Как уже упоминалось, обязательной для модуля является лишь функция xx put()
. Рассмотрим ситуацию, когда модули потока не содержат процедур xx service()
. В этом случае, проиллюстрированном на рис. 5.19, каждый предыдущий модуль вызывает функцию xx put()
следующего, передавая ему сообщение, с помощью функции ядра putnext(9F) . Функция xx put()
немедленно вызывает putnext(9F) и т.д.:
xxput(queue_t *q, mblk_t *mp) {
putnext(q, mp);
}
Рис. 5.19. Передача данных без управления потоком
Когда данные достигают драйвера, он передает их непосредственно устройству. Если устройство занято, или драйвер не может немедленно обработать данные, сообщение уничтожается. В данном примере никакого управления потоком не происходит, и очереди сообщений не используются.
Хотя такой вариант может применяться для некоторых драйверов (как правило, для псевдоустройств, например, /dev/null), в общем случае устройство не может быть все время готово к обработке данных, а потеря данных из-за занятости устройства недопустима. Таким образом, в потоке может происходить блокирование передачи данных [60] Блокирование передачи может происходить не только в драйвере (оконечном модуле) потока из-за занятости устройства. Возможна ситуация, когда отдельный модуль вынужден отложить обработку сообщений до наступления некоторого события.
, и эта ситуация не должна приводить к потере сообщений, во избежание которой необходим согласованный между модулями механизм управления потоком. Для этого сообщения обрабатываются и буферизуются в соответствующей очереди модуля, а их передача возлагается на функцию xx service()
, вызываемую ядром автоматически. Для каждой очереди определены две ватерлинии — верхняя и нижняя, которые используются для контроля заполненности очереди. Если число сообщений превышает верхнюю ватерлинию, очередь считается переполненной, и передача сообщений блокируется, пока их число не станет меньше нижней ватерлинии.
Рассмотрим пример потока, модули 1 и 3 которого поддерживают управление потоком данных, а модуль 2 — нет. Другими словами, модуль 2 не имеет процедуры xx service()
. Когда сообщение достигает модуля 3, вызывается его функция xx put()
. После необходимой обработки сообщения, оно помещается в очередь модуля 3 с помощью функции putq(9F) . Если при этом число сообщений в очереди превышает верхнюю ватерлинию, putq(9F) устанавливает специальный флаг, сигнализирующий о том, что очередь переполнена:
mod1put(queue_t* q, mblk_t* mp) {
/* Необходимая обработка сообщения */
...
putq(q, mp);
}
Через некоторое время ядро автоматически запускает процедуру xx service()
модуля 3. Для каждого сообщения очереди xxput()
вызывает функцию canput(9F) , которая проверяет заполненность очереди следующего по потоку модуля. Функция canput(9F) имеет вид:
#include
int canput(queue_t* q);
Заметим, что canput(9F) проверяет заполненность очереди следующего модуля, реализующего механизм управления передачей данных, т.е. производящего обработку очереди с помощью процедуры xx service()
. В противном случае, как уже говорилось, очередь модуля не принимает участия в передаче данных. В нашем примере, canput(9F) проверит заполненность очереди записи модуля 1. Функция возвращает истинное значение, если очередь может принять сообщение, и ложное — в противном случае. В зависимости от результата проверки процедура xx service()
либо передаст сообщение следующему модулю (в нашем примере — модулю 2, который после необходимой обработки сразу же передаст его модулю 1), либо вернет сообщение обратно в очередь, если следующая очередь переполнена.
Описанная схема показана на рис. 5.20. Ниже приведен скелет процедуры xx service()
модуля 3, иллюстрирующий описанный алгоритм передачи сообщений с использованием механизма управления передачей данных.
Рис. 5.20. Управление потоком данных
mod1service(queue_t *q) {
mblk_t* mp;
while ((mp = getq(q)) != NULL) {
if (canput(q->q_next))
putnext(q, mp);
else {
putbq(q, mp);
break;
}
}
В этом примере функция getq(9F) используется для извлечения следующего сообщения из очереди, а функция putbq(9F) — для помещения сообщения в начало очереди. Если модуль 1 блокирует передачу, т.е. canput(9F) вернет "ложно", процедура xx service()
завершает свою работу, и сообщения начинают буферизоваться в очереди модуля 3. При этом очередь временно исключается из списка очередей, ожидающих обработки, и процедура xx service()
для нее вызываться не будет. Данная ситуация продлится до тех пор, пока число сообщений очереди записи модуля 1 не станет меньше нижней ватерлинии.
Читать дальше