Листинг 9.6.Пул с очередями в поточно-локальной памяти
class thread_pool {
thread_safe_queue pool_work_queue;
typedef std::queue local_queue_type;←
(1)
static thread_local std::unique_ptr
local_work_queue; ←
(2)
void worker_thread() {
local_work_queue.reset(new local_queue_type);←
(3)
while (!done) {
run_pending_task();
}
}
public:
template
std::future::type>
submit(FunctionType f) {
typedef typename std::result_of::type
result_type;
std::packaged_task task(f);
std::future res(task.get_future());
if (local_work_queue) { ←
(4)
local_work_queue->push(std::move(task));
} else {
pool_work_queue.push(std::move(task)); ←
(5)
}
return res;
}
void run_pending_task() {
function_wrapper task;
if (local_work_queue && !local_work_queue->empty()) {←
(6)
task = std::move(local_work_queue->front());
local_work_queue->pop();
task();
} else if(pool_work_queue.try_pop(task)) { ←
(7)
task();
} else {
std::this_thread::yield();
}
}
// остальное, как и раньше
};
Для хранения очереди работ в поточно-локальной памяти мы воспользовались указателем std::unique_ptr<>
(2), потому что не хотим, чтобы у потоков, не входящих в пул, была очередь; этот указатель инициализируется в функции worker_thread()
до начала цикла обработки (3). Деструктор std::unique_ptr<>
позаботится об уничтожении очереди работ по завершении потока.
Затем функция submit()
проверяет, есть ли у текущего потока очередь работ (4). Если есть, то это поток из пула, и мы можем поместить задачу в локальную очередь. В противном случае задачу следует помещать в очередь пула, как и раньше (5).
Аналогичная проверка имеется в функции run_pending_task()
(6), только на этот раз нужно еще проверить, есть ли что-нибудь в локальной очереди. Если есть, то можно извлечь элемент из начала очереди и обработать его. Обратите внимание, что локальная очередь может быть обычным объектом std::queue<>
(1), так как к ней обращается только один поток. Если в локальной очереди задач нет, то мы проверяем очередь пула, как и раньше (7).
Таким образом мы действительно уменьшаем конкуренцию, но если распределение работ неравномерно, то может случиться, что в очереди одного потока скопится много задач, тогда как остальным будет нечем заняться. Например, в случае Quicksort только самый первый блок попадает в очередь пула, а остальные окажутся в локальной очереди того потока, который этот блок обработал. Это сводит на нет всю идею пула потоков.
К счастью, у этой проблемы есть решение: позволить потоку занимать (steal) работы из очередей других потоков, если ни в его собственной, ни в глобальной очереди ничего нет.
Если мы хотим, чтобы «безработный» поток мог брать работы из очереди другого потока, то эта очередь должна быть доступна занимающему потоку в run_pending_tasks()
. Для этого каждый поток должен зарегистрировать свою очередь в пуле или получать очередь от пула. Кроме того, необходимо позаботиться о надлежащей синхронизации и защите очереди работ, чтобы не нарушались инварианты.
Можно написать свободную от блокировок очередь, которая позволит потоку-владельцу помещать и извлекать элементы с одного конца, а другим потокам — занимать элементы с другого конца, однако реализация такой очереди выходит за рамки данной книги. Чтобы продемонстрировать идею, мы поступим проще — воспользуемся мьютексом для защиты данных очереди. Мы надеемся, что занимание работ — редкое событие, поэтому конкуренция за мьютекс будет невелика, и накладные расходы на такую очередь окажутся минимальны. Ниже приведена простая реализация с блокировками.
Листинг 9.7.Очередь с блокировкой, допускающей занимание работ
class work_stealing_queue {
private:
typedef function_wrapper data_type;
std::deque the_queue; ←
(1)
mutable std::mutex the_mutex;
public:
work_stealing_queue() {}
work_stealing_queue(const work_stealing_queue& other)=delete;
work_stealing_queue& operator=(
const work_stealing_queue& other)=delete;
void push(data_type data) { ←
(2)
std::lock_guard lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool empty() const {
std::lock_guard lock(the_mutex);
return the_queue.empty();
}
bool try_pop(data_type& res) { ←
(3)
std::lock_guard lock(the_mutex);
if (the_queue.empty()) {
return false;
Читать дальше