Кроме того, try_pop()
удерживает tail_mutex
лишь на очень короткое время, необходимое для защиты чтения tail
. Следовательно, почти все действия внутри try_pop()
могут производиться одновременно с вызовом push()
. Объем операций, выполняемых под защитой мьютекса head_mutex
также совсем невелик; дорогостоящая операция delete
(в деструкторе указателя на узел) производится вне блокировки. Это увеличивает потенциальное число одновременных обращений к try_pop()
; в каждый момент времени только один поток может вызывать pop_head()
, зато несколько потоков могут удалять старые узлы и безопасно возвращать данные.
Ожидание поступления элемента
Ну хорошо, код в листинге 6.6 дает в наше распоряжение потокобезопасную очередь с мелкогранулярными блокировками, но он поддерживает только функцию try_pop()
(и к тому же всего в одном варианте). А как насчет таких удобных функций wait_and_pop()
, которые мы написали в листинге 6.2? Сможем ли мы реализовать идентичный интерфейс, сохранив мелкогранулярные блокировки?
Ответ, разумеется, — да, только вот как это сделать? Модифицировать push()
несложно: нужно лишь добавить вызов data_cond.notify_one()
в конец функции, как и было в листинге 6.2. Но на самом деле не всё так просто; мы же связались с мелкогранулярными блокировками для того, чтобы увеличить уровень параллелизма. Если оставить мьютекс захваченным на все время вызова notify_one()
(как в листинге 6.2), то поток, разбуженный до того, как мьютекс освобожден, должен будет ждать мьютекса. С другой стороны, если освободить мьютекс до обращения к notify_one(), то ожидающий поток сможет захватить его сразу, как проснётся (если, конечно, какой-то другой поток не успеет раньше). Это небольшое улучшение, но в некоторых случаях оно бывает полезно.
Функция wait_and_pop()
сложнее, потому что мы должны решить, где поместить ожидание, какой задать предикат и какой мьютекс захватить. Мы ждем условия «очередь не пуста», оно представляется выражением head != tail
. Если записать его в таком виде, то придется захватывать и head_mutex
, и tail_mutex
, но, разбирая код в листинге 6.6, мы уже поняли, что захватывать tail_mutex
нужно только для чтения tail
, а не для самого сравнения, та же логика применима и здесь. Если записать предикат в виде head != get_tail()
, то нужно будет захватить только head_mutex
и использовать уже полученную блокировку для защиты data_cond.wait()
. Прочий код такой же, как в try_pop()
.
Второй перегруженный вариант try_pop()
и соответствующий ему вариант wait_and_pop()
нуждаются в тщательном осмыслении. Если просто заменить возврат указателя std::shared_ptr<>
, полученного из old_head
, копирующим присваиванием параметру value
, то функция перестанет быть безопасной относительно исключений. В этот момент элемент данных уже удален из очереди и мьютекс освобожден, осталось только вернуть данные вызывающей программе. Однако, если копирующее присваивание возбудит исключение (а почему бы и нет?), то элемент данных будет потерян, потому что вернуть его в то же место очереди, где он был, уже невозможно.
Если фактический тип T
, которым конкретизируется шаблон, обладает не возбуждающими исключений оператором перемещающего присваивания или операцией обмена ( swap
), то так поступить можно, но ведь мы ищем общее решение, применимое к любому типу T
. В таком случае следует поместить операции, способные возбудить исключения, в защищенную область перед тем, как удалять узел из списка. Это означает, что нам необходим еще один перегруженный вариант pop_head()
, который извлекает сохраненное значение до модификации списка.
Напротив, модификация функции empty() тривиальна: нужно просто захватить head_mutex
и выполнить проверку head == get_tail()
(см. листинг 6.10). Окончательный код очереди приведён в листингах 6.7, 6.8, 6.9 и 6.10.
Листинг 6.7.Потокобезопасная очередь с блокировкой и ожиданием: внутренние данные и интерфейс
template
class threadsafe_queue {
private:
struct node {
std::shared_ptr data;
std::unique_ptr next;
};
std::mutex head_mutex;
std::unique_ptr head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;
public:
threadsafe_queue():
head(new node), tail(head.get()) {}
threadsafe_queue(const threadsafe_queue& other) = delete;
threadsafe_queue& operator=(
const threadsafe_queue& other) = delete;
std::shared_ptr try_pop();
Читать дальше