Эта реализация действительно очень простая — в ней для ожидания используется спинлок, поэтому она не идеальна для случая, когда потоки могут ждать долго, и совсем не годится в ситуации, когда в каждый момент времени может существовать более count потоков, выполнивших wait()
. Если требуется, чтобы и в этих случаях барьер работал правильно, то следует использовать более надежную (но и более сложную) реализацию. Кроме того, я ограничился последовательно согласованными операциями над атомарными переменными, потому что они проще для понимания, но вы, возможно, захотите ослабить ограничения на порядок доступа к памяти. Такая глобальная синхронизация дорого обходится в массивно параллельных архитектурах, так как строку кэша, содержащую состояние барьера, приходится передавать всем участвующим процессорам (см. обсуждение перебрасывания кэша в разделе 8.2.2). Поэтому нужно тщательно продумать, является ли это решение оптимальным.
Как бы то ни было, барьер — это именно то, что нам необходимо в данном случае; имеется фиксированное число потоков, которые должны синхронизироваться на каждой итерации цикла. Ну почти фиксированное. Как вы, наверное, помните, элементы, расположенные близко к началу списка, получают окончательные значения всего через пару шагов. Это означает, что либо все потоки должны крутиться в цикле, пока не будет обработан весь диапазон, либо барьер должен поддерживать выпадение потоков, уменьшая значение count
. Я предпочитаю второй вариант, чтобы не заставлять потоки выполнять бессмысленную работу в ожидании завершения последнего шага.
Но тогда нужно сделать count
атомарной переменной, чтобы ее можно было изменять из нескольких потоков без внешней синхронизации:
std::atomic count;
Инициализация не меняется, но при переустановке spaces
теперь нужно явно загружать spaces
с помощью операции load()
:
spaces = count.load();
Больше никаких изменений в wait()
вносить не надо, но необходима новая функция-член для уменьшения count. Назовем ее done_waiting()
, потому что с ее помощью поток заявляет, что больше ждать не будет.
void done_waiting() {
--count; ←
(1)
if (!--spaces) { ←
(2)
spaces = count.load();←
(3)
++generation;
}
}
Прежде всего мы уменьшаем count
(1)
, чтобы при следующей переустановке spaces
было отражено новое, меньшее прежнего, количество ожидающих потоков. Затем уменьшаем количество свободных мест spaces
(2). Если этого не сделать, то остальные потоки будут ждать вечно, так как при инициализации spaces
было задано старое, большее, значение. Если текущий поток является последним в партии, то количество мест нужно переустановить и увеличить generation
на единицу (3)— так же, как в wait()
. Основное отличие от wait()
заключается в том, что поток не должен ждать — он же сам объявляет, что больше ждать не будет!
Теперь мы готовы написать вторую реализацию алгоритма вычисления частичных сумм. На каждом шаге каждый поток вызывает функцию wait()
барьера, чтобы все потоки пересекли его вместе, а, закончив работу, поток вызывает функцию done_waiting()
, чтобы уменьшить счетчик. Если наряду с исходным диапазоном использовать второй буфер, то барьер обеспечивает необходимую синхронизацию. На каждом шаге потоки либо читают исходный диапазон и записывают новое значение в буфер, либо наоборот — читают буфер и записывают новое значение в исходный диапазон. На следующем шаге исходный диапазон и буфер меняются местами. Тем самым мы гарантируем отсутствие гонки между операциями чтения и записи в разных потоках. Перед выходом из цикла каждый поток должен позаботиться о записи окончательного значения в исходный диапазон. В листинге 8.13 все это собрано воедино.
Листинг 8.13.Параллельная реализация partial_sum
методом попарных обновлений
struct barrier {
std::atomic count;
std::atomic spaces;
std::atomic generation;
barrier(unsigned count_) :
count(count_), spaces(count_), generation(0) {}
void wait() {
unsigned const gen = generation.load();
if (!--spaces) {
spaces = count.load();
++generation;
} else {
while (generation.load() == gen) {
std::this_thread::yield();
}
}
}
void done_waiting() {
--count;
if (!--spaces) {
spaces = count.load();
++generation;
}
}
};
template
Читать дальше