Частный случай ожидания завершения запущенной задачи возникает, когда главный поток нуждается в вычисленном ей результате. Мы уже встречались с такой ситуацией выше, например, в функции parallel_accumulate()
из главы 2. В таком случае путем использования будущих результатов мы можем объединить ожидание с передачей результата. В листинге 9.2 приведен код модифицированного пула потоков, который разрешает ожидать завершения задачи и передает возвращенный ей результат ожидающему потоку. Поскольку экземпляры класса std::packaged_task<>
допускают только перемещение , но не копирование , мы больше не можем воспользоваться классом std::function<>
для обертывания элементов очереди, потому что std::function<>
требует, чтобы в обернутых объектах-функциях был определён копирующий конструктор. Вместо этого мы напишем специальный класс-обертку, умеющий работать с объектами, обладающими только перемещающим конструктором. Это простой маскирующий тип класс (type-erasure class), в котором определён оператор вызова. Нам нужно поддержать функции, которые не принимают параметров и возвращают void
, поэтому оператор всего лишь вызывает виртуальный метод call()
, который в свою очередь вызывает обернутую функцию.
Листинг 9.2.Пул потоков, ожидающий завершения задачи
class function_wrapper {
struct impl_base {
virtual void call() = 0;
virtual ~impl_base() {}
};
std::unique_ptr impl;
template
struct impl_type: impl_base {
F f;
impl_type(F&& f_): f(std::move(f_)) {}
void call() { f(); }
};
public:
template function_wrapper(F&& f):
impl(new impl_type(std::move(f))) {}
void operator()() { impl->call(); }
function_wrapper() = default;
function_wrapper(function_wrapper&& other):
impl(std::move(other.impl)) {}
function_wrapper& operator=(function_wrapper&& other) {
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
class thread_pool {
thread_safe_queue work_queue;←┐
│
Используем
void worker_thread() │
function_
{ │
wrapper
while (!done) │
вместо std::
{ │
function
function_wrapper task; ←┘
if (work_queue.try_pop(task))
task();
else
std::this_thread::yield();
}
}
public:
template
std::future::type>←
(1)
submit(FunctionType f) {
typedef typename std::result_of::type
result_type; ←
(2)
std::packaged_task task(std::move(f));←
(3)
std::future res(task.get_future()); ←
(4)
work_queue.push(std::move(task)); ←
(5)
return res; ←
(6)
}
// остальное, как и раньше
};
Прежде всего отметим, что модифицированная функция submit()
(1)возвращает объект std::future<>
, который будет содержать возвращенное задачей значение и позволит вызывающей программе ждать ее завершения. Для этого нам необходимо знать тип значения, возвращаемого переданной функцией f
, и здесь на помощь приходит шаблон std::result_of<>
: std::result_of::type
— это тип результата, возвращенного вызовом объекта типа FunctionType
(например, f
) без аргументов. Выражение std::result_of<>
мы используем также в определении псевдонима типа result_type
(2)внутри функции.
Затем f
обертывается объектом std::packaged_task
(3), потому что f
— функция или допускающий вызов объект, который не принимает параметров и возвращает результат типа result_type
. Теперь мы можем получить будущий результат из std::packaged_task<>
(4), перед тем как помещать задачу в очередь (5)и возвращать будущий результат (6). Отметим, что при помещении задачи в очередь мы должны использовать функцию std::move()
, потому что класс std::packaged_task<> не допускает копирования. Именно поэтому в очереди хранятся объекты function_wrapper
, а не объекты типа.
Этот пул позволяет ожидать завершения задач и получать возвращаемые ими результаты. В листинге ниже показано, как выглядит функция parallel_accumulate
, работающая с таким пулом потоков.
Листинг 9.3.Функция parallel_accumulate
, реализованная с помощью пула потоков, допускающего ожидание задач
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const block_size = 25;
Читать дальше