Что нового в Celery 3.1 (Cipater)¶
- Автор:
Спросите Солема (
ask at celeryproject.org
)
Celery - это простая, гибкая и надежная распределенная система для обработки огромного количества сообщений, предоставляющая операторам инструменты, необходимые для обслуживания такой системы.
Это очередь задач, ориентированная на обработку в реальном времени, а также поддерживающая планирование задач.
Celery имеет большое и разнообразное сообщество пользователей и разработчиков, вам стоит присоединиться к нам on IRC или << 1 >>>.
Чтобы узнать больше о сельдерее, вам следует перейти по ссылке introduction.
Хотя эта версия обратно совместима с предыдущими версиями, важно, чтобы вы прочитали следующий раздел.
Эта версия официально поддерживается на CPython 2.6, 2.7 и 3.3, а также поддерживается на PyPy.
Предисловие¶
Тупики давно мучают наших рабочих, и хотя они редкость, они неприемлемы. Они также печально известны тем, что их очень трудно диагностировать и воспроизвести, поэтому, чтобы облегчить эту работу, я написал набор стресс-тестов, который бомбардирует рабочий различными задачами в попытке сломать его.
Что произойдет, если тысячи дочерних процессов рабочего будут убиваться каждую секунду? Что, если мы также будем убивать соединение с брокером каждые 10 секунд? Это примеры того, что пакет стресс-тестов будет делать с рабочим, и он повторно проводит эти тесты, используя различные комбинации конфигурации, чтобы найти крайние ошибки.
В итоге мне пришлось переписать пул prefork, чтобы избежать использования семафора POSIX. Это было чрезвычайно сложно, но после нескольких месяцев напряженной работы рабочий наконец-то прошел набор стресс-тестов.
Возможно, мы найдем еще больше ошибок, но хорошая новость заключается в том, что теперь у нас есть инструмент для их воспроизведения, так что если вам не повезет и вы столкнетесь с ошибкой, мы напишем тест для нее и устраним ее!
Обратите внимание, что я также перевел многие брокерские транспорты в статус экспериментальных: единственными транспортами, рекомендованными для использования в производстве, на сегодняшний день являются RabbitMQ и Redis.
У меня нет ресурсов, чтобы поддерживать их все, поэтому ошибки остаются нерешенными. Я бы хотел, чтобы кто-нибудь взял на себя ответственность за эти транспорты или пожертвовал ресурсы для их улучшения, но в нынешней ситуации я не думаю, что качество соответствует остальной кодовой базе, поэтому я не могу рекомендовать их для использования в производстве.
В следующей версии Celery 4.0 основное внимание будет уделено производительности и удалению редко используемых частей библиотеки. Также началась работа над новым протоколом сообщений, поддержкой нескольких языков и многим другим. Первоначальный проект можно найти here.
Это, вероятно, был самый трудный релиз, над которым я работал, поэтому ни одно вступление к этому журналу изменений не будет полным без огромного спасибо всем, кто внес свой вклад и помог мне протестировать его!
Спасибо за вашу поддержку!
- Спроси Солема
Важные замечания¶
Отказано в поддержке Python 2.5¶
Для Celery теперь требуется Python 2.6 или более поздней версии.
Новая двойная кодовая база работает как на Python 2, так и на 3, не требуя инструмента переноса 2to3
.
Примечание
Это также последняя версия, поддерживающая Python 2.6! Начиная с Celery 4.0 и далее потребуется Python 2.7 или более поздняя версия.
Последняя версия, включающая Pickle по умолчанию¶
Начиная с Celery 4.0 сериализатором по умолчанию будет json.
Если вы зависите от принятия pickle, вам следует подготовиться к этому изменению, явно разрешив своему работнику потреблять pickled-сообщения с помощью параметра CELERY_ACCEPT_CONTENT
:
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
Убедитесь, что вы выбрали только те форматы сериализации, которые вы действительно будете использовать, и убедитесь, что вы должным образом защитили свой брокер от нежелательного доступа (см. Security Guide).
Рабочий выдаст предупреждение об устаревании, если вы не определите этот параметр.
Старые программы командной строки удалены и устарели¶
Все должны перейти на новую команду celery umbrella, поэтому мы постепенно сокращаем старые названия команд.
В этой версии мы удалили все команды, которые не используются в init-скриптах. Остальные будут удалены в версии 4.0.
Программа |
Новый статус |
Замена |
---|---|---|
|
ДЕПРЕКАТИРОВАНО |
celery worker |
|
ДЕПРЕКАТИРОВАНО |
celery beat |
|
ДЕПРЕКАТИРОВАНО |
celery multi |
|
УДАЛЕНО |
celery inspect|control |
|
УДАЛЕНО |
celery events |
|
УДАЛЕНО |
celery amqp |
Если это не новая установка, то вы можете удалить старые команды:
$ pip uninstall celery
$ # repeat until it fails
# ...
$ pip uninstall celery
$ pip install celery
Пожалуйста, выполните celery --help для получения помощи с помощью команды umbrella.
Новости¶
Улучшение бассейна в Префорке¶
Эти улучшения активны только в том случае, если вы используете транспорт с поддержкой async. Это означает, что на данный момент поддерживаются только RabbitMQ (AMQP) и Redis, а для других транспортов по-прежнему будет использоваться реализация резервного копирования на основе потоков.
Теперь пул использует одну очередь IPC на каждый дочерний процесс.
Ранее пул разделял одну очередь между всеми дочерними процессами, используя семафор POSIX в качестве мьютекса для достижения эксклюзивного доступа на чтение и запись.
Семафор POSIX теперь удален, и каждый дочерний процесс получает выделенную очередь. Это означает, что рабочему потребуется больше файловых дескрипторов (два дескриптора на процесс), но это также означает, что производительность повысилась, и мы можем отправлять работу отдельным дочерним процессам.
Семафоры POSIX не освобождаются при завершении процесса, поэтому уничтожение процессов может привести к тупику, если это произойдет во время получения семафора. Хорошего решения для исправления этого не существует, поэтому лучшим вариантом было удалить семафор.
Асинхронные операции записи
Теперь пул использует асинхронный ввод-вывод для отправки работы дочерним процессам.
Обнаружение потерянного процесса теперь происходит мгновенно.
Если дочерний процесс был убит или загадочно завершен, пул ранее должен был ждать 30 секунд, прежде чем пометить задание символом
WorkerLostError
. Он должен был это делать, потому что out-queue была общей для всех процессов, и пул не мог быть уверен, выполнил ли процесс задачу или нет. Поэтому был выбран произвольный тайм-аут в 30 секунд, так как считалось, что к этому моменту out-queue будет исчерпан.Этот тайм-аут больше не нужен, и поэтому задачу можно пометить как неудавшуюся, как только пул получит уведомление о завершении процесса.
Исправлены редкие состояния гонки
О большинстве из этих ошибок нам не сообщали, но они были обнаружены во время проведения нового набора стресс-тестов.
Оговорки¶
Django поддерживается из коробки¶
Celery 3.0 представил новый блестящий API, но, к сожалению, в нем не было решения для пользователей Django.
В этой версии ситуация меняется, поскольку Django теперь поддерживается в ядре, и новые пользователи Django, приходящие в Celery, теперь должны использовать новый API напрямую.
В сообществе Django принято, что для каждой библиотеки существует отдельный пакет django-x
, действующий как мост между Django и библиотекой.
Наличие отдельного проекта для пользователей Django было болью для Celery, с несколькими трекерами проблем и несколькими источниками документации, и, наконец, начиная с версии 3.0, у нас даже были разные API.
В этой версии мы бросаем вызов тому, что конвенция и пользователи Django будут использовать ту же библиотеку, тот же API и ту же документацию, что и все остальные.
Не стоит торопиться переносить существующий код для использования нового API, но если вы хотите поэкспериментировать с ним, вам следует знать об этом:
Вам необходимо использовать экземпляр приложения Celery.
Новый API Celery, представленный в версии 3.0, требует от пользователей инстанцирования библиотеки путем создания приложения:
from celery import Celery app = Celery()
Вам необходимо явно интегрировать Celery с Django
Celery не будет автоматически использовать настройки Django, поэтому вы можете либо настроить Celery отдельно, либо указать ему использовать настройки Django:
app.config_from_object('django.conf:settings')
Он также не будет автоматически обходить установленные приложения в поисках модулей задач. Если вам нужно такое поведение, вы должны явно передать список экземпляров Django в приложение Celery:
from django.conf import settings app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Вы больше не используете
manage.py
Вместо этого вы используете непосредственно команду celery:
$ celery -A proj worker -l info
Чтобы это работало, ваш модуль приложения должен хранить переменную окружения
DJANGO_SETTINGS_MODULE
, см. пример в Django guide.
Чтобы начать работу с новым API, вам следует сначала прочитать учебник Первые шаги с сельдереем, а затем - инструкции по работе с Django в Первые шаги в работе с Django.
Исправления и улучшения, применяемые библиотекой django-celery, теперь автоматически применяются ядром Celery, когда оно обнаруживает, что установлена переменная окружения DJANGO_SETTINGS_MODULE
.
Дистрибутив поставляется с новым примером проекта, использующего Django в examples/django
:
https://github.com/celery/celery/tree/3.1/examples/django
Для некоторых функций по-прежнему требуется библиотека django-celery:
Celery не реализует базу данных Django и бэкенды результатов кэширования.
- Celery не поставляется с периодической задачей на основе базы данных
планировщик.
Примечание
Если вы все еще используете старый API при обновлении до Celery 3.1, то вы должны убедиться, что ваш модуль настроек содержит строку djcelery.setup_loader()
, поскольку это больше не будет происходить как побочный эффект импорта модуля django-celery.
Новым пользователям (или если вы перешли на новый API) строка setup_loader
больше не нужна, и вы должны убедиться, что она удалена.
События теперь упорядочиваются с использованием логического времени¶
Идеальная синхронизация физических часов невозможна, поэтому использование временных меток для упорядочивания событий в распределенной системе ненадежно.
Сообщения о событиях Celery уже некоторое время включают логическое значение часов, но начиная с этой версии это поле также используется для их упорядочивания.
Кроме того, события теперь записывают информацию о часовом поясе, включая новое поле utcoffset
в сообщение о событии. Это знаковое целое число, показывающее разницу с временем UTC в часах, поэтому, например, событие, отправленное из часового пояса Европа/Лондон в летнее время, будет иметь смещение в 1.
app.events.Receiver
будет автоматически преобразовывать временные метки в местный часовой пояс.
Примечание
Логические часы синхронизируются с другими узлами в том же кластере (соседями), поэтому это означает, что логическая эпоха начнется в тот момент, когда запустится первый рабочий в кластере.
Если все рабочие будут выключены, значение часов будет потеряно и сброшено на 0. Для защиты от этого, вы должны указать опцию celery worker --statedb
, чтобы рабочий мог сохранить значение часов при выключении.
Вы можете заметить, что логические часы представляют собой целочисленное значение и очень быстро увеличиваются. Однако не беспокойтесь о том, что значение переполнится, поскольку даже в самых загруженных кластерах может пройти несколько тысячелетий, прежде чем часы превысят значение в 64 бита.
Новый формат имени рабочего узла (name@host
)¶
Имена узлов теперь строятся из двух элементов: name и host-name, разделенных „@“.
Это изменение было сделано для более легкого определения нескольких экземпляров, работающих на одной машине.
Если пользовательское имя не указано, то рабочий будет использовать имя „celery“ по умолчанию, в результате чего полное имя узла будет „celery@hostname“:
$ celery worker -n example.com
celery@example.com
Чтобы также задать имя, необходимо включить символ @:
$ celery worker -n worker1@example.com
worker1@example.com
Рабочий будет идентифицировать себя, используя полное имя узла в событиях и широковещательных сообщениях, поэтому если раньше рабочий идентифицировал себя как „worker1.example.com“, то теперь он будет использовать „celery@worker1.example.com“.
Помните, что аргумент -n
также поддерживает простые замены переменных, поэтому если текущее имя хоста george.example.com, то макрос %h
расширится до него:
$ celery worker -n worker1@%h
worker1@george.example.com
Возможные замены следующие:
Переменная |
Замена |
---|---|
|
Полное имя хоста (включая доменное имя) |
|
Только доменное имя |
|
Только имя хоста (без доменного имени) |
|
Символ |
Связанные задачи¶
Декоратор задач теперь может создавать «связанные задачи», что означает, что задача будет получать аргумент self
.
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
Использование связанных задач теперь является рекомендуемым подходом, когда вам нужен доступ к экземпляру задачи или контексту запроса. Ранее вместо этого приходилось обращаться к имени задачи (send_twitter_status.retry
), но это могло привести к проблемам в некоторых конфигурациях.
Mingle: Синхронизация рабочих¶
Теперь рабочий будет пытаться синхронизироваться с другими рабочими в том же кластере.
Синхронизированные данные в настоящее время включают отозванные задания и логические часы.
Это происходит только при запуске и вызывает секундную задержку при запуске для сбора широковещательных ответов от других рабочих.
Вы можете отключить этот шаг загрузки с помощью опции celery worker --without-mingle
.
Сплетни: Рабочий <-> Рабочее общение¶
Рабочие теперь пассивно подписываются на события, связанные с рабочими, такие как сердцебиение.
Это означает, что рабочий знает, что делают другие рабочие, и может обнаружить, если они уходят в оффлайн. В настоящее время это используется только для синхронизации часов, но есть много возможностей для будущих дополнений, и вы можете написать расширения, которые используют это преимущество уже сейчас.
Некоторые идеи включают протоколы консенсуса, перенаправление задачи лучшему работнику (на основе использования ресурсов или локальности данных) или перезапуск работников при сбое.
Мы считаем, что хотя это небольшое дополнение, оно открывает удивительные возможности.
Вы можете отключить этот шаг загрузки с помощью опции celery worker --without-gossip
.
Бутстепы: Расширение рабочего¶
Написав bootsteps, вы теперь можете легко расширить потребительскую часть worker для добавления дополнительных возможностей, например, пользовательских потребителей сообщений.
Рабочий уже некоторое время использует bootsteps, но они никогда не были документированы. В этой версии потребительская часть рабочего также была переписана для использования bootsteps, а в новом руководстве Расширения и бутстепы документированы примеры расширения рабочего, включая добавление пользовательских потребителей сообщений.
Дополнительную информацию см. в руководстве Расширения и бутстепы.
Примечание
Шаги загрузки, написанные для старых версий, не будут совместимы с этой версией, так как API значительно изменился.
Старый API был экспериментальным и внутренним, но если вам не повезло использовать его, пожалуйста, свяжитесь с нами в списке рассылки, и мы поможем вам перенести bootstep на новый API.
Новый бэкенд результатов RPC¶
Эта новая экспериментальная версия бэкенда результатов amqp
является хорошей альтернативой для использования в классических сценариях RPC, где процесс, инициирующий задачу, всегда является процессом для получения результата.
Он использует Kombu для отправки и получения результатов, и каждый клиент использует уникальную очередь для отправки ответов. Это позволяет избежать значительных накладных расходов оригинального бэкенда результатов amqp, который создает одну очередь для каждой задачи.
По умолчанию результаты, отправленные с помощью этого бэкенда, не сохраняются, поэтому они не переживут перезапуск брокера. Вы можете включить параметр CELERY_RESULT_PERSISTENT
, чтобы изменить это.
CELERY_RESULT_BACKEND = 'rpc'
CELERY_RESULT_PERSISTENT = True
Обратите внимание, что аккорды в настоящее время не поддерживаются бэкендом RPC.
Временные ограничения теперь могут быть установлены клиентом¶
В Calling API были добавлены две новые опции: time_limit
и soft_time_limit
:
>>> res = add.apply_async((2, 2), time_limit=10, soft_time_limit=8)
>>> res = add.subtask((2, 2), time_limit=10, soft_time_limit=8).delay()
>>> res = add.s(2, 2).set(time_limit=10, soft_time_limit=8).delay()
При участии Мгера Мовсисяна.
Redis: широковещательные сообщения и виртуальные хосты¶
В настоящее время широковещательные сообщения видны всем виртуальным хостам при использовании транспорта Redis. Теперь это можно исправить, включив префикс для всех каналов, чтобы сообщения разделялись:
BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True}
Обратите внимание, что вы не сможете общаться с рабочими, работающими на более старых версиях, или с рабочими, у которых эта настройка не включена.
Эта настройка будет использоваться по умолчанию в будущей версии.
Связано с проблемой #1490.
pytz заменяет python-dateutil зависимость¶
Celery больше не зависит от библиотеки python-dateutil, вместо этого была добавлена новая зависимость от библиотеки pytz.
Библиотека pytz уже была рекомендована для точной поддержки часовых поясов.
Это также означает, что зависимости одинаковы как для Python 2, так и для Python 3, и что файл requirements/default-py3k.txt
был удален.
Поддержка дополнительных требований setuptools¶
Pip теперь поддерживает формат дополнительных требований setuptools, поэтому мы убрали старое понятие bundles, и вместо этого указываем setuptools extras.
Вы устанавливаете дополнения, указывая их в скобках:
$ pip install celery[redis,mongodb]
Вышеперечисленное установит зависимости для Redis и MongoDB. Вы можете перечислить столько дополнений, сколько захотите.
Предупреждение
Вы больше не можете использовать пакеты celery-with-*
, так как они не будут обновлены для использования Celery 3.1.
Расширение |
Ввод требований |
Тип |
---|---|---|
Redis |
|
транспорт, бэкенд результатов |
MongoDB |
|
транспорт, бэкенд результатов |
CouchDB |
|
транспорт |
Beanstalk |
|
транспорт |
ZeroMQ |
|
транспорт |
Смотритель зоопарка |
|
транспорт |
SQLAlchemy |
|
транспорт, бэкенд результатов |
librabbitmq |
|
транспорт (C amqp клиент) |
Полный список с примерами находится в разделе Пакеты.
subtask.__call__()
теперь выполняет задание напрямую¶
Недоразумение привело к тому, что Signature.__call__
является псевдонимом .delay
, но это не соответствует API вызова Task
, который вызывает базовый метод задачи.
Это означает, что:
@app.task
def add(x, y):
return x + y
add.s(2, 2)()
теперь делает то же самое, что и прямой вызов задачи:
>>> add(2, 2)
Другие новости¶
Теперь зависит от Kombu 3.0.
Теперь зависит от billiard версии 3.3.
Worker теперь будет аварийно завершаться, если запущен от имени пользователя root с включенным pickle.
Canvas:
group.apply_async
иchain.apply_async
больше не запускают отдельное задание.То, что примитивы групп и аккордов поддерживают «вызов API», как и другие подзадачи, было хорошей идеей, но на практике это было бесполезно и часто путало пользователей. Если вы все еще хотите такого поведения, вы можете определить задачу, которая будет делать это за вас.
Новый метод
Signature.freeze()
может использоваться для «финализации» сигнатур/подзадач.Обычная подпись:
>>> s = add.s(2, 2) >>> result = s.freeze() >>> result <AsyncResult: ffacf44b-f8a1-44e9-80a3-703150151ef2> >>> s.delay() <AsyncResult: ffacf44b-f8a1-44e9-80a3-703150151ef2>
Группа:
>>> g = group(add.s(2, 2), add.s(4, 4)) >>> result = g.freeze() <GroupResult: e1094b1d-08fc-4e14-838e-6d601b99da6d [ 70c0fb3d-b60e-4b22-8df7-aa25b9abc86d, 58fcd260-2e32-4308-a2ea-f5be4a24f7f4]> >>> g() <GroupResult: e1094b1d-08fc-4e14-838e-6d601b99da6d [70c0fb3d-b60e-4b22-8df7-aa25b9abc86d, 58fcd260-2e32-4308-a2ea-f5be4a24f7f4]>
Определено поведение исключения аккордов (проблема #1172).
Начиная с этой версии, обратный вызов аккорда будет менять состояние на FAILURE, когда задача, являющаяся частью аккорда, вызывает исключение.
Смотрите больше на Обработка ошибок.
Новая возможность указывать дополнительные параметры командной строки для программ worker и beat.
Атрибут
app.user_options
может быть использован для добавления дополнительных аргументов командной строки и ожидает опций в стилеoptparse
:from celery import Celery from celery.bin import Option app = Celery() app.user_options['worker'].add( Option('--my-argument'), )
Дополнительную информацию см. в руководстве Расширения и бутстепы.
Все события теперь включают поле
pid
, которое является идентификатором процесса, отправившего событие.Пульс события теперь рассчитывается на основе времени, когда событие было получено монитором, а не времени, о котором сообщил работник.
Это означает, что работник с несинхронизированными часами больше не будет отображаться в мониторах как «Offline».
Теперь предупреждение выдается, если разница между временем отправителя и внутренним временем превышает 15 секунд, что свидетельствует о рассинхронизации часов.
Поддержка монотонных часов.
Монотонные часы теперь используются для тайм-аутов и составления расписания.
Функция монотонных часов встроена, начиная с Python 3.4, но у нас также есть запасные реализации для Linux и macOS.
celery worker теперь поддерживает новый аргумент
--detach
для запуска рабочего в качестве демона в фоновом режиме.app.events.Receiver
теперь устанавливает полеlocal_received
для входящих событий, которое устанавливается на время, когда событие было получено.app.events.Dispatcher
теперь принимает аргументgroups
, который определяет белый список групп событий, которые будут отправлены.Тип события - это строка, разделенная символами „-“, где часть перед первым символом „-“ - это группа. В настоящее время существует только две группы:
worker
иtask
.Диспетчер, инстанцированный следующим образом:
>>> app.events.Dispatcher(connection, groups=['worker'])
будет отправлять только события, связанные с рабочими, и молча отбрасывать любые попытки отправки событий, связанных с любой другой группой.
Новая настройка
BROKER_FAILOVER_STRATEGY
.Этот параметр может быть использован для изменения стратегии обхода отказа транспорта, может быть либо вызываемым параметром, возвращающим итерабельную переменную, либо именем встроенной в Kombu стратегии обхода отказа. По умолчанию это «round-robin».
При участии Мэтта Уайза.
Result.revoke
больше не будет ждать ответов.Вы можете добавить аргумент
reply=True
, если вы действительно хотите дождаться ответов от рабочих.Улучшена поддержка задач link и link_error для аккордов.
Внесено Стивом Морином.
Рабочий: Теперь выдает предупреждение, если параметр
CELERYD_POOL
установлен для включения пулов eventlet/gevent.Опция -P всегда должна использоваться для выбора пула событий/событий, чтобы гарантировать, что исправления будут применены как можно раньше.
Если вы запускаете рабочий в обертке (например, в Django
manage.py
), то вы должны применять патчи вручную, например, создав альтернативную обертку, которая обезьянничает патчи в начале программы перед импортом любых других модулей.Теперь есть команда „inspect clock“, которая будет собирать текущее значение логических часов из рабочих.
celery inspect stats теперь содержит идентификатор основного процесса рабочего.
При участии Мгера Мовсисяна.
Новая команда удаленного управления для сброса рабочей конфигурации.
Пример:
$ celery inspect conf
Значения конфигурации будут преобразованы в значения, поддерживаемые JSON, где это возможно.
При участии Мгера Мовсисяна.
Новые настройки
CELERY_EVENT_QUEUE_TTL
и << 1 >>>.Они контролируют, когда очередь событий мониторов будет удалена, и как долго будут видны события, опубликованные в этой очереди. Поддерживается только в RabbitMQ.
Новый бэкенд результатов Couchbase.
Этот бэкенд результатов позволяет хранить и извлекать результаты задач, используя Couchbase.
Смотрите Настройки бэкенда Couchbase для получения дополнительной информации о настройке этого бэкенда результатов.
При участии Алена Масьеро.
CentOS init-script теперь поддерживает запуск нескольких рабочих экземпляров.
Подробности см. в заголовке сценария.
Внесено Джонатаном Джорданом.
AsyncResult.iter_native
теперь устанавливает параметр интервала по умолчанию на 0,5Исправление предоставлено Иданом Камара
Новая настройка
BROKER_LOGIN_METHOD
.Этот параметр можно использовать для указания альтернативного метода входа в систему для AMQP-транспорта.
При участии Адриена Гине
Команда удаленного управления
dump_conf
теперь будет выдавать строковое представление для типов, не совместимых с JSON.Функция celery.security.setup_security теперь
app.setup_security()
.Повторное выполнение задания теперь распространяет значение истечения срока действия сообщения (проблема #980).
Значение пересылается в is, поэтому время истечения срока действия не изменится. Чтобы обновить время истечения срока действия, нужно передать новый аргумент expires в
retry()
.Worker теперь аварийно завершается при возникновении ошибки канала.
Ошибки канала зависят от транспорта и представляют собой список исключений, возвращаемых
Connection.channel_errors
. Для RabbitMQ это означает, что Celery аварийно завершит работу, если проверка эквивалентности для одной из очередей вCELERY_QUEUES
не совпадает, что имеет смысл, поскольку это сценарий, где требуется ручное вмешательство.Вызов
AsyncResult.get()
на цепочке теперь распространяет ошибки для предыдущих задач (проблема #1014).Родительский атрибут
AsyncResult
теперь восстанавливается при использовании сериализации JSON (проблема #1014).Журналы отключения рабочих теперь регистрируются со степенью предупреждения вместо ошибки.
При участии Криса Адамса.
events.State
больше не аварийно завершается при получении неизвестных типов событий.SQLAlchemy Result Backend: Новая настройка
CELERY_RESULT_DB_TABLENAMES
может быть использована для изменения имени используемых таблиц базы данных.При участии Райана Петрелло.
- SQLAlchemy Result Backend: Теперь вызывает
enginge.dispose
после форка (Выпуск #1564).
Если вы создаете свои собственные движки SQLAlchemy, то вы также должны убедиться, что они закрываются после форка в рабочем:
from multiprocessing.util import register_after_fork engine = create_engine(*engine_args) register_after_fork(engine, engine.dispose)
- SQLAlchemy Result Backend: Теперь вызывает
Был написан набор нагрузочных тестов для Celery worker.
Он находится в каталоге
funtests/stress
в репозитории git. Там есть файл README для начала работы.Регистратор с именем
celery.concurrency
был переименован вcelery.pool
.Новая утилита командной строки
celery graph
.Эта утилита создает графики в формате GraphViz dot.
Вы можете создавать графики на основе установленных в данный момент загрузок:
# Create graph of currently installed bootsteps in both the worker # and consumer name-spaces. $ celery graph bootsteps | dot -T png -o steps.png # Graph of the consumer name-space only. $ celery graph bootsteps consumer | dot -T png -o consumer_only.png # Graph of the worker name-space only. $ celery graph bootsteps worker | dot -T png -o worker_only.png
Или графики рабочих в кластере:
# Create graph from the current cluster $ celery graph workers | dot -T png -o workers.png # Create graph from a specified list of workers $ celery graph workers nodes:w1,w2,w3 | dot -T png workers.png # also specify the number of threads in each worker $ celery graph workers nodes:w1,w2,w3 threads:2,4,6 # …also specify the broker and backend URLs shown in the graph $ celery graph workers broker:amqp:// backend:redis:// # …also specify the max number of workers/threads shown (wmax/tmax), # enumerating anything that exceeds that number. $ celery graph workers wmax:10 tmax:3
Изменен способ маринования экземпляров приложений.
Теперь приложения могут определять метод
__reduce_keys__
, который используется вместо старого атрибутаAppPickler
. Например, если ваше приложение определяет пользовательский атрибут „foo“, который должен быть сохранен при мариновании, вы можете определить__reduce_keys__
как таковой:import celery class Celery(celery.Celery): def __init__(self, *args, **kwargs): super(Celery, self).__init__(*args, **kwargs) self.foo = kwargs.get('foo') def __reduce_keys__(self): return super(Celery, self).__reduce_keys__().update( foo=self.foo, )
Это гораздо более удобный способ добавить поддержку пикинга пользовательских атрибутов. Старый вариант
AppPickler
все еще поддерживается, но его использование не рекомендуется, и мы хотели бы убрать его в будущей версии.Возможность трассировки импорта в целях отладки.
Параметр
C_IMPDEBUG
может быть установлен для отслеживания импорта по мере его возникновения:$ C_IMDEBUG=1 celery worker -l info
$ C_IMPDEBUG=1 celery shell
Заголовки сообщений теперь доступны как часть запроса задания.
Пример добавления и извлечения значения заголовка:
@app.task(bind=True) def t(self): return self.request.headers.get('sender') >>> t.apply_async(headers={'sender': 'George Costanza'})
Новый сигнал
before_task_publish
диспетчеризируется перед отправкой сообщения задачи и может быть использован для изменения полей конечного сообщения (выпуск #1281).Новый сигнал
after_task_publish
заменяет старый сигналtask_sent
.Сигнал
task_sent
теперь устарел и не должен использоваться.Новый сигнал
worker_process_shutdown
рассылается в дочерних процессах пула prefork при их выходе.Внесен Даниэлем М Таубом.
celery.platforms.PIDFile
переименовано вcelery.platforms.Pidfile
.Бэкенд MongoDB: Теперь можно настраивать с помощью URL:
MongoDB Backend: Больше не используется устаревшее
pymongo.Connection
.Бэкенд MongoDB: Теперь отключается
auto_start_request
.Бэкенд MongoDB: Теперь включает
use_greenlets
при использовании eventlet/gevent.subtask()
/maybe_subtask()
переименованы вsignature()
/<< 3 >>>.Псевдонимы по-прежнему доступны для обратной совместимости.
Свойство
correlation_id
message теперь автоматически устанавливается на id задачи.Поля сообщения задачи
eta
и << 1 >>> теперь включают информацию о часовом поясе.Все методы result backends
store_result
/mark_as_*
теперь должны принимать аргумент в виде ключевого словаrequest
.События теперь выдают предупреждение, если используется неработающая библиотека
yajl
.Сигнал
celeryd_init
теперь принимает дополнительный аргумент в виде ключевого слова:option
.Это отображение разобранных аргументов командной строки, которое можно использовать для подготовки новых аргументов предварительной загрузки (
app.user_options['preload']
).Новый обратный вызов:
app.on_configure()
.Этот обратный вызов вызывается, когда приложение собирается сконфигурировать (требуется ключ конфигурации).
Рабочий: Больше не вилка на
HUP
.Это означает, что рабочий будет повторно использовать один и тот же pid для лучшей поддержки внешних супервизоров процессов.
При участии Джамиля Аль-Азиза.
Рабочий: Сообщение журнала
Got task from broker …
было изменено наReceived task …
.Рабочий: Сообщение журнала
Skipping revoked task …
было изменено наDiscarding revoked task …
.Оптимизация: Улучшена производительность
ResultSet.join_native()
.При участии Стаса Рудакова.
Сигнал
task_revoked
теперь принимает новый аргументrequest
(проблема #1555).Сигнал revoked отправляется после удаления запроса задачи из стека, поэтому вместо него необходимо использовать объект
Request
для получения информации о задаче.Worker: Новый аргумент командной строки
-X
для исключения очередей (проблема #1399).Добавляет
C_FAKEFORK
переменную окружения для простой отладки init-script/celery multi.Это означает, что теперь вы можете это сделать:
$ C_FAKEFORK=1 celery multi start 10
или:
$ C_FAKEFORK=1 /etc/init.d/celeryd start
чтобы избежать шага демонизации для просмотра ошибок, которые не видны из-за отсутствия stdout/stderr.
В общий init-скрипт была добавлена команда
dryrun
, которая включает эту опцию.Новый публичный API для выталкивания и выталкивания из текущего стека задач:
celery.app.push_current_task()
иcelery.app.pop_current_task`()
.RetryTaskError
было переименовано вRetry
.Старое название все еще доступно для обратной совместимости.
Новое полупредикатное исключение
Reject
.Это исключение может быть поднято в
reject
/requeue
сообщении задачи, примеры см. в Отклонить.Semipredicates документировано: (Retry/Ignore/Reject).
Плановые демонтажи¶
Настройка
BROKER_INSIST
и аргументinsist
для~@connection
больше не поддерживаются.Настройка
CELERY_AMQP_TASK_RESULT_CONNECTION_MAX
больше не поддерживается.Вместо этого используйте
BROKER_POOL_LIMIT
.Настройка
CELERY_TASK_ERROR_WHITELIST
больше не поддерживается.Вместо этого следует установить атрибут
ErrorMail
класса задачи. Вы также можете сделать это с помощьюCELERY_ANNOTATIONS
:from celery import Celery from celery.utils.mail import ErrorMail class MyErrorMail(ErrorMail): whitelist = (KeyError, ImportError) def should_send(self, context, exc): return isinstance(exc, self.whitelist) app = Celery() app.conf.CELERY_ANNOTATIONS = { '*': { 'ErrorMail': MyErrorMails, } }
Функции, создающие соединения брокера, больше не поддерживают аргумент
connect_timeout
.Теперь его можно установить только с помощью параметра
BROKER_CONNECTION_TIMEOUT
. Это связано с тем, что функции больше не создают соединения напрямую, а получают их из пула соединений.Настройка
CELERY_AMQP_TASK_RESULT_EXPIRES
больше не поддерживается.Вместо этого используйте
CELERY_TASK_RESULT_EXPIRES
.
Изменения в сроках амортизации¶
Исправления¶
AMQP Backend: join не преобразовывал исключения при использовании сериализатора json.
Неабстрактные классы задач теперь совместно используются приложениями (выпуск #1150).
Обратите внимание, что неабстрактные классы задач не должны использоваться в новом API. Пользовательские классы задач следует создавать только при использовании их в качестве базового класса в декораторе
@task
.Это исправление обеспечивает обратную совместимость со старыми версиями Celery, так что неабстрактные классы задач работают, даже если модуль импортируется несколько раз, так что приложение также инстанцируется несколько раз.
Рабочий: Обходное решение для ошибок Unicode в журналах (проблема #427).
Методы задач:
.apply_async
теперь работает правильно, если список args равен None (проблема #1459).Пулы Eventlet/gevent/solo/threads теперь правильно обрабатывают ошибки
BaseException
, вызванные задачами.<<< Команды дистанционного управления
autoscale
иpool_grow
/<< 2 >>> теперь также будут автоматически увеличивать и уменьшать счетчик предварительной выборки потребителей.Исправление внесено Дэниелом М. Таубом.
Команды
celery control pool_
не приводили строковые аргументы к int.Аккорды Redis/Cache: Результат обратного вызова теперь устанавливается на отказ, если группа исчезла из базы данных (проблема #1094).
Рабочий: Теперь следит за тем, чтобы процесс выключения не инициировался более одного раза.
Программы: celery multi теперь правильно обрабатывает опции
-f
и << 2 >>> (проблема #1541).
Внутренние изменения¶
Модуль
celery.task.trace
был переименован вcelery.app.trace
.Модуль
celery.concurrency.processes
был переименован вcelery.concurrency.prefork
.Классы, которые больше не возвращаются к использованию приложения по умолчанию:
Бэкенды результатов (
celery.backends.base.BaseBackend
)celery.worker.Consumer
Это означает, что при инстанцировании этих классов вы должны передать определенное приложение.
EventDispatcher.copy_buffer
переименовано вapp.events.Dispatcher.extend_buffer()
.Удален неиспользуемый и никогда не документированный глобальный экземпляр
celery.events.state.state
.app.events.Receiver
теперь является подклассомkombu.mixins.ConsumerMixin
.celery.apps.worker.Worker
был отрефакторен как подклассcelery.worker.WorkController
.Это устраняет множество дублирующих функций.
Метод
Celery.with_default_connection
был удален в пользуwith app.connection_or_acquire
(app.connection_or_acquire()
)Класс
celery.results.BaseDictBackend
был удален и заменен наcelery.results.BaseBackend
.