1. Загрузить из контейнера-источника доступные на данный момент задачи.
2. Уточнить состояние очереди задач на предмет того, какие задачи уже выполнены или еще выполняются.
3. Для каждой из нерешенных задач породить контейнеры-исполнители с соответствующим интерфейсом.
4. При успешном завершении контейнера-исполнителя зафи-ксировать, что задача выполнена.
180Часть III. Паттерны проектирования систем пакетных вычислений Этот алгоритм прост на словах, но в действительности его не так легко реализовать. К счастью, оркестратор Kubernetes имеет несколько возможностей, которые значительно упрощают его реализацию. А именно: в Kubernetes есть объект Job , который позволяет обеспечить надежную работу очереди задач. Объект Job можно настроить так, чтобы он запускал соответствующий контейнер-исполнитель либо разово, либо пока задача не будет успешно выполнена. Если контейнер-исполнитель настроить, чтобы он выполнялся до завершения задачи, то, даже когда машина в кластере откажет, задача в конце концов будет вы-полнена успешно.
Таким образом, построение очереди задач существенно упро-щается, так как оркестратор берет на себя ответственность за надежное исполнение задач.
Кроме того, Kubernetes позволяет аннотировать задачи, что дает нам возможность пометить каждый объект-задачу названием обрабатываемого элемента очереди задач. Становится проще различать задачи, обрабатываемые и завершенные как успешно, так и с ошибкой.
Это значит, что мы можем реализовать очередь задач поверх ор-кестратора Kubernetes, не используя собственного хранилища. Все это существенно упрощает задачу построения инфраструк-туры очереди задач.
Следовательно, подробный алгоритм работы контейнера — дис-петчера очереди задач выглядит следующим образом.
Повторять бесконечно.
1. Получить список задач посредством интерфейса контейне-ра — источника задач.
2. Получить список заданий, обслуживающих данную очередь Глава 10. Системы на основе очередей задач 181
3. Выделить на основе этих списков перечень необработанных задач.
4. Для каждой необработанной задачи создать объект Job , который порождает соответствующий контейнер-испол-нитель.
Приведу сценарий на языке Python, реализующий такую оче-редь:
import requests
import json
from kubernetes import client, config
import time
namespace = "default"
def make_container(item, obj):
container = client.V1Container()
container.image = "my/worker-image"
container.name = "worker"
return container
def make_job(item):
response =
requests.get("http://localhost:8000/items/{}".format(item)) obj = json.loads(response.text)
job = client.V1Job()
job.metadata = client.V1ObjectMeta()
job.metadata.name = item
job.spec = client.V1JobSpec()
job.spec.template = client.V1PodTemplate() job.spec.template.spec = client.V1PodTemplateSpec() job.spec.template.spec.restart_policy = "Never" job.spec.template.spec.containers = [
make_container(item, obj)
]
return job
def update_queue(batch):
response = requests.get("http://localhost:8000/items") 182Часть III. Паттерны проектирования систем пакетных вычислений obj = json.loads(response.text)
items = obj['items']
ret = batch.list_namespaced_job(namespace, watch=False) for item in items:
found = False
for i in ret.items:
if i.metadata.name == item:
found = True
if not found:
# Функция создает объект Job, пропущена
# для краткости
job = make_job(item)
batch.create_namespaced_job(namespace, job)
config.load_kube_config()
batch = client.BatchV1Api()
while True:
update_queue(batch)
time.sleep(10)
Практикум. Реализация генератора миниатюр видеофайлов
В качестве примера использования очереди задач рассмотрим задачу генерации миниатюр видеофайлов. На основе этих ми-ниатюр пользователи принимают решение о том, какие видео они хотят посмотреть.
Для реализации миниатюр понадобится два контейнера. Пер-вый — для источника задач. Проще всего будет размещать за-дачи на общем сетевом диске, подключенном, например, по NFS (Network File System, сетевая файловая система). Источник задач получает список файлов в этом каталоге и передает их вызывающей стороне.
Глава 10. Системы на основе очередей задач 183
Приведу простую программу на NodeJS:
const http = require('http');
const fs = require('fs');
const port = 8080;
const path = process.env.MEDIA_PATH;
const requestHandler = (request, response) => { console.log(request.url);
fs.readdir(path + '/*.mp4', (err, items) => { var msg = {
'kind': 'ItemList',
'apiVersion': 'v1',
'items': []
};
if (!items) {
return msg;
}
for (var i = 0; i < items.length; i++) {
msg.items.push(items[i]);
}
response.end(JSON.stringify(msg));
});
}
const server = http.createServer(requestHandler); server.listen(port, (err) => {
if (err) {
return console.log('Ошибка запуска сервера', err);
}
console.log(`сервер запущен на порте ${port}`) });
Данный источник определяет список фильмов, подлежащих обработке. Для извлечения миниатюр используется утилита ffmpeg.
184Часть III. Паттерны проектирования систем пакетных вычислений Можно создать контейнер, запускающий такую команду: ffmpeg -i ${INPUT_FILE} -frames:v 100 thumb.png Команда извлекает один из каждых 100 кадров (параметр -frames:v 100 ) и сохраняет его в формате PNG (например, thumb1.png , thumb2.png и т. д.).
Читать дальше