bool try_pop(T& value);
std::shared_ptr wait_and_pop();
void wait_and_pop(T& value);
void push(T new_value);
void empty();
};
Код, помещающий новые узлы в очередь, прост — его реализация (показанная в листинге ниже) близка к той, что мы видели раньше.
Листинг 6.8.Потокобезопасная очередь с блокировкой и ожиданием: добавление новых значений
template
void threadsafe_queue::push(T new_value) {
std::shared_ptr new_data(
std::make_shared(std::move(new_value)));
std::unique_ptr p(new node);
{
std::lock_guard tail_lock(tail_mutex);
tail->data = new_data;
node* const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
}
data_cond.notify_one();
}
Как уже отмечалось, вся сложность сосредоточена в части pop . В листинге ниже показана реализация функции-члена wait_and_pop()
и относящихся к ней вспомогательных функций.
Листинг 6.9.Потокобезопасная очередь с блокировкой и ожиданием: wait_and_pop
template
class threadsafe_queue {
private:
node* get_tail() {
std::lock_guard tail_lock(tail_mutex);
return tail;
}
std::unique_ptr pop_head() {←
(1)
std::unique_ptr old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
std::unique_lock wait_for_data() {←
(2)
std::unique_lock head_lock(head_mutex);
data_cond.wait(
head_lock, [&]{return head.get() != get_tail();});
return std::move(head_lock); ←
(3)
}
std::unique_ptr wait_pop_head() {
std::unique_lock head_lock(wait_for_data());←
(4)
return pop_head();
}
std::unique_ptr wait_pop_head(T& value) {
std::unique_lock head_lock(wait_for_data());←
(5)
value = std::move(*head->data);
return pop_head();
}
public:
std::shared_ptr wait_and_pop() {
std::unique_ptr const old_head = wait_pop_head();
return old_head->data;
}
void wait_and_pop(T& value) {
std::unique_ptr const old_head = wait_pop_head(value);
}
};
В реализации извлечения из очереди используется несколько небольших вспомогательных функций, которые упрощают код и позволяют устранить дублирование, например: pop_head()
(1)(модификация списка в результате удаления головного элемента) и wait_for_data()
(2)(ожидание появления данных в очереди). Особенно стоит отметить функцию wait_for_data()
, потому что она не только ждет условную переменную, используя лямбда-функцию в качестве предиката, но и возвращает объект блокировки вызывающей программе (3). Тем самым мы гарантируем, что та же самая блокировка удерживается, пока данные модифицируются в соответствующем перегруженном варианте wait_pop_head()
(4), (5). Функция pop_head()
используется также в функции try_pop()
, показанной ниже.
Листинг 6.10.Потокобозопасная очередь с блокировкой и ожиданием: try_pop()
и empty()
template
class threadsafe_queue {
private:
std::unique_ptr try_pop_head() {
std::lock_guard head_lock(head_mutex);
if (head.get() == get_tail()) {
return std::unique_ptr();
}
return pop_head();
}
std::unique_ptr try_pop_head(T& value) {
std::lock_guard head_lock(head_mutex);
if (head.get() == get_tail()) {
return std::unique_ptr();
}
value = std::move(*head->data);
return pop_head();
}
public:
std::shared_ptr try_pop() {
std::unique_ptr old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr();
}
bool try_pop(T& value) {
std::unique_ptr const old_head = try_pop_head(value);
return old_head;
}
void empty() {
std::lock_guard head_lock(head_mutex);
return (head.get() == get_tail());
}
};
Эта реализация очереди ляжет в основу очереди без блокировок, которую мы будем рассматривать в главе 7. Данная очередь неограниченная , то есть в нее можно помещать и помещать данные, ничего не удаляя, пока не кончится память. Альтернативой является ограниченная очередь, максимальная длина которой задается при создании. Попытка поместить элемент в заполненную очередь либо завершается ошибкой, либо приводит к приостановке потока до тех пор, пока из очереди не будет удален хотя бы один элемент. Ограниченные очереди бывают полезны для равномерного распределения задач между потоками (см. главу 8). Такая дисциплина не дает потоку (или потокам), заполняющему очередь, намного обогнать потоки, читающие из очереди.
Показанную реализацию неограниченной очереди легко преобразовать в очередь с ограниченной длиной, введя ожидание условной переменной в функцию push()
. Вместо того чтобы ждать, пока в очереди появятся элементы (как pop()
), мы должны будем ждать, когда число элементов в ней окажется меньше максимума. Дальнейшее обсуждение ограниченных очередей выходит за рамки этой книги, мы же перейдём от очередей к более сложным структурам данных.
Читать дальше