std::future_status::ready) { ←
(13)
try_sort_chunk(); ←
(14)
}
result.splice(result.begin(), new_lower.get());
return result;
}
void sort_chunk(boost::shared_ptr const& chunk) {
chunk->promise.set_value(do_sort(chunk->data));←
(15)
}
void sort_thread() {
while (!end_of_data) { ←
(16)
try_sort_chunk(); ←
(17)
std::this_thread::yield();←
(18)
}
}
};
template
std::list parallel_quick_sort(std::list input) { ←
(19)
if (input.empty()) {
return input;
}
sorter s;
return s.do_sort(input); ←
(20)
}
Здесь функция parallel_quick_sort
(19)делегирует большую часть работы классу sorter
(1), который объединяет стек неотсортированных блоков (2)с множеством потоков (3). Основные действия производятся в функции-члене do_sort
(9), которая занимается обычным разбиением данных на две части (10). Но на этот раз она не запускает новый поток для каждого блока, а помещает его в стек (11)и запускает новый поток, только если еще есть незанятые процессоры (12). Поскольку меньший блок, возможно, обрабатывается другим потоком, нам необходимо дождаться его готовности (13). Чтобы не простаивать зря (в том случае, когда данный поток единственный или все остальные уже заняты), мы пытаемся обработать блок, находящийся в стеке (14). Функция try_sort_chunk
извлекает поток из стека (7), сортирует его (8)и сохраняет результаты в обещании promise
, так чтобы их смог получить поток, который поместил в стек данный блок (15).
Запущенные потоки крутятся в цикле, пытаясь отсортировать блоки, находящиеся в стеке (17), ожидая, пока будет установлен флаг end_of_data
(16). В промежутке между проверками они уступают процессор другим потокам (18), чтобы у тех был шанс поместить в стек новые блоки. Эта программа полагается на то, что деструктор класса sorter
(4)разберется с запущенными потоками. Когда все данные будут отсортированы, do_sort
вернет управление (хотя рабочие потоки все еще выполняются), так что главный поток вернется из parallel_quick_sort
(20)и, стало быть, уничтожит объект sorter
. В этот момент деструктор поднимет флаг end_of_data
(5)и дождется завершения рабочих потоков (6). Поднятый флаг является для функции потока указанием выйти из цикла (16).
При таком подходе не возникает проблемы неограниченного количества потоков, с которой мы сталкивались, когда spawn_task
каждый раз запускает новый поток. С другой стороны, мы не полагаемся на то, что стандартная библиотека С++ выберет количество рабочих потоков за нас, как то происходит при использовании std::async()
. Мы сами ограничиваем число потоков значением std::thread::hardware_concurrency()
, чтобы избежать чрезмерно большого количества контекстных переключений. Однако же взамен нас поджидает другая потенциальная проблема: управление потоками и взаимодействие между ними сильно усложняют код. Кроме того, хотя потоки и обрабатывают разные элементы данных, все они обращаются к стеку для добавления новых блоков и извлечения блоков для сортировки. Из-за этой острой конкуренции производительность может снизиться, пусть даже мы используем свободный от блокировок (и, значит, неблокирующий) стек. Почему так происходит, мы увидим чуть ниже.
Этот подход представляет собой специализированную версию пула потоков — существует набор потоков, которые получают задачи из списка ожидающих работ, выполняют их, а затем снова обращаются к списку. Некоторые потенциальные проблемы, свойственные пулам потоков (в том числе конкуренция за список работ), и способы их решения рассматриваются в главе 9. Задача масштабирования приложения на несколько процессоров более детально обсуждается в этой главе ниже (см. раздел 8.2.1).
Как при предварительном, так и при рекурсивном распределении данных мы предполагаем, что сами данные фиксированы заранее, а нам нужно лишь найти удачный способ их разбиения. Но так бывает не всегда; если данные порождаются динамически или поступают из внешнего источника, то такой подход не годится. В этом случае имеет смысл распределять работу по типам задач, а не по данным.
8.1.3. Распределение работы по типам задач
Распределение работы между потоками путем назначения каждому потоку блока данных (заранее или рекурсивно во время обработки) все же опирается на предположение о том, что все потоки производят одни те же действия с данными. Альтернативный подход — заводить специализированные потоки, каждый из которых решает отдельную задачу — как водопроводчики и электрики на стройке. Потоки могут даже работать с одними и теми же данными, но обрабатывать их по-разному.
Читать дальше