Сигналы¶
Сигналы позволяют разобщенным приложениям получать уведомления, когда определенные действия происходят в другом месте приложения.
Celery поставляется с множеством сигналов, к которым ваше приложение может подключиться, чтобы дополнить поведение определенных действий.
Основы¶
Несколько видов событий вызывают сигналы, вы можете подключиться к этим сигналам для выполнения действий по мере их срабатывания.
Пример подключения к сигналу after_task_publish
:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
Некоторые сигналы также имеют отправителя, по которому их можно фильтровать. Например, сигнал after_task_publish
использует имя задачи в качестве отправителя, поэтому, предоставив аргумент sender
в connect
, вы можете подключить свой обработчик для вызова каждый раз, когда публикуется задача с именем «proj.tasks.add»:
@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
Сигналы используют ту же реализацию, что и django.core.dispatch
. В результате другие параметры ключевого слова (например, signal) передаются всем обработчикам сигналов по умолчанию.
Лучшая практика для обработчиков сигналов - принимать произвольные аргументы в виде ключевых слов (т.е. **kwargs
). Таким образом, новые версии Celery смогут добавлять дополнительные аргументы, не ломая пользовательский код.
Сигналы¶
Сигналы задачи¶
before_task_publish
¶
Добавлено в версии 3.1.
Отправляется перед публикацией задания. Обратите внимание, что это выполняется в процессе отправки задания.
Отправитель - это имя отправляемого задания.
Приводит аргументы:
body
Тело сообщения задачи.
Это отображение, содержащее поля сообщения задачи, см. Версия 2 и << 1 >>> для справки о возможных полях, которые могут быть определены.
exchange
Имя обмена для отправки или объект
Exchange
.routing_key
Ключ маршрутизации, который следует использовать при отправке сообщения.
headers
Сопоставление заголовков приложений (может быть изменено).
properties
Свойства сообщения (могут быть изменены)
declare
Список сущностей (
Exchange
,Queue
, илиbinding
), которые необходимо объявить перед публикацией сообщения. Может быть изменен.retry_policy
Отображение опций повторных попыток. Может быть любым аргументом
kombu.Connection.ensure()
и может быть изменен.
after_task_publish
¶
Отправляется, когда задача была отправлена брокеру. Обратите внимание, что это выполняется в процессе, который отправил задачу.
Отправитель - это имя отправляемого задания.
Приводит аргументы:
headers
Заголовки сообщений задачи, см. Версия 2 и << 1 >>> для справки о возможных полях, которые могут быть определены.
body
Тело сообщения задачи, см. Версия 2 и << 1 >>> для справки о возможных полях, которые могут быть определены.
exchange
Имя используемого обмена или объекта
Exchange
.routing_key
Используется ключ маршрутизации.
task_prerun
¶
Отправляется перед выполнением задачи.
Отправитель - это выполняемый объект задачи.
Приводит аргументы:
task_id
Идентификатор задания, которое должно быть выполнено.
task
Выполняемое задание.
args
Задачи позиционные аргументы.
kwargs
Аргументы ключевого слова tasks.
task_postrun
¶
Отправляется после выполнения задания.
Sender - это объект задания, который выполняется.
Приводит аргументы:
task_id
Идентификатор задания, которое должно быть выполнено.
task
Выполняемое задание.
args
Задачи позиционные аргументы.
kwargs
Аргументы ключевого слова tasks.
retval
Возвращаемое значение задачи.
state
Название результирующего государства.
task_retry
¶
Отправляется, когда задание будет повторно опробовано.
Отправителем является объект задачи.
Приводит аргументы:
request
Текущий запрос задачи.
reason
Причина для повторной попытки (обычно это экземпляр исключения, но всегда может быть принудительным
str
).einfo
Подробная информация об исключении, включая трассировку (объект
billiard.einfo.ExceptionInfo
).
task_success
¶
Отправляется при успешном выполнении задания.
Sender - это объект задания, который выполняется.
Приводит аргументы
result
Возвращаемое значение задачи.
task_failure
¶
Отправляется при неудачном выполнении задания.
Sender - это объект задания, который выполняется.
Приводит аргументы:
task_id
Id задания.
exception
Возникает экземпляр исключения.
args
Позиционные аргументы, с которыми была вызвана задача.
kwargs
Аргументы с ключевыми словами, с которыми была вызвана задача.
traceback
Объект трассировки стека.
einfo
Экземпляр
billiard.einfo.ExceptionInfo
.
task_internal_error
¶
Отправляется при возникновении внутренней ошибки Celery во время выполнения задачи.
Sender - это объект задания, который выполняется.
Приводит аргументы:
task_id
Id задания.
args
Позиционные аргументы, с которыми была вызвана задача.
kwargs
Аргументы с ключевыми словами, с которыми была вызвана задача.
request
Словарь исходного запроса. Он предоставляется, поскольку
task.request
может быть не готов к моменту возникновения исключения.exception
Возникает экземпляр исключения.
traceback
Объект трассировки стека.
einfo
Экземпляр
billiard.einfo.ExceptionInfo
.
task_received
¶
Отправляется, когда задание получено от брокера и готово к выполнению.
Отправитель - это объект потребителя.
Приводит аргументы:
request
Это экземпляр
Request
, а неtask.request
. При использовании пула prefork этот сигнал отправляется в родительский процесс, поэтомуtask.request
недоступен и не должен использоваться. Вместо него используйте этот объект, так как у них много общих полей.
task_revoked
¶
Отправляется, когда задание отзывается/прекращается работником.
Отправитель - объект задачи, который отзывается/прекращается.
Приводит аргументы:
request
Это экземпляр
Request
, а неtask.request
. При использовании пула prefork этот сигнал отправляется в родительский процесс, поэтомуtask.request
недоступен и не должен использоваться. Вместо него используйте этот объект, так как у них много общих полей.terminated
Устанавливается в
True
, если задание было завершено.signum
Номер сигнала, используемый для завершения задачи. Если это
None
и завершение равноTrue
, то следует считатьTERM
.expired
Устанавливается в
True
, если срок выполнения задания истек.
task_unknown
¶
Отправляется, когда рабочий получает сообщение для задачи, которая не зарегистрирована.
Отправителем является рабочий Consumer
.
Приводит аргументы:
name
Имя задания не найдено в реестре.
id
Идентификатор задачи, найденный в сообщении.
message
Объект необработанного сообщения.
exc
Произошедшая ошибка.
task_rejected
¶
Отправляется, когда рабочий получает сообщение неизвестного типа в одну из своих очередей задач.
Отправителем является рабочий Consumer
.
Приводит аргументы:
message
Объект необработанного сообщения.
exc
Ошибка, которая произошла (если таковая имеется).
Призывные сигналы¶
import_modules
¶
Этот сигнал посылается, когда программа (worker, beat, shell) и т.д. просит импортировать модули в настройках include
и << 1 >>>.
Отправителем является экземпляр приложения.
Рабочие сигналы¶
celeryd_after_setup
¶
Этот сигнал посылается после установки рабочего экземпляра, но до вызова его запуска. Это означает, что все очереди из опции celery worker -Q
включены, протоколирование настроено и так далее.
Его можно использовать для добавления пользовательских очередей, которые всегда должны потребляться, не обращая внимания на опцию celery worker -Q
. Вот пример, который устанавливает прямую очередь для каждого рабочего, эти очереди могут быть использованы для маршрутизации задачи к любому конкретному рабочему:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker
instance.app.amqp.queues.select_add(queue_name)
Приводит аргументы:
sender
Имя узла рабочего.
instance
Это экземпляр
celery.apps.worker.Worker
, который должен быть инициализирован. Обратите внимание, что пока были установлены только атрибутыapp
иhostname
(имя узла), а остальная часть__init__
не была выполнена.conf
Конфигурация текущего приложения.
celeryd_init
¶
Это первый сигнал, посылаемый при запуске celery worker. sender
- это имя хоста рабочего, поэтому этот сигнал может быть использован для настройки конфигурации рабочего:
from celery.signals import celeryd_init
@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
conf.task_default_rate_limit = '10/m'
или настроить конфигурацию для нескольких работников, вы можете не указывать отправителя при подключении:
from celery.signals import celeryd_init
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
if sender in ('worker1@example.com', 'worker2@example.com'):
conf.task_default_rate_limit = '10/m'
if sender == 'worker3@example.com':
conf.worker_prefetch_multiplier = 0
Приводит аргументы:
sender
Имя узла рабочего.
instance
Это экземпляр
celery.apps.worker.Worker
, который должен быть инициализирован. Обратите внимание, что пока были установлены только атрибутыapp
иhostname
(имя узла), а остальная часть__init__
не была выполнена.conf
Конфигурация текущего приложения.
options
Параметры, передаваемые рабочему из аргументов командной строки (включая значения по умолчанию).
worker_init
¶
Отправляется перед запуском рабочего.
worker_ready
¶
Отправляется, когда работник готов принять работу.
heartbeat_sent
¶
Отправляется, когда Celery посылает сердцебиение рабочего.
Отправителем является экземпляр celery.worker.heartbeat.Heart
.
worker_shutting_down
¶
Отправляется, когда рабочий начинает процесс выключения.
Приводит аргументы:
sig
Сигнал POSIX, который был получен.
how
Метод отключения, теплый или холодный.
exitcode
Код завершения, который будет использоваться при завершении основного процесса.
worker_process_init
¶
Отправляется во все дочерние процессы пула при их запуске.
Обратите внимание, что обработчики, присоединенные к этому сигналу, не должны блокироваться более чем на 4 секунды, иначе процесс будет убит, если он не смог запуститься.
worker_process_shutdown
¶
Отправляется во все дочерние процессы пула непосредственно перед их завершением.
Примечание: Нет никакой гарантии, что этот сигнал будет отправлен, аналогично блокам finally
невозможно гарантировать, что обработчики будут вызваны при выключении, а если они будут вызваны, то могут быть прерваны во время работы.
Приводит аргументы:
pid
pid дочернего процесса, который собирается завершить работу.
exitcode
Код завершения, который будет использоваться при завершении дочернего процесса.
worker_shutdown
¶
Высылается, когда рабочий собирается отключиться.
Сигналы удара¶
beat_init
¶
Отправляется при запуске celery beat (автономном или встроенном).
Отправителем является экземпляр celery.beat.Service
.
beat_embedded_init
¶
Отправляется в дополнение к сигналу beat_init
, когда celery beat запускается как встроенный процесс.
Отправителем является экземпляр celery.beat.Service
.
Сигналы эвентов¶
eventlet_pool_started
¶
Отправляется, когда пул эвентов был запущен.
Отправителем является экземпляр celery.concurrency.eventlet.TaskPool
.
eventlet_pool_preshutdown
¶
Отправляется при завершении работы, непосредственно перед тем, как пул eventlet будет запрошен для ожидания оставшихся работников.
Отправителем является экземпляр celery.concurrency.eventlet.TaskPool
.
eventlet_pool_postshutdown
¶
Отправляется, когда пул был присоединен и рабочий готов к отключению.
Отправителем является экземпляр celery.concurrency.eventlet.TaskPool
.
eventlet_pool_apply
¶
Отправляется всякий раз, когда задание применяется к пулу.
Отправителем является экземпляр celery.concurrency.eventlet.TaskPool
.
Приводит аргументы:
target
Целевая функция.
args
Позиционные аргументы.
kwargs
Аргументы с ключевыми словами.
Регистрация сигналов¶
setup_logging
¶
Celery не будет конфигурировать регистраторы, если этот сигнал подключен, поэтому вы можете использовать его для полной отмены конфигурации регистрации своими собственными.
Если вы хотите дополнить конфигурацию протоколирования, установленную Celery, вы можете использовать сигналы after_setup_logger
и << 1 >>>.
Приводит аргументы:
loglevel
Уровень объекта протоколирования.
logfile
Имя файла журнала.
format
Строка формата журнала.
colorize
Укажите, будут ли сообщения журнала окрашены или нет.
after_setup_logger
¶
Отправляется после настройки каждого глобального регистратора (не регистраторов задач). Используется для дополнения конфигурации регистрации.
Приводит аргументы:
logger
Объект регистратора.
loglevel
Уровень объекта протоколирования.
logfile
Имя файла журнала.
format
Строка формата журнала.
colorize
Укажите, будут ли сообщения журнала окрашены или нет.
after_setup_task_logger
¶
Отправляется после настройки каждого отдельного регистратора задач. Используется для дополнения конфигурации регистрации.
Приводит аргументы:
logger
Объект регистратора.
loglevel
Уровень объекта протоколирования.
logfile
Имя файла журнала.
format
Строка формата журнала.
colorize
Укажите, будут ли сообщения журнала окрашены или нет.
Командные сигналы¶
user_preload_options
¶
Этот сигнал посылается после того, как любая из программ командной строки Celery закончит разбор опций предварительной загрузки пользователя.
Его можно использовать для добавления дополнительных аргументов командной строки к команде celery umbrella:
from celery import Celery
from celery import signals
from celery.bin.base import Option
app = Celery()
app.user_options['preload'].add(Option(
'--monitoring', action='store_true',
help='Enable our external monitoring utility, blahblah',
))
@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
if options['monitoring']:
enable_monitoring()
Sender - это экземпляр Command
, а значение зависит от вызванной программы (например, для команды umbrella это будет объект CeleryCommand
).
Приводит аргументы:
app
Экземпляр приложения.
options
Отображение разобранных опций предварительной загрузки пользователя (со значениями по умолчанию).
Утратившие силу сигналы¶
task_sent
¶
Этот сигнал устарел, вместо него используйте after_task_publish
.