thread_safe_queue > work_queue;←
(1)
std::vector threads; ←
(2)
join_threads joiner; ←
(3)
void worker_thread() {
while (!done) { ←
(4)
std::function task;
if (work_queue.try_pop(task)) { ←
(5)
task(); ←
(6)
} else {
std::this_thread::yield(); ←
(7)
}
}
}
public:
thread_pool():
done(false), joiner(threads) {
unsigned const thread_count =
std::thread::hardware_concurrency();←
(8)
try {
for (unsigned i = 0; i < thread_count; ++i) {
threads.push_back(
std::thread(&thread_pool::worker_thread, this)); ←
(9)
}
} catch (...) {
done = true; ←
(10)
throw;
}
}
~thread_pool() {
done = true; ←
(11)
}
template
void submit(FunctionType f) {
work_queue.push(std::function(f)); ←
(12)
}
};
Здесь мы определили вектор рабочих потоков (2)и используем одну из потокобезопасных очередей из главы 6 (1)для хранения очереди работ. В данном случае пользователь не может ждать завершения задачи, а задача не может возвращать значения, поэтому для инкапсуляции задач можно использовать тип std::function
. Функция submit()
обертывает переданную функцию или допускающий вызов объект в объект std::function
и помещает его в очередь (12).
Потоки запускаются в конструкторе; их количество равно значению, возвращаемому функцией std::thread::hardware_concurrency()
, то есть мы создаем столько потоков, сколько может поддержать оборудование (8). Все эти потоки исполняют функцию-член нашего класса worker_thread()
(9).
Запуск потока может завершиться исключением, поэтому необходимо позаботиться о том, чтобы уже запущенные к этому моменту потоки корректно завершались. Для этого мы включили блок try-catch
, который в случае исключения поднимает флаг done
(10). Кроме того, мы воспользовались классом join_threads
из главы 8 (3), чтобы обеспечить присоединение всех потоков. То же самое происходит в деструкторе: мы просто поднимаем флаг done
(11), а объект join_threads
гарантирует, что потоки завершатся до уничтожения пула. Отметим, что порядок объявления членов важен: и флаг done
и объект worker_queue
должны быть объявлены раньше вектора threads
, который, в свою очередь, должен быть объявлен раньше joiner
. Только тогда деструкторы членов класса будут вызываться в правильном порядке; в частности, нельзя уничтожать очередь раньше, чем остановлены все потоки.
Сама функция worker_thread
проста до чрезвычайности: в цикле, который продолжается, пока не поднят флаг done
(4), она извлекает задачи из очереди (5)и выполняет их (6). Если в очереди нет задач, функция вызывает std::this_thread::yield()
(7), чтобы немного отдохнуть и дать возможность поработать другим потокам.
Часто даже такого простого пула потоков достаточно, особенно если задачи независимы, не возвращают значений и не выполняют блокирующих операций. Но бывает и по-другому: во-первых, у программы могут быть более жесткие требования, а, во-вторых, в таком пуле возможны проблемы, в частности, из-за взаимоблокировок. Кроме того, в простых случаях иногда лучше прибегнуть к функции std::async
, как неоднократно демонстрировалось в главе 8. В этой главе мы рассмотрим и более изощренные реализации пула потоков с дополнительными возможностями, которые призваны либо удовлетворить особые потребности пользователя, либо уменьшить количество потенциальных ошибок. Для начала разрешим ожидать завершения переданной пулу задачи.
9.1.2. Ожидание задачи, переданной пулу потоков
В примерах из главы 8, где потоки запускались явно, главный поток после распределения работы между потоками всегда ждал завершения запущенных потоков. Тем самым гарантировалось, что вызывающая программа получит управление только после полного завершения задачи. При использовании пула потоков ждать нужно завершения задачи, переданной пулу, а не самих рабочих потоков. Это похоже на то, как мы ждали будущих результатов при работе с std::async
в главе 8. В случае простого пула потоков, показанного в листинге 9.1, организовывать ожидание придется вручную, применяя механизмы, описанные в главе 4: условные переменные и будущие результаты. Это усложняет код; намного удобнее было бы ждать задачу напрямую.
За счет переноса сложности в сам пул потоков мы сумеем добиться желаемого. Функция submit()
могла бы возвращать некий описатель задачи, по которому затем можно было бы ждать ее завершения. Сам описатель должен был бы инкапсулировать условную переменную или будущий результат. Это упростило бы код, пользующийся пулом потоков.
Читать дальше