Освобождение от блокировок одного потока с помощью другого
Чтобы вновь сделать код свободным от блокировок, нам нужно придумать, как ожидающий поток может продвигаться вперед, даже если поток, находящийся в push()
, застрял. Один из способов — помочь застрявшему потоку, выполнив за него часть работы.
В данном случае мы точно знаем, что нужно сделать: указатель next
в хвостовом узле требуется установить на новый фиктивный узел, и тогда сам указатель tail
можно будет обновить. Все фиктивные узлы эквивалентны, поэтому не имеет значения, какой из них использовать — созданный потоком, который успешно поместил в очередь данные, или потоком, ожидающим входа в push()
. Если сделать указатель next
в узле атомарным, то для его установки можно будет применить compare_exchange_strong()
. После того как указатель next
установлен, в цикле по compare_exchange_weak()
можно будет установить tail
, проверяя при этом, указывает ли он по-прежнему на тот же самый исходный узел. Если это не так, значит, узел обновил какой-то другой поток, так что можно прекратить попытки и перейти в начало цикла. Реализация этой идеи потребует также небольшого изменения pop()
, где нужно будет загрузить указатель next
; эта модификация показана в листинге ниже.
Листинг 7.20.Модификация pop()
с целью помочь при выполнении push()
template
class lock_free_queue {
private:
struct node {
std::atomic data;
std::atomic count;
std::atomic next;←
(1)
};
public:
std::unique_ptr pop() {
counted_node_ptr old_head =
head.load(std::memory_order_relaxed);
for (;;) {
increase_external_count(head, old_head);
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr) {
return std::unique_ptr();
}
counted_node_ptr next = ptr->next.load();←
(2)
if (head.compare_exchange_strong(old_head, next)) {
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr(res);
}
ptr->release_ref();
}
}
};
Как я и говорил, изменения тривиальны: указатель next
теперь атомарный (1), поэтому операция load
в точке (2)атомарна. В данном примере мы используем упорядочение по умолчанию memory_order_seq_cst
, поэтому явное обращение к load()
можно было бы опустить, полагаясь на операцию загрузки в операторе неявного преобразования к типу counted_node_ptr
, но явное обращение будет напоминать нам, куда впоследствии добавить явное задание порядка обращения к памяти.
Листинг 7.21.Пример реализации функции push()
, освобождаемой от блокировок благодаря помощи извне
template
class lock_free_queue {
private:
void set_new_tail(counted_node_ptr &old_tail, ←
(1)
counted_node_ptr const &new_tail) {
node* const current_tail_ptr = old_tail.ptr;
while (!tail.compare_exchange_weak(old_tail, new_tail) &&←
(2)
old_tail.ptr == current_tail_ptr);
if (old_tail.ptr == current_tail_ptr)←
(3)
free_external_counter(old_tail); ←
(4)
else
current_tail_ptr->release_ref(); ←
(5)
}
public:
void push(T new_value) {
std::unique_ptr new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;) {
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong( ←
(6)
old_data, new_data.get())) {
counted_node_ptr old_next = {0};
if (!old_tail.ptr->next.compare_exchange_strong(←
(7)
old_next, new_next)) {
delete new_next.ptr;←
(8)
new_next = old_next;←
(9)
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
} else { ←
(10)
counted_node_ptr old_next = {0};
if (old_tail.ptr->next.compare_exchange_strong(←
(11)
old_next, new_next)) {
old_next = new_next; ←
(12)
new_next.ptr = new node;←
(13)
}
set_new_tail(old_tail, old_next);←
(14)
}
}
}
};
В целом похоже на исходную версию push()
из листинга 7.15, но есть и несколько принципиальных отличий. Если указатель data
действительно установлен (6), то нужно обработать случай, когда нам помог другой поток, и кроме того появилась ветвь else
, в которой один поток оказывает помощь другому (10).
Установив указатель data
в узле (6), новая версия push()
изменяет указатель next
, вызывая compare_exchange_strong()
(7). Мы используем compare_exchange_strong()
, чтобы избежать цикла. Если обмен завершился неудачно, значит, другой поток уже установил указатель next
, поэтому нам ни к чему узел, выделенный в начале, и его можно удалить (8). Мы также хотим использовать значение next
, установленное другим потоком, для обновления tail
(9).
Читать дальше