celery.events

Monitoring Event Receiver+Dispatcher.

Events is a stream of messages sent for certain actions occurring in the worker (and clients if task_send_sent_event is enabled), used for monitoring purposes.

celery.events.Event(type, _fields=None, __dict__=<class 'dict'>, __now__=<built-in function time>, **fields)[исходный код]

Create an event.

Заметки

An event is simply a dictionary: the only required field is type. A timestamp field will be set to the current time if not provided.

class celery.events.EventDispatcher(connection=None, hostname=None, enabled=True, channel=None, buffer_while_offline=True, app=None, serializer=None, groups=None, delivery_mode=1, buffer_group=None, buffer_limit=24, on_send_buffered=None)[исходный код]

Dispatches event messages.

Параметры:
  • connection (kombu.Connection) – Connection to the broker.

  • hostname (str) – Hostname to identify ourselves as, by default uses the hostname returned by anon_nodename().

  • groups (Sequence[str]) – List of groups to send events for. send() will ignore send requests to groups not in this list. If this is None, all events will be sent. Example groups include "task" and "worker".

  • enabled (bool) – Set to False to not actually publish any events, making send() a no-op.

  • channel (kombu.Channel) – Can be used instead of connection to specify an exact channel to use when sending events.

  • buffer_while_offline (bool) – If enabled events will be buffered while the connection is down. flush() must be called as soon as the connection is re-established.

Примечание

You need to close() this after use.

DISABLED_TRANSPORTS = {'sql'}
app = None
close()[исходный код]

Close the event dispatcher.

disable()[исходный код]
enable()[исходный код]
extend_buffer(other)[исходный код]

Copy the outbound buffer of another instance.

flush(errors=True, groups=True)[исходный код]

Flush the outbound buffer.

on_disabled = None
on_enabled = None
publish(type, fields, producer, blind=False, Event=<function Event>, **kwargs)[исходный код]

Publish event using custom Producer.

Параметры:
  • type (str) – Event type name, with group separated by dash (-). fields: Dictionary of event fields, must be json serializable.

  • producer (kombu.Producer) – Producer instance to use: only the publish method will be called.

  • retry (bool) – Retry in the event of connection failure.

  • retry_policy (Mapping) – Map of custom retry policy options. See ensure().

  • blind (bool) – Don’t set logical clock value (also don’t forward the internal logical clock).

  • Event (Callable) – Event type used to create event. Defaults to Event().

  • utcoffset (Callable) – Function returning the current utc offset in hours.

property publisher
send(type, blind=False, utcoffset=<function utcoffset>, retry=False, retry_policy=None, Event=<function Event>, **fields)[исходный код]

Send event.

Параметры:
  • type (str) – Event type name, with group separated by dash (-).

  • retry (bool) – Retry in the event of connection failure.

  • retry_policy (Mapping) – Map of custom retry policy options. See ensure().

  • blind (bool) – Don’t set logical clock value (also don’t forward the internal logical clock).

  • Event (Callable) – Event type used to create event, defaults to Event().

  • utcoffset (Callable) – unction returning the current utc offset in hours.

  • **fields (Any) – Event fields – must be json serializable.

class celery.events.EventReceiver(channel, handlers=None, routing_key='#', node_id=None, app=None, queue_prefix=None, accept=None, queue_ttl=None, queue_expires=None)[исходный код]

Capture events.

Параметры:
  • connection (kombu.Connection) – Connection to the broker.

  • handlers (Mapping[Callable]) – Event handlers. This is a map of event type names and their handlers. The special handler «*» captures all events that don’t have a handler.

app = None
capture(limit=None, timeout=None, wakeup=True)[исходный код]

Open up a consumer capturing events.

This has to run in the main process, and it will never stop unless EventDispatcher.should_stop is set to True, or forced via KeyboardInterrupt or SystemExit.

property connection
event_from_message(body, localize=True, now=<built-in function time>, tzfields=operator.itemgetter('utcoffset', 'timestamp'), adjust_timestamp=<function adjust_timestamp>, CLIENT_CLOCK_SKEW=-1)[исходный код]
get_consumers(Consumer, channel)[исходный код]
itercapture(limit=None, timeout=None, wakeup=True)[исходный код]
on_consume_ready(connection, channel, consumers, wakeup=True, **kwargs)[исходный код]
process(type, event)[исходный код]

Process event by dispatching to configured handler.

wakeup_workers(channel=None)[исходный код]
celery.events.get_exchange(conn, name='celeryev')[исходный код]

Get exchange used for sending events.

Параметры:
  • conn (kombu.Connection) – Connection used for sending/receiving events.

  • name (str) – Name of the exchange. Default is celeryev.

Примечание

The event type changes if Redis is used as the transport (from topic -> fanout).

celery.events.group_from(type)[исходный код]

Get the group part of an event type name.

Пример

>>> group_from('task-sent')
'task'
>>> group_from('custom-my-event')
'custom'
Back to Top