Задачи маршрутизации¶
Примечание
Альтернативные концепции маршрутизации, такие как topic и fanout, доступны не для всех транспортов, пожалуйста, обратитесь к transport comparison table.
Основы¶
Автоматическая маршрутизация¶
Самый простой способ маршрутизации - использовать настройку task_create_missing_queues
(по умолчанию включена).
При включении этой настройки именованная очередь, которая еще не определена в task_queues
, будет создана автоматически. Это облегчает выполнение простых задач маршрутизации.
Допустим, у вас есть два сервера, x и y, которые выполняют обычные задачи, и один сервер z, который выполняет только задачи, связанные с кормами. Вы можете использовать следующую конфигурацию:
task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}
При включении этого маршрута задачи, связанные с импортными фидами, будут направляться в очередь фиды
, а все остальные задачи будут направляться в очередь по умолчанию (названную сельдерей
по историческим причинам).
В качестве альтернативы можно использовать сопоставление шаблонов glob или даже регулярные выражения для сопоставления всех задач в пространстве имен feed.tasks
:
app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}
Если порядок соответствия шаблонов важен, то вместо этого следует указать маршрутизатор в формате items:
task_routes = ([
('feed.tasks.*', {'queue': 'feeds'}),
('web.tasks.*', {'queue': 'web'}),
(re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)
Примечание
Настройка task_routes
может быть либо словарем, либо списком объектов маршрутизатора, поэтому в данном случае нам нужно указать настройку в виде кортежа, содержащего список.
После установки маршрутизатора вы можете запустить сервер z для обработки только очереди фидов следующим образом:
user@z:/$ celery -A proj worker -Q feeds
Вы можете указать столько очередей, сколько хотите, поэтому вы можете сделать этот серверный процесс очередью по умолчанию:
user@z:/$ celery -A proj worker -Q feeds,celery
Изменение имени очереди по умолчанию¶
Вы можете изменить имя очереди по умолчанию с помощью следующей конфигурации:
app.conf.task_default_queue = 'default'
Как определяются очереди¶
Смысл этой функции в том, чтобы скрыть сложный протокол AMQP для пользователей с базовыми потребностями. Тем не менее, вам может быть интересно, как объявляются эти очереди.
Очередь с именем видео
будет создана со следующими настройками:
{'exchange': 'video',
'exchange_type': 'direct',
'routing_key': 'video'}
НеAMQP-бэкенды, такие как Redis или SQS, не поддерживают обмены, поэтому они требуют, чтобы обмен имел то же имя, что и очередь. Использование этой конструкции гарантирует, что она будет работать и для них.
Ручная маршрутизация¶
Скажем, у вас есть два сервера, x и y, которые обрабатывают обычные задачи, и один сервер z, который обрабатывает только задачи, связанные с кормами, вы можете использовать эту конфигурацию:
from kombu import Queue
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'task.default'
task_queues
- это список экземпляров Queue
. Если вы не задали значения обмена или типа обмена для ключа, они будут взяты из параметров task_default_exchange
и << 3 >>>.
Чтобы направить задачу в очередь feed_tasks, вы можете добавить запись в настройку task_routes
:
task_routes = {
'feeds.tasks.import_feed': {
'queue': 'feed_tasks',
'routing_key': 'feed.import',
},
}
Вы также можете переопределить это, используя аргумент routing_key Task.apply_async()
, или send_task()
:
>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
... queue='feed_tasks',
... routing_key='feed.import')
Чтобы заставить сервер z потреблять только из очереди фидов, вы можете запустить его с опцией celery worker -Q
:
user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h
Серверы x и y должны быть настроены на потребление из очереди по умолчанию:
user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h
Если хотите, вы можете даже поручить своему работнику по переработке кормов выполнять и обычные задачи, возможно, в периоды, когда работы много:
user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h
Если у вас есть другая очередь, но на другой бирже, которую вы хотите добавить, просто укажите пользовательский обмен и тип обмена:
from kombu import Exchange, Queue
app.conf.task_queues = (
Queue('feed_tasks', routing_key='feed.#'),
Queue('regular_tasks', routing_key='task.#'),
Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'),
routing_key='image.compress'),
)
Если вы запутались в этих терминах, вам стоит почитать об AMQP.
См.также
В дополнение к Приоритеты сообщений Redis ниже, есть Rabbits and Warrens, отличная статья в блоге, описывающая очереди и обмены. Есть также CloudAMQP tutorial, For users of RabbitMQ the RabbitMQ FAQ, который может быть полезен как источник информации.
Специальные варианты маршрутизации¶
Приоритеты сообщений RabbitMQ¶
- поддерживаемые транспорты:
RabbitMQ
Добавлено в версии 4.0.
Очереди могут быть настроены на поддержку приоритетов путем установки аргумента x-max-priority
:
from kombu import Exchange, Queue
app.conf.task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks',
queue_arguments={'x-max-priority': 10}),
]
Значение по умолчанию для всех очередей может быть установлено с помощью параметра task_queue_max_priority
:
app.conf.task_queue_max_priority = 10
Приоритет по умолчанию для всех задач также может быть задан с помощью параметра task_default_priority
:
app.conf.task_default_priority = 5
Приоритеты сообщений Redis¶
- поддерживаемые транспорты:
Redis
Хотя транспорт Celery Redis учитывает поле приоритета, сам Redis не имеет понятия о приоритетах. Пожалуйста, прочитайте это примечание, прежде чем пытаться реализовать приоритеты в Redis, поскольку вы можете столкнуться с неожиданным поведением.
Чтобы начать планировать задания на основе приоритетов, необходимо настроить транспортную опцию queue_order_strategy.
app.conf.broker_transport_options = {
'queue_order_strategy': 'priority',
}
Поддержка приоритетов реализована путем создания n списков для каждой очереди. Это означает, что хотя существует 10 (0-9) уровней приоритета, по умолчанию они объединены в 4 уровня для экономии ресурсов. Это означает, что очередь с именем celery в действительности будет разделена на 4 очереди:
['celery0', 'celery3', 'celery6', 'celery9']
Если вам нужно больше уровней приоритета, вы можете установить транспортный параметр priority_steps:
app.conf.broker_transport_options = {
'priority_steps': list(range(10)),
'queue_order_strategy': 'priority',
}
При этом учтите, что это никогда не будет так же хорошо, как приоритеты, реализованные на уровне сервера, и может быть в лучшем случае приблизительным. Но это может быть достаточно хорошо для вашего приложения.
AMQP Primer¶
Сообщения¶
Сообщение состоит из заголовков и тела. Celery использует заголовки для хранения типа содержимого сообщения и его кодировки. Тип содержимого обычно является форматом сериализации, используемым для сериализации сообщения. Тело содержит имя задания для выполнения, идентификатор задания (UUID), аргументы для его применения и некоторые дополнительные мета-данные - например, количество повторных попыток или ETA.
Это пример сообщения задачи, представленного в виде словаря Python:
{'task': 'myapp.tasks.add',
'id': '54086c5e-6193-4575-8308-dbab76798756',
'args': [4, 4],
'kwargs': {}}
Производители, потребители и брокеры¶
Клиент, отправляющий сообщения, обычно называется издателем, или производителем, а организация, получающая сообщения, называется потребителем.
Брокер* - это сервер сообщений, который направляет сообщения от производителей к потребителям.
Скорее всего, вы увидите, что эти термины часто используются в материалах, связанных с AMQP.
Обмены, очереди и ключи маршрутизации¶
Сообщения отправляются на биржи.
Обмен направляет сообщения в одну или несколько очередей. Существует несколько типов обменов, обеспечивающих различные способы маршрутизации или реализующих различные сценарии обмена сообщениями.
Сообщение ожидает в очереди до тех пор, пока кто-то его не употребит.
Сообщение удаляется из очереди, когда оно было подтверждено.
Для отправки и получения сообщений необходимо выполнить следующие действия:
Создать обмен
Создайте очередь
Привязать очередь к обмену.
Celery автоматически создает сущности, необходимые для работы очередей в task_queues
(за исключением случаев, когда параметр auto_declare очереди установлен в False
).
Вот пример конфигурации очереди с тремя очередями: одна для видео, одна для изображений и одна очередь по умолчанию для всего остального:
from kombu import Exchange, Queue
app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('videos', Exchange('media'), routing_key='media.video'),
Queue('images', Exchange('media'), routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'
Типы обмена¶
Тип обмена определяет, как сообщения направляются через обмен. Типы обмена, определенные в стандарте: direct, topic, fanout and headers. Also non-standard exchange types are available as plug-ins to RabbitMQ, like the last-value-cache plug-in Майкла Бриджена.
Прямые обмены¶
Прямые обмены совпадают по точным ключам маршрутизации, поэтому очередь, связанная ключом маршрутизации video, получает только сообщения с этим ключом маршрутизации.
Тематические обмены¶
Обмен темами соответствует ключам маршрутизации, используя слова, разделенные точками, и символы подстановки: *
(соответствует одному слову) и #
(соответствует нулю или более слов).
С такими клавишами маршрутизации, как usa.news
, usa.weather
, norway.news
и norway.weather
, привязки могут быть такими: *.news
(все новости), usa.#
(все события в США) или usa.weather
(все погодные события в США).
Практическая работа с API¶
Celery поставляется с инструментом celery amqp, который используется для доступа к AMQP API через командную строку, обеспечивая доступ к таким задачам администрирования, как создание/удаление очередей и обменов, очистка очередей или отправка сообщений. Он также может быть использован для брокеров, не относящихся к AMQP, но различные реализации могут не реализовать все команды.
Вы можете записывать команды непосредственно в аргументы celery amqp, или просто запустить без аргументов, чтобы запустить его в shell-режиме:
$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>
Здесь 1>
- это подсказка. Число 1 - это количество команд, которые вы выполнили на данный момент. Введите help
для получения списка доступных команд. Также поддерживается автозавершение, поэтому вы можете начать вводить команду, а затем нажать клавишу tab для отображения списка возможных совпадений.
Давайте создадим очередь, в которую можно отправлять сообщения:
$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.
Это создало прямой обмен testexchange
, и очередь с именем testqueue
. Очередь привязана к обмену с помощью ключа маршрутизации testkey
.
С этого момента все сообщения, отправленные на обмен testexchange
с ключом маршрутизации testkey
, будут перемещаться в эту очередь. Вы можете отправить сообщение с помощью команды basic.publish
:
4> basic.publish 'This is a message!' testexchange testkey
ok.
Теперь, когда сообщение отправлено, вы можете получить его снова. Здесь можно использовать команду basic.get
, которая синхронно опрашивает новые сообщения в очереди (это нормально для задач обслуживания, но для сервисов лучше использовать basic.consume
).
Вытащить сообщение из очереди:
5> basic.get testqueue
{'body': 'This is a message!',
'delivery_info': {'delivery_tag': 1,
'exchange': u'testexchange',
'message_count': 0,
'redelivered': False,
'routing_key': u'testkey'},
'properties': {}}
AMQP использует квитирование для подтверждения того, что сообщение было получено и успешно обработано. Если сообщение не было подтверждено и канал потребителя закрыт, сообщение будет доставлено другому потребителю.
Обратите внимание на тег доставки, указанный в структуре выше; в канале соединения каждое полученное сообщение имеет уникальный тег доставки, этот тег используется для подтверждения сообщения. Также обратите внимание, что теги доставки не являются уникальными для всех соединений, поэтому в другом клиенте тег доставки 1 может указывать на другое сообщение, чем в этом канале.
Вы можете подтвердить полученное сообщение с помощью basic.ack
:
6> basic.ack 1
ok.
Для очистки после нашей тестовой сессии необходимо удалить созданные вами сущности:
7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.
Задачи маршрутизации¶
Определение очередей¶
В Celery доступные очереди определяются параметром task_queues
.
Вот пример конфигурации очереди с тремя очередями: одна для видео, одна для изображений и одна очередь по умолчанию для всего остального:
default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')
app.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('videos', media_exchange, routing_key='media.video'),
Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'
Здесь task_default_queue
будет использоваться для маршрутизации задач, у которых нет явного маршрута.
Обмен по умолчанию, тип обмена и ключ маршрутизации будут использоваться в качестве значений маршрутизации по умолчанию для задач, а также в качестве значений по умолчанию для записей в task_queues
.
Также поддерживается несколько привязок к одной очереди. Вот пример двух ключей маршрутизации, которые привязаны к одной очереди:
from kombu import Exchange, Queue, binding
media_exchange = Exchange('media', type='direct')
CELERY_QUEUES = (
Queue('media', [
binding(media_exchange, routing_key='media.video'),
binding(media_exchange, routing_key='media.image'),
]),
)
Указание места назначения задачи¶
Место назначения задания определяется следующим (по порядку):
Аргументы маршрутизации для
Task.apply_async()
.Атрибуты, связанные с маршрутизацией, определенные на самом
Task
.Маршрутизаторы определено в
task_routes
.
Считается лучшей практикой не кодировать эти параметры жестко, а оставлять их в качестве опций конфигурации, используя Маршрутизаторы; Это наиболее гибкий подход, но разумные значения по умолчанию все же могут быть установлены в качестве атрибутов задачи.
Маршрутизаторы¶
Маршрутизатор - это функция, которая определяет варианты маршрутизации для задачи.
Все, что вам нужно для определения нового маршрутизатора - это определить функцию с сигнатурой (name, args, kwargs, options, task=None, **kw)
:
def route_task(name, args, kwargs, options, task=None, **kw):
if name == 'myapp.tasks.compress_video':
return {'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
Если вы вернете клавишу queue
, она расширится с заданными настройками этой очереди в task_queues
:
{'queue': 'video', 'routing_key': 'video.compress'}
становится –>
{'queue': 'video',
'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
Вы устанавливаете классы маршрутизатора, добавляя их в настройку task_routes
:
task_routes = (route_task,)
Функции маршрутизатора также могут быть добавлены по имени:
task_routes = ('myapp.routers.route_task',)
Для простых сопоставлений имя задачи -> маршрут, как в примере с маршрутизатором выше, вы можете просто подставить dict в task_routes
, чтобы получить такое же поведение:
task_routes = {
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
},
}
Затем маршрутизаторы будут пройдены по порядку, он остановится на первом маршрутизаторе, вернувшем истинное значение, и использует его в качестве конечного маршрута для задания.
Можно также определить несколько маршрутизаторов в последовательности:
task_routes = [
route_task,
{
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
},
]
Затем маршрутизаторы будут посещаться по очереди, и будет выбран первый, который вернет значение.
Если вы используете Redis или RabbitMQ, вы также можете указать приоритет очередипо умолчанию в маршруте.
task_routes = {
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
'priority': 10,
},
}
Аналогично, вызов apply_async для задачи отменяет приоритет по умолчанию.
task.apply_async(priority=0)
Порядок приоритетов и реакция кластера
Важно отметить, что из-за предварительной выборки рабочих задач, если несколько задач подаются одновременно, они могут сначала оказаться вне порядка приоритета. Отключение предварительной выборки для рабочих предотвратит эту проблему, но может привести к менее чем идеальной производительности для небольших и быстрых задач. В большинстве случаев простое уменьшение worker_prefetch_multiplier до 1 является более простым и чистым способом повысить отзывчивость вашей системы без затрат на полное отключение предварительной выборки.
Обратите внимание, что при использовании брокера redis значения приоритетов сортируются в обратном порядке: 0 - наивысший приоритет.
Трансляция¶
Celery также может поддерживать широковещательную маршрутизацию. Вот пример обмена broadcast_tasks
, который доставляет копии заданий всем подключенным к нему работникам:
from kombu.common import Broadcast
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
'tasks.reload_cache': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks'
}
}
Теперь задание tasks.reload_cache
будет отправлено каждому рабочему, потребляющему из этой очереди.
Вот еще один пример широковещательной маршрутизации, на этот раз с расписанием celery beat:
from kombu.common import Broadcast
from celery.schedules import crontab
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.beat_schedule = {
'test-task': {
'task': 'tasks.reload_cache',
'schedule': crontab(minute=0, hour='*/3'),
'options': {'exchange': 'broadcast_tasks'}
},
}
Трансляция и результаты
Обратите внимание, что Celery result не определяет, что произойдет, если две задачи будут иметь одинаковый task_id. Если одна и та же задача распределена между несколькими работниками, то история состояний может не сохраниться.
В этом случае целесообразно установить атрибут task.ignore_result
.