Что нового в Celery 4.0 (latentcall)¶
- Автор:
Спросите Солема (
ask at celeryproject.org
)
Celery - это простая, гибкая и надежная распределенная система для обработки огромного количества сообщений, предоставляющая операторам инструменты, необходимые для обслуживания такой системы.
Это очередь задач, ориентированная на обработку в реальном времени, а также поддерживающая планирование задач.
Celery имеет большое и разнообразное сообщество пользователей и разработчиков, вам стоит присоединиться к нам on IRC или << 1 >>>.
Чтобы узнать больше о сельдерее, вам следует перейти по ссылке introduction.
Хотя эта версия обратно совместима с предыдущими версиями, важно, чтобы вы прочитали следующий раздел.
Эта версия официально поддерживается на CPython 2.7, 3.4 и 3.5. и также поддерживается на PyPy.
Предисловие¶
Добро пожаловать в Celery 4!
Это масштабный релиз с изменениями более чем двухлетней давности. Он не только содержит множество новых функций, но и исправляет огромный список ошибок, поэтому во многих отношениях его можно назвать «Снежным барсом».
Следующая основная версия Celery будет поддерживать только Python 3.5, где мы планируем использовать преимущества новой библиотеки asyncio.
Этот выпуск был бы невозможен без поддержки моего работодателя, Robinhood (мы принимаем на работу!).
Спросите Солема
Посвящается Себастьяну «Зебу» Бьорнеруду (RIP), с особой благодарностью Ty Wilkins, за разработку нашего нового логотипа, всем соавторам, которые помогли сделать это, и моим коллегам из Robinhood.
Стена вкладчиков¶
Аарон МакМиллин, Адам Чейнз, Адам Ренберг, Адриано Мартинс де Хесус, Адриан Гине, Ахмет Демир, Айтор Гомес-Гуари, Алан Жустино, Альберт Ванг, Алекс Кошелев, Алекс Рэттрей, Алекс Уильямс, Александр Кошелев, Александр Лебедев, Александр Обловатный, Алексей Котляров, Али Бозоргхан, Алиса Зоя Беван-МакГрегор, Аллард Хове, Алман Один, Амир Рустамзаде, Андреа Раббаглиетти, Андреа Роза, Андрей Фокау, Андрей Родионов, Андрей Стюарт, Андрей Юрчук, Анейл Маллаварапу, Арески Белаид, Арменак Бабурян, Артур Вюйяр, Артём Коваль, Асиф Сайфуддин Ауви, Аск Солем, Бальтазар Рубероль, Батист Билер, Беркер Пексаг, Берт Вандербауведе, Брендан Смитиман, Брайан Боутерс, Брайс Грофф, Кэмерон Уилл, ЧангБо Гуо, Крис Кларк, Крис Дьюри, Крис Эрвей, Крис Харрис, Крис Мартин, Чиллар Ананд, Колин Макинтош, Конрад Крамер, Кори Фарвелл, Крейг Джеллик, Каллен Роудс, Даллас Марлоу, Дэниел Девайн, Дэниел Уоллес, Данило Барген, Даванум Шринивас, Дейв Смит, Дэвид Баумголд, Дэвид Харриган, Дэвид Правек, Деннис Бракхан, Дерек Андерсон, Дмитрий Дыгало, Дмитрий Малиновский, Донгвейминг, Дудаш Адам, Дастин Дж. Митчелл, Эд Морли, Эдвард Беттс, Элои Ривар, Эммануэль Казенаве, Фахад Сиддики, Фатих Суку, Феанил Патель, Федерико Фикарелли, Феликс Шварц, Феликс Ян, Фернандо Роча, Флавио Гросси, Франтишек Холоп, Гао Цзянмяо, Джордж Уивелл, Джеральд Манипон, Жиль Дартигелонгу, Джино Ледесма, Грег Уилбур, Гийом Сеген, Хэнк Джон, Хогни Гилфасон, Илья Георгиевский, Ионел Кристиан Мэрэриш, Иван Ларин, Джеймс Пулек, Джаред Льюис, Джейсон Витч, Джаспер Брайант-Грин, Джефф Уидман, Джереми Тиллман, Джереми Зафран, Джослин Делаланде, Джо Джевник, Джо Санфорд, Джон Андерсон, Джон Бархэм, Джон Киркхэм, Джон Уитлок, Джонатан Ванаско, Джошуа Харлоу, Жуан Рикардо, Хуан Карлос Феррер, Хуан Росси, Джастин Патрин, Кай Гронер, Кевин Харви, Кевин Ричардсон, Кому Вайрагу, Константинос Коукопулос, Коухей Маеда, Красекумар Рамараджу, Кшиштоф Буйневич, Латиция М. Haskins, Len Buckens, Lev Berman, lidongming, Lorenzo Mancini, Lucas Wiman, Luke Pomfrey, Luyun Xie, Maciej Obuchowski, Manuel Kaufmann, Marat Sharafutdinov, Marc Sibson, Marcio Ribeiro, Marin Atanasov Nikolov, Mathieu Fenniak, Mark Parncutt, Mauro Rocco, Максим Бошемин, Максим Вдб, Мгер Мовсисян, Майкл Акилина, Майкл Дуэйн Муринг, Майкл Пермана, Микаэль Пенхард, Майк Эттвуд, Митчел Хамферис, Мохамед Абуэльсауд, Моррис Твид, Мортон Фокс, Моше ван дер Стерр, Нат Уильямс, Натан Ван Гхим, Николя Унравел, Ник Найби, Омер Кац, Омер Корнер, Ори Хох, Пол Пирс, Пауло Бу, Павел Капышин, Филип Гарнеро, Пьер Ферсинг, Петр Кильчук, Петр Маслянка, Квентин Праде, Радек Чайка, Рагурам Шринивасан, Рэнди Барлоу, Рафаэль Мишель, Реми Леоне, Роберт Куп, Роберт Колба, Рокаллит Вульф, Родольфо Карвальо, Роджер Ху, Ромуальд Брюне, Ронгзе Жу, Росс Дин, Райан Лаки, Реми Грейнхофер, Самуэль Жиффар, Самуэль Жайле, Сергей Азовсков, Сергей Тихонов, Сынха Ким, Саймон Пеетерс, Спенсер Е. Olson, Srinivas Garlapati, Stephen Milner, Steve Peak, Steven Sklar, Stuart Axon, Sukrit Khera, Tadej Janež, Taha Jahangir, Takeshi Kanemoto, Tayfun Sen, Tewfik Sadaoui, Thomas French, Thomas Grainger, Tomas Machalek, Tobias Schottdorf, Tocho Tochev, Valentyn Klindukh, Vic Kumar, Vladimir Bolshakov, Vladimir Gorbunov, Уэйн Чанг, Виланд Хоффманн, Видо ден Холландер, Уил Лэнгфорд, Уилл Томпсон, Уильям Кинг, Юрий Селиванов, Витис Банайтис, Зоран Павлович, Синь Ли, 許邱翔, @allenling, @alzeih, @bastb, @bee-keeper, @ffeast, @firefly4268, @flyingfoxlee, @gdw2, @gitaarik, @hankjin, @lvh, @m-vdb, @kindule, @mdk: , @michael-k, @mozillazg, @nokrik, @ocean1, @orlo666, @raducc, @wanglei, @worldexception, @xBeAsTx.
Примечание
Эта стена была автоматически сгенерирована из истории git, поэтому, к сожалению, она не включает людей, которые помогают в более важных вещах, таких как ответы на вопросы списка рассылки.
Обновление с Celery 3.1¶
Шаг 1: Обновление до Celery 3.1.25¶
Если вы еще не сделали этого, первым шагом будет обновление до Celery 3.1.25.
Эта версия добавляет совместимость с новым протоколом сообщений, так что вы можете постепенно переходить с версии 3.1 на версию 4.0.
Сначала разверните рабочих, обновив их до версии 3.1.25. Это означает, что эти рабочие могут обрабатывать сообщения, отправленные клиентами, использующими версии 3.1 и 4.0.
После обновления рабочих систем вы можете обновить клиентов (например, веб-серверы).
Шаг 2: Обновите конфигурацию с новыми именами параметров¶
В этой версии радикально изменены названия параметров конфигурации, чтобы они были более согласованными.
Изменения полностью обратно совместимы, поэтому у вас есть возможность подождать, пока старые названия настроек не будут устаревшими, но для облегчения перехода мы включили утилиту командной строки, которая автоматически переписывает ваши настройки.
Дополнительную информацию см. в разделе Имена настроек в нижнем регистре.
Шаг 3: Прочитайте важные примечания в этом документе¶
Убедитесь, что вас не коснется ни одно из важных замечаний по обновлению, упомянутых в следующем разделе.
Особенно важно отметить, что Celery теперь проверяет аргументы, которые вы отправляете в задачу, сопоставляя их с сигнатурой (Проверка аргументов задачи).
Шаг 4: Обновление до Celery 4.0¶
В этот момент вы можете обновить своих работников и клиентов новой версией.
Важные замечания¶
Отказано в поддержке Python 2.6¶
Celery теперь требует Python 2.7 или более поздней версии, а также отказывается от поддержки Python 3.3, поэтому поддерживаются следующие версии:
CPython 2.7
CPython 3.4
CPython 3.5
PyPy 5.4 (
pypy2
)PyPy 5.5-alpha (
pypy3
)
Последняя основная версия с поддержкой Python 2¶
Начиная с Celery 5.0 будет поддерживаться только Python 3.5+.
Чтобы убедиться, что вас не затронет это изменение, вам следует привязать версию Celery в вашем файле требований либо к конкретной версии: celery==4.0.0
, либо к диапазону: celery>=4.0,<5.0
.
Отказ от поддержки Python 2 позволит нам удалить огромное количество кода совместимости, а переход на Python 3.5 позволит нам использовать преимущества типизации, async/await, asyncio и подобных концепций, которым нет альтернативы в старых версиях.
Celery 4.x будет продолжать работать на Python 2.7, 3.4, 3.5; так же как Celery 3.x продолжает работать на Python 2.6.
Поддержка Django¶
Celery 4.x требует Django 1.8 или более поздней версии, но мы действительно рекомендуем использовать по крайней мере Django 1.9 для новой функции transaction.on_commit
.
Частой проблемой при вызове задач из Django является ситуация, когда задача связана с изменением модели, и вы хотите отменить задачу, если транзакция будет откатана, или обеспечить выполнение задачи только после того, как изменения будут записаны в базу данных.
transaction.atomic
позволяет решить эту проблему, добавив задачу в качестве обратного вызова, который будет вызываться только при фиксации транзакции.
Пример использования:
from functools import partial
from django.db import transaction
from .models import Article, Log
from .tasks import send_article_created_notification
def create_article(request):
with transaction.atomic():
article = Article.objects.create(**request.POST)
# send this task only if the rest of the transaction succeeds.
transaction.on_commit(partial(
send_article_created_notification.delay, article_id=article.pk))
Log.objects.create(type=Log.ARTICLE_CREATED, object_pk=article.pk)
Удаленные функции¶
Microsoft Windows больше не поддерживается.
Набор тестов пройден, и Celery, похоже, работает с Windows, но мы не даем никаких гарантий, поскольку не можем диагностировать проблемы на этой платформе. Если вы являетесь компанией, нуждающейся в поддержке на этой платформе, пожалуйста, свяжитесь с нами.
Jython больше не поддерживается.
Характеристики удалены для простоты¶
Механизм задания Webhook (
celery.task.http
) был удален.В настоящее время легко использовать модуль requests для написания задач webhook вручную. Мы бы с удовольствием использовали запросы, но мы просто не можем этого сделать, так как в сообществе Python есть очень активная толпа «против зависимостей».
Если вам нужна обратная совместимость, вы можете просто скопировать + вставить версию модуля 3.1 и убедиться, что он импортирован рабочим: https://github.com/celery/celery/blob/3.1/celery/task/http.py.
Задачи больше не отправляют сообщения об ошибках.
Это также удаляет поддержку
app.mail_admins
, и любую функциональность, связанную с отправкой электронной почты.celery.contrib.batches
был удален.Это была экспериментальная функция, поэтому на нее не распространяются наши гарантии по срокам вывода из эксплуатации.
Вы можете скопировать и использовать существующий код батчей для использования в своих проектах: https://github.com/celery/celery/blob/3.1/celery/contrib/batches.py.
Функции удалены из-за отсутствия финансирования¶
В релизе 3.1 мы объявили, что некоторые транспорты переведены в статус экспериментальных, и что официальной поддержки этих транспортов не будет.
Поскольку этот тонкий намек на необходимость финансирования не удался, мы полностью убрали их, нарушив обратную совместимость.
Использование Django ORM в качестве брокера больше не поддерживается.
Вы все еще можете использовать Django ORM в качестве бэкенда результатов: смотрите раздел django-celery-results - Использование Django ORM/Cache в качестве бэкенда результатов для получения дополнительной информации.
Использование SQLAlchemy в качестве брокера больше не поддерживается.
Вы по-прежнему можете использовать SQLAlchemy в качестве бэкенда результатов.
Использование CouchDB в качестве брокера больше не поддерживается.
Вы по-прежнему можете использовать CouchDB в качестве бэкенда результатов.
Использование IronMQ в качестве брокера больше не поддерживается.
Использование Beanstalk в качестве брокера больше не поддерживается.
Кроме того, некоторые функции были полностью удалены, так что при попытке их использования будет возникать исключение:
Функция
--autoreload
была удалена.Это была экспериментальная функция, и на нее не распространяются наши гарантии по срокам изъятия. Флаг полностью удален, поэтому при его наличии рабочий будет аварийно завершать работу при запуске. К счастью, этот флаг не используется в производственных системах.
Экспериментальный пул
threads
больше не поддерживается и был удален.Функция
force_execv
больше не поддерживается.Команда
celery worker
теперь игнорирует установки--no-execv
,--force-execv
иCELERYD_FORCE_EXECV
.В версии 5.0 этот флаг будет полностью удален, и рабочий будет выдавать ошибку.
Старый унаследованный бэкенд результатов «amqp» был устаревшим и будет удален в Celery 5.0.
Пожалуйста, используйте бэкенд результатов
rpc
для вызовов в стиле RPC, и постоянный бэкенд результатов для многопользовательских результатов.
Мы считаем, что большинство из них можно исправить без значительных усилий, поэтому если вы заинтересованы в возвращении какой-либо из этих функций, пожалуйста, свяжитесь с нами.
А теперь хорошие новости…
Новый протокол сообщений о задачах¶
В этой версии представлен совершенно новый протокол сообщений о задачах - первое серьезное изменение протокола с начала проекта.
Новый протокол включен по умолчанию в этой версии, и поскольку новая версия не имеет обратной совместимости, вы должны быть осторожны при обновлении.
Версия 3.1.25 была выпущена для обеспечения совместимости с новым протоколом, поэтому самый простой способ обновления - сначала перейти на эту версию, а затем перейти на 4.0 в ходе второго развертывания.
Если вы хотите продолжать использовать старый протокол, вы также можете настроить номер используемой версии протокола:
app = Celery()
app.conf.task_protocol = 1
Подробнее о возможностях, доступных в новом протоколе, читайте в разделе новостей, расположенном далее в этом документе.
Имена настроек в нижнем регистре¶
В погоне за красотой все настройки теперь переименованы в строчные буквы, а названия некоторых настроек были переименованы для единообразия.
Это изменение полностью обратно совместимо, поэтому вы по-прежнему можете использовать имена настроек в верхнем регистре, но мы хотели бы, чтобы вы обновились как можно скорее, и вы можете сделать это автоматически с помощью команды celery upgrade settings:
$ celery upgrade settings proj/settings.py
Эта команда изменит ваш модуль на месте, чтобы использовать новые имена в нижнем регистре (если вы хотите использовать верхний регистр с префиксом «CELERY
», см. блок ниже), и сохранит резервную копию в proj/settings.py.orig
.
Для пользователей Django и тех, кто хочет сохранить имена в верхнем регистре
Если вы загружаете конфигурацию Celery из модуля настроек Django, то вам лучше использовать имена в верхнем регистре.
Вы также хотите использовать префикс CELERY_
, чтобы настройки Celery не столкнулись с настройками Django, используемыми другими приложениями.
Для этого сначала нужно преобразовать файл настроек для использования новой согласованной схемы именования и добавить префикс ко всем настройкам, связанным с Celery:
$ celery upgrade settings proj/settings.py --django
После обновления файла настроек вам необходимо явно установить префикс в вашем модуле proj/celery.py
:
app.config_from_object('django.conf:settings', namespace='CELERY')
Самый актуальный пример интеграции Django Celery вы можете найти здесь: Первые шаги в работе с Django.
Примечание
Это также добавит префикс к настройкам, которые ранее не имели его, например, BROKER_URL
должно быть написано CELERY_BROKER_URL
с пространством имен CELERY
CELERY_BROKER_URL
.
К счастью, вам не придется вручную изменять файлы, так как программа celery upgrade settings --django сделает все правильно.
Загрузчик попытается определить, использует ли ваша конфигурация новый формат, и будет действовать соответственно, но это также означает, что вам не разрешается смешивать и сопоставлять новые и старые имена параметров, если только вы не предоставите значение для обеих альтернатив.
Основным отличием от предыдущих версий, помимо имен в нижнем регистре, является переименование некоторых префиксов, например, celerybeat_
в beat_
, celeryd_
в worker_
.
Префикс celery_
также был удален, и параметры, связанные с задачами, из этого пространства имен теперь имеют префикс task_
, а параметры, связанные с рабочими - worker_
.
Кроме этого, большинство настроек будут такими же в нижнем регистре, за исключением нескольких специальных:
Имя установки |
Заменить на |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Полную таблицу изменений можно посмотреть в Новые настройки строчных букв.
Json теперь является сериализатором по умолчанию¶
Наконец-то пришло время положить конец господству pickle
в качестве механизма сериализации по умолчанию, и начиная с этой версии сериализатором по умолчанию является json.
Это изменение было announced with the release of Celery 3.1.
Если вы все еще полагаетесь на pickle
в качестве сериализатора по умолчанию, то вам необходимо настроить свое приложение перед обновлением до версии 4.0:
task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = {'pickle'}
Сериализатор Json теперь также поддерживает некоторые дополнительные типы:
-
Преобразуется в json-текст в формате ISO-8601.
-
Преобразуется в текст в формате json.
django.utils.functional.Promise
Только для Django: Ленивые строки, используемые для перевода и т.д., оцениваются и пытаются преобразоваться в тип json.
-
Преобразуется в текст в формате json.
Вы также можете определить метод __json__
в ваших пользовательских классах для поддержки сериализации JSON (должен возвращать тип, совместимый с json):
class Person:
first_name = None
last_name = None
address = None
def __json__(self):
return {
'first_name': self.first_name,
'last_name': self.last_name,
'address': self.address,
}
Базовый класс Task больше не регистрирует задачи автоматически¶
Класс Task
больше не использует специальный мета-класс, который автоматически регистрирует задачу в реестре задач.
Вместо этого теперь это обрабатывается декораторами app.task
.
Если вы все еще используете задания, основанные на классах, то вам нужно зарегистрировать их вручную:
class CustomTask(Task):
def run(self):
print('running')
CustomTask = app.register_task(CustomTask())
Лучшей практикой является использование пользовательских классов задач только для переопределения общего поведения, а затем использование декоратора задач для реализации задачи:
@app.task(bind=True, base=CustomTask)
def custom(self):
print('running')
Это изменение также означает, что атрибут задания abstract
больше не имеет никакого эффекта.
Проверка аргументов задачи¶
Аргументы задачи теперь проверяются при вызове задачи, даже асинхронно:
>>> @app.task
... def add(x, y):
... return x + y
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
>>> add.delay(8)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 376, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 485, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)
Вы можете отключить проверку аргументов для любой задачи, установив ее атрибут typing
в значение False
:
>>> @app.task(typing=False)
... def add(x, y):
... return x + y
Или, если вы хотите полностью отключить это для всех задач, вы можете передать strict_typing=False
при создании приложения:
app = Celery(..., strict_typing=False)
Redis Events не имеет обратной совместимости¶
Опции транспорта Redis fanout_patterns
и << 1 >>> теперь включены по умолчанию.
Работники/мониторы без включенных флагов не смогут видеть работников с отключенным флагом. Они по-прежнему могут выполнять задания, но не смогут получать сообщения мониторинга друг друга.
Вы можете выполнить обновление с обратной совместимостью, сначала настроив рабочие и мониторы 3.1 для включения настроек, перед окончательным обновлением до 4.0:
BROKER_TRANSPORT_OPTIONS = {
'fanout_patterns': True,
'fanout_prefix': True,
}
Пересмотр приоритетов Redis¶
Приоритет 0 теперь самый низкий, 9 - самый высокий.
Это изменение было сделано для того, чтобы поддержка приоритетов соответствовала тому, как она работает в AMQP.
Внесено Алексом Кошелевым.
Django: Автообнаружение теперь поддерживает конфигурации приложений Django¶
Теперь функцию autodiscover_tasks()
можно вызывать без аргументов, и обработчик Django автоматически найдет ваши установленные приложения:
app.autodiscover_tasks()
Интеграция Django example in the documentation была обновлена для использования вызова без аргументов.
Это также обеспечивает совместимость с новыми, эээ, AppConfig
вещами, появившимися в последних версиях Django.
Прямые очереди рабочих больше не используют автоудаление¶
Рабочие/клиенты, работающие под управлением версии 4.0, больше не смогут отправлять прямые сообщения рабочим, работающим под управлением более старых версий, и наоборот.
Если вы полагаетесь на прямые сообщения рабочих, вам следует сначала обновить рабочих и клиентов 3.x, чтобы они использовали новые настройки маршрутизации, заменив celery.utils.worker_direct()
на эту реализацию:
from kombu import Exchange, Queue
worker_direct_exchange = Exchange('C.dq2')
def worker_direct(hostname):
return Queue(
'{hostname}.dq2'.format(hostname),
exchange=worker_direct_exchange,
routing_key=hostname,
)
Эта функция закрыла проблему #2492.
Удалены старые программы командной строки¶
При установке Celery больше не будут устанавливаться программы celeryd
, celerybeat
и celeryd-multi
.
Об этом было объявлено с выходом Celery 3.1, но у вас могут оставаться скрипты, указывающие на старые имена, поэтому обязательно обновите их, чтобы использовать новую команду umbrella:
Программа |
Новый статус |
Замена |
---|---|---|
|
УДАЛЕНО |
celery worker |
|
УДАЛЕНО |
celery beat |
|
УДАЛЕНО |
celery multi |
Новости¶
Основные моменты нового протокола¶
Новый протокол устраняет многие проблемы старого протокола и позволяет реализовать некоторые давно востребованные функции:
Большинство данных теперь отправляются в виде заголовков сообщений, а не сериализуются вместе с телом сообщения.
В первой версии протокола рабочий всегда должен был десериализовать сообщение, чтобы иметь возможность прочитать мета-данные задачи, такие как идентификатор задачи, имя и т. д. Это также означало, что рабочий был вынужден дважды декодировать данные, сначала десериализуя сообщение при получении, снова сериализуя сообщение для отправки дочернему процессу, а затем дочерний процесс снова десериализует сообщение.
Сохранение полей метаданных в заголовках сообщений означает, что рабочему не нужно декодировать полезную нагрузку перед передачей задания дочернему процессу, а также то, что теперь рабочий может перенаправить задание, написанное на языке, отличном от Python, другому рабочему.
Новый заголовок сообщения
lang
может быть использован для указания языка программирования, на котором написано задание.Worker сохраняет результаты для внутренних ошибок, таких как
ContentDisallowed
, и других ошибок десериализации.Worker сохраняет результаты и отправляет события мониторинга для незарегистрированных ошибок задачи.
Worker вызывает callbacks/errbacks, даже если результат отправляется родительским процессом (например,
WorkerLostError
при завершении дочернего процесса, ошибки десериализации, незарегистрированные задачи).Новый заголовок
origin
содержит информацию о процессе, отправляющем задание (имя рабочего узла, или информацию о PID и имени хоста).Новый заголовок
shadow
позволяет изменять имя задачи, используемое в журналах.Это полезно для диспетчеризации подобных шаблонов, например, задачи, которая вызывает любую функцию, использующую pickle (не делайте этого дома):
from celery import Task from celery.utils.imports import qualname class call_as_task(Task): def shadow_name(self, args, kwargs, options): return 'call_as_task:{0}'.format(qualname(args[0])) def run(self, fun, *args, **kwargs): return fun(*args, **kwargs) call_as_task = app.register_task(call_as_task())
Новые поля
argsrepr
иkwargsrepr
содержат текстовые представления аргументов задачи (возможно, усеченные) для использования в журналах, мониторах и т.д.Это означает, что рабочему не нужно десериализовывать полезную нагрузку сообщения, чтобы отобразить аргументы задачи в информационных целях.
Цепочки теперь используют специальное поле
chain
, что позволяет поддерживать цепочки из тысяч и более задач.Новые заголовки
parent_id
и << 1 >>> добавляют информацию об отношениях задачи с другими задачами.parent_id
является идентификатором задачи, которая вызвала эту задачуroot_id
- это первая задача в рабочем потоке.
Эти поля могут быть использованы для улучшения мониторов, таких как цветок, для группировки связанных сообщений вместе (например, цепочки, группы, аккорды, полные рабочие потоки и т.д.).
app.TaskProducer
заменяется наapp.amqp.create_task_message()
иapp.amqp.send_task_message()
.Разделение обязанностей на создание и отправку означает, что людям, которые хотят отправлять сообщения, используя AMQP-клиент Python напрямую, не нужно реализовывать протокол.
Метод
app.amqp.create_task_message()
вызывает либоapp.amqp.as_task_v2()
, либоapp.amqp.as_task_v1()
в зависимости от настроенного протокола задачи, и возвращает специальный кортежtask_message
, содержащий заголовки, свойства и тело сообщения задачи.
См.также
Новый протокол задач полностью задокументирован здесь: Версия 2.
Улучшение бассейна в Префорке¶
Задачи теперь регистрируются в дочернем процессе¶
Логирование успеха/неудачи задачи теперь происходит из дочернего процесса, выполняющего задачу. В результате утилиты протоколирования, такие как Sentry, могут получить полную информацию о задачах, включая переменные в стеке трассировки.
-Ofair
теперь является стратегией планирования по умолчанию¶
Чтобы снова включить поведение по умолчанию в версии 3.1, используйте опцию командной строки -Ofast
.
Было много путаницы относительно того, что делает опция командной строки -Ofair
, и использование термина «prefetch» в объяснениях, вероятно, не помогло, учитывая, насколько запутанной является эта терминология в AMQP.
Когда рабочий Celery, использующий пул prefork, получает задание, ему необходимо делегировать это задание дочернему процессу для выполнения.
Пул prefork имеет настраиваемое количество дочерних процессов (--concurrency
), которые могут быть использованы для выполнения задач, и каждый дочерний процесс использует трубы/сокеты для связи с родительским процессом:
inqueue (pipe/socket): родитель отправляет задание дочернему процессу
outqueue (pipe/socket): дочерняя программа отправляет результат/обратное значение родительской программе.
В Celery 3.1 механизм планирования по умолчанию просто посылал задачу первому inqueue
, который был доступен для записи, с некоторой эвристикой, чтобы убедиться, что мы выполняем круговое распределение между ними, чтобы каждый дочерний процесс получал одинаковое количество задач.
Это означает, что в стратегии планирования по умолчанию рабочий может отправлять задания тому же дочернему процессу, который уже выполняет задание. Если эта задача долго выполняется, она может надолго заблокировать ожидающую задачу. Что еще хуже, сотни коротко выполняющихся задач могут застрять за долго выполняющейся задачей, даже если есть дочерние процессы, свободные для выполнения работы.
Стратегия планирования -Ofair
была добавлена, чтобы избежать этой ситуации, и когда она включена, она добавляет правило, что никакая задача не должна быть отправлена в дочерний процесс, который уже выполняет задачу.
Стратегия справедливого планирования может работать немного хуже, если у вас есть только краткосрочные задачи.
Ограничение размера резидентной памяти дочернего процесса¶
Теперь вы можете ограничить максимальный объем памяти, выделяемой на один дочерний процесс пула prefork, установив параметр worker --max-memory-per-child
, или параметр worker_max_memory_per_child
.
Ограничение касается объема памяти RSS/резидента и указывается в килобайтах.
Дочерний процесс, превысивший лимит, будет завершен и заменен новым процессом после возвращения текущего выполняемого задания.
Дополнительную информацию см. в разделе Максимальное количество памяти на одну детскую установку.
Внесено Дэйвом Смитом.
Один лог-файл на каждый дочерний процесс¶
Init-scrips и celery multi теперь используют опцию формата файла журнала %I (например, /var/log/celery/%n%I.log
).
Это изменение было необходимо для обеспечения того, чтобы каждый дочерний процесс имел отдельный файл журнала после переноса регистрации задач в дочерний процесс, так как запись нескольких процессов в один и тот же файл журнала может привести к повреждению.
Вам рекомендуется обновить свои init-скрипты и аргументы celery multi, чтобы использовать эту новую опцию.
Перевозки¶
Поддержка приоритетной очереди RabbitMQ¶
Дополнительную информацию см. в разделе Приоритеты сообщений RabbitMQ.
Предоставлено Джеральдом Манипоном.
Настройте URL-адрес брокера отдельно для чтения/записи¶
Добавлены новые параметры broker_read_url
и << 1 >>>, чтобы можно было предоставлять отдельные URL брокера для соединений, используемых для потребления/публикации.
В дополнение к опциям конфигурации, в API приложения были добавлены два новых метода:
app.connection_for_read()
app.connection_for_write()
Теперь их следует использовать вместо app.connection()
для указания намерения требуемого соединения.
Примечание
Доступны два пула соединений: app.pool
(чтение) и app.producer_pool
(запись). Последний на самом деле дает не соединения, а полные экземпляры kombu.Producer
.
def publish_some_message(app, producer=None):
with app.producer_or_acquire(producer) as producer:
...
def consume_messages(app, connection=None):
with app.connection_or_acquire(connection) as connection:
...
Поддержка расширений очередей RabbitMQ¶
Объявления очередей теперь могут задавать TTL сообщения и время истечения очереди напрямую, используя аргументы message_ttl
и << 1 >>>.
В Queue
были добавлены новые аргументы, позволяющие напрямую и удобно конфигурировать расширения очередей RabbitMQ в объявлениях очередей:
Queue(expires=20.0)
Установка времени истечения очереди в плавающих секундах.
См.
kombu.Queue.expires
.Queue(message_ttl=30.0)
Установка времени жизни сообщения очереди в секундах в формате float.
См.
kombu.Queue.message_ttl
.Queue(max_length=1000)
Установите максимальную длину очереди (количество сообщений) как int.
См.
kombu.Queue.max_length
.Queue(max_length_bytes=1000)
Установите максимальную длину очереди (общий размер сообщения в байтах) как int.
См.
kombu.Queue.max_length_bytes
.Queue(max_priority=10)
Объявить очередь приоритетной очередью, которая направляет сообщения на основе поля
priority
сообщения.См.
kombu.Queue.max_priority
.
Транспорт Amazon SQS теперь официально поддерживается¶
Транспорт брокера SQS был переписан для использования асинхронного ввода-вывода и, таким образом, присоединился к RabbitMQ, Redis и QPid в качестве официально поддерживаемых транспортов.
Новая реализация также использует преимущества длинного опроса и устраняет несколько проблем, связанных с использованием SQS в качестве брокера.
Эта работа была спонсирована компанией Nextdoor.
Транспорт Apache QPid теперь официально поддерживается¶
Внесено Брайаном Баутерсом.
Redis: поддержка Sentinel¶
Вы можете направить соединение на список URL-адресов, например:
sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...
где каждый сентинел разделяется символом ;. Множественные сентинелы обрабатываются конструктором kombu.Connection
, и помещаются в альтернативный список серверов для подключения в случае обрыва соединения.
При участии Сергея Азовскова, и Лоренцо Манчини.
Задачи¶
Декоратор автоповтора задачи¶
Написание пользовательской обработки повторных попыток для событий исключений настолько распространено, что теперь мы имеем встроенную поддержку для этого.
Для этого в декораторах задач теперь поддерживается новый аргумент autoretry_for
, в котором можно указать кортеж исключений для автоматической повторной попытки:
from twitter.exceptions import FailWhaleError
@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
return twitter.refresh_timeline(user)
Дополнительную информацию см. в разделе Автоматические повторные попытки для известных исключений.
Внесено Дмитрием Малиновским.
Task.replace
Улучшения¶
self.replace(signature)
теперь может заменить любую задачу, аккорд или группу, а сигнатура, на которую производится замена, может быть аккордом, группой или любым другим типом сигнатуры.Больше не наследует callbacks и errbacks существующей задачи.
Если вы заменяете узел в дереве, то вы не ожидаете, что новый узел унаследует дочерние элементы старого узла.
Task.replace_in_chord
было удалено, вместо него используйте.replace
.Если замена является группой, эта группа будет автоматически преобразована в аккорд, в котором обратный вызов «накапливает» результаты групповых задач.
Новая встроенная задача (для этого была добавлена celery.accumulate)
При участии Стива Морина, и Аска Солема.
Удаленное выполнение заданий¶
Новый task_remote_tracebacks
сделает трассировку задач более полезной, внедряя стек удаленного рабочего.
Эта функция требует дополнительной библиотеки tblib.
Внесено Ионелом Кристианом Мэриешем.
Обработка ошибок подключения к задаче¶
Ошибки, связанные с соединением, возникающие при отправке задания, теперь повторно отображаются как ошибка kombu.exceptions.OperationalError
:
>>> try:
... add.delay(2, 2)
... except add.OperationalError as exc:
... print('Could not send task %r: %r' % (add, exc))
Дополнительную информацию см. в разделе Обработка ошибок подключения.
Gevent/Eventlet: Выделенный поток для получения результатов¶
При использовании gevent или eventlet за потребление событий теперь отвечает один поток.
Это означает, что если у вас много вызовов, получающих результаты, то для их получения будет выделен отдельный поток:
result = add.delay(2, 2)
# this call will delegate to the result consumer thread:
# once the consumer thread has received the result this greenlet can
# continue.
value = result.get(timeout=3)
Это делает выполнение вызовов RPC при использовании gevent/eventlet намного лучше.
AsyncResult.then(on_success, on_error)
¶
API AsyncResult был расширен для поддержки протокола promise
.
В настоящее время это работает только с бэкендами результатов RPC (amqp) и Redis, но позволяет подключать обратные вызовы для завершения задач:
import gevent.monkey
monkey.patch_all()
import time
from celery import Celery
app = Celery(broker='amqp://', backend='rpc')
@app.task
def add(x, y):
return x + y
def on_result_ready(result):
print('Received result for id %r: %r' % (result.id, result.result,))
add.delay(2, 2).then(on_result_ready)
time.sleep(3) # run gevent event loop for a while.
Здесь продемонстрировано использование gevent, но на самом деле этот API более полезен в циклах событий с обратным вызовом, таких как twisted, или tornado.
Новый API маршрутизатора задач¶
Параметр task_routes
теперь может содержать функции, а маршруты map теперь поддерживают шаблоны glob и regexes.
Вместо использования классов маршрутизаторов теперь можно просто определить функцию:
def route_for_task(name, args, kwargs, options, task=None, **kwargs):
from proj import tasks
if name == tasks.add.name:
return {'queue': 'hipri'}
Если вам не нужны аргументы, вы можете использовать аргументы start, только убедитесь, что вы всегда принимаете также аргументы star, чтобы у нас была возможность добавить больше функций в будущем:
def route_for_task(name, *args, **kwargs):
from proj import tasks
if name == tasks.add.name:
return {'queue': 'hipri', 'priority': 9}
И аргумент options
, и новый аргумент с ключевым словом task
являются новыми для маршрутизаторов в стиле функций, и облегчают написание маршрутизаторов на основе параметров выполнения или свойств задачи.
Необязательный аргумент ключевого слова task
не будет установлен, если задача вызывается по имени с помощью app.send_task()
.
Дополнительные примеры, включая использование glob/regexes в маршрутизаторах, смотрите в task_routes
и << 1 >>>.
Рефактор холста¶
Реализация canvas/work-flow была сильно доработана для устранения некоторых давно нерешенных проблем.
Обратные вызовы ошибок теперь могут принимать реальные экземпляры исключений и трассировки (проблема #2538).
>>> add.s(2, 2).on_error(log_error.s()).delay()
Где
log_error
может быть определено как:@app.task def log_error(request, exc, traceback): with open(os.path.join('/var/errors', request.id), 'a') as fh: print('--\n\n{0} {1} {2}'.format( task_id, exc, traceback), file=fh)
Дополнительные примеры см. в разделе Холст: Проектирование рабочих потоков.
chain(a, b, c)
теперь работает так же, какa | b | c
.Это означает, что chain больше не может возвращать экземпляр
chain
, вместо этого он может оптимизировать рабочий процесс таким образом, что, например, две группы, соединенные в цепочку, становятся одной группой.Теперь группы внутри групп разворачиваются в одну группу (выпуск #1509).
задачи chunks/map/starmap теперь маршрутизируются на основе целевой задачи
аккорды и цепочки теперь могут быть неизменяемыми.
Исправлена ошибка, при которой сериализованные подписи не преобразовывались обратно в подписи (проблема #2078)
Исправление внесено Россом Дином.
Исправлена проблема, при которой цепочки и группы не работали при использовании сериализации JSON (проблема #2076).
Исправление внесено Россом Дином.
Создание аккорда больше не приводит к множественным значениям для аргумента ключевого слова „task_id“ (проблема #2225).
Исправление внесено Анейлом Маллаварапу.
Исправлена проблема, при которой возвращался неверный результат, когда цепочка содержала аккорд в качестве предпоследней задачи.
Исправление внесено Анейлом Маллаварапу.
Специальный случай
group(A.s() | group(B.s() | C.s()))
теперь работает.Цепочка: Исправлена ошибка с неправильным набором id, когда подзадача также является цепочкой.
group | group
теперь сглажена в одну группу (выпуск #2573).Исправлена проблема, при которой
group | task
не обновлялся до аккорда (проблема #2922).Аккорды теперь правильно устанавливают ссылки
result.parent
.chunks
/map
/starmap
теперь маршрутизируются на основе целевой задачи.Signature.link
теперь работает, когда аргумент скалярный (не список)(Выпуск №2019).
group()
теперь правильно пересылает аргументы ключевых слов (проблема #3426).Исправление внесено Самуэлем Гиффардом.
chord
>, где группа заголовков состоит только из одной задачи, теперь превращается в простую цепочку.Передача аргумента
link
вgroup.apply_async()
теперь вызывает ошибку (проблема #3508).chord | sig
теперь присоединяется к обратному вызову аккорда (проблема #3356).
Периодические задачи¶
Новый API для настройки периодических задач¶
Этот новый API позволяет использовать подписи при определении периодических задач, устраняя вероятность неправильного ввода имен задач.
Примером нового API является here.
Оптимизированная реализация Beat¶
Реализация celery beat была оптимизирована для миллионов периодических задач путем использования кучи для планирования записей.
При участии Аска Солема и Александра Кошелева.
Планирование задач на основе восхода, заката, рассвета и сумерек¶
Дополнительную информацию см. в разделе Солнечные графики.
Внесено Марком Парнкаттом.
Бэкенды результатов¶
RPC Result Backend зрелый¶
Множество ошибок в ранее экспериментальном бэкенде результатов RPC были исправлены и теперь могут быть рассмотрены для производственного использования.
Вклад внесли Аск Солем, Моррис Твид.
Redis: Оптимизация бэкенда результатов¶
result.get()
теперь используется pub/sub для потоковой передачи результатов задач¶
Вызов result.get()
при использовании бэкенда результатов Redis раньше был очень дорогим, поскольку он использовал опрос для ожидания, пока результат станет доступен. Интервал опроса по умолчанию в 0,5 секунды не способствовал повышению производительности, но был необходим, чтобы избежать зацикливания.
Новая реализация использует механизмы Redis Pub/Sub для немедленной публикации и получения результатов, что значительно улучшает время выполнения задачи.
При участии Ярослава Жаворонкова и Аска Солема.
Новая оптимизированная реализация соединения аккордов¶
Это была экспериментальная функция, представленная в Celery 3.1, которую можно было включить, только добавив ?new_join=1
в конфигурацию URL бэкенда результата.
Мы считаем, что реализация была протестирована достаточно тщательно, чтобы считать ее стабильной и включенной по умолчанию.
Новая реализация значительно снижает накладные расходы на аккорды, и особенно при больших аккордах выигрыш в производительности может быть огромным.
Представлен новый бэкенд результатов Riak¶
Дополнительную информацию см. в разделе conf-riak-result-backend.
Вклад внесли Жиль Дартигуэлонг, Alman One и NoKriK.
Представлен новый бэкенд результатов CouchDB¶
Дополнительную информацию см. в разделе Настройки бэкенда CouchDB.
Внесено Натаном Ван Гимом.
Представлен новый бэкенд результатов Consul¶
Добавьте поддержку Consul в качестве бэкенда, используя хранилище ключей/значений Consul.
Consul имеет HTTP API, через который вы можете хранить ключи с их значениями.
Бэкенд расширяет KeyValueStoreBackend и реализует большинство методов.
В основном для установки, получения и удаления объектов.
Это позволяет Celery хранить результаты выполнения задачи в K/V хранилище Consul.
Consul также позволяет установить TTL для ключей, используя Sessions из Consul. Таким образом, бэкенд поддерживает автоматическое истечение срока действия результатов задач.
Для получения дополнительной информации о Консуле посетите сайт https://consul.io/.
Бэкенд использует python-consul для общения с HTTP API. Этот пакет полностью совместим с Python 3, как и этот бэкенд:
$ pip install python-consul
Это установит необходимый пакет для взаимодействия с HTTP API Consul из Python.
Вы также можете указать consul в качестве расширения в зависимости от Celery:
$ pip install celery[consul]
Дополнительную информацию см. в разделе Пакеты.
Внесено Видо ден Холландером.
Совершенно новый бэкенд результатов Cassandra¶
Совершенно новый бэкенд Cassandra, использующий новую библиотеку cassandra-driver, заменяет старый бэкенд результатов, использующий старую библиотеку pycassa.
Дополнительную информацию см. в разделе Настройки бэкенда Cassandra.
Для зависимости от Celery с использованием Cassandra в качестве бэкенда результатов:
$ pip install celery[cassandra]
Вы также можете объединить несколько требований к расширению, для получения дополнительной информации см. раздел Пакеты.
Представлен новый бэкенд результатов Elasticsearch¶
Дополнительную информацию см. в разделе Настройки бэкенда Elasticsearch.
Для зависимости от Celery с Elasticsearch в качестве результата используйте bakend:
$ pip install celery[elasticsearch]
Вы также можете объединить несколько требований к расширению, для получения дополнительной информации см. раздел Пакеты.
При участии Ахмета Демира.
Представлен новый бэкенд результатов файловой системы¶
Дополнительную информацию см. в разделе Настройки бэкенда файловой системы.
При участии Моше ван дер Стерре.
Пакетирование событий¶
Теперь события буферизируются в рабочем и отправляются в виде списка, что уменьшает накладные расходы, необходимые для отправки событий мониторинга.
Для авторов пользовательских мониторов событий не потребуется никаких действий, если вы используете помощники Python Celery (<< 0 >>>) для реализации вашего монитора.
Однако, если вы разбираете необработанные сообщения о событиях, вы должны учитывать пакетные сообщения о событиях, поскольку они отличаются от обычных сообщений о событиях следующим образом:
Ключ маршрутизации для пакета сообщений о событиях будет установлен в
<event-group>.multi
, если единственная группа событий, передаваемых в пакет, в настоящее времяtask
(что дает ключ маршрутизацииtask.multi
).Тело сообщения будет представлять собой сериализованный список словарей вместо словаря. Каждый элемент списка можно рассматривать как обычное тело сообщения о событии.
Другие новости…¶
Требования¶
Задачи¶
В настоящее время «anon-exchange» используется для простой прямой маршрутизации по имени.
Это увеличивает производительность, поскольку полностью обходит таблицу маршрутизации, а также повышает надежность транспорта брокера Redis.
Пустой ResultSet теперь оценивается как True.
Исправление внесено Колином Макинтошем.
Ключ маршрутизации по умолчанию (
task_default_routing_key
) и имя обмена (task_default_exchange
) теперь берутся из настройкиtask_default_queue
.Это означает, что для изменения имени очереди по умолчанию теперь нужно задать только один параметр.
Новая настройка
task_reject_on_worker_lost
и атрибут задачиreject_on_worker_lost
определяют, что произойдет, когда дочерний рабочий процесс, выполняющий задачу late ack, будет завершен.Внесено Майклом Пермана.
Task.subtask
переименовано вTask.signature
с псевдонимом.Task.subtask_from_request
переименовано вTask.signature_from_request
с псевдонимом.Атрибут
delivery_mode
дляkombu.Queue
теперь соблюдается (выпуск #1953).Маршруты в
task-routes
теперь могут напрямую указывать экземплярQueue
.Пример:
task_routes = {'proj.tasks.add': {'queue': Queue('add')}}
AsyncResult
теперь возвращаетValueError
, если task_id равен None. (Проблема #1996).Повторные задания не пересылали настройки истечения срока выполнения (проблема #3297).
result.get()
теперь поддерживает аргументon_message
для установки обратного вызова, который будет вызываться для каждого полученного сообщения.Добавлены новые абстрактные классы:
-
Похоже на задачу.
-
Похоже на подпись под заданием.
-
Task.replace
теперь правильно пересылает обратные вызовы (проблема #2722).Исправление внесено Nicolas Unravel.
Task.replace
: Добавление к цепочке/аккорду (закрывает #3232)Исправлена проблема #3232, добавляющая сигнатуру в цепочку (если она есть). Исправлено подавление аккордов, если заданная сигнатура содержит один.
Исправление внесено @honux.
Повторное выполнение задачи теперь выполняется и в режиме eager.
Исправление внесено Феанилом Пателем.
Побить¶
Исправлен бесконечный цикл crontab при неверной дате.
Если вхождение никогда не может быть достигнуто (например, 31 апреля), попытка достичь следующего вхождения вызовет бесконечный цикл.
Попробуйте исправить это, вызвав ошибку
RuntimeError
после 2 000 итераций(Также в процессе добавлена проверка на високосные годы в кронтабе)
Исправление внесено Ромуальдом Брюне.
Теперь обеспечивает выход программы с ненулевым кодом выхода, когда исключение завершает работу сервиса.
Исправление внесено Simon Peeters.
Приложение¶
Даты теперь всегда учитывают часовые пояса, даже если
enable_utc
отключено (выпуск #943).Исправление внесено Омером Кацем.
Конфигурация: Предварительная конфигурация приложения теперь также сохраняется вместе с конфигурацией.
Исправление внесено Джереми Зафраном.
- Теперь приложение может изменить способ создания имен задач, используя
метод
gen_task_name()
.Внесено Дмитрием Малиновским.
В приложении появилось новое свойство
app.current_worker_task
, которое возвращает задачу, над которой в данный момент ведется работа (илиNone
). (Проблема #2100).
Ведение журнала¶
get_task_logger()
теперь вызывает исключение при попытке использовать имя «celery» или «celery.task» (проблема #3475).
Пулы исполнения¶
Eventlet/Gevent: теперь включена функция AMQP heartbeat (выпуск #3338).
Эвентлет/Гевент: Исправлено состояние гонки, приводящее к ошибкам «одновременного чтения» (проблема #2755).
Prefork: Пул префорков теперь использует
poll
вместоselect
там, где это возможно (проблема #2373).Prefork: Исправлена ошибка, при которой пул отказывался выключать рабочего (проблема #2606).
Эвентлет: Теперь возвращает размер пула в команде celery inspect stats.
Внесено Александром Обловатным.
Тестирование¶
Celery теперь является плагином pytest, включающим фикстуры, полезные для модульного и интеграционного тестирования.
Более подробную информацию см. в testing user guide.
Перевозки¶
amqps://
теперь можно указать, что требуется SSL.Транспорт Redis: Транспорт Redis теперь поддерживает опцию
broker_use_ssl
.Внесено Робертом Колба.
Сериализатор JSON теперь вызывает
obj.__json__
для неподдерживаемых типов.Это означает, что теперь вы можете определить метод
__json__
для пользовательских типов, которые могут быть сведены к встроенному типу json.Пример:
class Person: first_name = None last_name = None address = None def __json__(self): return { 'first_name': self.first_name, 'last_name': self.last_name, 'address': self.address, }
Сериализатор JSON теперь работает с datetime, Django promise, UUID и Decimal.
Новый
Queue.consumer_arguments
может использоваться для возможности установки приоритета потребителя черезx-priority
.См. https://www.rabbitmq.com/consumer-priority.html
Пример:
consumer = Consumer(channel, consumer_arguments={'x-priority': 3})
Очередь/обмен: Добавлена опция
no_declare
(также включена для внутренних обменов amq.).
Программы¶
Все программы теперь отключают цвета, если управляющий терминал не является TTY.
celery worker: Аргумент
-q
теперь отключает баннер запуска.celery worker: Сообщение «рабочий готов» теперь регистрируется с серьезностью info, а не warn.
celery multi: Формат
%n
теперь является синонимом%N
, чтобы соответствовать формату celery worker.celery inspect/celery control: теперь поддерживается новая опция
--json
для вывода в формате json.celery inspect registered: теперь игнорирует встроенные задачи.
celery purge теперь принимает параметры
-Q
и-X
, используемые для указания того, какие очереди включать и исключать из очистки.Новое celery logtool: Утилита для фильтрации и разбора лог-файлов celery worker
celery multi: теперь проходит через форматы файлов журнала %i и %I.
Общие:
%p
теперь можно использовать для расширения до полного имени рабочего узла в аргументах log-file/pid-file.- Новый параметр командной строки
--executable
теперь доступен для демонизирующих программ (celery worker и celery beat).Предоставлено Бертом Вандербауведом.
celery worker: поддерживает новую опцию
--prefetch-multiplier
.Внесено Микаэлем Пенхардом.
Аргумент
--loader
теперь действует всегда, даже если задан аргумент app (проблема №3405).inspect/control теперь принимает команды из реестра
Это означает, что пользовательские команды дистанционного управления также могут быть использованы из командной строки.
Обратите внимание, что необходимо указать аргументы/и тип аргументов, чтобы аргументы были правильно переданы в командную строку.
Теперь есть два декоратора, использование которых зависит от типа команды: @inspect_command + @control_command:
from celery.worker.control import control_command @control_command( args=[('n', int)] signature='[N=1]', ) def something(state, n=1, **kwargs): ...
Здесь
args
- это список аргументов, поддерживаемых командой. Список должен содержать кортежи(argument_name, type)
.signature
- это просто подсказка командной строки, используемая, например, вcelery -A proj control --help
.Команды также поддерживают вариадные аргументы, что означает, что все оставшиеся аргументы будут добавлены к одной переменной. Здесь демонстрируется команда
terminate
, которая принимает аргумент signal и переменное количество task_ids:from celery.worker.control import control_command @control_command( args=[('signal', str)], signature='<signal> [id1, [id2, [..., [idN]]]]', variadic='ids', ) def terminate(state, signal, ids, **kwargs): ...
Теперь эту команду можно вызвать с помощью:
$ celery -A proj control terminate SIGKILL id1 id2 id3`
Дополнительную информацию см. в разделе Написание собственных команд дистанционного управления.
Рабочий¶
Улучшения и исправления для
LimitedSet
.Избавление от утечки памяти + добавление
minlen
размера набора: минимальный остаточный размер набора после работы в течение некоторого времени.minlen
элементы сохраняются, даже если они должны были быть исключены.Проблемы со старым и еще более старым кодом:
В некоторых сценариях (например, при многократном добавлении элемента) куча будет расти.
Быстрое добавление большого количества предметов не позволит очистить их достаточно быстро (если вообще возможно).
При общении с другими рабочими были отправлены revoked._data, но они были обработаны на другой стороне как iterable. Это означает присвоение этим ключам новой (текущей) метки времени. Таким образом, рабочие могли перерабатывать элементы вечно. В сочетании с 1) и 2) это означает, что при большом количестве рабочих скоро закончится память.
Теперь все эти проблемы должны быть устранены.
Это должно исправить проблемы #3095, #3086.
Внесено Дэвидом Правеком.
Новые настройки для управления очередями команд дистанционного управления.
-
Установите время истечения очереди как для очереди команд удаленного управления, так и для очереди ответов удаленного управления.
-
Установите время жизни сообщений для очередей команд удаленного управления и очередей ответов удаленного управления.
Внесено Аланом Джустино.
-
Сигнал
worker_shutdown
теперь всегда вызывается во время выключения.Ранее он не вызывался, если экземпляр worker сначала был собран gc.
Теперь Worker запускает потребителя команд удаленного управления только в том случае, если используемый транспорт брокера действительно поддерживает их.
Gossip теперь устанавливает
x-message-ttl
для очереди событий значение heartbeat_interval s. (Выпуск #2005).Теперь сохраняет код выхода (проблема #2024).
Теперь отклоняет сообщения с недопустимым значением ETA (вместо ack, что означает, что они будут отправлены на биржу мертвых букв, если таковая настроена).
Исправлено падение при использовании аргумента
-purge
.Уровень журнала для неустранимых ошибок изменен с
error
наcritical
.Улучшенная точность ограничения скорости.
Учет отсутствующей информации о часовом поясе в поле «Срок действия задачи».
Исправление внесено Альбертом Вангом.
- Рабочий больше не имеет
Queues
шагов загрузки, так как он теперь лишним.
- Рабочий больше не имеет
Теперь строка «Получено задание» выдается даже для отозванных заданий. (Проблема №3155).
Теперь соблюдается установка
broker_connection_retry
.Исправление внесено Нат Уильямс.
Новые настройки
control_queue_ttl
иcontrol_queue_expires
теперь позволяют настраивать TTL сообщений команд удаленного управления, а также время истечения очереди.Внесено Аланом Джустино.
Новое
celery.worker.state.requests
позволяет выполнять O(1) просмотр активных/резервных задач по идентификатору.Автомасштабирование не всегда обновляло keep-alive при уменьшении масштаба.
Исправление внесено Филипом Гарнеро.
Исправлена опечатка
options_list
->option_list
.Исправление внесено Грегом Уилбуром.
Некоторые аргументы командной строки worker и аргументы класса
Worker()
были переименованы для согласованности.Все они имеют псевдонимы для обратной совместимости.
--send-events
->--task-events
--schedule
->--schedule-filename
--maxtasksperchild
->--max-tasks-per-child
Beat(scheduler_cls=)
->Beat(scheduler=)
Worker(send_events=True)
->Worker(task_events=True)
Worker(task_time_limit=)
->Worker(time_limit=
)Worker(task_soft_time_limit=)
->Worker(soft_time_limit=)
Worker(state_db=)
->Worker(statedb=)
Worker(working_directory=)
->Worker(workdir=)
Отладочные утилиты¶
celery.contrib.rdb
: Изменен баннер удаленного отладчика, чтобы можно было легко скопировать и вставить адрес (больше нет точки в адресе).Внесено Джонатаном Ванаско.
Исправлена совместимость с последними версиями psutil (проблема #3262).
Сигналы¶
App: Новые сигналы для настройки/финализации приложений:
Задача: Новые сигналы задачи для отклоненных сообщений задачи:
celery.signals.task_rejected
.celery.signals.task_unknown
.
Рабочий: Новый сигнал для отправки события сердцебиения.
celery.signals.heartbeat_sent
Внесено Кевином Ричардсоном.
События¶
Сообщения о событиях теперь используют опцию RabbitMQ
x-message-ttl
для обеспечения отбрасывания старых сообщений о событиях.По умолчанию - 5 секунд, но его можно изменить с помощью параметра
event_queue_ttl
.Task.send_event
теперь автоматически повторяет отправку события при разрыве соединения, в соответствии с настройками повтора публикации задачи.Мониторы событий теперь по умолчанию устанавливают значение
event_queue_expires
.Очереди теперь будут заканчиваться через 60 секунд после того, как монитор перестанет их потреблять.
Исправлена ошибка, при которой значение None не обрабатывалось должным образом.
Исправление внесено Dongweiming.
Новая настройка
event_queue_prefix
теперь может быть использована для изменения префикса очереди по умолчаниюceleryev
для очередей приемников событий.Внесено Такеши Канемото.
State.tasks_by_type
иState.tasks_by_worker
теперь могут быть использованы в качестве отображения для быстрого доступа к этой информации.
Развертывание¶
Общие init-скрипты теперь поддерживают переменные окружения
CELERY_SU
иCELERYD_SU_ARGS
для установки пути и аргументов для su (su(1)).Общие init-скрипты теперь лучше поддерживают FreeBSD и другие BSD-системы, выполняя поиск конфигурационного файла
/usr/local/etc/
.Прислано Таха Джахангиром.
Общий init-скрипт: Исправлена странная ошибка для
celerybeat
, когда перезапуск не всегда срабатывал (проблема #3018).Сценарий systemd init теперь использует оболочку при выполнении служб.
Внесено Томасом Мачалеком.
Бэкенды результатов¶
Redis: Теперь таймаут сокета по умолчанию составляет 120 секунд.
Значение по умолчанию можно изменить с помощью новой настройки
redis_socket_timeout
.Внесено Рагурамом Шринивасаном.
Очереди результатов RPC Backend теперь автоматически удаляются по умолчанию (выпуск #2001).
Бэкенд RPC: Исправлена проблема, при которой исключение не десериализовывалось должным образом с помощью json-сериализатора (проблема #2518).
Исправление внесено Аллардом Хёве.
CouchDB: бэкенд, используемый для двойного json-кодирования результатов.
Исправление внесено Эндрю Стюартом.
CouchDB: Исправлена опечатка, из-за которой бэкенд не был найден (проблема #3287).
Исправление внесено Эндрю Стюартом.
MongoDB: Теперь поддерживается установка параметра
result_serialzier
в значениеbson
для использования собственного сериализатора библиотек MongoDB.При участии Давиде Кварта.
- MongoDB: работа с URI была улучшена для использования
имя базы данных, пользователя и пароль из URI, если они предоставлены.
Внесено Самуэлем Джаиллетом.
Бэкенд результатов SQLAlchemy: Теперь игнорирует все опции движка результатов при использовании NullPool (выпуск #1930).
Бэкенд результатов SQLAlchemy: Теперь устанавливает максимальный размер символа равным 155, чтобы справиться с поврежденной мозгом реализацией MySQL Unicode (проблема #1748).
Общие: Все исключения/предупреждения Celery теперь наследуются от common
CeleryError
/CeleryWarning
. (Проблема #2643).
Улучшение документации¶
Внесено
Адам Чейнз
Амир Рустамзаде
Артур Вюйяр
Батист Билер
Беркер Пексаг
Брайс Грофф
Дэниел Девайн
Эдвард Беттс
Джейсон Ветч
Джефф Видман
Мацей Обуховский
Мануэль Продавец
Максим Бошемин
Митчел Хамферис
Павел Капышин
Пьер Ферсинг
Рик
Стивен Скляр
Тайфун Сен
Виланд Хоффманн
Реорганизация, амортизация и удаления¶
Несовместимые изменения¶
Префорк: Вызов
result.get()
или присоединение любого результата внутри задачи теперь вызываетRuntimeError
.В предыдущих версиях это приводило к появлению предупреждения.
celery.worker.consumer
теперь является пакетом, а не модулем.Модуль
celery.worker.job
переименован вcelery.worker.request
.Бить:
Scheduler.Publisher
/<.publisher
переименован в.Producer
/.producer
.Результат: Аргумент/атрибут task_name
app.AsyncResult
был удален.Раньше это поле использовалось для совместимости с
pickle
, но теперь в нем нет необходимости.Бэкенды: Аргументы с именем
status
переименованы вstate
.Бэкенды:
backend.get_status()
переименован вbackend.get_state()
.Бэкенды:
backend.maybe_reraise()
переименован в.maybe_throw()
В API promise используется .throw(), поэтому это изменение было сделано, чтобы сделать его более последовательным.
Имеется псевдоним, поэтому вы можете использовать maybe_reraise до версии Celery 5.0.
Внеплановые переезды¶
Экспериментальная функция
celery.contrib.methods
была удалена, так как в ее реализации было слишком много ошибок, чтобы быть полезной.Инит-скрипты CentOS были удалены.
Они не добавляют никаких возможностей по сравнению с общими init-скриптами, поэтому рекомендуется использовать их или что-то вроде supervisor.
Реорганизация Амортизация¶
Эти символы были переименованы, и хотя в этой версии есть псевдоним для обратной совместимости, они будут удалены в Celery 5.0, поэтому убедитесь, что вы переименовали их как можно скорее, чтобы убедиться, что они не сломаются в этом релизе.
Есть шанс, что вы будете использовать только первый из этого списка, но никогда не знаешь:
celery.utils.worker_direct
->celery.utils.nodenames.worker_direct()
.celery.utils.nodename
->celery.utils.nodenames.nodename()
.celery.utils.anon_nodename
->celery.utils.nodenames.anon_nodename()
.celery.utils.nodesplit
->celery.utils.nodenames.nodesplit()
.celery.utils.default_nodename
->celery.utils.nodenames.default_nodename()
.celery.utils.node_format
->celery.utils.nodenames.node_format()
.celery.utils.host_format
->celery.utils.nodenames.host_format()
.
Плановые демонтажи¶
Модули¶
Модуль
celery.worker.job
был переименован вcelery.worker.request
.Это был внутренний модуль, поэтому он не должен был оказывать никакого влияния. Теперь он является частью общедоступного API, поэтому не должен больше меняться.
Модуль
celery.task.trace
был переименован вcelery.app.trace
, поскольку пакетcelery.task
постепенно закрывается. Модуль будет удален в версии 5.0, поэтому, пожалуйста, измените любой импорт из:from celery.task.trace import X
к:
from celery.app.trace import X
Старые псевдонимы совместимости в модуле
celery.loaders
были удалены.Удалено
celery.loaders.current_loader()
, используйте:current_app.loader
Удалено
celery.loaders.load_settings()
, используйте:current_app.conf
Результат¶
AsyncResult.serializable()
иcelery.result.from_serializable
.была удалена:
Используйте вместо этого:
>>> tup = result.as_tuple() >>> from celery.result import result_from_tuple >>> result = result_from_tuple(tup)
Удалено
BaseAsyncResult
, вместо этого используйтеAsyncResult
для проверки экземпляров.Удалено
TaskSetResult
, вместо него используйтеGroupResult
.TaskSetResult.total
->len(GroupResult)
TaskSetResult.taskset_id
->GroupResult.id
Удалено
ResultSet.subtasks
, вместо него используйтеResultSet.results
.
TaskSet¶
TaskSet был удален, так как в Celery 3.0 он был заменен конструкцией group
.
Если у вас есть код, подобный этому:
>>> from celery.task import TaskSet
>>> TaskSet(add.subtask((i, i)) for i in xrange(10)).apply_async()
Вам нужно заменить это на:
>>> from celery import group
>>> group(add.s(i, i) for i in xrange(10))()
События¶
Удаления для класса
celery.events.state.Worker
:Worker._defaults
атрибут.Используйте
{k: getattr(worker, k) for k in worker._fields}
.Worker.update_heartbeat
Используйте
Worker.event(None, timestamp, received)
Worker.on_online
Используйте
Worker.event('online', timestamp, received, fields)
Worker.on_offline
Используйте
Worker.event('offline', timestamp, received, fields)
Worker.on_heartbeat
Используйте
Worker.event('heartbeat', timestamp, received, fields)
Удаления для класса
celery.events.state.Task
:Task._defaults
атрибут.Используйте
{k: getattr(task, k) for k in task._fields}
.Task.on_sent
Используйте
Worker.event('sent', timestamp, received, fields)
Task.on_received
Используйте
Task.event('received', timestamp, received, fields)
Task.on_started
Используйте
Task.event('started', timestamp, received, fields)
Task.on_failed
Используйте
Task.event('failed', timestamp, received, fields)
Task.on_retried
Используйте
Task.event('retried', timestamp, received, fields)
Task.on_succeeded
Используйте
Task.event('succeeded', timestamp, received, fields)
Task.on_revoked
Используйте
Task.event('revoked', timestamp, received, fields)
Task.on_unknown_event
Используйте
Task.event(short_type, timestamp, received, fields)
Task.update
Используйте
Task.event(short_type, timestamp, received, fields)
Task.merge
Свяжитесь с нами, если вам это необходимо.
Магические аргументы ключевых слов¶
Поддержка очень старых магических ключевых слов-аргументов, принимаемых задачами, окончательно удалена в этой версии.
Если вы все еще используете их, вам придется переписать любую задачу, все еще использующую старый модуль celery.decorators
и зависящую от аргументов ключевых слов, передаваемых задаче, например:
from celery.decorators import task
@task()
def add(x, y, task_id=None):
print('My task id is %r' % (task_id,))
следует переписать в:
from celery import task
@task(bind=True)
def add(self, x, y):
print('My task id is {0.request.id}'.format(self))
Удаленные настройки¶
Следующие настройки были удалены и больше не поддерживаются:
Настройки ведения журнала¶
Имя установки |
Заменить на |
---|---|
|
|
|
|
|
|
|
|
|
celerymon устарел, используйте flower |
|
celerymon устарел, используйте flower |
|
celerymon устарел, используйте flower |
Настройки задачи¶
Имя установки |
Заменить на |
---|---|
|
Н/Д |
Изменения во внутреннем API¶
Модуль
celery.datastructures
переименован вcelery.utils.collections
.Модуль
celery.utils.timeutils
переименован вcelery.utils.time
.celery.utils.datastructures.DependencyGraph
переместилось вcelery.utils.graph
.celery.utils.jsonify
теперьcelery.utils.serialization.jsonify()
.celery.utils.strtobool
теперьcelery.utils.serialization.strtobool()
.celery.utils.is_iterable
был удален.Вместо этого используйте:
isinstance(x, collections.Iterable)
celery.utils.lpmerge
теперьcelery.utils.collections.lpmerge()
.celery.utils.cry
теперьcelery.utils.debug.cry()
.celery.utils.isatty
теперьcelery.platforms.isatty()
.celery.utils.gen_task_name
теперьcelery.utils.imports.gen_task_name()
.celery.utils.deprecated
теперьcelery.utils.deprecated.Callable()
celery.utils.deprecated_property
теперьcelery.utils.deprecated.Property()
.celery.utils.warn_deprecated
теперьcelery.utils.deprecated.warn()