celery.events
¶
Monitoring Event Receiver+Dispatcher.
Events is a stream of messages sent for certain actions occurring
in the worker (and clients if task_send_sent_event
is enabled), used for monitoring purposes.
- celery.events.Event(type, _fields=None, __dict__=<class 'dict'>, __now__=<built-in function time>, **fields)[исходный код]¶
Create an event.
Заметки
An event is simply a dictionary: the only required field is
type
. Atimestamp
field will be set to the current time if not provided.
- class celery.events.EventDispatcher(connection=None, hostname=None, enabled=True, channel=None, buffer_while_offline=True, app=None, serializer=None, groups=None, delivery_mode=1, buffer_group=None, buffer_limit=24, on_send_buffered=None)[исходный код]¶
Dispatches event messages.
- Параметры:
connection (kombu.Connection) – Connection to the broker.
hostname (str) – Hostname to identify ourselves as, by default uses the hostname returned by
anon_nodename()
.groups (Sequence[str]) – List of groups to send events for.
send()
will ignore send requests to groups not in this list. If this isNone
, all events will be sent. Example groups include"task"
and"worker"
.enabled (bool) – Set to
False
to not actually publish any events, makingsend()
a no-op.channel (kombu.Channel) – Can be used instead of connection to specify an exact channel to use when sending events.
buffer_while_offline (bool) – If enabled events will be buffered while the connection is down.
flush()
must be called as soon as the connection is re-established.
Примечание
You need to
close()
this after use.- DISABLED_TRANSPORTS = {'sql'}¶
- app = None¶
- close()[исходный код]¶
Close the event dispatcher.
- disable()[исходный код]¶
- enable()[исходный код]¶
- extend_buffer(other)[исходный код]¶
Copy the outbound buffer of another instance.
- flush(errors=True, groups=True)[исходный код]¶
Flush the outbound buffer.
- on_disabled = None¶
- on_enabled = None¶
- publish(type, fields, producer, blind=False, Event=<function Event>, **kwargs)[исходный код]¶
Publish event using custom
Producer
.- Параметры:
type (str) – Event type name, with group separated by dash (-). fields: Dictionary of event fields, must be json serializable.
producer (kombu.Producer) – Producer instance to use: only the
publish
method will be called.retry (bool) – Retry in the event of connection failure.
retry_policy (Mapping) – Map of custom retry policy options. See
ensure()
.blind (bool) – Don’t set logical clock value (also don’t forward the internal logical clock).
Event (Callable) – Event type used to create event. Defaults to
Event()
.utcoffset (Callable) – Function returning the current utc offset in hours.
- property publisher¶
- send(type, blind=False, utcoffset=<function utcoffset>, retry=False, retry_policy=None, Event=<function Event>, **fields)[исходный код]¶
Send event.
- Параметры:
type (str) – Event type name, with group separated by dash (-).
retry (bool) – Retry in the event of connection failure.
retry_policy (Mapping) – Map of custom retry policy options. See
ensure()
.blind (bool) – Don’t set logical clock value (also don’t forward the internal logical clock).
Event (Callable) – Event type used to create event, defaults to
Event()
.utcoffset (Callable) – unction returning the current utc offset in hours.
**fields (Any) – Event fields – must be json serializable.
- class celery.events.EventReceiver(channel, handlers=None, routing_key='#', node_id=None, app=None, queue_prefix=None, accept=None, queue_ttl=None, queue_expires=None)[исходный код]¶
Capture events.
- Параметры:
connection (kombu.Connection) – Connection to the broker.
handlers (Mapping[Callable]) – Event handlers. This is a map of event type names and their handlers. The special handler «*» captures all events that don’t have a handler.
- app = None¶
- capture(limit=None, timeout=None, wakeup=True)[исходный код]¶
Open up a consumer capturing events.
This has to run in the main process, and it will never stop unless
EventDispatcher.should_stop
is set to True, or forced viaKeyboardInterrupt
orSystemExit
.
- property connection¶
- event_from_message(body, localize=True, now=<built-in function time>, tzfields=operator.itemgetter('utcoffset', 'timestamp'), adjust_timestamp=<function adjust_timestamp>, CLIENT_CLOCK_SKEW=-1)[исходный код]¶
- get_consumers(Consumer, channel)[исходный код]¶
- itercapture(limit=None, timeout=None, wakeup=True)[исходный код]¶
- on_consume_ready(connection, channel, consumers, wakeup=True, **kwargs)[исходный код]¶
- process(type, event)[исходный код]¶
Process event by dispatching to configured handler.
- wakeup_workers(channel=None)[исходный код]¶
- celery.events.get_exchange(conn, name='celeryev')[исходный код]¶
Get exchange used for sending events.
- Параметры:
conn (kombu.Connection) – Connection used for sending/receiving events.
name (str) – Name of the exchange. Default is
celeryev
.
Примечание
The event type changes if Redis is used as the transport (from topic -> fanout).
- celery.events.group_from(type)[исходный код]¶
Get the group part of an event type name.
Пример
>>> group_from('task-sent') 'task'
>>> group_from('custom-my-event') 'custom'