Руководство по мониторингу и управлению¶
Введение¶
Существует несколько инструментов для мониторинга и проверки кластеров Celery.
В этом документе описаны некоторые из них, а также функции, связанные с мониторингом, такие как события и широковещательные команды.
Рабочие¶
Утилиты командной строки управления (inspect
/control
)¶
celery также может использоваться для проверки и управления рабочими узлами (и, в некоторой степени, задачами).
Чтобы перечислить все доступные команды, выполните следующие действия:
$ celery --help
или получить помощь по конкретной команде:
$ celery <command> --help
Команды¶
оболочка: Переход в оболочку Python.
В локали будет включена переменная
celery
: это текущее приложение. Также все известные задачи будут автоматически добавлены в locals (если не установлен флаг--without-tasks
).Использует Ipython, bpython, или обычный python в таком порядке, если установлен. Вы можете принудительно использовать реализацию с помощью
--ipython
,--bpython
, или--python
.status: Список активных узлов в этом кластере
$ celery -A proj status
результат: Показать результат выполнения задания
$ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
Обратите внимание, что имя задачи можно опустить, если задача не использует пользовательский бэкенд результатов.
purge: Очистка сообщений из всех настроенных очередей задач.
Эта команда удалит все сообщения из очередей, настроенных в параметре
CELERY_QUEUES
:Предупреждение
Эта операция не отменяется, и сообщения будут удалены навсегда!
$ celery -A proj purge
Вы также можете указать очереди для очистки с помощью опции -Q:
$ celery -A proj purge -Q celery,foo,bar
и исключить очереди из очистки с помощью опции -X:
$ celery -A proj purge -X celery
инспекция активна: Список активных задач
$ celery -A proj inspect active
Это все задачи, которые выполняются в настоящее время.
осмотр запланирован: Список запланированных задач ETA
$ celery -A proj inspect scheduled
Это задания, зарезервированные рабочим, когда у них установлен аргумент eta или countdown.
проверить зарезервированное: Список зарезервированных задач
$ celery -A proj inspect reserved
Здесь будут перечислены все задания, которые были предварительно обработаны рабочим и в настоящее время ожидают выполнения (не включает задания с установленным значением ETA).
инспектировать отозванные: Список истории отозванных заданий
$ celery -A proj inspect revoked
проверить зарегистрированные: Список зарегистрированных задач
$ celery -A proj inspect registered
инспектировать статистику: Показать статистику по работникам (см. Статистика)
$ celery -A proj inspect stats
инспектировать запрос_задачи: Показать информацию о задаче(ях) по идентификатору.
Любой работник, имеющий задание в этом наборе идентификаторов зарезервировано/активно, будет отвечать статусом и информацией.
$ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8
Вы также можете запросить информацию о нескольких задачах:
$ celery -A proj inspect query_task id1 id2 ... idN
контроль enable_events: Включить события
$ celery -A proj control enable_events
контроль disable_events: Отключить события
$ celery -A proj control disable_events
migrate: Перенос задач с одного брокера на другой (ЭКСПЕРИМЕНТАЛЬНО).
$ celery -A proj migrate redis://localhost amqp://localhost
Эта команда перенесет все задачи на одном брокере на другой. Поскольку эта команда является новой и экспериментальной, вы должны убедиться в наличии резервной копии данных перед выполнением.
Примечание
Все команды inspect
и control
поддерживают аргумент --timeout
, Это количество секунд ожидания ответа. Возможно, вам придется увеличить этот таймаут, если вы не получаете ответа из-за задержки.
Указание узлов назначения¶
По умолчанию команды inspect и control действуют на всех рабочих. Вы можете указать одного или список рабочих, используя аргумент --destination
:
$ celery -A proj inspect -d w1@e.com,w2@e.com reserved
$ celery -A proj control -d w1@e.com,w2@e.com enable_events
Цветок: Веб-монитор Celery в реальном времени¶
Flower - это веб-инструмент для мониторинга и администрирования Celery в режиме реального времени. Он находится в стадии активной разработки, но уже является незаменимым инструментом. Будучи рекомендуемым монитором для Celery, он заменяет монитор Django-Admin, celerymon
и монитор на основе << 1 >>>.
Flower произносится как «поток», но вы можете использовать и ботанический вариант, если хотите.
Характеристики¶
Мониторинг в реальном времени с помощью Celery Events
Ход выполнения задачи и история
Возможность показать детали задачи (аргументы, время начала, время выполнения и многое другое)
Графики и статистика
Дистанционное управление
Просмотр статуса работника и статистики
Выключение и перезапуск рабочих экземпляров
Управление размером пула рабочих и настройками автомасштабирования
Просмотр и изменение очередей, которые потребляет рабочий экземпляр
Просмотр текущих запущенных задач
Просмотр запланированных задач (расчетное время прибытия/обратный отсчет)
Просмотр зарезервированных и отозванных заданий
Применять временные и тарифные ограничения
Средство просмотра конфигурации
Отмена или прекращение выполнения заданий
HTTP API
Списочные работники
Выключение рабочего
Перезапуск пула рабочих
Увеличение резерва работников
Сокращение резерва работников
Рабочий пул автомасштабирования
Начать потребление из очереди
Остановить потребление из очереди
Перечислить задачи
Перечислите (просмотренные) типы задач
Получить информацию о задаче
Выполнить задание
Выполнение задания по имени
Получить результат задачи
Изменение мягких и жестких временных ограничений для задачи
Изменение предела скорости для задания
Отмена задания
Аутентификация OpenID
Скриншоты
Больше скриншотов:
Использование¶
Вы можете использовать pip для установки Flower:
$ pip install flower
Выполнение команды flower запустит веб-сервер, который вы сможете посетить:
$ celery -A proj flower
По умолчанию используется порт http://localhost:5555, но вы можете изменить его с помощью аргумента –port:
$ celery -A proj flower --port=5555
URL брокера также может быть передан через аргумент --broker
:
$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0
Затем вы можете посетить цветок в своем веб-браузере:
$ open http://localhost:5555
Flower имеет гораздо больше возможностей, чем описано здесь, включая возможности авторизации. Ознакомьтесь с official documentation для получения дополнительной информации.
сельдерейные события: Монитор проклятий¶
Добавлено в версии 2.0.
celery events - это простой монитор на языке curses, отображающий историю задач и рабочих. Вы можете просмотреть результат и трассировку заданий, а также он поддерживает некоторые команды управления, такие как ограничение скорости и отключение рабочих. Этот монитор был создан в качестве пробного варианта, и вы, вероятно, захотите использовать вместо него Flower.
Начало:
$ celery -A proj events
Вы должны увидеть экран, похожий на:
celery events также используется для запуска камер моментальных снимков (см. Снимки:
$ celery -A proj events --camera=<camera-class> --frequency=1.0
и включает в себя инструмент для сброса событий в stdout
:
$ celery -A proj events --dump
Для получения полного списка опций используйте --help
:
$ celery events --help
RabbitMQ¶
Для управления кластером Celery важно знать, как можно контролировать RabbitMQ.
RabbitMQ поставляется с командой rabbitmqctl(1), с помощью которой можно перечислить очереди, обмены, привязки, длину очереди, использование памяти каждой очереди, а также управлять пользователями, виртуальными хостами и их правами.
Примечание
В этих примерах используется виртуальный хост по умолчанию ("/"
), если вы используете пользовательский виртуальный хост, вам необходимо добавить аргумент -p
к команде, например: rabbitmqctl list_queues -p my_vhost …
Осмотр очередей¶
Нахождение количества задач в очереди:
$ rabbitmqctl list_queues name messages messages_ready \
messages_unacknowledged
Здесь messages_ready - это количество сообщений, готовых к доставке (отправленных, но не полученных), messages_unacknowledged - это количество сообщений, которые были получены рабочим, но еще не подтверждены (это означает, что они находятся в процессе выполнения или были зарезервированы). messages - это сумма готовых и непризнанных сообщений.
Выяснение количества работников, потребляющих в данный момент из очереди:
$ rabbitmqctl list_queues name consumers
Определение объема памяти, выделенной для очереди:
$ rabbitmqctl list_queues name memory
- Совет:
Добавление опции
-q
к rabbitmqctl(1) делает вывод более легким для разбора.
Redis¶
Если вы используете Redis в качестве брокера, вы можете контролировать кластер Celery с помощью команды redis-cli(1) для получения списка длин очередей.
Осмотр очередей¶
Нахождение количества задач в очереди:
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Очередь по умолчанию называется celery. Чтобы получить все доступные очереди, вызовите:
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*
Примечание
Ключи очереди существуют только тогда, когда в них есть задания, поэтому если ключ не существует, это просто означает, что в этой очереди нет сообщений. Это происходит потому, что в Redis список, в котором нет элементов, автоматически удаляется, и поэтому он не отображается в выводе команды keys, а llen для такого списка возвращает 0.
Также, если вы используете Redis для других целей, вывод команды keys будет включать несвязанные значения, хранящиеся в базе данных. Рекомендуемый способ обойти это - использовать выделенный DATABASE_NUMBER для Celery, вы также можете использовать номера баз данных для отделения приложений Celery друг от друга (виртуальные хосты), но это не повлияет на события мониторинга, используемые, например, Flower, поскольку команды Redis pub/sub являются глобальными, а не основанными на базе данных.
Мунин¶
Это список известных плагинов Munin, которые могут быть полезны при обслуживании кластера Celery.
rabbitmq-munin
: Плагины Munin для RabbitMQ.celery_tasks
: Отслеживает количество раз выполнения каждого типа задач (требует celerymon).celery_tasks_states
: Отслеживает количество задач в каждом состоянии (требует celerymon).
События¶
Рабочий имеет возможность отправлять сообщение всякий раз, когда происходит какое-то событие. Эти события затем перехватываются такими инструментами, как Flower, и celery events для мониторинга кластера.
Снимки¶
Добавлено в версии 2.1.
Даже один рабочий может производить огромное количество событий, поэтому хранить историю всех событий на диске может быть очень дорого.
Последовательность событий описывает состояние кластера в данный период времени, делая периодические снимки этого состояния, вы можете сохранить всю историю, но при этом лишь периодически записывать ее на диск.
Для создания снимков вам нужен класс Camera, с помощью которого вы можете определить, что должно происходить при каждом захвате состояния; вы можете записать его в базу данных, отправить по электронной почте или что-то еще.
celery events затем используется для получения снимков с помощью камеры, например, если вы хотите фиксировать состояние каждые 2 секунды с помощью камеры myapp.Camera
вы запускаете celery events со следующими аргументами:
$ celery -A proj events -c myapp.Camera --frequency=2.0
Пользовательская камера¶
Камеры могут быть полезны, если вам нужно фиксировать события и делать что-то с этими событиями через определенный интервал времени. Для обработки событий в реальном времени следует использовать app.events.Receiver
напрямую, как в Обработка в режиме реального времени.
Вот пример камеры, сбрасывающей снимок на экран:
from pprint import pformat
from celery.events.snapshot import Polaroid
class DumpCam(Polaroid):
clear_after = True # clear after flush (incl, state.event_count).
def on_shutter(self, state):
if not state.event_count:
# No new events since last snapshot.
return
print('Workers: {0}'.format(pformat(state.workers, indent=4)))
print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
print('Total: {0.event_count} events, {0.task_count} tasks'.format(
state))
Подробнее об объектах состояния см. в справке API для celery.events.state
.
Теперь вы можете использовать этот кулачок с celery events, указав его с опцией -c
:
$ celery -A proj events -c myapp.DumpCam --frequency=2.0
Или вы можете использовать его программно следующим образом:
from celery import Celery
from myapp import DumpCam
def main(app, freq=1.0):
state = app.events.State()
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={'*': state.event})
with DumpCam(state, freq=freq):
recv.capture(limit=None, timeout=None)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
main(app)
Обработка в режиме реального времени¶
Для обработки событий в режиме реального времени вам необходимо следующее
Потребитель событий (это
Receiver
)Набор обработчиков, вызываемых при поступлении событий.
Вы можете иметь различные обработчики для каждого типа событий, или же можно использовать универсальный обработчик („*“).
Государство (необязательно)
app.events.State
- это удобное представление в памяти задач и работников в кластере, которое обновляется по мере поступления событий.В нем заключены решения для многих общих задач, таких как проверка того, жив ли работник (путем проверки сердцебиения), объединение полей событий по мере их поступления, обеспечение синхронизации временных меток и так далее.
Объединив их, вы сможете легко обрабатывать события в режиме реального времени:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
Примечание
Аргумент wakeup
в capture
посылает сигнал всем рабочим, чтобы заставить их отправить heartbeat. Таким образом, при запуске монитора вы можете сразу увидеть рабочих.
Вы можете прослушивать определенные события, указывая обработчики:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
Ссылка на событие¶
Этот список содержит события, отправленные рабочим, и их аргументы.
События задачи¶
отправлено задание¶
- подпись:
task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key, root_id, parent_id)
Отправляется, когда сообщение задачи опубликовано и включен параметр task_send_sent_event
.
полученное задание¶
- подпись:
task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp, root_id, parent_id)
Отправляется, когда работник получает задание.
запущенное задание¶
- подпись:
task-started(uuid, hostname, timestamp, pid)
Отправляется непосредственно перед выполнением задания работником.
задача выполнена¶
- подпись:
task-succeeded(uuid, result, runtime, hostname, timestamp)
Отправляется при успешном выполнении задания.
Время выполнения - это время, затраченное на выполнение задания с использованием пула (начиная с момента отправки задания в рабочий пул и заканчивая обратным вызовом обработчика результата пула).
не справился с заданием¶
- подпись:
task-failed(uuid, exception, traceback, hostname, timestamp)
Отправляется, если выполнение задания не удалось.
отклонено задание¶
- подпись:
task-rejected(uuid, requeued)
Задание было отклонено работником, возможно, для повторной постановки в очередь или перемещения в очередь мертвых букв.
отмененное задание¶
- подпись:
task-revoked(uuid, terminated, signum, expired)
Отправляется, если задание было отозвано (обратите внимание, что это, скорее всего, будет отправлено более чем одним работником).
terminated
устанавливается в true, если процесс задачи был завершен,а поле
signum
установлено на используемый сигнал.
expired
устанавливается в true, если срок выполнения задания истек.
задание-отказ¶
- подпись:
task-retried(uuid, exception, traceback, hostname, timestamp)
Отправляется, если задание не удалось, но будет повторно выполнено в будущем.
Рабочие мероприятия¶
рабочий-онлайн¶
- подпись:
worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
Работник подключился к брокеру и находится в сети.
hostname: Имя узла рабочего.
timestamp: Временная метка события.
freq: Частота ударов сердца в секундах (float).
sw_ident: Имя рабочего программного обеспечения (например,
py-celery
).sw_ver: Версия программного обеспечения (например, 2.2.0).
sw_sys: Операционная система (например, Linux/Darwin).
worker-heartbeat¶
- подпись:
worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)
Отправляется каждую минуту, если рабочий не отправил сердцебиение в течение 2 минут, он считается отключенным.
hostname: Имя узла рабочего.
timestamp: Временная метка события.
freq: Частота ударов сердца в секундах (float).
sw_ident: Имя рабочего программного обеспечения (например,
py-celery
).sw_ver: Версия программного обеспечения (например, 2.2.0).
sw_sys: Операционная система (например, Linux/Darwin).
active: Количество текущих выполняемых задач.
processed: Общее количество заданий, обработанных этим работником.
рабочий-офлайн¶
- подпись:
worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
Рабочий отсоединился от брокера.