2.4. Задание количества потоков во время выполнения
В стандартной библиотеке С++ есть функция std::thread::hardware_concurrency()
, которая поможет нам решить эту задачу. Она возвращает число потоков, которые могут работать по-настоящему параллельно. В многоядерной системе это может быть, например, количество процессорных ядер. Возвращаемое значение всего лишь оценка; более того, функция может возвращать 0, если получить требуемую информацию невозможно. Однако эту оценку можно с пользой применить для разбиения задачи на несколько потоков.
В листинге 2.8 приведена простая реализация параллельной версии std::accumulate
. Она распределяет работу между несколькими потоками и, чтобы не создавать слишком много потоков, задает ограничение снизу на количество элементов, обрабатываемых одним потоком. Отмстим, что в этой реализации предполагается, что ни одна операция не возбуждает исключений, хотя в принципе исключения возможны; например, конструктор std::thread
возбуждает исключение, если не может создать новый поток. Но если добавить в этот алгоритм обработку исключений, он перестанет быть таким простым; эту тему мы рассмотрим в главе 8.
Листинг 2.8.Наивная реализация параллельной версии алгоритма std::accumulate
template
struct accumulate_block {
void operator()(Iterator first, Iterator last, T& result) {
result = std::accumulate(first, last, result);
}
};
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length) ←
(1)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length+min_per_thread - 1) / min_per_thread; ←
(2)
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads = ←
(3)
std::min(
hardware.threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads; ←
(4)
std::vector results(num_threads);
std::vector threads(num_threads - 1); ←
(5)
Iterator block_start = first;
for(unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size); ←
(6)
threads[i] = std::thread( ←
(7)
accumulate_block(),
block_start, block_end, std::ref(results(i)));
block_start = block_end; ←
(8)
}
accumulate_block()(
block_start, last, results[num_threads-1]); ←
(9)
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join)); ←
(10)
return
std::accumulate(results.begin(), results.end(), init); ←
(11)
}
Хотя функция довольно длинная, по существу она очень проста. Если входной диапазон пуст (1), то мы сразу возвращаем начальное значение init
. В противном случае диапазон содержит хотя бы один элемент, поэтому мы можем разделить количество элементов на минимальный размер блока и получить максимальное число потоков (2).
Это позволит избежать создания 32 потоков на 32-ядерной машине, если диапазон состоит всего из пяти элементов.
Число запускаемых потоков равно минимуму из только что вычисленного максимума и количества аппаратных потоков (3): мы не хотим запускать больше потоков, чем может поддержать оборудование (это называется превышением лимита ), так как из-за контекстных переключений при большем количестве потоков производительность снизится. Если функция std::thread::hardware_concurrency()
вернула 0, то мы берем произвольно выбранное число, я решил остановиться на 2. Мы не хотим запускать слишком много потоков, потому что на одноядерной машине это только замедлило бы программу. Но и слишком мало потоков тоже плохо, так как это означало бы отказ от возможного параллелизма.
Каждый поток будет обрабатывать количество элементов, равное длине диапазона, поделенной на число потоков (4). Пусть вас не пугает случай, когда одно число нацело не делится на другое, — ниже мы рассмотрим его.
Теперь, зная, сколько необходимо потоков, мы можем создать вектор std::vector
для хранения промежуточных результатов и вектор std::vector
для хранения потоков (5). Отметим, что запускать нужно на один поток меньше, чем num_threads
, потому что один поток у нас уже есть.
Запуск потоков производится в обычном цикле: мы сдвигаем итератор block_end
в конец текущего блока (6)и запускаем новый поток для аккумулирования результатов по этому блоку (7). Начало нового блока совпадает с концом текущего (8).
После того как все потоки запущены, главный поток может обработать последний блок (9). Именно здесь обрабатывается случай деления с остатком: мы знаем, что конец последнего блока — last
, а сколько в нем элементов, не имеет значения.
Читать дальше