Сигналы

Сигналы позволяют разобщенным приложениям получать уведомления, когда определенные действия происходят в другом месте приложения.

Celery поставляется с множеством сигналов, к которым ваше приложение может подключиться, чтобы дополнить поведение определенных действий.

Основы

Несколько видов событий вызывают сигналы, вы можете подключиться к этим сигналам для выполнения действий по мере их срабатывания.

Пример подключения к сигналу after_task_publish:

from celery.signals import after_task_publish

@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

Некоторые сигналы также имеют отправителя, по которому их можно фильтровать. Например, сигнал after_task_publish использует имя задачи в качестве отправителя, поэтому, предоставив аргумент sender в connect, вы можете подключить свой обработчик для вызова каждый раз, когда публикуется задача с именем «proj.tasks.add»:

@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

Сигналы используют ту же реализацию, что и django.core.dispatch. В результате другие параметры ключевого слова (например, signal) передаются всем обработчикам сигналов по умолчанию.

Лучшая практика для обработчиков сигналов - принимать произвольные аргументы в виде ключевых слов (т.е. **kwargs). Таким образом, новые версии Celery смогут добавлять дополнительные аргументы, не ломая пользовательский код.

Сигналы

Сигналы задачи

before_task_publish

Добавлено в версии 3.1.

Отправляется перед публикацией задания. Обратите внимание, что это выполняется в процессе отправки задания.

Отправитель - это имя отправляемого задания.

Приводит аргументы:

  • body

    Тело сообщения задачи.

    Это отображение, содержащее поля сообщения задачи, см. Версия 2 и << 1 >>> для справки о возможных полях, которые могут быть определены.

  • exchange

    Имя обмена для отправки или объект Exchange.

  • routing_key

    Ключ маршрутизации, который следует использовать при отправке сообщения.

  • headers

    Сопоставление заголовков приложений (может быть изменено).

  • properties

    Свойства сообщения (могут быть изменены)

  • declare

    Список сущностей (Exchange, Queue, или binding), которые необходимо объявить перед публикацией сообщения. Может быть изменен.

  • retry_policy

    Отображение опций повторных попыток. Может быть любым аргументом kombu.Connection.ensure() и может быть изменен.

after_task_publish

Отправляется, когда задача была отправлена брокеру. Обратите внимание, что это выполняется в процессе, который отправил задачу.

Отправитель - это имя отправляемого задания.

Приводит аргументы:

  • headers

    Заголовки сообщений задачи, см. Версия 2 и << 1 >>> для справки о возможных полях, которые могут быть определены.

  • body

    Тело сообщения задачи, см. Версия 2 и << 1 >>> для справки о возможных полях, которые могут быть определены.

  • exchange

    Имя используемого обмена или объекта Exchange.

  • routing_key

    Используется ключ маршрутизации.

task_prerun

Отправляется перед выполнением задачи.

Отправитель - это выполняемый объект задачи.

Приводит аргументы:

  • task_id

    Идентификатор задания, которое должно быть выполнено.

  • task

    Выполняемое задание.

  • args

    Задачи позиционные аргументы.

  • kwargs

    Аргументы ключевого слова tasks.

task_postrun

Отправляется после выполнения задания.

Sender - это объект задания, который выполняется.

Приводит аргументы:

  • task_id

    Идентификатор задания, которое должно быть выполнено.

  • task

    Выполняемое задание.

  • args

    Задачи позиционные аргументы.

  • kwargs

    Аргументы ключевого слова tasks.

  • retval

    Возвращаемое значение задачи.

  • state

    Название результирующего государства.

task_retry

Отправляется, когда задание будет повторно опробовано.

Отправителем является объект задачи.

Приводит аргументы:

  • request

    Текущий запрос задачи.

  • reason

    Причина для повторной попытки (обычно это экземпляр исключения, но всегда может быть принудительным str).

  • einfo

    Подробная информация об исключении, включая трассировку (объект billiard.einfo.ExceptionInfo).

task_success

Отправляется при успешном выполнении задания.

Sender - это объект задания, который выполняется.

Приводит аргументы

  • result

    Возвращаемое значение задачи.

task_failure

Отправляется при неудачном выполнении задания.

Sender - это объект задания, который выполняется.

Приводит аргументы:

  • task_id

    Id задания.

  • exception

    Возникает экземпляр исключения.

  • args

    Позиционные аргументы, с которыми была вызвана задача.

  • kwargs

    Аргументы с ключевыми словами, с которыми была вызвана задача.

  • traceback

    Объект трассировки стека.

  • einfo

    Экземпляр billiard.einfo.ExceptionInfo.

task_internal_error

Отправляется при возникновении внутренней ошибки Celery во время выполнения задачи.

Sender - это объект задания, который выполняется.

Приводит аргументы:

  • task_id

    Id задания.

  • args

    Позиционные аргументы, с которыми была вызвана задача.

  • kwargs

    Аргументы с ключевыми словами, с которыми была вызвана задача.

  • request

    Словарь исходного запроса. Он предоставляется, поскольку task.request может быть не готов к моменту возникновения исключения.

  • exception

    Возникает экземпляр исключения.

  • traceback

    Объект трассировки стека.

  • einfo

    Экземпляр billiard.einfo.ExceptionInfo.

task_received

Отправляется, когда задание получено от брокера и готово к выполнению.

Отправитель - это объект потребителя.

Приводит аргументы:

  • request

    Это экземпляр Request, а не task.request. При использовании пула prefork этот сигнал отправляется в родительский процесс, поэтому task.request недоступен и не должен использоваться. Вместо него используйте этот объект, так как у них много общих полей.

task_revoked

Отправляется, когда задание отзывается/прекращается работником.

Отправитель - объект задачи, который отзывается/прекращается.

Приводит аргументы:

  • request

    Это экземпляр Request, а не task.request. При использовании пула prefork этот сигнал отправляется в родительский процесс, поэтому task.request недоступен и не должен использоваться. Вместо него используйте этот объект, так как у них много общих полей.

  • terminated

    Устанавливается в True, если задание было завершено.

  • signum

    Номер сигнала, используемый для завершения задачи. Если это None и завершение равно True, то следует считать TERM.

  • expired

    Устанавливается в True, если срок выполнения задания истек.

task_unknown

Отправляется, когда рабочий получает сообщение для задачи, которая не зарегистрирована.

Отправителем является рабочий Consumer.

Приводит аргументы:

  • name

    Имя задания не найдено в реестре.

  • id

    Идентификатор задачи, найденный в сообщении.

  • message

    Объект необработанного сообщения.

  • exc

    Произошедшая ошибка.

task_rejected

Отправляется, когда рабочий получает сообщение неизвестного типа в одну из своих очередей задач.

Отправителем является рабочий Consumer.

Приводит аргументы:

  • message

    Объект необработанного сообщения.

  • exc

    Ошибка, которая произошла (если таковая имеется).

Призывные сигналы

import_modules

Этот сигнал посылается, когда программа (worker, beat, shell) и т.д. просит импортировать модули в настройках include и << 1 >>>.

Отправителем является экземпляр приложения.

Рабочие сигналы

celeryd_after_setup

Этот сигнал посылается после установки рабочего экземпляра, но до вызова его запуска. Это означает, что все очереди из опции celery worker -Q включены, протоколирование настроено и так далее.

Его можно использовать для добавления пользовательских очередей, которые всегда должны потребляться, не обращая внимания на опцию celery worker -Q. Вот пример, который устанавливает прямую очередь для каждого рабочего, эти очереди могут быть использованы для маршрутизации задачи к любому конкретному рабочему:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
    queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
    instance.app.amqp.queues.select_add(queue_name)

Приводит аргументы:

  • sender

    Имя узла рабочего.

  • instance

    Это экземпляр celery.apps.worker.Worker, который должен быть инициализирован. Обратите внимание, что пока были установлены только атрибуты app и hostname (имя узла), а остальная часть __init__ не была выполнена.

  • conf

    Конфигурация текущего приложения.

celeryd_init

Это первый сигнал, посылаемый при запуске celery worker. sender - это имя хоста рабочего, поэтому этот сигнал может быть использован для настройки конфигурации рабочего:

from celery.signals import celeryd_init

@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
    conf.task_default_rate_limit = '10/m'

или настроить конфигурацию для нескольких работников, вы можете не указывать отправителя при подключении:

from celery.signals import celeryd_init

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    if sender in ('worker1@example.com', 'worker2@example.com'):
        conf.task_default_rate_limit = '10/m'
    if sender == 'worker3@example.com':
        conf.worker_prefetch_multiplier = 0

Приводит аргументы:

  • sender

    Имя узла рабочего.

  • instance

    Это экземпляр celery.apps.worker.Worker, который должен быть инициализирован. Обратите внимание, что пока были установлены только атрибуты app и hostname (имя узла), а остальная часть __init__ не была выполнена.

  • conf

    Конфигурация текущего приложения.

  • options

    Параметры, передаваемые рабочему из аргументов командной строки (включая значения по умолчанию).

worker_init

Отправляется перед запуском рабочего.

worker_ready

Отправляется, когда работник готов принять работу.

heartbeat_sent

Отправляется, когда Celery посылает сердцебиение рабочего.

Отправителем является экземпляр celery.worker.heartbeat.Heart.

worker_shutting_down

Отправляется, когда рабочий начинает процесс выключения.

Приводит аргументы:

  • sig

    Сигнал POSIX, который был получен.

  • how

    Метод отключения, теплый или холодный.

  • exitcode

    Код завершения, который будет использоваться при завершении основного процесса.

worker_process_init

Отправляется во все дочерние процессы пула при их запуске.

Обратите внимание, что обработчики, присоединенные к этому сигналу, не должны блокироваться более чем на 4 секунды, иначе процесс будет убит, если он не смог запуститься.

worker_process_shutdown

Отправляется во все дочерние процессы пула непосредственно перед их завершением.

Примечание: Нет никакой гарантии, что этот сигнал будет отправлен, аналогично блокам finally невозможно гарантировать, что обработчики будут вызваны при выключении, а если они будут вызваны, то могут быть прерваны во время работы.

Приводит аргументы:

  • pid

    pid дочернего процесса, который собирается завершить работу.

  • exitcode

    Код завершения, который будет использоваться при завершении дочернего процесса.

worker_shutdown

Высылается, когда рабочий собирается отключиться.

Сигналы удара

beat_init

Отправляется при запуске celery beat (автономном или встроенном).

Отправителем является экземпляр celery.beat.Service.

beat_embedded_init

Отправляется в дополнение к сигналу beat_init, когда celery beat запускается как встроенный процесс.

Отправителем является экземпляр celery.beat.Service.

Сигналы эвентов

eventlet_pool_started

Отправляется, когда пул эвентов был запущен.

Отправителем является экземпляр celery.concurrency.eventlet.TaskPool.

eventlet_pool_preshutdown

Отправляется при завершении работы, непосредственно перед тем, как пул eventlet будет запрошен для ожидания оставшихся работников.

Отправителем является экземпляр celery.concurrency.eventlet.TaskPool.

eventlet_pool_postshutdown

Отправляется, когда пул был присоединен и рабочий готов к отключению.

Отправителем является экземпляр celery.concurrency.eventlet.TaskPool.

eventlet_pool_apply

Отправляется всякий раз, когда задание применяется к пулу.

Отправителем является экземпляр celery.concurrency.eventlet.TaskPool.

Приводит аргументы:

  • target

    Целевая функция.

  • args

    Позиционные аргументы.

  • kwargs

    Аргументы с ключевыми словами.

Регистрация сигналов

setup_logging

Celery не будет конфигурировать регистраторы, если этот сигнал подключен, поэтому вы можете использовать его для полной отмены конфигурации регистрации своими собственными.

Если вы хотите дополнить конфигурацию протоколирования, установленную Celery, вы можете использовать сигналы after_setup_logger и << 1 >>>.

Приводит аргументы:

  • loglevel

    Уровень объекта протоколирования.

  • logfile

    Имя файла журнала.

  • format

    Строка формата журнала.

  • colorize

    Укажите, будут ли сообщения журнала окрашены или нет.

after_setup_logger

Отправляется после настройки каждого глобального регистратора (не регистраторов задач). Используется для дополнения конфигурации регистрации.

Приводит аргументы:

  • logger

    Объект регистратора.

  • loglevel

    Уровень объекта протоколирования.

  • logfile

    Имя файла журнала.

  • format

    Строка формата журнала.

  • colorize

    Укажите, будут ли сообщения журнала окрашены или нет.

after_setup_task_logger

Отправляется после настройки каждого отдельного регистратора задач. Используется для дополнения конфигурации регистрации.

Приводит аргументы:

  • logger

    Объект регистратора.

  • loglevel

    Уровень объекта протоколирования.

  • logfile

    Имя файла журнала.

  • format

    Строка формата журнала.

  • colorize

    Укажите, будут ли сообщения журнала окрашены или нет.

Командные сигналы

user_preload_options

Этот сигнал посылается после того, как любая из программ командной строки Celery закончит разбор опций предварительной загрузки пользователя.

Его можно использовать для добавления дополнительных аргументов командной строки к команде celery umbrella:

from celery import Celery
from celery import signals
from celery.bin.base import Option

app = Celery()
app.user_options['preload'].add(Option(
    '--monitoring', action='store_true',
    help='Enable our external monitoring utility, blahblah',
))

@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
    if options['monitoring']:
        enable_monitoring()

Sender - это экземпляр Command, а значение зависит от вызванной программы (например, для команды umbrella это будет объект CeleryCommand).

Приводит аргументы:

  • app

    Экземпляр приложения.

  • options

    Отображение разобранных опций предварительной загрузки пользователя (со значениями по умолчанию).

Утратившие силу сигналы

task_sent

Этот сигнал устарел, вместо него используйте after_task_publish.

Back to Top