char future[8];
} THARG;
/* Сгруппированные сообщения, посылаемые передающим потоком потребителю.*/
typedef struct t2r_msg_tag {
volatile DWORD num_msgs; /* Количество содержащихся сообщений. */
msg_block_t messages[TBLOCK_SIZE];
} t2r_msg_t;
queue_t p2tq, t2rq, *r2cq_array;
static volatile DWORD ShutDown = 0;
static DWORD EventTimeout = 50;
DWORD _tmain(DWORD argc, LPTSTR * argv[]) {
DWORD tstatus, nthread, ithread, goal, thid;
HANDLE *producer_th, *consumer_th, transmitter_th, receiver_th;
THARG *producer_arg, *consumer_arg;
nthread = atoi(argv[1]);
goal = atoi(argv[2]);
producer_th = malloc(nthread * sizeof(HANDLE));
producer_arg = calloc(nthread, sizeof(THARG));
consumer_th = malloc(nthread * sizeof(HANDLE));
consumer_arg = calloc(nthread, sizeof(THARG));
q_initialize(&p2tq, sizeof(msg_block_t), P2T_QLEN);
q_initialize(&t2rq, sizeof(t2r_msg_t), T2R_QLEN);
/* Распределить ресурсы, инициализировать очереди "принимающий поток/потребитель" для каждого потребителя. */
r2cq_array = calloc(nthread, sizeof(queue_t));
for (ithread = 0; ithread < nthread; ithread++) {
/* Инициализировать очередь r2с для потока данного потребителя. */
q_initialize(&r2cq_array[ithread], sizeof(msg_block_t), R2C_QLEN);
/* Заполнить аргументы потока. */
consumer_arg[ithread].thread_number = ithread;
consumer_arg[ithread].work_goal = goal;
consumer_arg[ithread].work_done = 0;
consumer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, consumer, (PVOID)&consumer_arg[ithread], 0, &thid);
producer_arg[ithread].thread_number = ithread;
producer_arg[ithread].work_goal = goal;
producer_arg[ithread].work_done = 0;
producer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, producer, (PVOID)&producer_arg[ithread], 0, &thid);
}
transraitter_th = (HANDLE)_beginthreadex(NULL, 0, transmitter, NULL, 0, &thid);
receiver_th = (HANDLE)_beginthreadex (NULL, 0, receiver, NULL, 0, &thid);
_tprintf(_T("ХОЗЯИН: Выполняются все потоки\n"));
/* Ждать завершения потоков производителя. */
for (ithread = 0; ithread < nthread; ithread++) {
WaitForSingleObject(producer_th[ithread], INFINITE);
_tprintf(_T("ХОЗЯИН: производитель %d выработал %d единичных сообщений\n"), ithread, producer_arg[ithread].work_done);
}
/* Производители завершили работу. */
_tprintf(_T("ХОЗЯИН: Все потоки производителя выполнили свою работу.\n"));
/* Ждать завершения потоков потребителя. */
for (ithread = 0; ithread < nthread; ithread++) {
WaitForSingleObject(consumer_th[ithread], INFINITE);
_tprintf(_T("ХОЗЯИН: потребитель %d принял %d одиночных сообщений\n"), ithread, consumer_arg[ithread].work_done);
}
_tprintf(_T("ХОЗЯИН: Все потоки потребителя выполнили свою работу.\n"));
ShutDown = 1; /* Установить флаг завершения работы. */
/* Завершить выполнение и перейти в состояние ожидания передающих и принимающих потоков. */
/* Эта процедура завершения работает нормально, поскольку и передающий,*/
/* и принимающий потоки не владеют иными ресурсами, кроме мьютекса, */
/* которые они могли бы покинуть по завершении выполнения, не уступив прав владения ими. Можете ли вы улучшить эту процедуру? */
TerminateThread(transmitter_th, 0);
TerminateThread(receiver_th, 0);
WaitForSingleObject(transmitter_th, INFINITE);
WaitForSingleObject(receiver_th, INFINITE);
q_destroy(&p2tq);
q_destroy(&t2rq);
for (ithread = 0; ithread < nthread; ithread++) q_destroy(&r2cq_array [ithread]);
free(r2cq_array);
free(producer_th);
free(consumer_th);
free(producer_arg);
free(consumer_arg);
_tprintf(_T("Система завершила работу. Останов системы\n"));
return 0;
}
DWORD WINAPI producer(PVOID arg) {
THARG * parg;
DWORD ithread, tstatus;
msg_block_t msg;
parg = (THARG *)arg;
ithread = parg->thread_number;
while (parg->work_done < parg->work_goal) {
/* Вырабатывать единичные сообщения, пока их общее количество */
/* не станет равным "goal". */
/* Сообщения снабжаются адресами отправителя и адресата, которые в */
/* нашем примере одинаковы для всех сообщений, но в общем случае */
/* могут быть различными. */
delay_cpu(DELAY_COUNT * rand() / RAND_MAX);
message_fill(&msg, ithread, ithread, parg->work_done);
/* Поместить сообщение в очередь. */
tstatus = q_put(&p2tq, &msg, sizeof(msg), INFINITE);
parg->work_done++;
}
return 0;
}
DWORD WINAPI transmitter(PVOID arg) {
/* Получись несколько сообщений от производителя, объединяя их в одно*/
/* составное сообщение, предназначенное для принимающего потока. */
DWORD tstatus, im;
t2r_msg_t t2r_msg = {0};
msg_block_t p2t_msg;
while (!ShutDown) {
t2r_msg.num_msgs = 0;
/* Упаковать сообщения для передачи принимающему потоку. */
for (im = 0; im < TBLOCK_SIZE; im++) {
tstatus = q_get(&p2tq, &p2t_msg, sizeof(p2t_msg), INFINITE);
if (tstatus != 0) break;
memcpy(&t2r_msg.messages[im], &p2t_msg, sizeof(p2t_msg));
Читать дальше