celery.contrib.testing.mocks
¶
Справочник по API¶
Useful mocks for unit testing.
- celery.contrib.testing.mocks.ContextMock(*args, **kwargs)[исходный код]¶
Mock that mocks
with
statement contexts.
- celery.contrib.testing.mocks.TaskMessage(name: str, id: Optional[str] = None, args: Sequence = (), kwargs: Optional[Mapping] = None, callbacks: Optional[Sequence[Signature]] = None, errbacks: Optional[Sequence[Signature]] = None, chain: Optional[Sequence[Signature]] = None, shadow: Optional[str] = None, utc: Optional[bool] = None, **options: Any) Any [исходный код]¶
Create task message in protocol 2 format.
- celery.contrib.testing.mocks.TaskMessage1(name: str, id: Optional[str] = None, args: Sequence = (), kwargs: Optional[Mapping] = None, callbacks: Optional[Sequence[Signature]] = None, errbacks: Optional[Sequence[Signature]] = None, chain: Optional[Sequence[Signature]] = None, **options: Any) Any [исходный код]¶
Create task message in protocol 1 format.
- celery.contrib.testing.mocks.task_message_from_sig(app: ~celery.app.base.Celery, sig: ~celery.canvas.Signature, utc: bool = True, TaskMessage: ~typing.Any = <function TaskMessage>) Any [исходный код]¶
Create task message from
celery.Signature
.Пример
>>> m = task_message_from_sig(app, add.s(2, 2)) >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')