celery.app.control
¶
Worker Remote Control Client.
Client for worker remote control commands.
Server implementation is in celery.worker.control
.
There are two types of remote control commands:
Inspect commands: Does not have side effects, will usually just return some value found in the worker, like the list of currently registered tasks, the list of active tasks, etc. Commands are accessible via
Inspect
class.Control commands: Performs side effects, like adding a new queue to consume from. Commands are accessible via
Control
class.
- class celery.app.control.Control(app=None)[исходный код]¶
Worker remote control client.
- class Mailbox(namespace, type='direct', connection=None, clock=None, accept=None, serializer=None, producer_pool=None, queue_ttl=None, queue_expires=None, reply_queue_ttl=None, reply_queue_expires=10.0)¶
Process Mailbox.
- Node(hostname=None, state=None, channel=None, handlers=None)¶
- abcast(command, kwargs=None)¶
- accept = ['json']¶
Only accepts json messages by default.
- call(destination, command, kwargs=None, timeout=None, callback=None, channel=None)¶
- cast(destination, command, kwargs=None)¶
- connection = None¶
Connection (if bound).
- exchange = None¶
mailbox exchange (init by constructor).
- exchange_fmt = '%s.pidbox'¶
- get_queue(hostname)¶
- get_reply_queue()¶
- multi_call(command, kwargs=None, timeout=1, limit=None, callback=None, channel=None)¶
- namespace = None¶
Name of application.
- node_cls¶
alias of
Node
- property oid¶
- producer_or_acquire(producer=None, channel=None)¶
- property producer_pool¶
- reply_exchange = None¶
exchange to send replies to.
- reply_exchange_fmt = 'reply.%s.pidbox'¶
- property reply_queue¶
- serializer = None¶
Message serializer
- type = 'direct'¶
Exchange type (usually direct, or fanout for broadcast).
- add_consumer(queue, exchange=None, exchange_type='direct', routing_key=None, options=None, destination=None, **kwargs)[исходный код]¶
Tell all (or specific) workers to start consuming from a new queue.
Only the queue name is required as if only the queue is specified then the exchange/routing key will be set to the same name ( like automatic queues do).
Примечание
This command does not respect the default queue/exchange options in the configuration.
- Параметры:
queue (str) – Name of queue to start consuming from.
exchange (str) – Optional name of exchange.
exchange_type (str) – Type of exchange (defaults to „direct“) command to, when empty broadcast to all workers.
routing_key (str) – Optional routing key.
options (Dict) – Additional options as supported by
kombu.entity.Queue.from_dict()
.
См.также
broadcast()
for supported keyword arguments.
- autoscale(max, min, destination=None, **kwargs)[исходный код]¶
Change worker(s) autoscale setting.
См.также
Supports the same arguments as
broadcast()
.
- broadcast(command, arguments=None, destination=None, connection=None, reply=False, timeout=1.0, limit=None, callback=None, channel=None, pattern=None, matcher=None, **extra_kwargs)[исходный код]¶
Broadcast a control command to the celery workers.
- Параметры:
command (str) – Name of command to send.
arguments (Dict) – Keyword arguments for the command.
destination (List) – If set, a list of the hosts to send the command to, when empty broadcast to all workers.
connection (kombu.Connection) – Custom broker connection to use, if not set, a connection will be acquired from the pool.
reply (bool) – Wait for and return the reply.
timeout (float) – Timeout in seconds to wait for the reply.
limit (int) – Limit number of replies.
callback (Callable) – Callback called immediately for each reply received.
pattern (str) – Custom pattern string to match
matcher (Callable) – Custom matcher to run the pattern to match
- cancel_consumer(queue, destination=None, **kwargs)[исходный код]¶
Tell all (or specific) workers to stop consuming from
queue
.См.также
Supports the same arguments as
broadcast()
.
- disable_events(destination=None, **kwargs)[исходный код]¶
Tell all (or specific) workers to disable events.
См.также
Supports the same arguments as
broadcast()
.
- discard_all(connection=None)¶
Discard all waiting tasks.
This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.
- Параметры:
connection (kombu.Connection) – Optional specific connection instance to use. If not provided a connection will be acquired from the connection pool.
- Результат:
the number of tasks discarded.
- Тип результата:
- election(id, topic, action=None, connection=None)[исходный код]¶
- enable_events(destination=None, **kwargs)[исходный код]¶
Tell all (or specific) workers to enable events.
См.также
Supports the same arguments as
broadcast()
.
- heartbeat(destination=None, **kwargs)[исходный код]¶
Tell worker(s) to send a heartbeat immediately.
См.также
Supports the same arguments as
broadcast()
- ping(destination=None, timeout=1.0, **kwargs)[исходный код]¶
Ping all (or specific) workers.
>>> app.control.ping() [{'celery@node1': {'ok': 'pong'}}, {'celery@node2': {'ok': 'pong'}}] >>> app.control.ping(destination=['celery@node2']) [{'celery@node2': {'ok': 'pong'}}]
- Результат:
List of
{HOSTNAME: {'ok': 'pong'}}
dictionaries.- Тип результата:
List[Dict]
См.также
broadcast()
for supported keyword arguments.
- pool_grow(n=1, destination=None, **kwargs)[исходный код]¶
Tell all (or specific) workers to grow the pool by
n
.См.также
Supports the same arguments as
broadcast()
.
- pool_restart(modules=None, reload=False, reloader=None, destination=None, **kwargs)[исходный код]¶
Restart the execution pools of all or specific workers.
- Именованные аргументы:
См.также
Supports the same arguments as
broadcast()
- pool_shrink(n=1, destination=None, **kwargs)[исходный код]¶
Tell all (or specific) workers to shrink the pool by
n
.См.также
Supports the same arguments as
broadcast()
.
- purge(connection=None)[исходный код]¶
Discard all waiting tasks.
This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.
- Параметры:
connection (kombu.Connection) – Optional specific connection instance to use. If not provided a connection will be acquired from the connection pool.
- Результат:
the number of tasks discarded.
- Тип результата:
- rate_limit(task_name, rate_limit, destination=None, **kwargs)[исходный код]¶
Tell workers to set a new rate limit for task by type.
- Параметры:
task_name (str) – Name of task to change rate limit for.
rate_limit (int, str) – The rate limit as tasks per second, or a rate limit string („100/m“, etc. see
celery.app.task.Task.rate_limit
for more information).
См.также
broadcast()
for supported keyword arguments.
- revoke(task_id, destination=None, terminate=False, signal='SIGTERM', **kwargs)[исходный код]¶
Tell all (or specific) workers to revoke a task by id (or list of ids).
If a task is revoked, the workers will ignore the task and not execute it after all.
- Параметры:
См.также
broadcast()
for supported keyword arguments.
- revoke_by_stamped_headers(headers, destination=None, terminate=False, signal='SIGTERM', **kwargs)[исходный код]¶
Tell all (or specific) workers to revoke a task by headers.
If a task is revoked, the workers will ignore the task and not execute it after all.
- Параметры:
См.также
broadcast()
for supported keyword arguments.
- shutdown(destination=None, **kwargs)[исходный код]¶
Shutdown worker(s).
См.также
Supports the same arguments as
broadcast()
- terminate(task_id, destination=None, signal='SIGTERM', **kwargs)[исходный код]¶
Tell all (or specific) workers to terminate a task by id (or list of ids).
См.также
This is just a shortcut to
revoke()
with the terminate argument enabled.
- time_limit(task_name, soft=None, hard=None, destination=None, **kwargs)[исходный код]¶
Tell workers to set time limits for a task by type.
- Параметры:
task_name (str) – Name of task to change time limits for.
soft (float) – New soft time limit (in seconds).
hard (float) – New hard time limit (in seconds).
**kwargs (Any) – arguments passed on to
broadcast()
.
- class celery.app.control.Inspect(destination=None, timeout=1.0, callback=None, connection=None, app=None, limit=None, pattern=None, matcher=None)[исходный код]¶
API for inspecting workers.
This class provides proxy for accessing Inspect API of workers. The API is defined in
celery.worker.control
- active(safe=None)[исходный код]¶
Return list of tasks currently executed by workers.
- Параметры:
safe (Boolean) – Set to True to disable deserialization.
- Результат:
Dictionary
{HOSTNAME: [TASK_INFO,...]}
.- Тип результата:
Dict
См.также
For
TASK_INFO
details seequery_task()
return value.
- active_queues()[исходный код]¶
Return information about queues from which worker consumes tasks.
- Результат:
Dictionary
{HOSTNAME: [QUEUE_INFO, QUEUE_INFO,...]}
.- Тип результата:
Dict
Here is the list of
QUEUE_INFO
fields:name
exchange
name
type
arguments
durable
passive
auto_delete
delivery_mode
no_declare
routing_key
queue_arguments
binding_arguments
consumer_arguments
durable
exclusive
auto_delete
no_ack
alias
bindings
no_declare
expires
message_ttl
max_length
max_length_bytes
max_priority
См.также
See the RabbitMQ/AMQP documentation for more details about
queue_info
fields.Примечание
The
queue_info
fields are RabbitMQ/AMQP oriented. Not all fields applies for other transports.
- app = None¶
- clock()[исходный код]¶
Get the Clock value on workers.
>>> app.control.inspect().clock() {'celery@node1': {'clock': 12}}
- Результат:
Dictionary
{HOSTNAME: CLOCK_VALUE}
.- Тип результата:
Dict
- conf(with_defaults=False)[исходный код]¶
Return configuration of each worker.
- Параметры:
with_defaults (bool) – if set to True, method returns also configuration options with default values.
- Результат:
Dictionary
{HOSTNAME: WORKER_CONFIGURATION}
.- Тип результата:
Dict
См.также
WORKER_CONFIGURATION
is a dictionary containing current configuration options. See Конфигурация и настройки по умолчанию for possible values.
- hello(from_node, revoked=None)[исходный код]¶
- memdump(samples=10)[исходный код]¶
Dump statistics of previous memsample requests.
Примечание
Requires the psutils library.
- memsample()[исходный код]¶
Return sample current RSS memory usage.
Примечание
Requires the psutils library.
- objgraph(type='Request', n=200, max_depth=10)[исходный код]¶
Create graph of uncollected objects (memory-leak debugging).
- Параметры:
- Результат:
Dictionary
{'filename': FILENAME}
- Тип результата:
Dict
Примечание
Requires the objgraph library.
- ping(destination=None)[исходный код]¶
Ping all (or specific) workers.
>>> app.control.inspect().ping() {'celery@node1': {'ok': 'pong'}, 'celery@node2': {'ok': 'pong'}} >>> app.control.inspect().ping(destination=['celery@node1']) {'celery@node1': {'ok': 'pong'}}
- Параметры:
destination (List) – If set, a list of the hosts to send the command to, when empty broadcast to all workers.
- Результат:
Dictionary
{HOSTNAME: {'ok': 'pong'}}
.- Тип результата:
Dict
См.также
broadcast()
for supported keyword arguments.
- query_task(*ids)[исходный код]¶
Return detail of tasks currently executed by workers.
- Параметры:
*ids (str) – IDs of tasks to be queried.
- Результат:
Dictionary
{HOSTNAME: {TASK_ID: [STATE, TASK_INFO]}}
.- Тип результата:
Dict
- Here is the list of
TASK_INFO
fields: id
- ID of the taskname
- Name of the taskargs
- Positinal arguments passed to the taskkwargs
- Keyword arguments passed to the tasktype
- Type of the taskhostname
- Hostname of the worker processing the tasktime_start
- Time of processing startacknowledged
- True when task was acknowledged to brokerdelivery_info
- Dictionary containing delivery informationexchange
- Name of exchange where task was publishedrouting_key
- Routing key used when task was publishedpriority
- Priority used when task was publishedredelivered
- True if the task was redelivered
worker_pid
- PID of worker processin the task
- registered(*taskinfoitems)[исходный код]¶
Return all registered tasks per worker.
>>> app.control.inspect().registered() {'celery@node1': ['task1', 'task1']} >>> app.control.inspect().registered('serializer', 'max_retries') {'celery@node1': ['task_foo [serializer=json max_retries=3]', 'tasb_bar [serializer=json max_retries=3]']}
- registered_tasks(*taskinfoitems)¶
Return all registered tasks per worker.
>>> app.control.inspect().registered() {'celery@node1': ['task1', 'task1']} >>> app.control.inspect().registered('serializer', 'max_retries') {'celery@node1': ['task_foo [serializer=json max_retries=3]', 'tasb_bar [serializer=json max_retries=3]']}
- report()[исходный код]¶
Return human readable report for each worker.
- Результат:
Dictionary
{HOSTNAME: {'ok': REPORT_STRING}}
.- Тип результата:
Dict
- reserved(safe=None)[исходный код]¶
Return list of currently reserved tasks, not including scheduled/active.
- Результат:
Dictionary
{HOSTNAME: [TASK_INFO,...]}
.- Тип результата:
Dict
См.также
For
TASK_INFO
details seequery_task()
return value.
- revoked()[исходный код]¶
Return list of revoked tasks.
>>> app.control.inspect().revoked() {'celery@node1': ['16f527de-1c72-47a6-b477-c472b92fef7a']}
- Результат:
Dictionary
{HOSTNAME: [TASK_ID, ...]}
.- Тип результата:
Dict
- scheduled(safe=None)[исходный код]¶
Return list of scheduled tasks with details.
- Результат:
Dictionary
{HOSTNAME: [TASK_SCHEDULED_INFO,...]}
.- Тип результата:
Dict
Here is the list of
TASK_SCHEDULED_INFO
fields:eta
- scheduled time for task execution as string in ISO 8601 formatpriority
- priority of the taskrequest
- field containingTASK_INFO
value.
См.также
For more details about
TASK_INFO
seequery_task()
return value.
- stats()[исходный код]¶
Return statistics of worker.
- Результат:
Dictionary
{HOSTNAME: STAT_INFO}
.- Тип результата:
Dict
Here is the list of
STAT_INFO
fields:broker
- Section for broker information.connect_timeout
- Timeout in seconds (int/float) for establishing a new connection.heartbeat
- Current heartbeat value (set by client).hostname
- Node name of the remote broker.insist
- No longer used.login_method
- Login method used to connect to the broker.port
- Port of the remote broker.ssl
- SSL enabled/disabled.transport
- Name of transport used (e.g., amqp or redis)transport_options
- Options passed to transport.uri_prefix
- Some transports expects the host name to be a URL. E.g.redis+socket:///tmp/redis.sock
. In this example the URI-prefix will be redis.userid
- User id used to connect to the broker with.virtual_host
- Virtual host used.
clock
- Value of the workers logical clock. This is a positive integer and should be increasing every time you receive statistics.uptime
- Numbers of seconds since the worker controller was startedpid
- Process id of the worker instance (Main process).pool
- Pool-specific section.max-concurrency
- Max number of processes/threads/green threads.max-tasks-per-child
- Max number of tasks a thread may execute before being recycled.processes
- List of PIDs (or thread-id’s).put-guarded-by-semaphore
- Internaltimeouts
- Default values for time limits.writes
- Specific to the prefork pool, this shows the distribution of writes to each process in the pool when using async I/O.
prefetch_count
- Current prefetch count value for the task consumer.rusage
- System usage statistics. The fields available may be different on your platform. From getrusage(2):stime
- Time spent in operating system code on behalf of this process.utime
- Time spent executing user instructions.maxrss
- The maximum resident size used by this process (in kilobytes).idrss
- Amount of non-shared memory used for data (in kilobytes times ticks of execution)isrss
- Amount of non-shared memory used for stack space (in kilobytes times ticks of execution)ixrss
- Amount of memory shared with other processes (in kilobytes times ticks of execution).inblock
- Number of times the file system had to read from the disk on behalf of this process.oublock
- Number of times the file system has to write to disk on behalf of this process.majflt
- Number of page faults that were serviced by doing I/O.minflt
- Number of page faults that were serviced without doing I/O.msgrcv
- Number of IPC messages received.msgsnd
- Number of IPC messages sent.nvcsw
- Number of times this process voluntarily invoked a context switch.nivcsw
- Number of times an involuntary context switch took place.nsignals
- Number of signals received.nswap
- The number of times this process was swapped entirely out of memory.
total
- Map of task names and the total number of tasks with that type the worker has accepted since start-up.
- celery.app.control.flatten_reply(reply)[исходный код]¶
Flatten node replies.
Convert from a list of replies in this format:
[{'a@example.com': reply}, {'b@example.com': reply}]
into this format:
{'a@example.com': reply, 'b@example.com': reply}