Но оставим в стороне эффективность и перейдем к коду. В листинге 8.11 приведена реализация первого подхода.
Листинг 8.11.Параллельное вычисление частичных сумм путём разбиения задачи на части
template
void parallel_partial_sum(Iterator first, Iterator last) {
typedef typename Iterator::value_type value_type;
struct process_chunk { ←
(1)
void operator()(Iterator begin, Iterator last,
std::future* previous_end_value,
std::promise* end_value) {
try {
Iterator end = last;
++end;
std::partial_sum(begin, end, begin); ←
(2)
if (previous_end_value) { ←
(3)
value_type& addend = previous_end_value->get();←
(4)
*last += addend; ←
(5)
if (end_value) {
end_value->set_value(*last); ←
(6)
}
std::for_each(begin, last, [addend](value_type& item) {←
(7)
item += addend;
});
} else if (end_value) {
end_value->set_value(*last); ←
(8)
}
} catch (...) { ←
(9)
if (end_value) {
end_value->set_exception(std::current_exception()); ←
(10)
} else {
throw; ←
(11)
}
}
}
};
unsigned long const length = std::distance(first, last);
if (!length)
return last;
unsigned long const min_per_thread = 25; ←
(12)
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads!= 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
typedef typename Iterator::value_type value_type;
std::vector threads(num_threads - 1);←
(13)
std::vector >
end_values(num_threads - 1); ←
(14)
std::vector >
previous_end_values; ←
(15)
previous_end_values.reserve(num_threads - 1); ←
(16)
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_last = block_start;
std::advance(block_last, block_size – 1); ←
(17)
threads[i] = std::thread(process_chunk(), ←
(18)
block_start, block_last,
(i !=0 ) ? &previous_end_values[i - 1] : 0, &end_values[i]);
block_start = block_last;
++block_start; ←
(19)
previous_end_values.push_back(end_values[i].get_future());←
(20)
}
Iterator final_element = block_start;
std::advance(
final_element, std::distance(block_start, last) - 1);←
(21)
process_chunk()(block_start, final_element, ←
(22)
(num_threads > 1) ? &previous_end_values.back() : 0, 0);
}
Общая структура этого кода не отличается от рассмотренных ранее алгоритмов: разбиение задачи на блоки с указанием минимального размера блока, обрабатываемого одним потоком (12). В данном случае, помимо вектора потоков (13), мы завели вектор обещаний (14), в котором будут храниться значения последних элементов в каждом блоке, и вектор будущих результатов (15), используемый для получения последнего значения из предыдущего блока. Так как мы знаем, сколько всего понадобится будущих результатов, то можем заранее зарезервировать для них место в векторе (16), чтобы избежать перераспределения памяти при запуске потоков.
Главный цикл выглядит так же, как раньше, только на этот раз нам нужен итератор, который указывает на последний элемент в блоке (17), а не на элемент за последним, чтобы можно было распространить последние элементы поддиапазонов. Собственно обработка производится в объекте-функции process_chunk
, который мы рассмотрим ниже; в качестве аргументов передаются итераторы, указывающие на начало и конец блока, а также будущий результат, в котором будет храниться последнее значение из предыдущего диапазона (если таковой существует), и объект-обещание для хранения последнего значения в текущем диапазоне (18).
Запустив поток, мы можем обновить итератор, указывающий на начало блока, не забыв продвинуть его на одну позицию вперед (за последний элемент предыдущего блока) (19), и поместить будущий результат, в котором будет храниться последнее значение в текущем блоке, в вектор будущих результатов, откуда он будет извлечён на следующей итерации цикла (20).
Перед тем как обрабатывать последний блок, мы должны получить итератор, указывающий на последний элемент (21), который передается в process_chunk
(22). Алгоритм std::partial_sum
не возвращает значения, поэтому по завершении обработки последнего блока больше ничего делать не надо. Можно считать, что операция закончилась, когда все потоки завершили выполнение.
Теперь настало время поближе познакомиться с объектом-функцией process_chunk
, который собственно и делает всю содержательную работу (1). Сначала вызывается std::partial_sum
для всего блока, включая последний элемент (2), но затем нам нужно узнать, первый это блок или нет (3). Если это не первый блок, то должно быть вычислено значение previous_end_value
для предыдущего блока, поэтому нам придется его подождать (4). Чтобы добиться максимального распараллеливания, мы затем сразу же обновляем последний элемент (5), так чтобы это значение можно было передать дальше, в следующий блок (если таковой имеется) (6). Сделав это, мы можем с помощью std::for_each
и простой лямбда-функции (7)обновить остальные элементы диапазона.
Читать дальше