map() против submit() с помощью ProcessPoolExecutor в Python

Используйте map() при преобразовании цикла for для использования процессов и используйте submit(), когда вам нужно больше контролировать асинхронность задачи при использовании ProcessPoolExecutor в Python.

В этом руководстве вы узнаете о разнице между map() и submit() при выполнении задач с помощью ProcessPoolExecutor в Python.

Давайте начнем.

Оглавление

Используйте map() для выполнения задач с помощью ProcessPoolExecutor

Используйте map(), чтобы преобразовать цикл for в use processes.

Возможно, наиболее распространенным шаблоном при использовании ProcessPoolExecutor является преобразование цикла for, который выполняет функцию для каждого элемента в коллекции, для использования процессов.

Предполагается, что функция не имеет побочных эффектов, то есть она не обращается к каким-либо данным за пределами функции и не изменяет предоставленные ей данные. Она принимает данные и выдает результат.

Эти типы циклов for могут быть явно записаны в Python, например:

...
# apply a function to each element in a collection
for item in mylist:
    result = task(item)

Лучше всего использовать встроенную функцию map(), которая применяет эту функцию к каждому элементу в iterable для вас.

...
# apply the function to each element in the collection
results = map(task, mylist)

Встроенная функция map() не выполняет функцию task() для каждого элемента, пока мы не повторим результаты, так называемая ленивая оценка:

...
# iterate the results from map
for result in results:
    print(result)

Таким образом, обычно эту операцию в идиоме for-loop можно увидеть следующим образом:

...
# iterate the results from map
for result in map(task, mylist):
    print(result)

Мы можем выполнить ту же операцию, используя пул процессов, за исключением того, что каждый вызов функции с элементом в iterable является задачей, которая выполняется асинхронно с использованием процессов.

Например:

...
# iterate the results from map
for result in executor.map(task, mylist):
    print(result)

Подобно встроенной функции map(), ProcessPoolExecutor map() функция возвращает итерацию по результатам, возвращенным целевой функцией, примененной к предоставленному итерационному набору элементов.

Хотя задачи выполняются асинхронно, результаты повторяются в порядке повторяемости, предоставляемой функции map() , аналогично встроенной функции функция map().

Таким образом, мы можем рассматривать ProcessPoolExecutor версию функции map() как асинхронную версию встроенногов функции map() и идеально подходит, если вы хотите обновить свой цикл for для использования процессов.

В приведенном ниже примере показано использование функции map() с задачей, которая будет находиться в режиме ожидания случайный промежуток времени менее одной секунды и вернет указанное значение.

# SuperFastPython.com
# example of the map and wait pattern for the ProcessPoolExecutor
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    return name
 
# entry point
if __name__ == '__main__':
    # start the process pool
    with ProcessPoolExecutor(10) as executor:
        # execute tasks concurrently and process results in order
        for result in executor.map(task, range(10)):
            # retrieve the result
            print(result)

Запустив пример, мы видим, что результаты отображаются в том порядке, в котором задачи были созданы и отправлены в пул процессов.

0
1
2
3
4
5
6
7
8
9

Подобно встроенной функции map(), ProcessPoolExecutor map() функция может выполнять более одной итерации. Это означает, что ваша функция может принимать более одного аргумента.

...
# example of calling map with more than one iterable
for result in executor.map(task, mylist1, mylist2):
    print(result)

В отличие от встроенной функции map(), задачи отправляются в пул процессов сразу после вызова map() вместо этого быть выполненным в ленивой манере по мере запроса результатов.

Другими словами, задачи будут выполняться и завершаться в свое время независимо от того, выполняем ли мы итерацию результатов, возвращаемых вызовом map().

...
# example of calling map and not iterating the results
_ = executor.map(task, mylist)

Теперь, когда мы познакомились с функцией map() , давайте взглянем на функцию submit() .

Запускайте циклы, используя все процессоры, чтобы узнать, как это делается.

Используйте submit() для выполнения задач с помощью ProcessPoolExecutor

Используйте submit(), если вам нужен больший контроль над асинхронными задачами.

Функция submit() примет имя целевой целевой функции, которую вы хотите выполнить асинхронно, а также любые аргументы функции. Затем он вернет Будущий объект.

...
# submit a task to the process pool and get a future object
future = executor.submit(task, arg1, arg2)

Объект Future можно сохранить и использовать для запроса статуса асинхронной задачи, например, выполняется ли она (), сделано(), или было отменено().

...
# check if a task is running
if future.running():
    # do something...

Его также можно использовать для получения результата() из задачи после ее завершения или исключения() , если таковое имеется был поднят во время выполнения задания.

...
# get the result from a task via it's future object
result = future.result()

Объект Будущее также можно использовать для отмены() задачи до ее запуска и добавления функции обратного вызова с помощью функция add_done_callback(), которая будет выполнена после завершения задачи.

...
# cancel the task if has not yet started running
if future.cancel():
    print('Task was cancelled')

Обычной практикой является передача множества задач в пул процессов и сохранение Будущих объектов в коллекции.

Например, обычно используется понимание списка.

...
# create many tasks and store the future objects in a list
futures = [executor.submit(work) for _ in range(100)]

Мы можем перебирать список Будущих объектов, чтобы получить результаты в том порядке, в котором были отправлены задачи, например:

...
# get results from tasks in the order they were submitted
for future in futures:
    # get the result
    result = future.result()

Напомним, что вызов функции result() в Future не будет возвращен до тех пор, пока задача не будет выполнена.

Коллекция объектов Future затем может быть передана служебным функциям, предоставляемым модулем concurrent.futures, таким как ожидание() и как_ завершенные().

Функция модуля wait() принимает коллекцию Будущих объектов и по умолчанию возвращает все выполненные задачи, хотя может должен быть настроен на возврат, когда какая-либо задача вызывает исключение или завершена.

...
# wait for all tasks to be done
wait(futures)

Модульная функция as_completed() принимает коллекцию Будущих объектов и возвращает Будущих объекты в том порядке, в котором задачи выполняются по мере их выполнения. Это вместо того порядка, в котором они были отправлены в пул процессов, что позволяет вашей программе быть более отзывчивой.

...
# respond to tasks as they are completed
for future in as_completed(futures):
    # get the result
    result = future.result()

Обработка Будущих объектов в порядке их завершения может быть наиболее распространенным способом использования функции submit() с ProcessPoolExecutor - исполнитель.

Приведенный ниже пример демонстрирует эту схему, представляя задания в порядке от 0 до 9 и показывая результаты в том порядке, в котором они были выполнены.

# SuperFastPython.com
# example of the submit and use as completed pattern for the ProcessPoolExecutor
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    return name
 
# entry point
if __name__ == '__main__':
    # start the process pool
    with ProcessPoolExecutor(10) as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # process task results as they are available
        for future in as_completed(futures):
            # retrieve the result
            print(future.result())

Запустив пример, мы видим, что результаты извлекаются и печатаются в порядке выполнения задач, а не в порядке отправки задач в пул процессов.

5
9
6
1
0
7
3
8
4
2

Теперь, когда мы знакомы с использованием submit() для выполнения задач в ProcessPoolExecutor, давайте рассмотрим сравнение между map() и отправить().

map() и submit() с помощью ProcessPoolExecutor

Давайте сравним функции map() и submit() для ProcessPoolExecutor.

Обе функции map() и submit() схожи в том, что они позволяют выполнять задачи асинхронно с использованием процессов.

Функция map() проще:

  • Это многопроцессорная версия встроенной функции map().
  • Предполагается, что вы хотите вызывать одну и ту же функцию много раз с разными значениями.
  • Он принимает только повторяющиеся значения в качестве аргументов целевой функции.
  • Это позволяет вам только повторять результаты из целевой функции.

Чтобы ваш код был проще и понятнее для чтения, вам следует сначала попробовать использовать map(), прежде чем пытаться использовать submit() функция.

Простота функции map() означает, что она также ограничена:

  • Это не позволяет контролировать порядок использования результатов задач.
  • Это не позволяет проверять статус задач.
  • Это не позволяет вам отменять задачи до того, как они начнут выполняться.
  • Это не позволяет вам контролировать, как обрабатывать исключение, вызванное функцией задачи.

Если функция map() является слишком строгой, вы можете рассмотреть возможность использования функции submit().

Функция submit() предоставляет больше возможностей для контроля:

  • Предполагается, что вы хотите отправлять по одной задаче за раз.
  • Это позволяет использовать разные целевые функции с переменным числом аргументов для каждой задачи.
  • Это позволяет вам проверять статус каждой задачи.
  • Это позволяет отменить задачу до того, как она начнет выполняться.
  • Это позволяет автоматически вызывать функции обратного вызова по завершении задач.
  • Это позволяет вам обрабатывать исключение, вызванное целевой функцией задачи.
  • Это позволяет вам контролировать, когда вы хотели бы получить результат выполнения задачи, если вообще хотите.
  • Его можно использовать с функциями модуля, такими как wait() и as_completed() для работы с задачами в группах.

Дополнительный элемент управления, предоставляемый при использовании функции submit(), сопряжен с дополнительной сложностью:

  • Для этого требуется, чтобы вы управляли Будущим объектом для каждой задачи.
  • Для этого требуется, чтобы вы явно извлекали результат для каждой задачи.
  • Требуется дополнительный код, если вам нужно применить одну и ту же функцию с разными аргументами.

Теперь, когда мы сравнили и противопоставили функции map() и submit() на ProcessPoolExecutor, какой из них вы должны использовать?

Используйте map(), если:

  • Вы уже используете встроенную функцию map().
  • Вы вызываете (почти) чистую функцию в цикле for для каждого элемента в iterable.

Используйте функцию submit(), если:

  • Вам необходимо проверять статус задач во время их выполнения.
  • Вам нужен контроль над порядком обработки результатов выполнения задач.
  • Вам необходимо условно отменить выполнение заданий.
  • Вы можете упростить свой код, используя функции обратного вызова, вызываемые по завершении задач.

Читать далее

В этом разделе представлены дополнительные ресурсы, которые могут оказаться вам полезными.

Книги

Я также рекомендую отдельные главы из следующих книг:

  • Эффективный Python, Бретт Слаткин, 2019.
    • Смотрите Главу 7: Параллелизм и распараллеливаемость
  • Python в двух словах, Алекс Мартелли и др., 2017.
    • Смотрите: Глава 14: Потоки и процессы

Направляющие

API-интерфейсы

Ссылки

Выводы

Теперь вы знаете, когда следует использовать map() и submit() для выполнения задач с помощью ProcessPoolExecutor - исполнитель.

Back to Top