Приложение¶
Библиотека Celery должна быть инстанцирована перед использованием, этот экземпляр называется приложением (или app для краткости).
Приложение является потокобезопасным, поэтому несколько приложений Celery с различными конфигурациями, компонентами и задачами могут сосуществовать в одном пространстве процессов.
Давайте создадим его сейчас:
>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>
Последняя строка показывает текстовое представление приложения: включая имя класса app (Celery
), имя текущего главного модуля (__main__
) и адрес памяти объекта (0x100469fd0
).
Основное название¶
Только одно из них важно, и это имя главного модуля. Давайте рассмотрим, почему это так.
Когда вы посылаете сообщение о задаче в Celery, это сообщение не содержит исходного кода, а только имя задачи, которую вы хотите выполнить. Это работает подобно тому, как работают имена хостов в интернете: каждый рабочий поддерживает отображение имен задач на их фактические функции, называемое реестром задач.
Всякий раз, когда вы определяете задачу, эта задача также добавляется в локальный реестр:
>>> @app.task
... def add(x, y):
... return x + y
>>> add
<@task: __main__.add>
>>> add.name
__main__.add
>>> app.tasks['__main__.add']
<@task: __main__.add>
и здесь вы снова видите __main__
; всякий раз, когда Celery не может определить, к какому модулю принадлежит функция, он использует имя главного модуля для генерации начала имени задачи.
Это является проблемой только в ограниченном наборе случаев использования:
Если модуль, в котором определена задача, выполняется как программа.
Если приложение создается в оболочке Python (REPL).
Например, здесь, где модуль задач также используется для запуска рабочего с app.worker_main()
:
tasks.py
:
from celery import Celery
app = Celery()
@app.task
def add(x, y): return x + y
if __name__ == '__main__':
app.worker_main()
При выполнении этого модуля задачи будут называться, начиная с «__main__
», но когда модуль импортируется другим процессом, скажем, для вызова задачи, задачи будут называться, начиная с «tasks
». (настоящее имя модуля):
>>> from tasks import add
>>> add.name
tasks.add
Вы можете задать другое имя для главного модуля:
>>> app = Celery('tasks')
>>> app.main
'tasks'
>>> @app.task
... def add(x, y):
... return x + y
>>> add.name
tasks.add
См.также
Конфигурация¶
Существует несколько опций, которые могут изменить работу Celery. Эти параметры можно установить непосредственно на экземпляре приложения или использовать специальный модуль конфигурации.
Конфигурация доступна в виде app.conf
:
>>> app.conf.timezone
'Europe/London'
где вы также можете напрямую задать значения конфигурации:
>>> app.conf.enable_utc = True
или обновить несколько ключей одновременно, используя метод update
:
>>> app.conf.update(
... enable_utc=True,
... timezone='Europe/London',
...)
Объект конфигурации состоит из нескольких словарей, которые просматриваются по порядку:
Изменения, внесенные во время выполнения.
Модуль конфигурации (при наличии)
Конфигурация по умолчанию (
celery.app.defaults
).
Вы можете даже добавить новые источники по умолчанию, используя метод app.add_defaults()
.
См.также
Перейдите в раздел Configuration reference для получения полного списка всех доступных настроек и их значений по умолчанию.
config_from_object
¶
Метод app.config_from_object()
загружает конфигурацию из объекта конфигурации.
Это может быть модуль конфигурации или любой объект с атрибутами конфигурации.
Обратите внимание, что любая ранее заданная конфигурация будет сброшена при вызове config_from_object()
. Если вы хотите установить дополнительную конфигурацию, вы должны сделать это после.
Пример 1: Использование имени модуля¶
Метод app.config_from_object()
может принимать полное имя модуля Python или даже имя атрибута Python, например: "celeryconfig"
, "myproj.config.celery"
или "myproj.config:CeleryConfig"
:
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
Модуль celeryconfig
может выглядеть следующим образом:
celeryconfig.py
:
enable_utc = True
timezone = 'Europe/London'
и приложение сможет использовать его до тех пор, пока import celeryconfig
возможно.
Пример 2: Передача фактического объекта модуля¶
Вы также можете передать уже импортированный объект модуля, но это не всегда рекомендуется.
Совет
Рекомендуется использовать имя модуля, так как это означает, что модуль не нужно сериализовать при использовании пула prefork. Если у вас возникли проблемы с конфигурацией или ошибки pickle, попробуйте использовать имя модуля.
import celeryconfig
from celery import Celery
app = Celery()
app.config_from_object(celeryconfig)
Пример 3: Использование класса/объекта конфигурации¶
from celery import Celery
app = Celery()
class Config:
enable_utc = True
timezone = 'Europe/London'
app.config_from_object(Config)
# or using the fully qualified name of the object:
# app.config_from_object('module:Config')
config_from_envvar
¶
app.config_from_envvar()
берет имя модуля конфигурации из переменной окружения
Например – для загрузки конфигурации из модуля, указанного в переменной окружения с именем CELERY_CONFIG_MODULE
:
import os
from celery import Celery
#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')
Затем вы можете указать модуль конфигурации для использования через среду:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO
Конфигурация с цензурой¶
Если вы когда-нибудь захотите распечатать конфигурацию в качестве отладочной информации или чего-то подобного, вы также можете захотеть отфильтровать конфиденциальную информацию, такую как пароли и ключи API.
Celery поставляется с несколькими утилитами, полезными для представления конфигурации, одна из них - humanize()
:
>>> app.conf.humanize(with_defaults=False, censored=True)
Этот метод возвращает конфигурацию в виде табулированной строки. По умолчанию она будет содержать только изменения конфигурации, но вы можете включить встроенные ключи и значения по умолчанию, включив аргумент with_defaults
.
Если вместо этого вы хотите работать с конфигурацией как со словарем, вы можете использовать метод table()
:
>>> app.conf.table(with_defaults=False, censored=True)
Обратите внимание, что Celery не сможет удалить всю конфиденциальную информацию, поскольку он просто использует регулярное выражение для поиска ключей с общим названием. Если вы добавляете пользовательские настройки, содержащие конфиденциальную информацию, вам следует назвать ключи именами, которые Celery идентифицирует как секретные.
Настройка конфигурации будет подвергнута цензуре, если имя содержит любую из этих подстрок:
API
, TOKEN
, KEY
, SECRET
, PASS
, SIGNATURE
, DATABASE
Лень¶
Экземпляр приложения является «ленивым», то есть он не будет оцениваться до тех пор, пока он действительно не понадобится.
Создание экземпляра Celery
приведет только к следующему:
Создайте логический экземпляр часов, используемый для событий.
Создайте реестр задач.
Установить себя в качестве текущего приложения (но не в том случае, если аргумент
set_as_current
был отключен)Вызовите обратный вызов
app.on_init()
(по умолчанию ничего не делает).
Декораторы app.task()
не создают задачи в момент определения задачи, вместо этого они откладывают создание задачи либо на момент использования задачи, либо после финализации приложения,
Этот пример показывает, что задача не создается до тех пор, пока вы не используете задачу или не получите доступ к атрибуту (в данном случае repr()
):
>>> @app.task
>>> def add(x, y):
... return x + y
>>> type(add)
<class 'celery.local.PromiseProxy'>
>>> add.__evaluated__()
False
>>> add # <-- causes repr(add) to happen
<@task: __main__.add>
>>> add.__evaluated__()
True
Финализация приложения происходит либо явно путем вызова app.finalize()
– либо неявно путем обращения к атрибуту app.tasks
.
Доработка объекта будет:
Копирование задач, которые должны быть разделены между приложениями
По умолчанию задачи являются общими, но если аргумент
shared
в декораторе задачи отключен, то задача будет приватной для приложения, к которому она привязана.Оцените все находящиеся на рассмотрении декораторы задач.
Убедитесь, что все задачи привязаны к текущему приложению.
Задачи привязываются к приложению, чтобы они могли считывать значения по умолчанию из конфигурации.
Разрыв цепи¶
Хотя можно зависеть от установки текущего приложения, лучшей практикой является постоянная передача экземпляра приложения всему, что в нем нуждается.
Я называю это «цепочкой приложений», поскольку она создает цепочку экземпляров в зависимости от передаваемого приложения.
Следующий пример считается плохой практикой:
from celery import current_app
class Scheduler:
def run(self):
app = current_app
Вместо этого он должен принимать в качестве аргумента app
:
class Scheduler:
def __init__(self, app):
self.app = app
Внутри Celery использует функцию celery.app.app_or_default()
, чтобы все работало и в API совместимости на основе модулей
from celery.app import app_or_default
class Scheduler:
def __init__(self, app=None):
self.app = app_or_default(app)
В процессе разработки вы можете установить переменную окружения CELERY_TRACE_APP
, чтобы вызвать исключение при разрыве цепочки приложений:
$ CELERY_TRACE_APP=1 celery worker -l INFO
Абстрактные задачи¶
Все задачи, созданные с помощью декоратора task()
, будут наследоваться от базового класса приложения Task
.
Вы можете указать другой базовый класс, используя аргумент base
:
@app.task(base=OtherTask):
def add(x, y):
return x + y
Для создания пользовательского класса задачи необходимо наследоваться от нейтрального базового класса: celery.Task
.
from celery import Task
class DebugTask(Task):
def __call__(self, *args, **kwargs):
print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
return self.run(*args, **kwargs)
Совет
Если вы переопределите метод __call__
задачи, то очень важно, чтобы вы также вызвали self.run
для выполнения тела задачи. Не вызывайте super().__call__
. Метод __call__
нейтрального базового класса celery.Task
присутствует только для справки. Для оптимизации он был развернут в celery.app.trace.build_tracer.trace_task
, который вызывает run
непосредственно на пользовательском классе задачи, если не определен метод __call__
.
Нейтральный базовый класс является особенным, поскольку он еще не привязан к какому-либо конкретному приложению. Когда задача будет привязана к приложению, она будет считывать конфигурацию, чтобы установить значения по умолчанию, и так далее.
Для реализации базового класса необходимо создать задачу, используя декоратор app.task()
:
@app.task(base=DebugTask)
def add(x, y):
return x + y
Можно даже изменить базовый класс по умолчанию для приложения, изменив его атрибут app.Task()
:
>>> from celery import Celery, Task
>>> app = Celery()
>>> class MyBaseTask(Task):
... queue = 'hipri'
>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>
>>> @app.task
... def add(x, y):
... return x + y
>>> add
<@task: __main__.add>
>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
<unbound MyBaseTask>,
<unbound Task>,
<type 'object'>]