celery.contrib.testing.manager

Справочник по API

Integration testing utilities.

class celery.contrib.testing.manager.Manager(app, **kwargs)[исходный код]

Test helpers for task integration tests.

class celery.contrib.testing.manager.ManagerMixin[исходный код]

Mixin that adds Manager capabilities.

assert_accepted(ids, interval=0.5, desc='waiting for tasks to be accepted', **policy)[исходный код]
assert_received(ids, interval=0.5, desc='waiting for tasks to be received', **policy)[исходный код]
assert_result_tasks_in_progress_or_completed(async_results, interval=0.5, desc='waiting for tasks to be started or completed', **policy)[исходный код]
assert_task_state_from_result(fun, results, interval=0.5, **policy)[исходный код]
assert_task_worker_state(fun, ids, interval=0.5, **policy)[исходный код]
ensure_not_for_a_while(fun, catch, desc='thing', max_retries=20, interval_start=0.1, interval_step=0.02, interval_max=1.0, emit_warning=False, **options)[исходный код]

Make sure something does not happen (at least for a while).

inspect(timeout=3.0)[исходный код]
is_accepted(ids, **kwargs)[исходный код]
is_received(ids, **kwargs)[исходный код]
static is_result_task_in_progress(results, **kwargs)[исходный код]
join(r, propagate=False, max_retries=10, **kwargs)[исходный код]
missing_results(r: Sequence[AsyncResult]) Sequence[str][исходный код]
query_task_states(ids, timeout=0.5)[исходный код]
query_tasks(ids, timeout=0.5)[исходный код]
remark(s: str, sep: str = '-') None[исходный код]
retry_over_time(*args, **kwargs)[исходный код]
true_or_raise(fun, *args, **kwargs)[исходный код]
wait_for(fun: Callable, catch: Sequence[Any], desc: str = 'thing', args: Tuple = (), kwargs: Optional[Dict] = None, errback: Optional[Callable] = None, max_retries: int = 10, interval_start: float = 0.1, interval_step: float = 0.5, interval_max: float = 5.0, emit_warning: bool = False, **options: Any) Any[исходный код]

Wait for event to happen.

The catch argument specifies the exception that means the event has not happened yet.

exception celery.contrib.testing.manager.Sentinel[исходный код]

Signifies the end of something.

Back to Top