Функция pop_task_from_other_thread_queue()
(4)обходит очереди, принадлежащие всем потокам пула, пытаясь занять задачу у каждой. Чтобы не случилось так, что все потоки занимают задачи у первого потока в списке, каждый поток начинает просмотр с позиции, равной его собственному индексу (5).
Теперь у нас имеется пул потоков, пригодный для самых разных целей. Разумеется, есть масса способов улучшить его для работы в конкретной ситуации, но это я оставляю в качестве упражнения для читателя. В частности, мы совсем не исследовали идею динамического изменения размера пула, так чтобы обеспечить оптимальное использование процессоров, даже когда потоки блокированы в ожидании какого-то события, например, завершения ввода/вывода или освобождения мьютекса.
Следующим в нашем списке «продвинутых» приёмов управления потоками стоит прерывание потоков.
Часто бывает необходимо сообщить долго работающему потоку о том, что пришло время остановиться. Например, потому что это рабочий поток пула, а мы собираемся уничтожить сам пул, или потому что пользователь отменил работу, выполняемую этим потоком. Причин миллион. Но идея в любом случае одна и та же: послать из одного потока другому сигнал с требованием прекратить работу до ее естественного завершения, и сделать это так, чтобы поток завершился корректно, а не просто выбить почву у него из-под ног.
Можно было бы придумывать такой механизм специально для каждого случая, но это, пожалуй, перебор. Мало того что общий механизм в дальнейшем упростит написание кода, так он еще и позволит писать код, допускающий прерывание, не заботясь о том, где конкретно он используется. Стандарт C++11 такого механизма не предоставляет, но реализовать его самостоятельно не слишком сложно. Я покажу, как это сделать, но сначала взгляну на проблему с точки зрения интерфейса запуска и прерывания потока, а не с точки зрения самого прерываемого потока.
9.2.1. Запуск и прерывание другого потока
Начнем с рассмотрения внешнего интерфейса. Что нам нужно от допускающего прерывание потока? На самом элементарном уровне интерфейс должен быть таким же, как у std::thread
, но с дополнительной функцией interrupt()
:
class interruptible_thread {
public:
template
interruptible_thread(FunctionType f);
void join();
void detach();
bool joinable() const;
void interrupt();
};
В реализации можно было бы использовать std::thread
для управления потоком и какую-то структуру данных для обработки прерывания. А как это выглядит с точки зрения самого потока? Как минимум, нужна возможность сказать: «Меня можно прерывать здесь», то есть нам требуется точка прерывания . Чтобы не передавать дополнительные данные, соответствующая функция должна вызываться без параметров: interruption_point()
. Отсюда следует, что относящаяся к прерываниям структура данных должна быть доступна через переменную типа thread_local
, которая устанавливается при запуске потока. Поэтому, когда поток обращается к функции interruption_point()
, та проверяет структуру данных для текущего исполняемого потока. С реализацией interruption_point()
мы познакомимся ниже.
Флаг типа thread_local
— основная причина, по которой мы не можем использовать для управления потоком просто класс std::thread
; память для него нужно выделить таким образом, чтобы к ней имел доступ как экземпляр interruptible_thread
, так и вновь запущенный поток. Для этого функцию, переданную конструктору, можно специальным образом обернуть перед тем, как передавать конструктору std::thread
. Как это делается, показано в следующем листинге.
Листинг 9.9.Простая реализация interruptible_thread
class interrupt_flag {
public:
void set();
bool is_set() const;
};
thread_local interrupt_flag this_thread_interrupt_flag; ←
(1)
class interruptible_thread {
std::thread internal_thread;
interrupt_flag* flag;
public:
template
interruptible_thread(FunctionType f) {
std::promise p; ←
(2)
internal_thread = std::thread([f,&p] { ←
(3)
p.set_value(&this_thread_interrupt_flag);
f(); ←
(4)
});
flag = p.get_future().get(); ←
(5)
}
void interrupt() {
if (flag) {
flag->set(); ←
(6)
}
}
};
Переданная функция f
обертывается лямбда-функцией (3), которая хранит копию f
и ссылку на локальный объект-обещание p
(2). Перед тем как вызывать переданную функцию (4), лямбда-функция устанавливает в качестве значения обещания адрес переменной this_thread_interrupt_flag
(объявленной с модификатором thread_local
(1)) в новом потоке. Затем вызывающий поток дожидается готовности будущего результата, ассоциированного с обещанием, и сохраняет этот результат в переменной-члене flag
(5). Отметим, что лямбда-функция исполняется в новом потоке и хранит висячую ссылку на локальную переменную p
, но ничего страшного в этом нет, так как конструктор interruptible_thread
ждет, пока на p
не останется ссылок в новом потоке, и только потом возвращает управление. Еще отметим, что эта реализация не обрабатывает присоединение или отсоединение потока. Мы сами должны позаботиться об очистке переменной flag
в случае выхода или отсоединения потока, чтобы избежать появления висячего указателя.
Читать дальше