Хотя это действенная техника, применима она не в любой ситуации. Иногда данные не удается заранее распределить равномерно, а как это сделать, становится понятно только в процессе обработки. Особенно наглядно это проявляется в таких рекурсивных алгоритмах, как Quicksort; здесь нужен другой подход.
8.1.2. Рекурсивное распределение данных
Алгоритм Quicksort состоит из двух шагов: разбиение данных на две части — до и после опорного элемента в смысле требуемого упорядочения, и рекурсивная сортировка обеих «половин». Невозможно распараллелить этот алгоритм, разбив данные заранее, потому что состав каждой «половины» становится известен только в процессе обработки элементов. Поэтому распараллеливание по необходимости должно быть рекурсивным. На каждом уровне рекурсии производится больше вызовов функции quick_sort
, потому что предстоит отсортировать элементы, меньшие опорного, и большие опорного. Эти рекурсивные вызовы не зависят друг от друга, так как обращаются к разным элементам, поэтому являются идеальными кандидатами для параллельного выполнения. На рис. 8.2 изображено такое рекурсивное разбиение.
Рис. 8.2.Рекурсивное разбиение данных
В главе 4 была приведена подобная реализация. Вместо того чтобы просто вызывать функцию рекурсивно для большей и меньшей «половины», мы с помощью std::async()
на каждом шаге запускали асинхронную задачу для меньшей половины. Вызывая std::async()
, мы просим стандартную библиотеку С++ самостоятельно решить, имеет ли смысл действительно выполнять задачу в новом потоке или лучше сделать это синхронно.
Это существенно: при сортировке большого набора данных запуск нового потока для каждого рекурсивного вызова быстро приводит к образованию чрезмерного количества потоков. Как мы увидим ниже, когда потоков слишком много, производительность может не возрасти, а наоборот упасть . Кроме того, если набор данных очень велик, то потоков может просто не хватить. Сама идея рекурсивного разбиения на задачи хороша, нужно только строже контролировать число запущенных потоков. В простых случаях с этим справляется std::async()
, но есть и другие варианты.
Один из них — воспользоваться функцией std::thread::hardware_concurrency()
для выбора нужного числа потоков, как мы делали в параллельной версии accumulate()
из листинга 2.8. Тогда вместо того чтобы запускать новый поток для каждого рекурсивного вызова, мы можем просто поместить подлежащий сортировке блок данных в потокобезопасный стек типа того, что был описан в главах 6 и 7. Если потоку больше нечего делать — потому что он закончил обработку всех своих блоков или потому что ждет, когда необходимый ему блок будет отсортирован, то он может взять блок из стека и заняться его сортировкой.
В следующем листинге показана реализация этой идеи.
Листинг 8.1.Параллельный алгоритм Quicksort с применением стека блоков, ожидающих сортировки
template
struct sorter { ←
(1)
struct chunk_to_sort {
std::list data;
std::promise > promise;
};
thread_safe_stack chunks; ←
(2)
std::vector threads; ←
(3)
unsigned const max_thread_count;
std::atomic end_of_data;
sorter():
max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false) {}
~sorter() { ←
(4)
end_of_data = true; ←
(5)
for (unsigned i = 0; i < threads.size(); ++i) {
threads[i].join(); ←
(6)
}
}
void try_sort_chunk() {
boost::shared_ptr chunk = chunks.pop();←
(7)
if (chunk) {
sort_chunk(chunk); ←
(8)
}
}
std::list do_sort(std::list& chunk_data) { ←
(9)
if (chunk_data.empty()) {
return chunk_data;
}
std::list result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();
typename std::list::iterator divide_point = ←
(10)
std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val) {return val < partition_val; });
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(),
chunk_data, chunk_data.begin(),
divide_point);
std::future > new_lower =
new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk)); ←
(11)
if (threads.size() < max_thread_count) { ←
(12)
threads.push_back(std::thread(&sorter::sort_thread, this));
}
std::list new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
while (new_lower.wait_for(std::chrono::seconds(0)) !=
Читать дальше