celery.contrib.migrate
¶
Message migration tools (Broker <-> Broker).
- class celery.contrib.migrate.State[исходный код]¶
Migration progress state.
- count = 0¶
- filtered = 0¶
- property strtotal¶
- total_apx = 0¶
- exception celery.contrib.migrate.StopFiltering[исходный код]¶
Semi-predicate used to signal filter stop.
- celery.contrib.migrate.migrate_task(producer, body_, message, queues=None)[исходный код]¶
Migrate single task message.
- celery.contrib.migrate.migrate_tasks(source, dest, migrate=<function migrate_task>, app=None, queues=None, **kwargs)[исходный код]¶
Migrate tasks from one broker to another.
- celery.contrib.migrate.move(predicate, connection=None, exchange=None, routing_key=None, source=None, app=None, callback=None, limit=None, transform=None, **kwargs)[исходный код]¶
Find tasks by filtering them and move the tasks to a new queue.
- Параметры:
predicate (Callable) –
Filter function used to decide the messages to move. Must accept the standard signature of
(body, message)
used by Kombu consumer callbacks. If the predicate wants the message to be moved it must return either:a tuple of
(exchange, routing_key)
, ora
Queue
instance, or- any other true value means the specified
exchange
androuting_key
arguments will be used.
connection (kombu.Connection) – Custom connection to use.
source – List[Union[str, kombu.Queue]]: Optional list of source queues to use instead of the default (queues in
task_queues
). This list can also containQueue
instances.exchange (str, kombu.Exchange) – Default destination exchange.
routing_key (str) – Default destination routing key.
limit (int) – Limit number of messages to filter.
callback (Callable) – Callback called after message moved, with signature
(state, body, message)
.transform (Callable) – Optional function to transform the return value (destination) of the filter function.
Also supports the same keyword arguments as
start_filter()
.To demonstrate, the
move_task_by_id()
operation can be implemented like this:def is_wanted_task(body, message): if body['id'] == wanted_id: return Queue('foo', exchange=Exchange('foo'), routing_key='foo') move(is_wanted_task)
or with a transform:
def transform(value): if isinstance(value, str): return Queue(value, Exchange(value), value) return value move(is_wanted_task, transform=transform)
Примечание
The predicate may also return a tuple of
(exchange, routing_key)
to specify the destination to where the task should be moved, or aQueue
instance. Any other true value means that the task will be moved to the default exchange/routing_key.
- celery.contrib.migrate.move_by_idmap(map, **kwargs)[исходный код]¶
Move tasks by matching from a
task_id: queue
mapping.Where
queue
is a queue to move the task to.Пример
>>> move_by_idmap({ ... '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'), ... 'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'), ... '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')}, ... queues=['hipri'])
- celery.contrib.migrate.move_by_taskmap(map, **kwargs)[исходный код]¶
Move tasks by matching from a
task_name: queue
mapping.queue
is the queue to move the task to.Пример
>>> move_by_taskmap({ ... 'tasks.add': Queue('name'), ... 'tasks.mul': Queue('name'), ... })
- celery.contrib.migrate.move_direct(predicate, connection=None, exchange=None, routing_key=None, source=None, app=None, callback=None, limit=None, *, transform=<function worker_direct>, **kwargs)¶
Find tasks by filtering them and move the tasks to a new queue.
- Параметры:
predicate (Callable) –
Filter function used to decide the messages to move. Must accept the standard signature of
(body, message)
used by Kombu consumer callbacks. If the predicate wants the message to be moved it must return either:a tuple of
(exchange, routing_key)
, ora
Queue
instance, or- any other true value means the specified
exchange
androuting_key
arguments will be used.
connection (kombu.Connection) – Custom connection to use.
source – List[Union[str, kombu.Queue]]: Optional list of source queues to use instead of the default (queues in
task_queues
). This list can also containQueue
instances.exchange (str, kombu.Exchange) – Default destination exchange.
routing_key (str) – Default destination routing key.
limit (int) – Limit number of messages to filter.
callback (Callable) – Callback called after message moved, with signature
(state, body, message)
.transform (Callable) – Optional function to transform the return value (destination) of the filter function.
Also supports the same keyword arguments as
start_filter()
.To demonstrate, the
move_task_by_id()
operation can be implemented like this:def is_wanted_task(body, message): if body['id'] == wanted_id: return Queue('foo', exchange=Exchange('foo'), routing_key='foo') move(is_wanted_task)
or with a transform:
def transform(value): if isinstance(value, str): return Queue(value, Exchange(value), value) return value move(is_wanted_task, transform=transform)
Примечание
The predicate may also return a tuple of
(exchange, routing_key)
to specify the destination to where the task should be moved, or aQueue
instance. Any other true value means that the task will be moved to the default exchange/routing_key.
- celery.contrib.migrate.move_direct_by_id(task_id, dest, **kwargs)¶
Find a task by id and move it to another queue.
- celery.contrib.migrate.move_task_by_id(task_id, dest, **kwargs)[исходный код]¶
Find a task by id and move it to another queue.
- celery.contrib.migrate.republish(producer, message, exchange=None, routing_key=None, remove_props=None)[исходный код]¶
Republish message.
- celery.contrib.migrate.start_filter(app, conn, filter, limit=None, timeout=1.0, ack_messages=False, tasks=None, queues=None, callback=None, forever=False, on_declare_queue=None, consume_from=None, state=None, accept=None, **kwargs)[исходный код]¶
Filter tasks.
- celery.contrib.migrate.task_id_eq(task_id, body, message)[исходный код]¶
Return true if task id equals task_id“.
- celery.contrib.migrate.task_id_in(ids, body, message)[исходный код]¶
Return true if task id is member of set ids“.