Листинг 6.2.Потокобезопасная очередь с блокировками и условными переменными
template
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
void push(T new_value) {
std::lock_guard lk(mut);
data_queue.push(std::move(data));
data_cond.notify_one(); ←
(1)
}
void wait_and_pop(T& value) { ←
(2)
std::unique_lock lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr wait_and_pop() ←
(3)
std::unique_lock lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty();});←
(4)
std::shared_ptr res(
std::make_shared(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool try_pop(T& value) {
std::lock_guard lk(mut);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr try_pop() {
std::lock_guard lk(mut);
if (data_queue.empty())
return std::shared_ptr(); ←
(5)
std::shared_ptr res(
std::make_shared(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool empty() const {
std::lock_guard lk(mut);
return data_queue.empty();
}
};
Структурно очередь в листинге 6.2 реализована аналогично стеку в листинге 6.1, отличие только в обращениях к функции data_cond.notify_one()
в push()
(1)и в наличии двух вариантов функции wait_and_pop()
(2), (3). Оба перегруженных варианта try_pop()
почти идентичны функциям pop()
в листинге 6.1 с тем отличием, что не возбуждают исключение, если очередь пуста. Вместо этого одна функция возвращает булевское значение, показывающее, были ли извлечены данные, а вторая — возвращающая указатель на данные (5)— указатель NULL
. Точно так же можно было бы поступить и в случае стека. Таким образом, если оставить в стороне функции wait_and_pop()
, то применим тот же анализ, который мы провели для стека.
Новые функции wait_and_pop()
решают проблему ожидания значения в очереди, с которой мы столкнулись при обсуждении стека; вместо того чтобы раз за разом вызывать empty()
, ожидающий поток может просто вызвать wait_and_pop()
, а структура данных обслужит этот вызов с помощью условной переменной. Обращение к data_cond.wait()
не вернет управление, пока во внутренней очереди не появится хотя бы один элемент, так что мы можем не беспокоиться но поводу того, что в этом месте кода возможна пустая очередь. При этом данные по-прежнему защищаются мьютексом. Таким образом, функции wait_and_pop()
не вводят новых состояний гонки, не создают возможности взаимоблокировок и не нарушают никаких инвариантов.
В части безопасности относительно исключений есть мелкая неприятность — если помещения данных в очередь ожидают несколько потоков, то лишь один из них будет разбужен в результате вызова data_cond.notify_one()
. Однако если этот поток возбудит исключение в wait_and_pop()
, например при конструировании std::shared_ptr<>
(4), то ни один из оставшихся потоков разбужен не будет. Если это неприемлемо, то можно заменить notify_one()
на data_cond.notify_all()
, тогда будут разбужены все потоки, но за это придётся заплатить — большая часть из них сразу же уснет снова, увидев, что очередь по-прежнему пуста. Другой вариант — включить в wait_and_pop()
обращение к notify_one()
в случае исключения, тогда другой поток сможет попытаться извлечь находящееся в очереди значение. Третий вариант — перенести инициализацию std::shared_ptr<>
в push()
и сохранять экземпляры std::shared_ptr<>
, а не сами значения данных. Тогда при копировании std::shared_ptr<>
из внутренней очереди std::queue<>
никаких исключений возникнуть не может, и wait_and_pop()
становится безопасной. В следующем листинге приведена реализация очереди, переработанная с учетом высказанных соображений.
Листинг 6.3.Потокобезопасная очередь, в которой хранятся объекты std::shared_ptr
template
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue > data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
void wait_and_pop(T& value) {
std::unique_lock lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = std::move(*data_queue.front()); ←
(1)
data_queue.pop();
}
bool try_pop(T& value) {
std::lock_guard lk(mut);
Читать дальше