В программе 10.5 (ThreeStage.c) предусмотрено создание нескольких этапов производства и потребления, на каждой из которых поддерживается очередь рабочих заданий, подлежащих обработке. Каждая очередь имеет ограниченную, конечную длину. Всего существует три конвейерных ступени, соединяющих четыре этапа обработки. Программа имеет следующую структуру:
• Производители (producers) периодически создают единичные сообщения, дополненные контрольными суммами, используя для этого ту же функцию, что и в программе 8.2, если не считать того, что в каждом сообщении содержится дополнительное поле адресата, указывающее поток потребителя (consumer), для которой предназначено это сообщение, причем каждый производитель связывается только с одним потребителем. Количество пар "производитель/потребитель" задается в виде параметра командной строки. Далее производитель посылает одиночное сообщение передающему потоку (transmitter), помещая его в очередь передачи сообщений. Если очередь заполнена, производитель ждет, пока ее состояние не изменится.
• Передающий поток объединяет имеющиеся единичные сообщения (но не более пяти за один раз) и создает одно передаваемое сообщение, которое содержит заголовок и ряд единичных сообщений. Затем передающий поток помещает каждое передаваемое сообщение в очередь приема сообщений (receiver), блокируясь, если очередь заполнена. В общем случае передатчик и приемник могут связываться между собой через сетевое соединение. Произвольно выбранное здесь значение коэффициента блокирования (blocking factor), равное 5:1, легко поддается регулировке.
• Принимающий поток обрабатывает единичные сообщения, входящие в состав каждого передаваемого сообщения, и помещает каждое из них в соответствующую очередь потребителя, если она не заполнена.
• Каждый поток потребителя получает одиночные сообщения по мере их поступления и записывает сообщение в файл журнала регистрации.
Блок-схема системы представлена на рис. 10.1. Обратите внимание, что эта система моделирует сетевое соединение, в котором сообщения, относящиеся к различным парам "отправитель/получатель" объединяются и передаются по общему каналу связи.
Рис. 10.1.Многоступенчатый конвейер
В программе 10.5 предложен вариант реализации, в котором используются функции очереди из программы 10.4. Функции генерации и отображения сообщений здесь не представлены, но они взяты из программы 8.1. При этом, наряду с контрольными суммами и данными, в блоки сообщений введены поля производителя и адресата.
Программа 10.5. ThreeStage.с: многоступенчатыйконвейер
/* Глава 10. ThreeStage.с */
/* Трехступенчатая система производитель/потребитель. */
/* Использование: ThreeStage npc goal. */
/* Запустить "npc" пар потоков производителя и потребителя. */
/* Каждый производитель должен сгенерировать в общей сложности */
/* "goal" сообщений, каждое из которых снабжается меткой, указывающей */
/* потребителя, для которого оно предназначено. */
/* Сообщения отправляются "передающему потоку", который, прежде чем */
/* отправить группу сообщений "принимающему потоку", выполняет некоторую*/
/* дополнительную обработку. Наконец, принимающий поток отправляет сообщения потокам потребителя. */
#include "EvryThng.h"
#include "SynchObj.h"
#include "messages.h"
#include
#define DELAY_COUNT 1000
#define MAX_THREADS 1024
/* Размеры и коэффициенты блокирования очередей. Эти величины являются */
/* произвольными и могут регулироваться для обеспечения оптимальной */
/* производительности. Текущие значения не являются сбалансированными. */
#define TBLOCK_SIZE 5 /*Передающий поток формирует группы из 5 сообщений.*/
#define TBLOCK_TIMEOUT 50 /*Интервал ожидания сообщений передающим потоком.*/
#define P2T_QLEN 10 /* Размер очереди "производитель/передающий поток". */
#define T2R_QLEN 4 /*Размер очереди "передающий поток/принимающий поток".*/
#define R2C_QLEN 4 /* Размер очереди "принимающий поток/потребитель" -- */
/* для каждого потребителя существует только одна очередь.*/
DWORD WINAPI producer(PVOID);
DWORD WINAPI consumer(PVOID);
DWORD WINAPI transmitter(PVOID);
DWORD WINAPI receiver(PVOID);
typedef struct _THARG {
volatile DWORD thread_number;
volatile DWORD work_goal; /* Используется потоками производителей. */
volatile DWORD work_done; /* Используется потоками производителей и потребителей. */ '
Читать дальше