Кроме основной нашей цели это приложение дополнительно демонстрирует:
• Практическое использование принудительного завершения (отмены) потоков «извне» с управлением состоянием завершаемости потоков и расстановкой точек отмены, о чем мы уже говорили ранее.
• Использование атомарных (непрерываемых) операций (например, atomic_add_value()
), о которых мы будем говорить чуть позже.
• Использование реентерабельных форм функций стандартной библиотеки, безопасных в многопоточной среде ( rand_r()
вместо rand()
).
Один производитель — T потребителей
#include
#include
#include
#include
#include
#include
#include
#include
const int D = 10;
unsigned int T = 2;
static sem_t sem;
pthread_t* tid;
void* writer(void* data) {
unsigned long i = (int)(data); // общий размер выборки
unsigned int s = 1;
while (i-- > 0) {
delay((long)rand_r(&s) * D / RAND_MAX + 1);
sem_post(&sem); // объект данных произведен
}
for (i = 0; i < T; i++) pthread_cancel(tid[i + 1]);
return NULL;
}
static char *str; // строка результирующей диагностики
static volatile unsigned ind = 0;
void* reader(void*) {
char tid[8];
sprintf(tid, "%X", pthread_self());
unsigned int s = rand();
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
while(true) {
sem_wait(&sem); // получен объект данных
str[atomic_add_value(&ind, 1)] = *tid;
pthread_testcancel();
delay((long)rand_r(&s) * D * T / RAND_MAX + 1);
}
return NULL;
}
int main(int argc, char *argv[]) {
unsigned long N = 1000;
int opt, val;
while ((opt = getopt(argc, argv, "n:t:")) != -1) {
switch(opt) {
case 'n':
if (sscanf(optarg, "%i", &val) != 1)
cout << "parse command line error" << endl, exit(EXIT_FAILURE);
if (val > 0) N = val;
break;
case 't':
if (sscanf(optarg, "%i", &val) != 1)
cout << "parse command line error" << endl, exit(EXIT_FAILURE);
if (val > 0) T = val;
break;
default:
exit(EXIT_FAILURE);
}
}
str = new char[N + 1];
tid = new pthread_t[T + 1];
if (sem_init(&sem, 0, 0))
perror("semaphore init"), exit(EXIT_FAILURE);
if (pthread_create(tid, NULL, writer, (void*)N) >= EOK)
perror("writer create error"), exit(EXIT_FAILURE);
for (int i = 0; i < T; i++)
if (pthread_create(tid + i + 1, NULL, reader, NULL) != EOK)
perror("reader create error"), exit(EXIT_FAILURE);
for (int i = 0; i < T; i++)
pthread_join(tid[i], NULL);
sem_destroy(&sem);
delete [] tid;
str[ind] = '\0';
cout << str << endl;
delete [] str;
exit(EXIT_SUCCESS);
}
Вот как выглядит результат выполнения этой программы (во избежание внесения дополнительного синхронизма в качестве общего числа циклов «производства» и числа потоков потребителей выбраны взаимно простые числа):
# sy22 -n200 -t13
3456789ABCDEF7936A8547E39DCB45F67A59B84D37EC64F395B6AEF78B9DF34CB53B86A5FEDF975B3A8EC46FB8AD954736FA78C3ED46F7B594EC7B83AC6F9D4BCE569A73F86BCAD74C536EB79F5C8DA5B463EFBC7D937AEC85FDE4566CAF69DE7F385CA6
Хорошо видно, как строго последовательный поначалу порядок доступа потребителей к объектам данных десинхронизируется и становится хаотическим: каждый освободившийся потребитель приступает к работе над следующим объектом данных, как только тот становится доступен.
Атомарные операции не относятся к элементам синхронизации параллельных ветвей программы. Но им следует уделить внимание по двум причинам. Во-первых, атомарные операции — это простое и эффективное средство, позволяющее во многих случаях избежать использования механизмов синхронизации. А во-вторых, атомарные операции зачастую выпадают из рассмотрения из-за их двойственного положения: при обсуждении параллелизма и синхронизации они не рассматриваются, потому что не являются элементами синхронизации, а при обсуждении последовательных программ не рассматриваются потому, что здесь в них просто нет необходимости.
Атомарные операции — это операции, для которых гарантируется их непрерываемость даже при выполнении на симметричных мультипроцессорных платформах. Выполнение атомарных операций не прерывается даже асинхронными аппаратными прерываниями. Таким образом, эта группа операций является также и безопасной в многопоточном окружении.
Действительно, наиболее часто примитивы синхронизации применяются для создания критической секции кода с целью предотвращения возможности одновременного воздействия на объекты данных со стороны нескольких параллельно развивающихся ветвей программы.
При одновременной работе с данными из различных потоков состояние данных после такого воздействия должно считаться «неопределенным», при этом последствия могут быть более тяжкими, чем просто некорректное состояние данных - структура сложных объектов может быть просто разрушена.
Читать дальше
Конец ознакомительного отрывка
Купить книгу