Сигналы¶
Сигналы позволяют разобщенным приложениям получать уведомления, когда определенные действия происходят в другом месте приложения.
Celery поставляется с множеством сигналов, к которым ваше приложение может подключиться, чтобы дополнить поведение определенных действий.
Основы¶
Несколько видов событий вызывают сигналы, вы можете подключиться к этим сигналам для выполнения действий по мере их срабатывания.
Пример подключения к сигналу after_task_publish:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
Некоторые сигналы также имеют отправителя, по которому их можно фильтровать. Например, сигнал after_task_publish использует имя задачи в качестве отправителя, поэтому, предоставив аргумент sender в connect, вы можете подключить свой обработчик для вызова каждый раз, когда публикуется задача с именем «proj.tasks.add»:
@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
Сигналы используют ту же реализацию, что и django.core.dispatch. В результате другие параметры ключевого слова (например, signal) передаются всем обработчикам сигналов по умолчанию.
Лучшая практика для обработчиков сигналов - принимать произвольные аргументы в виде ключевых слов (т.е. **kwargs). Таким образом, новые версии Celery смогут добавлять дополнительные аргументы, не ломая пользовательский код.
Сигналы¶
Сигналы задачи¶
before_task_publish¶
Добавлено в версии 3.1.
Отправляется перед публикацией задания. Обратите внимание, что это выполняется в процессе отправки задания.
Отправитель - это имя отправляемого задания.
Приводит аргументы:
bodyТело сообщения задачи.
Это отображение, содержащее поля сообщения задачи, см. Версия 2 и << 1 >>> для справки о возможных полях, которые могут быть определены.
exchangeИмя обмена для отправки или объект
Exchange.routing_keyКлюч маршрутизации, который следует использовать при отправке сообщения.
headersСопоставление заголовков приложений (может быть изменено).
propertiesСвойства сообщения (могут быть изменены)
declareСписок сущностей (
Exchange,Queue, илиbinding), которые необходимо объявить перед публикацией сообщения. Может быть изменен.retry_policyОтображение опций повторных попыток. Может быть любым аргументом
kombu.Connection.ensure()и может быть изменен.
after_task_publish¶
Отправляется, когда задача была отправлена брокеру. Обратите внимание, что это выполняется в процессе, который отправил задачу.
Отправитель - это имя отправляемого задания.
Приводит аргументы:
headersЗаголовки сообщений задачи, см. Версия 2 и << 1 >>> для справки о возможных полях, которые могут быть определены.
bodyТело сообщения задачи, см. Версия 2 и << 1 >>> для справки о возможных полях, которые могут быть определены.
exchangeИмя используемого обмена или объекта
Exchange.routing_keyИспользуется ключ маршрутизации.
task_prerun¶
Отправляется перед выполнением задачи.
Отправитель - это выполняемый объект задачи.
Приводит аргументы:
task_idИдентификатор задания, которое должно быть выполнено.
taskВыполняемое задание.
argsЗадачи позиционные аргументы.
kwargsАргументы ключевого слова tasks.
task_postrun¶
Отправляется после выполнения задания.
Sender - это объект задания, который выполняется.
Приводит аргументы:
task_idИдентификатор задания, которое должно быть выполнено.
taskВыполняемое задание.
argsЗадачи позиционные аргументы.
kwargsАргументы ключевого слова tasks.
retvalВозвращаемое значение задачи.
stateНазвание результирующего государства.
task_retry¶
Отправляется, когда задание будет повторно опробовано.
Отправителем является объект задачи.
Приводит аргументы:
requestТекущий запрос задачи.
reasonПричина для повторной попытки (обычно это экземпляр исключения, но всегда может быть принудительным
str).einfoПодробная информация об исключении, включая трассировку (объект
billiard.einfo.ExceptionInfo).
task_success¶
Отправляется при успешном выполнении задания.
Sender - это объект задания, который выполняется.
Приводит аргументы
resultВозвращаемое значение задачи.
task_failure¶
Отправляется при неудачном выполнении задания.
Sender - это объект задания, который выполняется.
Приводит аргументы:
task_idId задания.
exceptionВозникает экземпляр исключения.
argsПозиционные аргументы, с которыми была вызвана задача.
kwargsАргументы с ключевыми словами, с которыми была вызвана задача.
tracebackОбъект трассировки стека.
einfoЭкземпляр
billiard.einfo.ExceptionInfo.
task_internal_error¶
Отправляется при возникновении внутренней ошибки Celery во время выполнения задачи.
Sender - это объект задания, который выполняется.
Приводит аргументы:
task_idId задания.
argsПозиционные аргументы, с которыми была вызвана задача.
kwargsАргументы с ключевыми словами, с которыми была вызвана задача.
requestСловарь исходного запроса. Он предоставляется, поскольку
task.requestможет быть не готов к моменту возникновения исключения.exceptionВозникает экземпляр исключения.
tracebackОбъект трассировки стека.
einfoЭкземпляр
billiard.einfo.ExceptionInfo.
task_received¶
Отправляется, когда задание получено от брокера и готово к выполнению.
Отправитель - это объект потребителя.
Приводит аргументы:
requestЭто экземпляр
Request, а неtask.request. При использовании пула prefork этот сигнал отправляется в родительский процесс, поэтомуtask.requestнедоступен и не должен использоваться. Вместо него используйте этот объект, так как у них много общих полей.
task_revoked¶
Отправляется, когда задание отзывается/прекращается работником.
Отправитель - объект задачи, который отзывается/прекращается.
Приводит аргументы:
requestЭто экземпляр
Request, а неtask.request. При использовании пула prefork этот сигнал отправляется в родительский процесс, поэтомуtask.requestнедоступен и не должен использоваться. Вместо него используйте этот объект, так как у них много общих полей.terminatedУстанавливается в
True, если задание было завершено.signumНомер сигнала, используемый для завершения задачи. Если это
Noneи завершение равноTrue, то следует считатьTERM.expiredУстанавливается в
True, если срок выполнения задания истек.
task_unknown¶
Отправляется, когда рабочий получает сообщение для задачи, которая не зарегистрирована.
Отправителем является рабочий Consumer.
Приводит аргументы:
nameИмя задания не найдено в реестре.
idИдентификатор задачи, найденный в сообщении.
messageОбъект необработанного сообщения.
excПроизошедшая ошибка.
task_rejected¶
Отправляется, когда рабочий получает сообщение неизвестного типа в одну из своих очередей задач.
Отправителем является рабочий Consumer.
Приводит аргументы:
messageОбъект необработанного сообщения.
excОшибка, которая произошла (если таковая имеется).
Призывные сигналы¶
import_modules¶
Этот сигнал посылается, когда программа (worker, beat, shell) и т.д. просит импортировать модули в настройках include и << 1 >>>.
Отправителем является экземпляр приложения.
Рабочие сигналы¶
celeryd_after_setup¶
Этот сигнал посылается после установки рабочего экземпляра, но до вызова его запуска. Это означает, что все очереди из опции celery worker -Q включены, протоколирование настроено и так далее.
Его можно использовать для добавления пользовательских очередей, которые всегда должны потребляться, не обращая внимания на опцию celery worker -Q. Вот пример, который устанавливает прямую очередь для каждого рабочего, эти очереди могут быть использованы для маршрутизации задачи к любому конкретному рабочему:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker
instance.app.amqp.queues.select_add(queue_name)
Приводит аргументы:
senderИмя узла рабочего.
instanceЭто экземпляр
celery.apps.worker.Worker, который должен быть инициализирован. Обратите внимание, что пока были установлены только атрибутыappиhostname(имя узла), а остальная часть__init__не была выполнена.confКонфигурация текущего приложения.
celeryd_init¶
Это первый сигнал, посылаемый при запуске celery worker. sender - это имя хоста рабочего, поэтому этот сигнал может быть использован для настройки конфигурации рабочего:
from celery.signals import celeryd_init
@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
conf.task_default_rate_limit = '10/m'
или настроить конфигурацию для нескольких работников, вы можете не указывать отправителя при подключении:
from celery.signals import celeryd_init
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
if sender in ('worker1@example.com', 'worker2@example.com'):
conf.task_default_rate_limit = '10/m'
if sender == 'worker3@example.com':
conf.worker_prefetch_multiplier = 0
Приводит аргументы:
senderИмя узла рабочего.
instanceЭто экземпляр
celery.apps.worker.Worker, который должен быть инициализирован. Обратите внимание, что пока были установлены только атрибутыappиhostname(имя узла), а остальная часть__init__не была выполнена.confКонфигурация текущего приложения.
optionsПараметры, передаваемые рабочему из аргументов командной строки (включая значения по умолчанию).
worker_init¶
Отправляется перед запуском рабочего.
worker_ready¶
Отправляется, когда работник готов принять работу.
heartbeat_sent¶
Отправляется, когда Celery посылает сердцебиение рабочего.
Отправителем является экземпляр celery.worker.heartbeat.Heart.
worker_shutting_down¶
Отправляется, когда рабочий начинает процесс выключения.
Приводит аргументы:
sigСигнал POSIX, который был получен.
howМетод отключения, теплый или холодный.
exitcodeКод завершения, который будет использоваться при завершении основного процесса.
worker_process_init¶
Отправляется во все дочерние процессы пула при их запуске.
Обратите внимание, что обработчики, присоединенные к этому сигналу, не должны блокироваться более чем на 4 секунды, иначе процесс будет убит, если он не смог запуститься.
worker_process_shutdown¶
Отправляется во все дочерние процессы пула непосредственно перед их завершением.
Примечание: Нет никакой гарантии, что этот сигнал будет отправлен, аналогично блокам finally невозможно гарантировать, что обработчики будут вызваны при выключении, а если они будут вызваны, то могут быть прерваны во время работы.
Приводит аргументы:
pidpid дочернего процесса, который собирается завершить работу.
exitcodeКод завершения, который будет использоваться при завершении дочернего процесса.
worker_shutdown¶
Высылается, когда рабочий собирается отключиться.
Сигналы удара¶
beat_init¶
Отправляется при запуске celery beat (автономном или встроенном).
Отправителем является экземпляр celery.beat.Service.
beat_embedded_init¶
Отправляется в дополнение к сигналу beat_init, когда celery beat запускается как встроенный процесс.
Отправителем является экземпляр celery.beat.Service.
Сигналы эвентов¶
eventlet_pool_started¶
Отправляется, когда пул эвентов был запущен.
Отправителем является экземпляр celery.concurrency.eventlet.TaskPool.
eventlet_pool_preshutdown¶
Отправляется при завершении работы, непосредственно перед тем, как пул eventlet будет запрошен для ожидания оставшихся работников.
Отправителем является экземпляр celery.concurrency.eventlet.TaskPool.
eventlet_pool_postshutdown¶
Отправляется, когда пул был присоединен и рабочий готов к отключению.
Отправителем является экземпляр celery.concurrency.eventlet.TaskPool.
eventlet_pool_apply¶
Отправляется всякий раз, когда задание применяется к пулу.
Отправителем является экземпляр celery.concurrency.eventlet.TaskPool.
Приводит аргументы:
targetЦелевая функция.
argsПозиционные аргументы.
kwargsАргументы с ключевыми словами.
Регистрация сигналов¶
setup_logging¶
Celery не будет конфигурировать регистраторы, если этот сигнал подключен, поэтому вы можете использовать его для полной отмены конфигурации регистрации своими собственными.
Если вы хотите дополнить конфигурацию протоколирования, установленную Celery, вы можете использовать сигналы after_setup_logger и << 1 >>>.
Приводит аргументы:
loglevelУровень объекта протоколирования.
logfileИмя файла журнала.
formatСтрока формата журнала.
colorizeУкажите, будут ли сообщения журнала окрашены или нет.
after_setup_logger¶
Отправляется после настройки каждого глобального регистратора (не регистраторов задач). Используется для дополнения конфигурации регистрации.
Приводит аргументы:
loggerОбъект регистратора.
loglevelУровень объекта протоколирования.
logfileИмя файла журнала.
formatСтрока формата журнала.
colorizeУкажите, будут ли сообщения журнала окрашены или нет.
after_setup_task_logger¶
Отправляется после настройки каждого отдельного регистратора задач. Используется для дополнения конфигурации регистрации.
Приводит аргументы:
loggerОбъект регистратора.
loglevelУровень объекта протоколирования.
logfileИмя файла журнала.
formatСтрока формата журнала.
colorizeУкажите, будут ли сообщения журнала окрашены или нет.
Командные сигналы¶
user_preload_options¶
Этот сигнал посылается после того, как любая из программ командной строки Celery закончит разбор опций предварительной загрузки пользователя.
Его можно использовать для добавления дополнительных аргументов командной строки к команде celery umbrella:
from celery import Celery
from celery import signals
from celery.bin.base import Option
app = Celery()
app.user_options['preload'].add(Option(
'--monitoring', action='store_true',
help='Enable our external monitoring utility, blahblah',
))
@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
if options['monitoring']:
enable_monitoring()
Sender - это экземпляр Command, а значение зависит от вызванной программы (например, для команды umbrella это будет объект CeleryCommand).
Приводит аргументы:
appЭкземпляр приложения.
optionsОтображение разобранных опций предварительной загрузки пользователя (со значениями по умолчанию).
Утратившие силу сигналы¶
task_sent¶
Этот сигнал устарел, вместо него используйте after_task_publish.