void parallel_partial_sum(Iterator first, Iterator last) {
typedef typename Iterator::value_type value_type;
struct process_element { ←
(1)
void operator()(Iterator first, Iterator last,
std::vector& buffer,
unsigned i, barrier& b) {
value_type& ith_element = *(first + i);
bool update_source = false;
for (unsigned step = 0, stride = 1;
stride <= i; ++step, stride *= 2) {
value_type const& source = (step % 2) ? ←
(2)
buffer[i] : ith_element;
value_type& dest = (step % 2) ?
ith_element:buffer[i];
value_type const& addend = (step % 2) ? ←
(3)
buffer[i - stride] : *(first + i – stride);
dest = source + addend; ←
(4)
update_source = !(step % 2);
b.wait(); ←
(5)
}
if (update_source) { ←
(6)
ith_element = buffer[i];
}
b.done_waiting(); ←
(7)
}
};
unsigned long const length = std::distance(first, last);
if (length <= 1)
return;
std::vector buffer(length);
barrier b(length);
std::vector threads(length - 1); ←
(8)
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (length - 1); ++i) {
threads[i] = std::thread(process_element(), first, last,←
(9)
std::ref(buffer), i, std::ref(b));
}
process_element()(first, last, buffer, length - 1, b); ←
(10)
}
Общая структура кода вам, наверное, уже понятна. Имеется класс с оператором вызова ( process_element
), который выполняет содержательную работу (1)и вызывается из нескольких потоков (9), хранящихся в векторе (8), а также из главного потока (10). Важное отличие заключается в том, что теперь число потоков зависит от числа элементов в списке, а не от результата, возвращаемого функцией std::thread::hardware_concurrency
. Я уже говорил, что эта идея не слишком удачна, если только вы не работаете на машине с массивно параллельной архитектурой, где потоки обходятся дешево. Но это неотъемлемая часть самой идеи решения. Можно было бы обойтись и меньшим числом потоков, поручив каждому обработку нескольких значений из исходного диапазона, но тогда при относительно небольшом количестве потоков этот алгоритм оказался бы менее эффективен, чем алгоритм с прямым распространением.
Так или иначе, основная работа выполняется в операторе вызове из класса process_element
. На каждом шаге берется либо i -ый элемент из исходного диапазона, либо i -ый элемент из буфера (2)и складывается с предшествующим элементом, отстоящим от него на расстояние stride
(3); результат сохраняется в буфере, если мы читали из исходного диапазона, и в исходном диапазоне — если мы читали из буфера (4). Перед тем как переходить к следующему шагу, мы ждем у барьера (5). Работа заканчивается, когда элемент, отстоящий на расстояние stride
, оказывается слева от начала диапазона. В этом случае мы должно обновить элемент в исходном диапазоне, если сохраняли окончательный результат в буфере (6). Наконец, мы вызываем функцию done_waiting()
(7), сообщая барьеру, что больше ждать не будем.
Отметим, что это решение не безопасно относительно исключений. Если один из рабочих потоков возбудит исключение в process_element
, то приложение аварийно завершится. Решить эту проблему можно было бы, воспользовавшись std::promise
для запоминания исключения, как в реализации parallel_find
из листинга 8.9, или просто с помощью объекта std::exception_ptr
, защищенного мьютексом.
Вот и подошли к концу обещанные три примера. Надеюсь, они помогли уложить в сознании соображения, высказанные в разделе 8.1, 8.2, 8.3 и 8.4, и показали, как описанные приемы воплощаются в реальном коде.
Эта глава оказалась очень насыщенной. Мы начали с описания различных способов распределения работы между потоками, в частности предварительного распределения данных и использования нескольких потоков для формирования конвейера. Затем мы остановились на низкоуровневых аспектах производительности многопоточного кода, обратив внимание на феномен ложного разделения и проблему конкуренции за данные. Далее мы перешли к вопросу о том, как порядок доступа к данным влияет на производительность программы. После этого мы поговорили о дополнительных соображениях при проектировании параллельных программ, в частности о безопасности относительно исключений и о масштабируемости. И закончили главу рядом примеров реализации параллельных алгоритмов, на которых показали, какие проблемы могут возникать при проектировании многопоточного кода.
Читать дальше