В данном случае я остановился на std::promise
, потому что такое поведение больше походит на поведение std::find
. Надо только не забыть о случае, когда искомого элемента в указанном диапазоне нет. Поэтому необходимо дождаться завершения всех потоков перед тем, как получать значение из будущего результата. Если просто блокировать исполнение при обращении к get()
, то при условии, что искомого элемента нет, мы будем ждать вечно. Получившийся код приведён ниже.
Листинг 8.9.Параллельная реализация алгоритма find()
template
Iterator parallel_find(Iterator first, Iterator last,
MatchType match) {
struct find_element { ←
(1)
void operator()(Iterator begin, Iterator end,
MatchType match,
std::promise* result,
std::atomic* done_flag) {
try {
for(; (begin != end) && !done_flag->load(); ++begin) {←
(2)
if (*begin == match) {
result->set_value(begin);←
(3)
done_flag->store(true); ←
(4)
return;
}
}
} catch (...) { ←
(5)
try {
result->set_exception(std::current_exception());←
(6)
done_flag->store(true);
} catch (...) ←
(7)
{}
}
}
};
unsigned long const length = std::distance(first, last);
if (!length)
return last;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread — 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::promise result; ←
(8)
std::atomic done_flag(false);←
(9)
std::vector threads(num_threads — 1); {←
(10)
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(find_element(), ←
(11)
block_start, block_end, match,
&result, &done_flag);
block_start = block_end;
}
find_element()(
block_start, last, match, &result, &done_flag);←
(12)
if (!done_flag.load()) { ←
(13)
return last;
}
return result.get_future().get() ←
(14)
}
В основе своей код в листинге 8.9 похож на предыдущие примеры. На этот раз вся работа производится в операторе вызова, определенном в локальном классе find_element
(1). Здесь мы в цикле обходим элементы из назначенного потоку блока, проверяя флаг на каждой итерации (2). Если искомый элемент найден, то мы записываем окончательный результат в объект-обещание (3)и перед возвратом устанавливаем флаг done_flag
(4).
Если было возбуждено исключение, то его перехватит универсальный обработчик (5)и попытается сохранить исключение в обещании (6)перед установкой done_flag
. Но установка значения объекта-обещания может возбудить исключение, если значение уже установлено, поэтому мы перехватываем и игнорируем любые возникающие здесь исключения (7).
Это означает, что если поток, вызвавший find_element
, найдет искомый элемент или возбудит исключение, то все остальные потоки увидят поднятый флаг done_flag
и прекратят работу. Если несколько потоков одновременно найдут искомое или возбудят исключение, то возникнет гонка за установку результата в обещании. Но это безобидная гонка: победивший поток считается «первым», и установленный им результат приемлем.
В самой функции parallel_find
мы определяем обещание (8)и флаг прекращения поиска (9), которые передаем новым потокам вместе с диапазоном для просмотра (11). Кроме того, главный поток пользуется классом find_element
для поиска среди оставшихся элементов (12). Как уже отмечалось, мы должны дождаться завершения всех потоков перед тем, как проверять результат, потому что искомого элемента может вообще не оказаться. Для этого мы заключаем код запуска и присоединения потоков в блок (10), так что к моменту проверки флага (13)все потоки гарантировано присоединены. Если элемент был найден, то, обратившись к функции get()
объекта std::future
, мы либо получим результат из обещания, либо возбудим сохраненное исключение.
Как и раньше, в этой реализации предполагается, что мы собираемся использовать все доступные аппаратные потоки или располагаем каким-то механизмом, который позволит заранее определить количество потоков для предварительного разделения между ними работы. И снова мы можем упростить код, воспользовавшись функцией std::async
и рекурсивным разбиением данных, если готовы принять автоматический механизм масштабирования, скрытый в стандартной библиотеке С++. Реализация parallel_find
с применением std::async
приведена в листинге ниже.
Читать дальше