Поскольку используются будущие результаты, массива results
больше нет, поэтому мы должны сохранить результат обработки последнего блока в переменной (7), а не в элементе массива. Кроме того, поскольку мы получаем значения из будущих результатов, проще не вызывать std::accumulate
, а написать простой цикл for
, в котором к переданному начальному значению (8)будут прибавляться значения, полученные из каждого будущего результата (9). Если какая-то задача возбудит исключение, то оно будет запомнено в будущем результате и повторно возбуждено при обращении к get()
. Наконец, перед тем как возвращать окончательный результат вызывающей программе, мы прибавляем результат обработки последнего блока (10).
Таким образом, мы устранили одну из потенциальных проблем: исключения, возбужденные в рабочих потоках, повторно возбуждаются в главном. Если исключения возникнут в нескольких рабочих потоках, то вверх распространится только одно, но это не очень страшно. Если вы считаете, что это все-таки важно, то можете воспользоваться чем-то вроде класса std::nested_exception
, чтобы собрать все такие исключения и передать их главному потоку.
Осталось решить проблему утечки потоков в случае, когда исключение возникает между моментом запуска первого потока и присоединением всех запущенных. Для этого проще всего перехватить любое исключение, дождаться присоединения потоков, которые все еще находятся в состоянии joinable()
, а потом возбудить исключение повторно:
try {
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
// ... как и раньше
}
T last_result = accumulate_block()(block_start, last);
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
} catch (...) {
for (unsigned long i = 0; i < (num_thread - 1); ++i) {
if (threads[i].joinable())
thread[i].join();
}
throw;
}
Теперь все работает. Все потоки будут присоединены вне зависимости от того, как завершилась обработка блока. Однако блоки try-catch
выглядят некрасиво, и часть кода дублируется. Мы присоединяем потоки как в «нормальной» ветке, так и в блоке catch
. Дублирование кода — вещь почти всегда нежелательная, потому что изменения придётся вносить в несколько мест. Давайте лучше перенесём этот код в деструктор — ведь именно такова идиома очистки ресурсов в С++. Вот как выглядит этот класс:
class join_threads {
std::vector& threads;
public:
explicit join_threads(std::vector& threads_):
threads(threads_) {}
~join_threads() {
for (unsigned long i = 0; i < threads.size(); ++i) {
if (threads[i].joinable())
threads[i].join();
}
}
};
Это похоже на класс thread_guard
из листинга 2.3, только на этот раз мы «охраняем» целый вектор потоков. Теперь наш код упрощается.
Листинг 8.4.Безопасная относительно исключений версия std::accumulate
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads i = 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector > futures(num_threads — 1);
std::vector threads(num_threads - 1);
join_threads joiner(threads); ←
(1)
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task task(
accumulate_block());
futures[i] = task.get_future();
threads[i] =
std::thread(std:move(task), block_start, block_end);
block_start = block_end;
}
T last_result = accumulate_block()(block_start, last);
T result = init;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
result + = futures[i].get(); ←
(2)
}
result += last_result;
return result;
}
Создав контейнер потоков, мы создаем объект написанного выше класса (1), который присоединяет все завершившиеся потоки. Теперь явный цикл присоединения можно удалить, точно зная, что при выходе из функции потоки будут присоединены. Отметим, что вызовы futures[i].get()
(2)приостанавливают выполнение программы до готовности результатов, поэтому в этой точке явно присоединять потоки не нужно. Этим данный код отличается от первоначальной версии в листинге 8.2, где присоединять потоки было необходимо для нрав ильного заполнения вектора results
. Мало того что мы получили безопасный относительно исключений код, так еще и функция стала короче, потому что код присоединения вынесен в новый класс (который, кстати, можно использовать и в других программах).
Читать дальше