Протокол сообщений¶
Сообщения о задачах¶
Версия 2¶
Определение¶
properties = {
'correlation_id': uuid task_id,
'content_type': string mimetype,
'content_encoding': string encoding,
# optional
'reply_to': string queue_or_url,
}
headers = {
'lang': string 'py'
'task': string task,
'id': uuid task_id,
'root_id': uuid root_id,
'parent_id': uuid parent_id,
'group': uuid group_id,
# optional
'meth': string method_name,
'shadow': string alias_name,
'eta': iso8601 ETA,
'expires': iso8601 expires,
'retries': int retries,
'timelimit': (soft, hard),
'argsrepr': str repr(args),
'kwargsrepr': str repr(kwargs),
'origin': str nodename,
}
body = (
object[] args,
Mapping kwargs,
Mapping embed {
'callbacks': Signature[] callbacks,
'errbacks': Signature[] errbacks,
'chain': Signature[] chain,
'chord': Signature chord_callback,
}
)
Пример¶
В этом примере отправляется сообщение задачи с использованием версии 2 протокола:
# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
import json
import os
import socket
task_id = uuid()
args = (2, 2)
kwargs = {}
basic_publish(
message=json.dumps((args, kwargs, None)),
application_headers={
'lang': 'py',
'task': 'proj.tasks.add',
'argsrepr': repr(args),
'kwargsrepr': repr(kwargs),
'origin': '@'.join([os.getpid(), socket.gethostname()])
}
properties={
'correlation_id': task_id,
'content_type': 'application/json',
'content_encoding': 'utf-8',
}
)
Изменения по сравнению с версией 1¶
Версия протокола, определяемая по наличию заголовка сообщения
task
.Поддержка нескольких языков с помощью заголовка
lang
.Работник может перенаправить сообщение работнику, поддерживающему данный язык.
Мета-данные перенесены в заголовки.
Это означает, что рабочие/посредники могут проверять сообщение и принимать решения на основе заголовков без декодирования полезной нагрузки (которая может быть специфичной для языка, например, сериализованной сериализатором pickle, специфичным для Python).
Всегда UTC
Больше нет флага
utc
, поэтому любая информация о времени, в которой отсутствует часовой пояс, будет ожидаться как время UTC.Body - только для данных, специфичных для конкретного языка.
Python хранит args/kwargs и встроенные сигнатуры в body.
Если сообщение использует необработанную кодировку, то необработанные данные будут переданы в качестве единственного аргумента функции.
Java/C и т.д. могут использовать документ Thrift/protobuf в качестве тела
origin
- имя узла, отправляющего задание.Отправка актеру на основе заголовков
task
, << 1 >>>meth
не используется Python, но может быть использован в будущем для указания пар класс+метод.Цепь приобретает выделенное поле.
Сокращение цепочки в рекурсивный аргумент
callbacks
вызывает проблемы при превышении предела рекурсии.Это исправлено в новом протоколе сообщений путем указания списка подписей, каждая задача затем выводит задачу из списка при отправке следующего сообщения:
execute_task(message) chain = embed['chain'] if chain: sig = maybe_signature(chain.pop()) sig.apply_async(chain=chain)
correlation_id
заменяет полеtask_id
.Поля
root_id
иparent_id
помогают отслеживать рабочие потоки.shadow
позволяет указать другое имя для журналов, мониторы могут использоваться для таких понятий, как задачи, вызывающие функцию, указанную в качестве аргумента:from celery.utils.imports import qualname class PickleTask(Task): def unpack_args(self, fun, args=()): return fun, args def apply_async(self, args, kwargs, **options): fun, real_args = self.unpack_args(*args) return super().apply_async( (fun, real_args, kwargs), shadow=qualname(fun), **options ) @app.task(base=PickleTask) def call(fun, args, kwargs): return fun(*args, **kwargs)
Версия 1¶
В версии 1 протокола все поля хранятся в теле сообщения: это означает, что рабочие и промежуточные потребители должны десериализовать полезную нагрузку, чтобы прочитать поля.
Тело сообщения¶
task
- строка:
Название задания. обязательно
id
- строка:
Уникальный идентификатор задания (UUID). обязательно.
args
- список:
Список аргументов. Будет пустым списком, если не указан.
kwargs
- словарь:
Словарь аргументов ключевых слов. Будет пустым словарем, если не указан.
retries
- int:
Текущее количество повторных попыток выполнения этого задания. По умолчанию равно 0, если не указано.
eta
- строка (ISO 8601):
Расчетное время прибытия. Это дата и время в формате ISO 8601. Если это время не указано, сообщение не будет запланировано, но будет выполнено как можно скорее.
expires
- строка (ISO 8601):
Добавлено в версии 2.0.2.
Дата истечения срока действия. Это дата и время в формате ISO 8601. Если дата не указана, срок действия сообщения не истечет. Срок действия сообщения истечет, когда сообщение будет получено и дата истечения срока действия будет превышена.
taskset
- строка:
Группа, в которую входит это задание (если есть).
chord
- Подпись:
Добавлено в версии 2.3.
Обозначает, что данное задание является одной из частей заголовка аккорда. Значение этого ключа является телом аккорда, которое должно быть выполнено после возвращения всех задач в заголовке.
utc
- bool:
Добавлено в версии 2.5.
Если время истинно, то используется часовой пояс UTC, если нет, то должен использоваться текущий местный часовой пояс.
callbacks
- <список>Подпись:
Добавлено в версии 3.0.
Список сигнатур для вызова при успешном завершении задачи.
errbacks
- <список>Подпись:
Добавлено в версии 3.0.
Список сигнатур для вызова в случае возникновения ошибки при выполнении задачи.
timelimit
- < кортеж>(float, float).:
Добавлено в версии 3.1.
Настройки ограничения времени выполнения задачи. Это кортеж из значений жесткого и мягкого лимита времени (int/float или
None
для отсутствия лимита).Пример значения, задающего мягкое ограничение времени в 3 секунды и жесткое ограничение времени в 10 секунд:
{'timelimit': (3.0, 10.0)}
Пример сообщения¶
Это пример вызова задачи celery.task.ping в формате json:
{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
"task": "celery.task.PingTask",
"args": [],
"kwargs": {},
"retries": 0,
"eta": "2009-11-17T12:30:56.527191"}
Сериализация задач¶
С помощью заголовка сообщения content_type поддерживается несколько типов форматов сериализации.
MIME-типы, поддерживаемые по умолчанию, показаны в следующей таблице.
Схема
Тип MIME
json
приложение/json
yaml
приложение/x-yaml
маринованный огурец
application/x-python-serialize
msgpack
application/x-msgpack
Сообщения о событиях¶
Сообщения о событиях всегда сериализуются в формате JSON и могут содержать произвольные поля тела сообщения.
Начиная с версии 4.0. тело может состоять либо из одного отображения (одно событие), либо из списка отображений (несколько событий).
Существуют также стандартные поля, которые всегда должны присутствовать в сообщении о событии:
Стандартные поля тела¶
строка
type
Тип события. Это строка, содержащая категорию и действие, разделенные разделителем тире (например,
task-succeeded
).строка
hostname
Полное имя хоста, на котором произошло событие.
unsigned long long
clock
Значение логических часов для этого события (временная метка Лампорта).
float
timestamp
Временная метка UNIX, соответствующая времени, когда произошло событие.
signed short
utcoffset
Это поле описывает часовой пояс отправляющего узла и указывается как количество часов впереди/позади UTC (например, -2 или +1).
unsigned long long
pid
Идентификатор процесса, в котором произошло событие.
Стандартные типы событий¶
Список стандартных типов событий и их полей см. в Ссылка на событие.
Пример сообщения¶
Это поля сообщения для события task-succeeded
:
properties = {
'routing_key': 'task.succeeded',
'exchange': 'celeryev',
'content_type': 'application/json',
'content_encoding': 'utf-8',
'delivery_mode': 1,
}
headers = {
'hostname': 'worker1@george.vandelay.com',
}
body = {
'type': 'task-succeeded',
'hostname': 'worker1@george.vandelay.com',
'pid': 6335,
'clock': 393912923921,
'timestamp': 1401717709.101747,
'utcoffset': -1,
'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
'retval': '4',
'runtime': 0.0003212,
)