Ускорение Python с помощью параллелизма, многопоточности и asyncio
Что такое параллелизм и распараллеливаемость и как они применимы к Python?
Существует множество причин, по которым ваши приложения могут работать медленно. Иногда это связано с плохой разработкой алгоритмов или неправильным выбором структуры данных. Однако иногда это происходит из-за факторов, не зависящих от нас, таких как аппаратные ограничения или особенности работы сетей. Вот где уместны параллелизм и распараллеливаемость. Они позволяют вашим программам выполнять несколько задач одновременно или тратить как можно меньше времени на ожидание загруженных задач.
Независимо от того, работаете ли вы с внешними веб-ресурсами, выполняете чтение из нескольких файлов и запись в них или вам необходимо многократно использовать функцию, требующую больших вычислений, с разными параметрами, эта статья поможет вам максимально повысить эффективность и скорость вашего кода.
Сначала мы рассмотрим, что такое параллелизм и распараллеливаемость и как они вписываются в область Python, используя стандартные библиотеки, такие как threading, multiprocessing и asyncio. В последней части этой статьи мы сравним реализацию async/await в Python с тем, как они реализованы в других языках.
Вы можете найти все примеры кода из этой статьи в репозитории concurrency-parallelism-and-asyncio на GitHub.
Чтобы разобраться с примерами в этой статье, вы уже должны знать, как работать с HTTP-запросами.
Содержимое
- Цели
- Параллелизм
- Потоки
- Асинхронный
- Параллелизм
- Сочетание Asyncio с многопроцессорной обработкой
- Краткое описание: Когда следует использовать многопроцессорную обработку против асинхронной или потоковой
- Асинхронность/ожидание на других языках
- Краткое описание
Цели
К концу этой статьи вы должны быть в состоянии ответить на следующие вопросы:
- Что такое параллелизм?
- Что такое поток?
- Что это значит, когда что-то не блокируется?
- Что такое цикл обработки событий?
- Что такое обратный вызов?
- Почему метод asyncio всегда немного быстрее, чем метод threading?
- Когда следует использовать многопоточность, а когда - асинхронность?
- Что такое параллелизм?
- В чем разница между параллелизмом и распараллеливаемостью?
- Возможно ли совместить асинхронность с многопроцессорностью?
- Когда следует использовать многопроцессорную обработку по сравнению с асинхронной или потоковой обработкой?
- В чем разница между многопроцессорной обработкой, asyncio и concurrency.futures?
- Как я могу протестировать asyncio с помощью pytest?
Параллелизм
Что такое параллелизм?
Эффективным определением параллелизма является "способность выполнять несколько задач одновременно". Однако это немного вводит в заблуждение, поскольку задачи могут выполняться, а могут и не выполняться в одно и то же время. Вместо этого процесс может запуститься, а затем, дождавшись выполнения определенной инструкции, переключиться на новую задачу и вернуться к ней, как только ожидание закончится. Как только одна задача завершена, она снова переключается на незавершенную задачу, пока все они не будут выполнены. Задачи начинаются асинхронно, выполняются асинхронно и затем завершаются асинхронно.

Если это вас смутило, давайте вместо этого рассмотрим аналогию: допустим, вы хотите создать BLT. Сначала обжарьте бекон на сковороде на средне-слабом огне. Пока бекон готовится, вы можете достать помидоры и листья салата и приступить к их подготовке (помыть и нарезать). Все это время вы продолжаете следить за своим беконом и время от времени переворачивать его.
На этом этапе вы приступили к выполнению задачи, а затем приступили и завершили еще две, и все это в то время, когда вы все еще ожидаете выполнения первой.
В конце концов, вы кладете хлеб в тостер. Пока он поджаривается, вы продолжаете проверять, готов ли бекон. Когда кусочки будут готовы, вы вынимаете их и выкладываете на тарелку. Как только хлеб подрумянится, намажьте его на свой вкус для бутербродов, а затем начинайте выкладывать слоями помидоры, листья салата, а затем, когда он будет готов, бекон. Только после того, как все будет приготовлено и выложено слоями, вы можете положить последний кусочек тоста на свой сэндвич, нарезать его ломтиками (по желанию) и съесть.
Поскольку это требует от вас одновременного выполнения нескольких задач, создание BLT по своей сути является параллельным процессом, даже если вы не уделяете все свое внимание каждой из этих задач одновременно. По сути, в следующем разделе мы будем называть эту форму параллелизма просто "параллелизмом". Далее в этой статье мы рассмотрим ее различия.
По этой причине параллелизм отлично подходит для процессов с интенсивным вводом-выводом - задач, которые включают ожидание веб-запросов или операций чтения/записи файлов.
В Python существует несколько различных способов достижения параллелизма. Первое, что мы рассмотрим, - это библиотека потоковой обработки.
Для наших примеров в этом разделе мы собираемся создать небольшую программу на Python, которая пять раз выбирает случайный музыкальный жанр из API Genrenator для Binary Jazz, выводит жанр на экран и помещает каждый из них в свой собственный файл.
Для работы с потоковой передачей в Python вам понадобится только threading, но для этого примера я также импортировал urllib для работы с HTTP-запросами, time чтобы определить сколько времени занимает выполнение функций, и json как легко преобразовать данные json, возвращаемые из API Genrenator.
Вы можете найти код для этого примера здесь.
Давайте начнем с простой функции:
def write_genre(file_name):
"""
Uses genrenator from binaryjazz.us to write a random genre to the
name of the given file
"""
req = Request("https://binaryjazz.us/wp-json/genrenator/v1/genre/", headers={"User-Agent": "Mozilla/5.0"})
genre = json.load(urlopen(req))
with open(file_name, "w") as new_file:
print(f"Writing '{genre}' to '{file_name}'...")
new_file.write(genre)
Изучая приведенный выше код, мы отправляем запрос к API Genrenator, загружаем его ответ в формате JSON (случайный музыкальный жанр), печатаем его, а затем записываем в файл.
Без заголовка "User-Agent" вы получите значение 304.
Что нас действительно интересует, так это следующий раздел, где собственно и происходит потоковая обработка:
threads = []
for i in range(5):
thread = threading.Thread(
target=write_genre,
args=[f"./threading/new_file{i}.txt"]
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
Сначала мы начинаем со списка. Затем мы выполняем итерацию пять раз, каждый раз создавая новый поток. Затем мы запускаем каждый поток, добавляем его в наш список "потоки", а затем в последний раз перебираем наш список, чтобы присоединиться к каждому потоку.
Пояснение: Создавать потоки в Python очень просто.
Чтобы создать новый поток, используйте threading.Thread(). Вы можете передать в него kwarg (аргумент ключевого слова) target со значением любой функции, которую вы хотели бы запустить в этом потоке. Но передайте только имя функции, а не ее значение (для наших целей это означает write_genre, а не write_genre()). Чтобы передать аргументы, введите "kwargs" (который принимает значение ваших kwargs) или "args" (который принимает итерацию, содержащую ваши аргументы - в данном случае список).
Однако
Создание потока - это не то же самое, что запуск потока. Чтобы запустить поток, используйте {the name of your thread}.start(). Запуск потока означает "начало его выполнения".
Наконец, когда мы объединяем потоки с помощью thread.join(), все, что мы делаем, - это проверяем, что поток завершен, прежде чем продолжить работу с нашим кодом.
Потоки
Но что именно представляет собой нить?
Поток - это способ, позволяющий вашему компьютеру разбить единый процесс/программу на множество простых частей, которые выполняются параллельно. Несколько сбивает с толку то, что стандартная реализация потоковой передачи в Python ограничивает возможность выполнения потоков только по одному за раз из-за чего-то, называемого Глобальной блокировкой интерпретатора (GIL). GIL необходим, потому что управление памятью в CPython (реализация Python по умолчанию) не является потокобезопасным. Из-за этого ограничения потоковая обработка в Python является параллельной, но не параллельной. Чтобы обойти это, в Python есть отдельный модуль multiprocessing, не ограниченный GIL, который запускает отдельные процессы, обеспечивая параллельное выполнение вашего кода. Использование модуля multiprocessing практически идентично использованию модуля threading.
Более подробную информацию о GIL и потокобезопасности Python можно найти в Официальных документах Real Python и Python .
Вскоре мы более подробно рассмотрим многопроцессорную обработку в Python.
Прежде чем мы продемонстрируем потенциальное повышение скорости по сравнению с непоточным кодом, я взял на себя смелость также создать непоточную версию той же программы (опять же, доступную на GitHub). Вместо того чтобы создавать новый поток и присоединяться к каждому из них, он вызывает write_genre в цикле for, который повторяется пять раз.
Чтобы сравнить показатели скорости, я также импортировал библиотеку time для определения времени выполнения наших скриптов:
Starting...
Writing "binary indoremix" to "./sync/new_file0.txt"...
Writing "slavic aggro polka fusion" to "./sync/new_file1.txt"...
Writing "israeli new wave" to "./sync/new_file2.txt"...
Writing "byzantine motown" to "./sync/new_file3.txt"...
Writing "dutch hate industrialtune" to "./sync/new_file4.txt"...
Time to complete synchronous read/writes: 1.42 seconds
После запуска скрипта мы видим, что это занимает у моего компьютера около 1,49 секунды (наряду с классическими музыкальными жанрами, такими как "dutch hate industrialtune"). Не так уж плохо.
Теперь давайте запустим версию, использующую многопоточность:
Starting...
Writing "college k-dubstep" to "./threading/new_file2.txt"...
Writing "swiss dirt" to "./threading/new_file0.txt"...
Writing "bop idol alternative" to "./threading/new_file4.txt"...
Writing "ethertrio" to "./threading/new_file1.txt"...
Writing "beach aust shanty français" to "./threading/new_file3.txt"...
Time to complete threading read/writes: 0.77 seconds
Первое, что может броситься вам в глаза, - это то, что функции выполняются не по порядку: 2 - 0 - 4 - 1 - 3
Это происходит из-за асинхронной природы потоковой передачи: пока одна функция ожидает, запускается другая и так далее. Поскольку мы можем продолжать выполнять задачи, ожидая завершения других (либо из-за работы в сети, либо из-за операций ввода-вывода файлов), вы, возможно, также заметили, что мы сокращаем наше время примерно вдвое: на 0,77 секунды. Хотя сейчас это может показаться незначительным, легко представить себе вполне реальный случай создания веб-приложения, которому требуется записывать гораздо больше данных в файл или взаимодействовать с гораздо более сложными веб-сервисами.
Итак, если многопоточность - это так здорово, почему бы нам не закончить статью на этом?
Потому что есть еще лучшие способы одновременного выполнения задач.
Асинхронный режим
Давайте рассмотрим пример с использованием asyncio. Для этого метода мы собираемся установить aiohttp с помощью pip. Это позволит нам отправлять неблокирующие запросы и получать ответы, используя синтаксис async/await, который будет представлен в ближайшее время. Это также имеет дополнительное преимущество в виде функции, которая преобразует ответ в формате JSON без необходимости импорта библиотеки json. Мы также установим и импортируем aiofiles, который позволяет выполнять неблокирующие операции с файлами. Кроме aiohttp и aiofiles, импортируйте asyncio, который поставляется со стандартной библиотекой Python.
" "Неблокирующий" означает, что программа позволяет другим потокам продолжать выполнение во время ожидания. Это противоположно "блокирующему" коду, который полностью останавливает выполнение вашей программы. Обычные синхронные операции ввода-вывода страдают от этого ограничения.
Вы можете найти код для этого примера здесь.
Как только мы настроим импорт, давайте взглянем на асинхронную версию функции write_genre из нашего примера asyncio:
async def write_genre(file_name):
"""
Uses genrenator from binaryjazz.us to write a random genre to the
name of the given file
"""
async with aiohttp.ClientSession() as session:
async with session.get("https://binaryjazz.us/wp-json/genrenator/v1/genre/") as response:
genre = await response.json()
async with aiofiles.open(file_name, "w") as new_file:
print(f'Writing "{genre}" to "{file_name}"...')
await new_file.write(genre)
Для тех, кто не знаком с синтаксисом async/await, который можно найти во многих других современных языках, async объявляет, что функция, for цикл, или with оператор должен использоваться асинхронно. Чтобы вызвать асинхронную функцию, вы должны либо использовать ключевое слово await из другой асинхронной функции, либо вызвать create_task() непосредственно из цикла обработки событий, который можно получить из asyncio.get_event_loop(), т.е. loop = asyncio.get_event_loop().
Дополнительно:
async withпозволяет ожидать асинхронных ответов и файловых операций.async for( здесь не используется) выполняется итерация по асинхронному потоку .
Цикл обработки событий
Циклы обработки событий - это конструкции, присущие асинхронному программированию, которые позволяют выполнять задачи асинхронно. Поскольку вы читаете эту статью, я могу с уверенностью предположить, что вы, вероятно, не слишком хорошо знакомы с этой концепцией. Однако, даже если вы никогда не писали асинхронное приложение, у вас есть опыт работы с циклами обработки событий каждый раз, когда вы пользуетесь компьютером. Независимо от того, прослушивает ли ваш компьютер ввод с клавиатуры, играете ли вы в многопользовательские онлайн-игры или просматриваете Reddit во время фонового копирования файлов, цикл обработки событий является движущей силой, обеспечивающей бесперебойную и эффективную работу. По своей сути, цикл обработки событий - это процесс, который ожидает появления триггеров, а затем выполняет определенные (запрограммированные) действия, как только эти триггеры выполняются. Они часто возвращают какое-либо "обещание" (синтаксис JavaScript) или "будущее" (синтаксис Python), чтобы указать, что задача была добавлена. Как только задача завершена, promise или future возвращает значение, переданное обратно из вызванной функции (при условии, что функция действительно возвращает значение).
Идея выполнения функции в ответ на другую функцию называется "обратным вызовом".
Что касается другого подхода к обратным вызовам и событиям, вот отличный ответ на Stack Overflow.
Вот пошаговое руководство по нашей функции:
Мы используем async with для асинхронного открытия нашей клиентской сессии. Класс aiohttp.ClientSession() позволяет нам отправлять HTTP-запросы и оставаться подключенными к источнику, не блокируя выполнение нашего кода. Затем мы отправляем асинхронный запрос к API Genrenator и ожидаем ответа в формате JSON (случайный музыкальный жанр). В следующей строке мы снова используем async with с библиотекой aiofiles, чтобы асинхронно открыть новый файл для записи нашего нового жанра. Мы печатаем жанр, затем записываем его в файл.
В отличие от обычных скриптов на Python, программирование с помощью asyncio в значительной степени предусматривает использование какой-либо "основной" функции.
*Если только вы не используете устаревший синтаксис "yield" с декоратором @asyncio.coroutine, который будет удален в Python 3.10.
Это связано с тем, что вам нужно использовать ключевое слово "async", чтобы использовать синтаксис "await", а синтаксис "await" - это единственный способ реального запуска других асинхронных функций.
Вот наша основная функция:
async def main():
tasks = []
for i in range(5):
tasks.append(write_genre(f"./async/new_file{i}.txt"))
await asyncio.gather(*tasks)
Как вы можете видеть, мы объявили это с помощью "async". Затем мы создаем пустой список под названием "задачи" для размещения наших асинхронных задач (вызовы Genrenator и наш файловый ввод-вывод). Мы добавляем наши задачи в наш список, но они еще не фактически выполнены. Вызовы на самом деле не выполняются, пока мы не запланируем их с помощью await asyncio.gather(*tasks). Это запускает все задачи из нашего списка и ожидает их завершения, прежде чем продолжить работу с остальной частью нашей программы. Наконец, мы используем asyncio.run(main()) для запуска нашей "основной" функции. Функция .run() является отправной точкой для нашей программы, и она должна обычно вызываться только один раз для каждого процесса.
Для тех, кто не знаком, команда
*перед задачами называется "распаковка аргументов". Как и кажется, она распаковывает наш список в последовательность аргументов для нашей функции. В данном случае наша функция равнаasyncio.gather().
И это все, что нам нужно сделать. Теперь запускаем нашу программу (исходный код которой включает в себя те же функции синхронизации, что и в примерах synchronous и threading)...
Writing "albuquerque fiddlehaus" to "./async/new_file1.txt"...
Writing "euroreggaebop" to "./async/new_file2.txt"...
Writing "shoedisco" to "./async/new_file0.txt"...
Writing "russiagaze" to "./async/new_file4.txt"...
Writing "alternative xylophone" to "./async/new_file3.txt"...
Time to complete asyncio read/writes: 0.71 seconds
...мы видим, что это еще быстрее. И, в целом, метод asyncio всегда будет немного быстрее, чем метод threading. Это происходит потому, что, когда мы используем синтаксис "await", мы, по сути, говорим нашей программе "подождите, я сейчас вернусь", но наша программа отслеживает, сколько времени нам требуется, чтобы завершить то, что мы делаем. Как только мы закончим, наша программа узнает об этом и возобновит работу, как только сможет. Многопоточность в Python допускает асинхронность, но наша программа теоретически может пропускать различные потоки, которые, возможно, еще не готовы, теряя время, если есть потоки, готовые к продолжению работы.
Итак, когда я должен использовать многопоточность, а когда я должен использовать asyncio?
Когда вы пишете новый код, используйте asyncio. Если вам нужно взаимодействовать со старыми библиотеками или с теми, которые не поддерживают asyncio, возможно, вам лучше использовать многопоточность.
Тестирование asyncio с помощью pytest
Оказывается, тестировать асинхронные функции с помощью pytest так же просто, как и синхронные. Просто установите пакет pytest-asyncio с помощью pip, пометьте свои тесты ключевым словом async и примените декоратор, который даст pytest знать, что это асинхронный: @pytest.mark.asyncio. Давайте рассмотрим пример.
Сначала давайте запишем произвольную асинхронную функцию в файл с именем hello_asyncio.py:
import asyncio
async def say_hello(name: str):
""" Sleeps for two seconds, then prints 'Hello, {{ name }}!' """
try:
if type(name) != str:
raise TypeError("'name' must be a string")
if name == "":
raise ValueError("'name' cannot be empty")
except (TypeError, ValueError):
raise
print("Sleeping...")
await asyncio.sleep(2)
print(f"Hello, {name}!")
Функция принимает единственный строковый аргумент: name. Убедившись, что name является строкой длиной больше единицы, наша функция асинхронно переходит в режим ожидания на две секунды, а затем выводит "Hello, {name}!" на консоль.
Разница между
asyncio.sleep()иtime.sleep()заключается в том, чтоasyncio.sleep()не является блокирующим.
Теперь давайте протестируем это с помощью pytest. В том же каталоге, что и hello_asyncio.py, создайте файл с именем test_hello_asyncio.py, затем откройте его в вашем любимом текстовом редакторе.
Давайте начнем с нашего импорта:
import pytest # Note: pytest-asyncio does not require a separate import
from hello_asyncio import say_hello
Затем мы создадим тест с соответствующими входными данными:
@pytest.mark.parametrize("name", [
"Robert Paulson",
"Seven of Nine",
"x Æ a-12"
])
@pytest.mark.asyncio
async def test_say_hello(name):
await say_hello(name)
На что следует обратить внимание:
- Декоратор
@pytest.mark.asyncioпозволяет pytest работать асинхронно - В нашем тесте используется синтаксис
async - Мы
awaitнастраиваем нашу асинхронную функцию так, как если бы мы запускали ее вне теста
Теперь давайте запустим наш тест с подробным вариантом -v:
pytest -v
...
collected 3 items
test_hello_asyncio.py::test_say_hello[Robert Paulson] PASSED [ 33%]
test_hello_asyncio.py::test_say_hello[Seven of Nine] PASSED [ 66%]
test_hello_asyncio.py::test_say_hello[x \xc6 a-12] PASSED [100%]
Выглядит неплохо. Далее мы напишем пару тестов с ошибочными входными данными. Вернемся к test_hello_asyncio.py , давайте создадим класс с именем TestSayHelloThrowsExceptions:
class TestSayHelloThrowsExceptions:
@pytest.mark.parametrize("name", [
"",
])
@pytest.mark.asyncio
async def test_say_hello_value_error(self, name):
with pytest.raises(ValueError):
await say_hello(name)
@pytest.mark.parametrize("name", [
19,
{"name", "Diane"},
[]
])
@pytest.mark.asyncio
async def test_say_hello_type_error(self, name):
with pytest.raises(TypeError):
await say_hello(name)
Опять же, мы оформляем наши тесты с помощью @pytest.mark.asyncio, помечаем наши тесты синтаксисом async, затем вызываем нашу функцию с помощью await.
Запустите тесты еще раз:
pytest -v
...
collected 7 items
test_hello_asyncio.py::test_say_hello[Robert Paulson] PASSED [ 14%]
test_hello_asyncio.py::test_say_hello[Seven of Nine] PASSED [ 28%]
test_hello_asyncio.py::test_say_hello[x \xc6 a-12] PASSED [ 42%]
test_hello_asyncio.py::TestSayHelloThrowsExceptions::test_say_hello_value_error[] PASSED [ 57%]
test_hello_asyncio.py::TestSayHelloThrowsExceptions::test_say_hello_type_error[19] PASSED [ 71%]
test_hello_asyncio.py::TestSayHelloThrowsExceptions::test_say_hello_type_error[name1] PASSED [ 85%]
test_hello_asyncio.py::TestSayHelloThrowsExceptions::test_say_hello_type_error[name2] PASSED [100%]
Без pytest-asyncio
В качестве альтернативы pytest-asyncio вы можете создать приспособление pytest, которое создает цикл обработки событий asyncio:
import asyncio
import pytest
from hello_asyncio import say_hello
@pytest.fixture
def event_loop():
loop = asyncio.get_event_loop()
yield loop
Затем, вместо использования синтаксиса async/await, вы создаете свои тесты так, как создавали бы обычные синхронные тесты:
@pytest.mark.parametrize("name", [
"Robert Paulson",
"Seven of Nine",
"x Æ a-12"
])
def test_say_hello(event_loop, name):
event_loop.run_until_complete(say_hello(name))
class TestSayHelloThrowsExceptions:
@pytest.mark.parametrize("name", [
"",
])
def test_say_hello_value_error(self, event_loop, name):
with pytest.raises(ValueError):
event_loop.run_until_complete(say_hello(name))
@pytest.mark.parametrize("name", [
19,
{"name", "Diane"},
[]
])
def test_say_hello_type_error(self, event_loop, name):
with pytest.raises(TypeError):
event_loop.run_until_complete(say_hello(name))
Если вам интересно, вот более подробное руководство по асинхронному тестированию.
Читать далее
Если вы хотите узнать больше о том, чем отличается реализация потоковой передачи в Python от asyncio, вот отличная статья от Medium.
Для получения еще более подробных примеров и объяснений работы с потоками в Python, вот видео Кори Шейфера, в котором более подробно рассказывается об использовании библиотеки concurrent.futures.
И, наконец, для более глубокого ознакомления с самим asyncio, вот статья из Real Python, полностью посвященная ему.
Бонус: Еще одна библиотека, которая может вас заинтересовать, называется Unsync, особенно если вы хотите легко преобразовать ваш текущий синхронный код в асинхронный код. Чтобы использовать его, вы устанавливаете библиотеку с помощью pip, импортируете ее с помощью from unsync import unsync, затем добавляете к любой текущей синхронной функции @unsync, чтобы сделать ее асинхронной. Чтобы дождаться его и получить возвращаемое значение (что вы можете сделать где угодно - это не обязательно должно быть в асинхронной/ несинхронной функции), просто вызовите .result() после вызова функции.
Параллелизм
Что такое параллелизм?
Параллелизм очень тесно связан с параллелизмом. Фактически, параллелизм - это разновидность параллелизма: в то время как параллельный процесс выполняет несколько задач одновременно, независимо от того, отвлекается на них все внимание или нет, параллельный процесс физически выполняет несколько задач одновременно. Хорошим примером может служить одновременное вождение автомобиля, прослушивание музыки и употребление в пищу батончика, который мы приготовили в предыдущем разделе.

Поскольку они не требуют больших усилий, вы можете выполнять их все сразу, не откладывая в долгий ящик и не отвлекаясь ни на что другое.
Теперь давайте посмотрим, как реализовать это в Python. Мы могли бы использовать библиотеку multiprocessing, но вместо этого давайте воспользуемся библиотекой concurrent.futures - это избавляет от необходимости управлять количеством процессов вручную. Поскольку основное преимущество многопроцессорной обработки заключается в том, что вы выполняете несколько задач с высокой нагрузкой на процессор, мы собираемся вычислить квадраты от 1 миллиона (1000000) до 1 миллиона и 16 (1000016).
Вы можете найти код для этого примера здесь.
Единственный импорт, который нам понадобится, это concurrent.futures:
import concurrent.futures
import time
if __name__ == "__main__":
pow_list = [i for i in range(1000000, 1000016)]
print("Starting...")
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(pow, i, i) for i in pow_list]
for f in concurrent.futures.as_completed(futures):
print("okay")
end = time.time()
print(f"Time to complete: {round(end - start, 2)}")
Поскольку я разрабатываю на компьютере с Windows, я использую
if __name__ == "main". Это необходимо, поскольку в Windows отсутствуетforkсистемный вызов , присущий системам Unix. Поскольку Windows не обладает такой возможностью, она запускает новый интерпретатор для каждого процесса, который пытается импортировать основной модуль. Если основной модуль не существует, программа запускается повторно, что приводит к рекурсивному хаосу.
Итак, рассмотрим нашу основную функцию: мы используем list comprehension для создания списка от 1 миллиона до 1 миллиона и 16, мы открываем ProcessPoolExecutor с помощью concurrent.futures, и мы используем list comprehension и ProcessPoolExecutor().submit(), чтобы начать выполнение нашего процессы и помещает их в список под названием "будущее".
Мы могли бы также использовать
ThreadPoolExecutor(), если бы хотели использовать потоки вместо этого - concurrent.futures универсален.
И вот тут-то и проявляется асинхронность: список "результаты" на самом деле не содержит результатов выполнения наших функций. Вместо этого он содержит "фьючерсы", которые похожи на JavaScript-идею "обещаний". Чтобы позволить нашей программе продолжить работу, мы возвращаем эти значения futures, которые представляют собой заполнитель для значения. Если мы попытаемся напечатать значение future, в зависимости от того, завершено его выполнение или нет, мы вернем либо состояние "ожидание", либо "завершено". Как только это будет завершено, мы сможем получить возвращаемое значение (при условии, что оно есть), используя var.result(). В этом случае нашей переменной будет "результат".
Затем мы перебираем наш список вариантов будущего, но вместо того, чтобы выводить наши значения, мы просто выводим "окей". Это объясняется тем, насколько масштабными получаются вычисления в результате.
Как и раньше, я создал скрипт сравнения, который выполняет это синхронно. И, как и раньше, вы можете найти его на GitHub..
Запустив нашу управляющую программу, которая также включает в себя функции синхронизации нашей программы, мы получаем:
Starting...
okay
...
okay
Time to complete: 54.64
Ничего себе. 54,64 секунды - это довольно долго. Давайте посмотрим, насколько лучше работает наша версия с многопроцессорной обработкой:
Starting...
okay
...
okay
Time to complete: 6.24
Наше время было значительно сокращено. Мы потратили примерно 1/9 нашего первоначального времени.
Итак, что бы произошло, если бы мы использовали для этого многопоточность?
Я уверен, вы догадываетесь, что это будет ненамного быстрее, чем выполнять это синхронно. На самом деле, это может быть и медленнее, потому что для запуска новых потоков все равно требуется немного времени и усилий. Но не верьте мне на слово, вот что мы получим, заменив ProcessPoolExecutor() на ThreadPoolExecutor():
Starting...
okay
...
okay
Time to complete: 53.83
Как я упоминал ранее, многопоточность позволяет вашим приложениям сосредоточиться на новых задачах, в то время как другие ожидают. В этом случае мы никогда не сидим сложа руки. С другой стороны, многопроцессорная обработка запускает совершенно новые сервисы, обычно на отдельных ядрах процессора, готовые выполнять все, что вы попросите, полностью в тандеме со всем остальным, что делает ваш скрипт. Вот почему многопроцессорная версия, занимающая примерно 1/9 времени, имеет смысл - в моем процессоре 8 ядер.
Теперь, когда мы поговорили о параллелизме в Python, мы можем, наконец, прояснить термины. Если у вас возникли проблемы с различением этих терминов, вы можете безопасно и точно использовать наши предыдущие определения "параллелизма" и "параллелепипедизма" как "параллельный параллелизм" и "непараллельный параллелизм" соответственно.
Читать далее
В Real Python есть отличная статья на тему параллелизм против параллелизма.
У инженера Мэна есть хорошее видео-сравнение многопоточности и многопроцессорности.
У Кори Шейфера также есть хорошее видео о многопроцессорной обработке в том же духе, что и его видео о потоковой обработке.
Если вы смотрите только одно видео, посмотрите это превосходное выступление Раймонда Хеттингера. Он проделал потрясающую работу, объяснив различия между многопроцессорной обработкой, потоковой передачей и асинхронностью.
Объединение Asyncio с многопроцессорной обработкой
Что делать, если мне нужно объединить множество операций ввода-вывода с тяжелыми вычислениями?
Мы тоже можем это сделать. Допустим, вам нужно просмотреть 100 веб-страниц в поисках определенной информации, а затем сохранить эту информацию в файле на будущее. Мы можем распределить вычислительную мощность между ядрами нашего компьютера, заставив каждый процесс обрабатывать небольшую часть страниц.
Для этого скрипта давайте установим Beautiful Soup, чтобы упростить очистку наших страниц: pip install beautifulsoup4. На этот раз у нас действительно довольно много импорта. Вот они, и вот почему мы их используем:
import asyncio # Gives us async/await
import concurrent.futures # Allows creating new processes
import time
from math import floor # Helps divide up our requests evenly across our CPU cores
from multiprocessing import cpu_count # Returns our number of CPU cores
import aiofiles # For asynchronously performing file I/O operations
import aiohttp # For asynchronously making HTTP requests
from bs4 import BeautifulSoup # For easy webpage scraping
Вы можете найти код для этого примера здесь.
Во-первых, мы собираемся создать асинхронную функцию, которая отправляет запрос в Википедию, чтобы получить обратно случайные страницы. Мы будем очищать каждую полученную страницу от заголовка, используя BeautifulSoup, а затем добавим ее в данный файл; мы будем отделять каждый заголовок символом табуляции. Функция будет принимать два аргумента:
- num_pages - количество страниц, которые нужно запросить и очистить для заголовков
- output_file - Файл для добавления наших заголовков к
async def get_and_scrape_pages(num_pages: int, output_file: str):
"""
Makes {{ num_pages }} requests to Wikipedia to receive {{ num_pages }} random
articles, then scrapes each page for its title and appends it to {{ output_file }},
separating each title with a tab: "\\t"
#### Arguments
---
num_pages: int -
Number of random Wikipedia pages to request and scrape
output_file: str -
File to append titles to
"""
async with \
aiohttp.ClientSession() as client, \
aiofiles.open(output_file, "a+", encoding="utf-8") as f:
for _ in range(num_pages):
async with client.get("https://en.wikipedia.org/wiki/Special:Random") as response:
if response.status > 399:
# I was getting a 429 Too Many Requests at a higher volume of requests
response.raise_for_status()
page = await response.text()
soup = BeautifulSoup(page, features="html.parser")
title = soup.find("h1").text
await f.write(title + "\t")
await f.write("\n")
Мы оба асинхронно открываем aiohttp ClientSession и наш выходной файл. Режим a+ означает добавить к файлу и создать его, если он еще не существует. Кодировка наших строк в utf-8 гарантирует, что мы не получим сообщение об ошибке, если наши заголовки содержат международные символы. Если мы получим ответ об ошибке, мы увеличим его, а не продолжим (при большом количестве запросов я получал 429 запросов, что было слишком много). Мы асинхронно получаем текст из нашего ответа, затем анализируем заголовок и асинхронно добавляем его в наш файл. После того, как мы добавим все наши заголовки, мы добавим новую строку: "\n".
Наша следующая функция - это функция, которую мы будем запускать с каждым новым процессом, чтобы обеспечить его асинхронный запуск:
def start_scraping(num_pages: int, output_file: str, i: int):
""" Starts an async process for requesting and scraping Wikipedia pages """
print(f"Process {i} starting...")
asyncio.run(get_and_scrape_pages(num_pages, output_file))
print(f"Process {i} finished.")
Теперь перейдем к нашей основной функции. Давайте начнем с некоторых констант (и описания нашей функции):
def main():
NUM_PAGES = 100 # Number of pages to scrape altogether
NUM_CORES = cpu_count() # Our number of CPU cores (including logical cores)
OUTPUT_FILE = "./wiki_titles.tsv" # File to append our scraped titles to
PAGES_PER_CORE = floor(NUM_PAGES / NUM_CORES)
PAGES_FOR_FINAL_CORE = PAGES_PER_CORE + NUM_PAGES % PAGES_PER_CORE # For our final core
А теперь логика:
futures = []
with concurrent.futures.ProcessPoolExecutor(NUM_CORES) as executor:
for i in range(NUM_CORES - 1):
new_future = executor.submit(
start_scraping, # Function to perform
# v Arguments v
num_pages=PAGES_PER_CORE,
output_file=OUTPUT_FILE,
i=i
)
futures.append(new_future)
futures.append(
executor.submit(
start_scraping,
PAGES_FOR_FINAL_CORE, OUTPUT_FILE, NUM_CORES-1
)
)
concurrent.futures.wait(futures)
Мы создаем массив для хранения наших фьючерсов, затем мы создаем ProcessPoolExecutor, устанавливая его max_workers равным нашему количеству ядер. Мы выполняем итерацию в диапазоне, равном нашему количеству ядер минус 1, запуская новый процесс с помощью нашей функции start_scraping. Затем мы добавляем его в наш список будущих процессов. Нашему конечному ядру потенциально придется проделать дополнительную работу, поскольку оно очистит количество страниц, равное количеству всех наших других ядер, но дополнительно очистит количество страниц, равное остатку, который мы получили при делении общего количества страниц для очистки на общее количество ядер процессора.
Убедитесь, что ваша основная функция действительно запущена:
if __name__ == "__main__":
start = time.time()
main()
print(f"Time to complete: {round(time.time() - start, 2)} seconds.")
После запуска программы на моем 8-ядерном процессоре (вместе с тестовым кодом):
Эта версия (asyncio с многопроцессорной обработкой):
Time to complete: 5.65 seconds.
Только многопроцессорная обработка:
Time to complete: 8.87 seconds.
Time to complete: 47.92 seconds.
Time to complete: 88.86 seconds.
На самом деле я очень удивлен, увидев, что улучшение asyncio с помощью многопроцессорной обработки по сравнению с простой многопроцессорной обработкой оказалось не таким значительным, как я думал.
Краткое описание: когда использовать многопроцессорную обработку вместо асинхронной или потоковой
- Используйте многопроцессорную обработку, когда вам нужно выполнить много сложных вычислений и вы можете разделить их.
- Используйте asyncio или threading при выполнении операций ввода-вывода - взаимодействии с внешними ресурсами или чтении/записи из/в файлы.
- Многопроцессорная обработка и asyncio могут использоваться вместе, но хорошим практическим правилом является разветвление процесса перед использованием потока / asyncio, а не наоборот - потоки относительно дешевы по сравнению с процессами.
asyn/awayt на других языках
async/await аналогичный синтаксис существует и в других языках, и в некоторых из этих языков его реализация может кардинально отличаться.
.NET: из F# в C
Первым языком программирования (еще в 2007 году), использовавшим синтаксис async, был Microsoft F#. В то время как он точно не использует await для ожидания вызова функции, он использует специальный синтаксис, такой как let! и do!, а также собственные функции Async, включенные в System модуль.
Вы можете узнать больше об асинхронном программировании на F# в Документации Microsoft по F#..
Затем их команда разработчиков C# основалась на этой концепции, и именно здесь родились async/await ключевые слова, с которыми мы теперь знакомы:
using System;
// Allows the "Task" return type
using System.Threading.Tasks;
public class Program
{
// Declare an async function with "async"
private static async Task<string> ReturnHello()
{
return "hello world";
}
// Main can be async -- no problem
public static async Task Main()
{
// await an async string
string result = await ReturnHello();
// Print the string we got asynchronously
Console.WriteLine(result);
}
}
Мы гарантируем, что у нас есть using System.Threading.Tasks, поскольку он включает тип Task, и, как правило, тип Task необходим для ожидания асинхронной функции. Самое замечательное в C# то, что вы можете сделать свою основную функцию асинхронной, просто объявив ее с помощью async, и у вас не возникнет никаких проблем.
Если вам интересно узнать больше о
async/awaitв C#, Документах Microsoft по C# есть хорошая страница, посвященная этому.
JavaScript
Синтаксис async/await, впервые представленный в ES6, по сути, представляет собой абстракцию от обещаний JavaScript (которые похожи на фьючерсы Python). Однако, в отличие от Python, пока вы не ожидаете, вы можете вызывать асинхронную функцию обычным способом, без специальной функции, подобной функции Python asyncio.start():
// Declare a function with async
async function returnHello(){
return "hello world";
}
async function printSomething(){
// await an async string
const result = await returnHello();
// print the string we got asynchronously
console.log(result);
}
// Run our async code
printSomething();
Смотрите MDN для получения дополнительной информации о
async/awaitв JavaScript.
Rust
Rust теперь также позволяет использовать синтаксис async/await, и он работает аналогично Python, C# и JavaScript:
// Allows blocking synchronous code to run async code
use futures::executor::block_on;
// Declare an async function with "async"
async fn return_hello() -> String {
"hello world".to_string()
}
// Code that awaits must also be declared with "async"
async fn print_something(){
// await an async String
let result: String = return_hello().await;
// Print the string we got asynchronously
println!("{0}", result);
}
fn main() {
// Block the current synchronous execution to run our async code
block_on(print_something());
}
Чтобы использовать асинхронные функции, мы должны сначала добавить futures = "0.3" в наш файл Cargo.toml. Затем мы импортируем функцию block_on с помощью use futures::executor::block_on -- block_on, которая необходима для запуска нашей асинхронной функции из нашей синхронной функции main.
Вы можете найти более подробную информацию о
async/awaitв Rust в документации по Rust.
Go
Вместо традиционного синтаксиса async/await, присущего всем предыдущим рассмотренным нами языкам, в Go используются "goroutines" и "channels". Вы можете представить, что канал похож на Python future. В Go вы обычно отправляете канал в качестве аргумента функции, а затем используете go для одновременного запуска функции. Всякий раз, когда вам нужно убедиться, что функция завершила выполнение, вы используете синтаксис <-, который можно рассматривать как более распространенный синтаксис await. Если ваша подпрограмма (функция, которую вы запускаете асинхронно) имеет возвращаемое значение, его можно получить следующим образом.
package main
import "fmt"
// "chan" makes the return value a string channel instead of a string
func returnHello(result chan string){
// Gives our channel a value
result <- "hello world"
}
func main() {
// Creates a string channel
result := make(chan string)
// Starts execution of our goroutine
go returnHello(result)
// Awaits and prints our string
fmt.Println(<- result)
}
Запустите его на игровой площадке Go Playground
Для получения дополнительной информации о параллелизме в Go, смотрите Введение в программирование на Go Калеба Докси.
Ruby
Подобно Python, Ruby также имеет глобальное ограничение на блокировку интерпретатора. Чего в нем нет, так это встроенного в язык параллелизма. Однако существует созданный сообществом gem, который допускает параллелизм в Ruby, и вы можете найти его исходный код на GitHub.
Java
Как и в Ruby, в Java нет встроенного синтаксиса async/await, но у него есть возможности параллелизма с использованием модуля java.util.concurrent. Однако Electronic Arts написала асинхронную библиотеку, которая позволяет использовать await в качестве метода. Это не совсем то же самое, что Python / C# / JavaScript / Rust, но на это стоит обратить внимание, если вы разработчик Java и заинтересованы в такого рода функциональности.
C++
Хотя в C++ также нет синтаксиса async/await, в нем есть возможность использовать futures для одновременного выполнения кода с помощью модуля futures:
#include <iostream>
#include <string>
// Necessary for futures
#include <future>
// No async declaration needed
std::string return_hello() {
return "hello world";
}
int main ()
{
// Declares a string future
std::future<std::string> fut = std::async(return_hello);
// Awaits the result of the future
std::string result = fut.get();
// Prints the string we got asynchronously
std::cout << result << '\n';
}
Нет необходимости объявлять функцию с каким-либо ключевым словом, чтобы указать, может ли она выполняться асинхронно или нет. Вместо этого вы объявляете свое начальное будущее всякий раз, когда вам это нужно, с помощью std::future<{{ function return type }}> и устанавливаете его равным std::async(), включая имя функции, которую вы хотите выполнять асинхронно, а также любые аргументы, которые она принимает, т.е. std::async(do_something, 1, 2, "string"). Чтобы дождаться значения в будущем, используйте для него синтаксис .get().
Документацию по асинхронности в C++ можно найти на странице cplusplus.com.
Краткое описание
Независимо от того, работаете ли вы с асинхронными сетевыми или файловыми операциями, или выполняете множество сложных вычислений, существует несколько различных способов повысить эффективность вашего кода.
Если вы используете Python, вы можете использовать asyncio или threading, чтобы максимально использовать операции ввода-вывода, или модуль multiprocessing для кода, требующего больших затрат процессора.
Также помните, что модуль
concurrent.futuresможно использовать вместоthreadingилиmultiprocessing.
Если вы используете другой язык программирования, скорее всего, для него тоже есть реализация async/await.
Back to TopХотите увидеть больше примеров параллелизма, concurrency и asyncio? Ознакомьтесь со статьей Параллелизм, параллелизаторство и асинхронность в Python на примере.