Проще всего будет добавить в класс thread_pool
новую функцию, чтобы исполнять задачу из очереди и управлять циклом самостоятельно. Так мы и поступим. Более развитые реализации пула могли бы включить дополнительную логику в функцию ожидания или добавить другие функции ожидания для обработки этого случая, быть может, даже назначая приоритеты ожидаемым задачам. В листинге ниже приведена новая функция run_pending_task()
, а модифицированный алгоритм Quicksort, в котором она используется, показан в листинге 9.5.
Листинг 9.4.Реализация функции run_pending_task()
void thread_pool::run_pending_task() {
function_wrapper task;
if (work_queue.try_pop(task)) {
task();
} else {
std::this_thread::yield();
}
}
Код run_pending_task()
вынесен из главного цикла внутри функции worker_thread()
, которую теперь можно будет изменить, так чтобы она вызывала run_pending_task()
. Функция run_pending_task()
пытается получить задачу из очереди и в случае успеха выполняет ее; если очередь пуста, то функция уступает управление ОС, чтобы та могла запланировать другой поток. Показанная ниже реализация Quicksort гораздо проще, чем версия в листинге 8.1, потому что вся логика управления потоками перенесена в пул.
Листинг 9.5.Реализация Quicksort на основе пула потоков
template
struct sorter { ←
(1)
thread_pool pool; ←
(2)
std::list do_sort(std::list& chunk_data) {
if (chunk_data.empty()) {
return chunk_data;
}
std::list result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();
typename std::list::iterator divide_point =
std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val){ return val < partition_val; });
std::list new_lower_chunk;
new_lower_chunk.splice(new_lower_chunk.end(),
chunk_data, chunk_data.begin(),
divide_point);
std::future > new_lower = ←
(3)
pool.submit(std::bind(&sorter::do_sort, this,
std::move(new_lower_chunk)));
std::list new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
while (!new_lower.wait_for(std::chrono::seconds(0)) ==
std::future_status::timeout) {
pool.run_pending_task(); ←
(4)
}
result.splice(result.begin(), new_lower.get());
return result;
}
};
template
std::list parallel_quick_sort(std::list input) {
if (input.empty()) {
return input;
}
sorter s;
return s.do_sort(input);
}
Как и в листинге 8.1, реальная работа делегируется функции-члену do_sort()
шаблона класса sorter
(1), хотя в данном случае этот шаблон нужен лишь для обертывания экземпляра thread_pool
(2).
Управление потоками и задачами теперь свелось к отправке задачи пулу (3)и исполнению находящихся в очереди задач в цикле ожидания (4). Это гораздо проще, чем в листинге 8.1, где нужно было явно управлять потоками и стеком подлежащих сортировке блоков. При отправке задачи пулу мы используем функцию std::bind()
, чтобы связать указатель this
с do_sort()
и передать подлежащий сортировке блок. В данном случае мы вызываем std::move()
, чтобы данные new_lower_chunk
перемещались, а не копировались.
Мы решили проблему взаимоблокировки, возникающую из- за того, что одни потоки ждут других, но этот пул все еще далек от идеала. Отметим хотя бы, что все вызовы submit()
и run_pending_task()
обращаются к одной и той же очереди. В главе 8 мы видели, что модификация одного набора данных из разных потоков может негативно сказаться на производительности, стало быть, с этим нужно что-то делать.
9.1.4. Предотвращение конкуренции за очередь работ
Всякий раз, как поток вызывает функцию submit()
экземпляра пула потоков, он помещает новый элемент в единственную разделяемую очередь работ. А рабочие потоки постоянно извлекают элементы из той же очереди. Следовательно, по мере увеличения числа процессоров будет возрастать конкуренция за очередь работ. Это может ощутимо отразиться на производительности; даже при использовании свободной от блокировок очереди, в которой нет явного ожидания, драгоценное время может тратиться на перебрасывание кэша.
Чтобы избежать перебрасывания кэша, мы можем завести по одной очереди работ на каждый поток. Тогда каждый поток будет помещать новые элементы в свою собственную очередь и брать работы из глобальной очереди работ только тогда, когда в его очереди работ нет. В следующем листинге приведена реализация с использованием переменной типа thread_local
, благодаря которой каждый поток обладает собственной очередью работ в дополнение к глобальной.
Читать дальше