Рефактор «Большого экземпляр໶
Ветка app является незавершенной работой по устранению использования глобальной конфигурации в Celery.
Теперь Celery можно инстанцировать, и несколько экземпляров Celery могут существовать в одном и том же пространстве процессов. Кроме того, крупные детали можно настраивать, не прибегая к «обезьяньим» исправлениям.
Примеры¶
Создание экземпляра Celery:
>>> from celery import Celery
>>> app = Celery()
>>> app.config_from_object('celeryconfig')
>>> #app.config_from_envvar('CELERY_CONFIG_MODULE')
Создание задач:
@app.task
def add(x, y):
return x + y
Создание пользовательских подклассов Task:
Task = celery.create_task_cls()
class DebugTask(Task):
def on_failure(self, *args, **kwargs):
import pdb
pdb.set_trace()
@app.task(base=DebugTask)
def add(x, y):
return x + y
Запуск рабочего:
worker = celery.Worker(loglevel='INFO')
Получение доступа к конфигурации:
celery.conf.task_always_eager = True
celery.conf['task_always_eager'] = True
Контролирующие работники:
>>> celery.control.inspect().active()
>>> celery.control.rate_limit(add.name, '100/m')
>>> celery.control.broadcast('shutdown')
>>> celery.control.discard_all()
Другие интересные атрибуты:
# Establish broker connection.
>>> celery.broker_connection()
# AMQP Specific features.
>>> celery.amqp
>>> celery.amqp.Router
>>> celery.amqp.get_queues()
>>> celery.amqp.get_task_consumer()
# Loader
>>> celery.loader
# Default backend
>>> celery.backend
Как вы, вероятно, видите, это действительно открывает новые возможности настройки.
Утративший силу¶
celery.task.ping
celery.task.PingTask
Уступает команде дистанционного управления ping. Будет удалена в Celery 2.3.
Псевдонимы (ожидает устаревания)¶
celery.task.base
.Task
-> {app.Task
/celery.app.task.Task
}
celery.task.sets
.TaskSet
-> {app.TaskSet
}
celery.decorators
/celery.task
.task
-> {app.task
}
celery.execute
.apply_async
-> {task.apply_async
}.apply
-> {task.apply
}.send_task
-> {app.send_task
}<<< 0 >> -> нет альтернатив.
celery.log
.get_default_logger
-> {app.log.get_default_logger
}.setup_logger
-> {app.log.setup_logger
}.get_task_logger
-> {app.log.get_task_logger
}.setup_task_logger
-> {app.log.setup_task_logger
}.setup_logging_subsystem
-> {app.log.setup_logging_subsystem
}.redirect_stdouts_to_logger
-> {app.log.redirect_stdouts_to_logger
}
celery.messaging
.establish_connection
-> {app.broker_connection
}.with_connection
-> {app.with_connection
}.get_consumer_set
-> {app.amqp.get_task_consumer
}.TaskPublisher
-> {app.amqp.TaskPublisher
}.TaskConsumer
-> {app.amqp.TaskConsumer
}.ConsumerSet
-> {app.amqp.ConsumerSet
}
celery.conf.*
-> {app.conf
}ПРИМЕЧАНИЕ: Все ключи конфигурации теперь называются так же, как и в конфигурации. Таким образом, доступ к ключу
task_always_eager
осуществляется как:>>> app.conf.task_always_eager
вместо:
>>> from celery import conf >>> conf.always_eager
.get_queues
-> {app.amqp.get_queues
}
celery.task.control
.broadcast
-> {app.control.broadcast
}.rate_limit
-> {app.control.rate_limit
}.ping
-> {app.control.ping
}.revoke
-> {app.control.revoke
}.discard_all
-> {app.control.discard_all
}.inspect
-> {app.control.inspect
}
celery.utils.info
.humanize_seconds
->celery.utils.time.humanize_seconds
.textindent
->celery.utils.textindent
.get_broker_info
-> {app.amqp.get_broker_info
}.format_broker_info
-> {app.amqp.format_broker_info
}.format_queues
-> {app.amqp.format_queues
}
Использование приложений по умолчанию¶
Для обеспечения обратной совместимости должно быть возможно использовать все классы/функции без передачи явного экземпляра приложения.
Это достигается тем, что все объекты, зависящие от приложения, используют default_app
, если экземпляр приложения отсутствует.
from celery.app import app_or_default
class SomeClass:
def __init__(self, app=None):
self.app = app_or_default(app)
Проблема такого подхода заключается в том, что существует вероятность того, что экземпляр приложения будет потерян по пути, а все вроде бы работает нормально. Проверить утечку экземпляра приложения сложно. Можно использовать переменную окружения CELERY_TRACE_APP
, при включении которой celery.app.app_or_default()
будет вызывать исключение всякий раз, когда придется возвращаться к экземпляру приложения по умолчанию.
Дерево зависимостей приложений¶
- {
app
} celery.loaders.base.BaseLoader
celery.backends.base.BaseBackend
- {
app.TaskSet
} celery.task.sets.TaskSet
(app.TaskSet
)
- {
- [
app.TaskSetResult
] celery.result.TaskSetResult
(app.TaskSetResult
)
- [
- {
- {
app.AsyncResult
} celery.result.BaseAsyncResult
/celery.result.AsyncResult
- {
celery.bin.worker.WorkerCommand
celery.apps.worker.Worker
celery.worker.WorkerController
celery.worker.consumer.Consumer
celery.worker.request.Request
celery.events.EventDispatcher
celery.worker.control.ControlDispatch
celery.worker.control.registry.Panel
celery.pidbox.BroadcastPublisher
celery.pidbox.BroadcastConsumer
celery.beat.EmbeddedService
celery.bin.events.EvCommand
celery.events.snapshot.evcam
celery.events.snapshot.Polaroid
celery.events.EventReceiver
celery.events.cursesmon.evtop
celery.events.EventReceiver
celery.events.cursesmon.CursesMonitor
celery.events.dumper
celery.events.EventReceiver
celery.bin.amqp.AMQPAdmin
celery.bin.beat.BeatCommand
celery.apps.beat.Beat
celery.beat.Service
celery.beat.Scheduler