7.2.6. Потокобезопасная очередь без блокировок
Очередь отличается от стека прежде всего тем, что операции push()
и pop()
обращаются к разным частям структуры данных, тогда как в стеке та и другая работают с головным узлом списка. Следовательно, и проблемы синхронизации тоже другие. Требуется сделать так, чтобы изменения, произведенные на одном конце, были видны при доступе с другого конца. Однако структура функции try_pop()
в листинге 6.6 не так уж сильно отличается от структуры pop()
в простом свободном от блокировок стеке в листинге 7.2, поэтому можно с достаточными основаниями предположить, что и весь свободный от блокировок код будет схожим. Посмотрим, так ли это.
Если взять листинг 6.6 за основу, то нам понадобятся два указателя на node
: один для головы списка ( head
), второй — для хвоста ( tail
). Поскольку мы собираемся обращаться к ним из нескольких потоков, то надо бы сделать эти указатели атомарными и расстаться с соответствующими мьютексами. Начнём с этого небольшого изменения и посмотрим, куда оно нас приведет. Результат показан в листинге ниже.
Листинг 7.13.Свободная от блокировок очередь с одним производителем и одним потребителем
template
class lock_free_queue {
private:
struct node {
std::shared_ptr data;
node* next;
node():
next(nullptr) {}
};
std::atomic head;
std::atomic tail;
node* pop_head() {
node* const old_head = head.load();
if (old_head == tail.load()) {←
(1)
return nullptr;
}
head.store(old_head->next);
return old_head;
}
public:
lock_free_queue():
head(new node), tail(head.load()) {}
lock_free_queue(const lock_free_queue& other) = delete;
lock_free_queue& operator=(
const lock_free_queue& other) = delete;
~lock_free_queue() {
while(node* const old_head = head.load()) {
head.store(old_head->next);
delete old_head;
}
}
std::shared_ptr pop() {
node* old_head = pop_head();
if (!old_head) {
return std::shared_ptr();
}
std::shared_ptr const res(old_head->data);←
(2)
delete old_head;
return res;
}
void push(T new_value) {
std::shared_ptr new_data(std::make_shared(new_value));
node* p = new node; ←
(3)
node* const old_tail = tail.load(); ←
(4)
old_tail->data.swap(new_data); ←
(5)
old_tail->next = p; ←
(6)
tail.store(p); ←
(7)
}
};
На первый взгляд, неплохо, и если в каждый момент времени существует только один поток, вызывающий push()
, и только один поток, вызывающий pop()
, то вообще всё прекрасно. Важно отметить, что в этом случае существует отношение происходит-раньше между push()
и pop()
, благодаря которому извлечение данных безопасно. Сохранение tail
(7)синхронизируется-с загрузкой tail
(1), сохранение указателя на data
в предыдущем узле (5)расположено перед сохранением tail
, а загрузка tail
расположена перед загрузкой указателя на data
(2), поэтому сохранение data
происходит раньше его загрузки, и всё замечательно. Таким образом, мы получили корректно обслуживаемую очередь с одним производителем и одним потребителем .
Проблемы начинаются, когда несколько потоков вызывают push()
или pop()
одновременно. Сначала рассмотрим push()
. Если два потока одновременно вызывают push()
, то оба выделяют память для нового фиктивного узла (3), оба читают одно и то же значение tail
(4)и, следовательно, оба изменяют данные-члены data
и next
одного и того же узла (5), (6). А это уже гонка за данными!
Аналогичные проблемы возникают в pop_head()
. Если два потока вызывают эту функцию одновременно, то оба читают одно и то же значение head
, и оба перезаписывают старое значение одним и тем же указателем next
. Оба потока теперь думают, что получили один и тот же узел, — прямой путь к катастрофе. Мы должны не только сделать так, чтобы лишь один поток извлекал данный элемент, но и позаботиться о том, чтобы другие потоки могли безопасно обращаться к члену next
узла, который прочитали из head
. Это точно та же проблема, с которой мы сталкивались при написании pop()
для свободного от блокировок стека, поэтому и любое из предложенных тогда решений можно применить здесь.
Итак, проблему pop()
можно считать решенной, но как быть с push()
? Здесь трудность заключается в том, что для получения требуемого отношения происходит-раньше между push()
и pop()
мы должны заполнить поля фиктивного узла до обновления tail
. Но это означает, что одновременные вызовы push()
конкурируют за те же самые данные, так как был прочитал один и тот же указатель tail
.
Читать дальше