Холст: Проектирование рабочих потоков¶
Подписи¶
Добавлено в версии 2.0.
Вы только что узнали, как вызвать задачу с помощью метода tasks delay
в руководстве calling, и часто это все, что вам нужно, но иногда вы можете захотеть передать сигнатуру вызова задачи другому процессу или в качестве аргумента другой функции.
signature()
оборачивает аргументы, аргументы ключевых слов и параметры выполнения одного вызова задачи таким образом, что его можно передавать функциям или даже сериализовать и передавать по проводам.
Вы можете создать сигнатуру для задачи
add
, используя ее имя следующим образом:>>> from celery import signature >>> signature('tasks.add', args=(2, 2), countdown=10) tasks.add(2, 2)
Эта задача имеет сигнатуру четности 2 (два аргумента):
(2, 2)
, и устанавливает параметр выполнения обратного отсчета на 10.или вы можете создать его, используя метод задачи
signature
:>>> add.signature((2, 2), countdown=10) tasks.add(2, 2)
Существует также короткий путь с использованием аргументов звезды:
>>> add.s(2, 2) tasks.add(2, 2)
Также поддерживаются аргументы с ключевыми словами:
>>> add.s(2, 2, debug=True) tasks.add(2, 2, debug=True)
Из любого экземпляра подписи можно просмотреть различные поля:
>>> s = add.signature((2, 2), {'debug': True}, countdown=10) >>> s.args (2, 2) >>> s.kwargs {'debug': True} >>> s.options {'countdown': 10}
Он поддерживает «API вызова»
delay
,apply_async
и т.д., включая вызов напрямую (__call__
).Вызов сигнатуры выполнит задание inline в текущем процессе:
>>> add(2, 2) 4 >>> add.s(2, 2)() 4
delay
- это наше любимое сокращение доapply_async
, принимающее звездные аргументы:>>> result = add.delay(2, 2) >>> result.get() 4
apply_async
принимает те же аргументы, что и методapp.Task.apply_async()
:>>> add.apply_async(args, kwargs, **options) >>> add.signature(args, kwargs, **options).apply_async() >>> add.apply_async((2, 2), countdown=1) >>> add.signature((2, 2), countdown=1).apply_async()
Вы не можете определить опции с помощью
s()
, но цепной вызовset
позаботится об этом:>>> add.s(2, 2).set(countdown=1) proj.tasks.add(2, 2)
Частицы¶
С помощью подписи вы можете выполнить задание в рабочем:
>>> add.s(2, 2).delay()
>>> add.s(2, 2).apply_async(countdown=1)
Или вы можете вызвать его непосредственно в текущем процессе:
>>> add.s(2, 2)()
4
Указание дополнительных args, kwargs или опций к apply_async
/delay
создает партиции:
Любые добавленные аргументы будут добавлены к аргументам в сигнатуре:
>>> partial = add.s(2) # incomplete signature >>> partial.delay(4) # 4 + 2 >>> partial.apply_async((4,)) # same
Любые добавленные аргументы ключевых слов будут объединены с kwargs в сигнатуре, при этом новые аргументы ключевых слов будут иметь приоритет:
>>> s = add.s(2, 2) >>> s.delay(debug=True) # -> add(2, 2, debug=True) >>> s.apply_async(kwargs={'debug': True}) # same
Любые добавленные опции будут объединены с опциями в подписи, при этом новые опции будут иметь приоритет:
>>> s = add.signature((2, 2), countdown=10) >>> s.apply_async(countdown=1) # countdown is now 1
Вы также можете клонировать подписи для создания производных:
>>> s = add.s(2)
proj.tasks.add(2)
>>> s.clone(args=(4,), kwargs={'debug': True})
proj.tasks.add(4, 2, debug=True)
Неизменность¶
Добавлено в версии 3.0.
Частицы предназначены для использования с обратными вызовами, любые связанные задачи или хордовые обратные вызовы будут применяться с результатом родительской задачи. Иногда вы хотите указать обратный вызов, который не принимает дополнительных аргументов, и в этом случае вы можете установить сигнатуру неизменяемой:
>>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))
Ярлык .si()
также может быть использован для создания неизменяемых подписей:
>>> add.apply_async((2, 2), link=reset_buffers.si())
Когда сигнатура неизменяема, можно установить только параметры выполнения, поэтому невозможно вызвать сигнатуру с частичными args/kwargs.
Примечание
В этом учебнике я иногда использую префиксный оператор ~ для подписей. Возможно, вам не стоит использовать его в своем производственном коде, но это удобное сокращение при экспериментах в оболочке Python:
>>> ~sig
>>> # is the same as
>>> sig.delay().get()
Обратные вызовы¶
Добавлено в версии 3.0.
Обратные вызовы могут быть добавлены к любой задаче с помощью аргумента link
к apply_async
:
add.apply_async((2, 2), link=other_task.s())
Обратный вызов будет применен только в случае успешного завершения задачи, и он будет применен с возвращаемым значением родительской задачи в качестве аргумента.
Как я уже упоминал ранее, любые аргументы, которые вы добавите к сигнатуре, будут добавлены к аргументам, указанным в самой сигнатуре!
Если у вас есть подпись:
>>> sig = add.s(10)
тогда sig.delay(result) становится:
>>> add.apply_async(args=(result, 10))
…
Теперь давайте вызовем нашу задачу add
с обратным вызовом, используя частичные аргументы:
>>> add.apply_async((2, 2), link=add.s(8))
Как и ожидалось, сначала будет запущена одна задача, вычисляющая 2 + 2, затем другая, вычисляющая 4 + 8.
Примитивы¶
Добавлено в версии 3.0.
Примитивы также сами являются сигнатурными объектами, поэтому их можно комбинировать любым способом для составления сложных рабочих потоков.
Вот несколько примеров:
Простая цепь
Вот простая цепочка, первая задача выполняется, передавая свое возвращаемое значение следующей задаче в цепочке, и так далее.
>>> from celery import chain >>> # 2 + 2 + 4 + 8 >>> res = chain(add.s(2, 2), add.s(4), add.s(8))() >>> res.get() 16
Это также можно записать с помощью труб:
>>> (add.s(2, 2) | add.s(4) | add.s(8))().get() 16
Неизменяемые подписи
Подписи могут быть частичными, поэтому аргументы могут быть добавлены к существующим аргументам, но это не всегда нужно, например, если вам не нужен результат предыдущей задачи в цепочке.
В этом случае вы можете пометить сигнатуру как неизменяемую, чтобы аргументы не могли быть изменены:
>>> add.signature((2, 2), immutable=True)
Для этого также существует сочетание клавиш
.si()
, и это предпочтительный способ создания подписей:>>> add.si(2, 2)
Теперь вместо этого вы можете создать цепочку независимых задач:
>>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))() >>> res.get() 16 >>> res.parent.get() 8 >>> res.parent.parent.get() 4
Простая группа
Вы можете легко создать группу задач для параллельного выполнения:
>>> from celery import group >>> res = group(add.s(i, i) for i in range(10))() >>> res.get(timeout=1) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Простой аккорд
Примитив chord позволяет нам добавить обратный вызов, который будет вызван, когда все задачи в группе завершат выполнение. Это часто требуется для алгоритмов, которые не являются ужасно параллельными:
>>> from celery import chord >>> res = chord((add.s(i, i) for i in range(10)), xsum.s())() >>> res.get() 90
Приведенный выше пример создает 10 задач, которые запускаются параллельно, и когда все они завершаются, возвращаемые значения объединяются в список и отправляются в задачу
xsum
.Тело аккорда также может быть неизменяемым, чтобы возвращаемое значение группы не передавалось обратному вызову:
>>> chord((import_contact.s(c) for c in contacts), ... notify_complete.si(import_id)).apply_async()
Обратите внимание на использование
.si
выше; это создает неизменяемую сигнатуру, что означает, что любые новые переданные аргументы (включая возвращаемое значение предыдущей задачи) будут игнорироваться.Взорвите свой разум, сочетая
Цепочки тоже могут быть частичными:
>>> c1 = (add.s(4) | mul.s(8)) # (16 + 4) * 8 >>> res = c1(16) >>> res.get() 160
это означает, что вы можете комбинировать цепочки:
# ((4 + 16) * 2 + 4) * 8 >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8))) >>> res = c2() >>> res.get() 352
Соединяя группу с другой задачей, вы автоматически повышаете ее статус до аккорда:
>>> c3 = (group(add.s(i, i) for i in range(10)) | xsum.s()) >>> res = c3() >>> res.get() 90
Группы и аккорды также принимают частичные аргументы, поэтому в цепочке возвращаемое значение предыдущей задачи передается всем задачам в группе:
>>> new_user_workflow = (create_user.s() | group( ... import_contacts.s(), ... send_welcome_email.s())) ... new_user_workflow.delay(username='artv', ... first='Art', ... last='Vandelay', ... email='art@vandelay.com')
Если вы не хотите передавать аргументы в группу, вы можете сделать подписи в группе неизменяемыми:
>>> res = (add.s(4, 4) | group(add.si(i, i) for i in range(10)))() >>> res.get() <GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [ bc01831b-9486-4e51-b046-480d7c9b78de, 2650a1b8-32bf-4771-a645-b0a35dcc791b, dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf, 59f92e0a-23ea-41ce-9fad-8645a0e7759c, 26e1e707-eccf-4bf4-bbd8-1e1729c3cce3, 2d10a5f4-37f0-41b2-96ac-a973b1df024d, e13d3bdb-7ae3-4101-81a4-6f17ee21df2d, 104b2be0-7b75-44eb-ac8e-f9220bdfa140, c5c551a5-0386-4973-aa37-b65cbeb2624b, 83f72d71-4b71-428e-b604-6f16599a9f37]> >>> res.parent.get() 8
Цепи¶
Добавлено в версии 3.0.
Задачи могут быть связаны между собой: связанная задача вызывается при успешном завершении задачи:
>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
4
Связанная задача будет применена с результатом ее родительской задачи в качестве первого аргумента. В приведенном выше случае, когда результатом было 4, это приведет к mul(4, 16)
.
Результаты будут отслеживать все подзадачи, вызванные исходной задачей, и к ним можно получить доступ из экземпляра результата:
>>> res.children
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]
>>> res.children[0].get()
64
Экземпляр результата также имеет метод collect()
, который рассматривает результат как граф, позволяя вам выполнять итерации по результатам:
>>> list(res.collect())
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
(<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]
По умолчанию collect()
вызовет исключение IncompleteStream
, если граф не полностью сформирован (одна из задач еще не завершена), но вы можете получить и промежуточное представление графа:
>>> for result, value in res.collect(intermediate=True):
....
Вы можете связать вместе столько задач, сколько пожелаете, и подписи тоже могут быть связаны:
>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())
Вы также можете добавить обратные вызовы ошибок, используя метод on_error:
>>> add.s(2, 2).on_error(log_error.s()).delay()
Это приведет к следующему вызову .apply_async
при применении сигнатуры:
>>> add.apply_async((2, 2), link_error=log_error.s())
Рабочий не будет вызывать errback как задачу, а вместо этого вызовет функцию errback напрямую, чтобы передать ей необработанные объекты запроса, исключения и трассировки.
Вот пример эррбэка:
from __future__ import print_function
import os
from proj.celery import app
@app.task
def log_error(request, exc, traceback):
with open(os.path.join('/var/errors', request.id), 'a') as fh:
print('--\n\n{0} {1} {2}'.format(
request.id, exc, traceback), file=fh)
Чтобы еще больше упростить процесс соединения задач между собой, существует специальная сигнатура chain
, которая позволяет соединять задачи в цепочку:
>>> from celery import chain
>>> from proj.tasks import add, mul
>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)
Вызов цепочки вызовет задачи в текущем процессе и вернет результат последней задачи в цепочке:
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.get()
640
Он также устанавливает атрибуты parent
, чтобы вы могли работать по цепочке вверх для получения промежуточных результатов:
>>> res.parent.get()
64
>>> res.parent.parent.get()
8
>>> res.parent.parent
<AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>
Цепочки также можно создавать с помощью оператора |
(pipe):
>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()
Графики¶
Кроме того, вы можете работать с графиком результатов в виде DependencyGraph
:
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.parent.parent.graph
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
463afec2-5ed4-4036-b22d-ba067ec64f52(0)
872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
463afec2-5ed4-4036-b22d-ba067ec64f52(0)
Вы даже можете конвертировать эти графики в формат dot:
>>> with open('graph.dot', 'w') as fh:
... res.parent.parent.graph.to_dot(fh)
и создавать образы:
$ dot -Tpng graph.dot -o graph.png
Группы¶
Добавлено в версии 3.0.
Группа может использоваться для параллельного выполнения нескольких задач.
Функция group
принимает список сигнатур:
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))
Если вы вызовете группу, задачи будут применяться одна за другой в текущем процессе, и будет возвращен экземпляр GroupResult
, который можно использовать для отслеживания результатов, или сообщить, сколько задач готово и так далее:
>>> g = group(add.s(2, 2), add.s(4, 4))
>>> res = g()
>>> res.get()
[4, 8]
Группа также поддерживает итераторы:
>>> group(add.s(i, i) for i in range(100))()
Группа - это объект подписи, поэтому ее можно использовать в сочетании с другими подписями.
Групповые обратные вызовы и обработка ошибок¶
Группы также могут иметь связанные с ними сигнатуры обратного вызова и возврата, однако их поведение может быть несколько неожиданным из-за того, что группы не являются реальными задачами и просто передают связанные задачи вниз к их инкапсулированным сигнатурам. Это означает, что возвращаемые значения группы не собираются для передачи в связанную сигнатуру обратного вызова. В качестве примера, следующий фрагмент, использующий простую задачу add(a, b), является ошибочным, поскольку связанная сигнатура add.s() не получит конечный результат группы, как можно было бы ожидать.
>>> g = group(add.s(2, 2), add.s(4, 4))
>>> g.link(add.s())
>>> res = g()
[4, 8]
Обратите внимание, что возвращаются завершенные результаты первых двух задач, но сигнатура обратного вызова будет выполняться в фоновом режиме и вызовет исключение, поскольку она не получила два ожидаемых аргумента.
Групповые возвраты передаются в инкапсулированные сигнатуры, что открывает возможность того, что возврат, связанный только один раз, будет вызываться более одного раза, если несколько задач в группе окажутся неудачными. В качестве примера можно привести следующий фрагмент, использующий задачу fail(), которая вызывает исключение, и ожидается, что сигнатура log_error() будет вызываться один раз для каждой сбойной задачи, выполняемой в группе.
>>> g = group(fail.s(), fail.s())
>>> g.link_error(log_error.s())
>>> res = g()
Учитывая это, обычно рекомендуется создавать идемпотентные или счетные задачи, которые терпимы к многократным вызовам для использования в качестве возвратов.
Для этих случаев лучше использовать класс chord
, который поддерживается в некоторых реализациях бэкенда.
Групповые результаты¶
Групповая задача также возвращает специальный результат, этот результат работает так же, как и результаты обычных задач, за исключением того, что он работает над группой в целом:
>>> from celery import group
>>> from tasks import add
>>> job = group([
... add.s(2, 2),
... add.s(4, 4),
... add.s(8, 8),
... add.s(16, 16),
... add.s(32, 32),
... ])
>>> result = job.apply_async()
>>> result.ready() # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]
GroupResult
принимает список экземпляров AsyncResult
и работает с ними так, как если бы это была одна задача.
Он поддерживает следующие операции:
successful()
Возвращает
True
, если все подзадачи завершились успешно (например, не вызвали исключения).failed()
Верните
True
, если какая-либо из подзадач не выполнилась.waiting()
Возвращает
True
, если какая-либо из подзадач еще не готова.ready()
Верните
True
, если все подзадачи готовы.completed_count()
Возвращает количество выполненных подзадач.
revoke()
Отмените все подзадачи.
join()
Соберите результаты всех подзадач и верните их в том же порядке, в котором они были вызваны (в виде списка).
Аккорды¶
Добавлено в версии 2.3.
Примечание
Задачи, используемые внутри аккорда, не должны не игнорировать свои результаты. Если бэкенд результатов отключен для любой задачи (заголовка или тела) в вашем аккорде, вы должны прочитать «Важные замечания». В настоящее время аккорды не поддерживают бэкенд результатов RPC.
Аккорд - это задание, которое выполняется только после завершения выполнения всех заданий в группе.
Вычислим сумму выражения 1 + 1 + 2 + 2 + 3 + 3 ... n + n до ста цифр.
Сначала вам нужны две задачи, add()
и tsum()
(sum()
уже является стандартной функцией):
@app.task
def add(x, y):
return x + y
@app.task
def tsum(numbers):
return sum(numbers)
Теперь вы можете использовать хорду для параллельного вычисления каждого шага сложения, а затем получить сумму получившихся чисел:
>>> from celery import chord
>>> from tasks import add, tsum
>>> chord(add.s(i, i)
... for i in range(100))(tsum.s()).get()
9900
Очевидно, что это очень надуманный пример, накладные расходы на обмен сообщениями и синхронизацию делают его намного медленнее, чем его аналог в Python:
>>> sum(i + i for i in range(100))
Шаг синхронизации является дорогостоящим, поэтому следует по возможности избегать использования хордов. Тем не менее, аккорд - это мощный примитив, который необходимо иметь в своем инструментарии, поскольку синхронизация является необходимым шагом для многих параллельных алгоритмов.
Давайте разберем выражение аккорда на части:
>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900
Помните, что обратный вызов может быть выполнен только после возврата всех заданий в заголовке. Каждый шаг в заголовке выполняется как задача, параллельно, возможно, на разных узлах. Затем применяется обратный вызов с возвращаемым значением каждой задачи в заголовке. Идентификатор задачи, возвращаемый командой chord()
, является идентификатором обратного вызова, поэтому вы можете дождаться его завершения и получить окончательное возвращаемое значение (но не забудьте never have a task wait for other tasks).
Обработка ошибок¶
Что же произойдет, если одна из задач вызовет исключение?
Результат обратного вызова аккорда переходит в состояние отказа, а ошибка устанавливается как исключение ChordError
:
>>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
>>> result = c()
>>> result.get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "*/celery/result.py", line 120, in get
interval=interval)
File "*/celery/backends/amqp.py", line 150, in wait_for
raise meta['result']
celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
raised ValueError('something something',)
Хотя отслеживание может отличаться в зависимости от используемого бэкенда результатов, вы можете видеть, что описание ошибки включает идентификатор задачи, которая потерпела неудачу, и строковое представление исходного исключения. Вы также можете найти исходный traceback в result.traceback
.
Обратите внимание, что остальные задачи все равно будут выполнены, поэтому третья задача (add.s(8, 8)
) все равно будет выполнена, даже если средняя задача не выполнилась. Также ChordError
показывает только ту задачу, которая не выполнилась первой (по времени): это не соблюдает порядок группы заголовков.
Чтобы выполнить действие при сбое аккорда, можно прикрепить к обратному вызову аккорда обратный вызов (errback):
@app.task
def on_chord_error(request, exc, traceback):
print('Task {0!r} raised error: {1!r}'.format(request.id, exc))
>>> c = (group(add.s(i, i) for i in range(10)) |
... xsum.s().on_error(on_chord_error.s())).delay()
Аккорды могут иметь связанные с ними сигнатуры обратных вызовов и возвратов, что решает некоторые проблемы, связанные с привязкой сигнатур к группам. В этом случае предоставленная подпись будет связана с телом аккорда, которое, как можно ожидать, будет изящно вызывать обратные вызовы только один раз при завершении тела, или обратные вызовы только один раз, если какая-либо задача в заголовке или теле аккорда не выполнится.
Важные замечания¶
Задачи, используемые внутри аккорда, не должны не игнорировать свои результаты. На практике это означает, что вы должны включить result_backend
, чтобы использовать аккорды. Кроме того, если в вашей конфигурации task_ignore_result
установлено значение True
, убедитесь, что отдельные задачи, используемые в аккорде, определены с помощью ignore_result=False
. Это относится как к подклассам Task, так и к декорированным задачам.
Пример подкласса задачи:
class MyTask(Task):
ignore_result = False
Пример оформленного задания:
@app.task(ignore_result=False)
def another_task(project):
do_something()
По умолчанию шаг синхронизации реализуется с помощью повторяющейся задачи, которая каждую секунду опрашивает завершение группы и вызывает сигнатуру, когда она готова.
Пример реализации:
from celery import maybe_signature
@app.task(bind=True)
def unlock_chord(self, group, callback, interval=1, max_retries=None):
if group.ready():
return maybe_signature(callback).delay(group.join())
raise self.retry(countdown=interval, max_retries=max_retries)
Это используется всеми бэкендами результатов, кроме Redis и Memcached: они увеличивают счетчик после каждой задачи в заголовке, а затем применяют обратный вызов, когда счетчик превышает количество задач в наборе.
Подход с использованием Redis и Memcached является гораздо лучшим решением, но его нелегко реализовать в других бэкендах (предложения приветствуются!).
Примечание
Аккорды не работают должным образом с Redis до версии 2.2; для их использования вам потребуется обновление, по крайней мере, до redis-server 2.2.
Примечание
Если вы используете аккорды с бэкендом результатов Redis, а также переопределяете метод Task.after_return()
, вам нужно убедиться, что вы вызываете метод super, иначе обратный вызов аккорда не будет применен.
def after_return(self, *args, **kwargs):
do_something()
super().after_return(*args, **kwargs)
Карта и Стармап¶
map
и starmap
- это встроенные задачи, которые вызывают предусмотренную вызывающую задачу для каждого элемента в последовательности.
Они отличаются от group
тем, что:
отправляется только одно сообщение о задаче.
операция является последовательной.
Например, используя map
:
>>> from proj.tasks import add
>>> ~xsum.map([range(10), range(100)])
[45, 4950]
это то же самое, что и выполнение задачи:
@app.task
def temp():
return [xsum(range(10)), xsum(range(100))]
и используя starmap
:
>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
это то же самое, что и выполнение задачи:
@app.task
def temp():
return [add(i, i) for i in range(10)]
И map
, и starmap
являются сигнатурными объектами, поэтому их можно использовать как другие сигнатуры и объединять в группы и т.д., например, для вызова starmap через 10 секунд:
>>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)
Куски¶
Chunking позволяет разделить итерабельную работу на части, так что если у вас есть один миллион объектов, вы можете создать 10 задач со ста тысячами объектов каждая.
Некоторые могут беспокоиться, что разбиение задач на части приводит к снижению параллелизма, но это редко бывает верно для загруженного кластера, а на практике, поскольку вы избегаете накладных расходов на обмен сообщениями, это может значительно повысить производительность.
Для создания сигнатуры чанков можно использовать app.Task.chunks()
:
>>> add.chunks(zip(range(100), range(100)), 10)
Как и в случае с group
, акт отправки сообщений для чанков будет происходить в текущем процессе при вызове:
>>> from proj.tasks import add
>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
[20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
[40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
[60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
[80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
[100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
[120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
[140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
[160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
[180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
в то время как вызов .apply_async
создаст выделенную задачу, так что отдельные задачи будут применяться в рабочем вместо этого:
>>> add.chunks(zip(range(100), range(100)), 10).apply_async()
Вы также можете преобразовать фрагменты в группу:
>>> group = add.chunks(zip(range(100), range(100)), 10).group()
и вместе с группой изменить отсчет времени выполнения каждого задания на единицу:
>>> group.skew(start=1, stop=10)()
Это означает, что первая задача будет иметь обратный отсчет одной секунды, вторая задача - двух секунд и так далее.