Параллелизм, конкурентность и AsyncIO в Python — на примере
Конкурентность против параллелизма
Конкурентность и параллелизм - похожие термины, но это не одно и то же.
Конкуренция - это возможность одновременного выполнения нескольких задач на ЦП. Задачи могут запускаться, выполняться и завершаться в перекрывающиеся периоды времени. В случае одного процессора несколько задач выполняются с помощью переключения контекстов, когда состояние процесса сохраняется, чтобы его можно было вызвать и выполнить позже.
Параллелизм - это возможность одновременного выполнения нескольких задач на нескольких ядрах процессора.
Хотя они могут увеличить скорость работы вашего приложения, параллелизм и параллельность не следует использовать повсеместно. Вариант использования зависит от того, является ли задача процессорозависимой или IO-зависимой.
Задачи, которые ограничиваются центральным процессором, привязаны к процессору. Например, математические вычисления привязаны к процессору, поскольку вычислительная мощность увеличивается с ростом числа процессоров компьютера. Параллелизм предназначен для задач, ограниченных центральным процессором. Теоретически, если задача разделена на n подзадач, каждая из этих n задач может выполняться параллельно, чтобы эффективно сократить время до 1/n исходной непараллельной задачи. Параллельность предпочтительна для задач, связанных с IO, поскольку вы можете заниматься чем-то другим, пока происходит выборка ресурсов IO.
Лучшим примером задач с привязкой к процессору является наука о данных. Специалисты по изучению данных работают с огромными массивами данных. Для предварительной обработки данных они могут разделить их на несколько пакетов и запускать их параллельно, эффективно сокращая общее время обработки. Увеличение количества ядер приводит к ускорению обработки данных.
Web scraping является IO-bound. Поскольку эта задача мало влияет на процессор, так как большая часть времени тратится на чтение из сети и запись в сеть. Другие распространенные IO-bound задачи включают вызовы баз данных, чтение и запись файлов на диск. Веб-приложения, такие как Django и Flask, являются IO-bound приложениями.
Если вам интересно узнать больше о различиях между потоками, мультипроцессингом и async в Python, ознакомьтесь со статьей Speeding Up Python with Concurrency, Parallelism, and asyncio.
Сценарий
Давайте рассмотрим, как ускорить выполнение следующих задач:
# tasks.py
import os
from multiprocessing import current_process
from threading import current_thread
import requests
def make_request(num):
# io-bound
pid = os.getpid()
thread_name = current_thread().name
process_name = current_process().name
print(f"{pid} - {process_name} - {thread_name}")
requests.get("https://httpbin.org/ip")
async def make_request_async(num, client):
# io-bound
pid = os.getpid()
thread_name = current_thread().name
process_name = current_process().name
print(f"{pid} - {process_name} - {thread_name}")
await client.get("https://httpbin.org/ip")
def get_prime_numbers(num):
# cpu-bound
pid = os.getpid()
thread_name = current_thread().name
process_name = current_process().name
print(f"{pid} - {process_name} - {thread_name}")
numbers = []
prime = [True for i in range(num + 1)]
p = 2
while p * p <= num:
if prime[p]:
for i in range(p * 2, num + 1, p):
prime[i] = False
p += 1
prime[0] = False
prime[1] = False
for p in range(num + 1):
if prime[p]:
numbers.append(p)
return numbers
Все примеры кода в этом руководстве можно найти в репозитории parallel-concurrent-examples-python.
Примечания:
make_request
делает HTTP запрос на https://httpbin.org/ip X количество раз.make_request_async
делает тот же самый HTTP запрос асинхронно с HTTPX.get_prime_numbers
вычисляет простые числа методом Решето Эратосфена от двух до заданного предела.
Для ускорения выполнения вышеуказанных задач мы будем использовать следующие библиотеки из стандартной библиотеки:
- поточность для одновременного выполнения задач
- multiprocessing для параллельного выполнения задач
- concurrent.futures для одновременного и параллельного выполнения задач из одного интерфейса
- asyncio для параллельного выполнения задач с корутинами, управляемыми интерпретатором Python
Library | Class/Method | Processing Type |
---|---|---|
threading | Thread | concurrent |
concurrent.futures | ThreadPoolExecutor | concurrent |
asyncio | gather | concurrent (via coroutines) |
multiprocessing | Pool | parallel |
concurrent.futures | ProcessPoolExecutor | parallel |
IO-bound Operation
Опять же, связанные с IO задачи тратят больше времени на IO, чем на CPU.
Поскольку веб-скрейпинг связан с IO, мы должны использовать потоки для ускорения обработки, поскольку получение HTML (IO) происходит медленнее, чем его разбор (CPU).
Сценарий: Как ускорить скрипт веб-скрейпинга и краулинга на базе Python?
Пример синхронизации
Начнем с эталона.
# io-bound_sync.py
import time
from tasks import make_request
def main():
for num in range(1, 101):
make_request(num)
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
Здесь мы сделали 100 HTTP-запросов с помощью функции make_request
. Поскольку запросы происходят синхронно, каждая задача выполняется последовательно.
Elapsed run time: 15.710984757 seconds.
Итак, это примерно 0,16 секунды на один запрос.
Пример потоковой обработки
# io-bound_concurrent_1.py
import threading
import time
from tasks import make_request
def main():
tasks = []
for num in range(1, 101):
tasks.append(threading.Thread(target=make_request, args=(num,)))
tasks[-1].start()
for task in tasks:
task.join()
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
Здесь одна и та же функция make_request
вызывается 100 раз. На этот раз для создания потока для каждого запроса используется библиотека threading
.
Elapsed run time: 1.020112515 seconds.
Общее время уменьшается с ~16с до ~1с.
Поскольку мы используем отдельные потоки для каждого запроса, вы можете задаться вопросом, почему все это не заняло ~0,16 с для завершения. Это дополнительное время - накладные расходы на управление потоками. Глобальная блокировка интерпретатора (GIL) в Python гарантирует, что только один поток одновременно использует байткод Python.
concurrent.futures Пример
# io-bound_concurrent_2.py
import time
from concurrent.futures import ThreadPoolExecutor, wait
from tasks import make_request
def main():
futures = []
with ThreadPoolExecutor() as executor:
for num in range(1, 101):
futures.append(executor.submit(make_request, num))
wait(futures)
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
Здесь мы использовали concurrent.futures.ThreadPoolExecutor
для достижения многопоточности. После создания всех фьючерсов/обещаний мы использовали wait
, чтобы дождаться их завершения.
Elapsed run time: 1.340592231 seconds
concurrent.futures.ThreadPoolExecutor
фактически является абстракцией вокруг библиотеки multithreading
, что упрощает ее использование. В предыдущем примере мы назначили каждый запрос на поток, и в общей сложности было использовано 100 потоков. Но в ThreadPoolExecutor
по умолчанию количество рабочих потоков равно min(32, os.cpu_count() + 4)
. ThreadPoolExecutor существует для того, чтобы облегчить процесс достижения многопоточности. Если вы хотите получить больший контроль над многопоточностью, используйте вместо этого библиотеку multithreading
.
Пример AsyncIO
# io-bound_concurrent_3.py
import asyncio
import time
import httpx
from tasks import make_request_async
async def main():
async with httpx.AsyncClient() as client:
return await asyncio.gather(
*[make_request_async(num, client) for num in range(1, 101)]
)
if __name__ == "__main__":
start_time = time.perf_counter()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f"Elapsed run time: {elapsed_time} seconds")
httpx
используется здесь, посколькуrequests
не поддерживает асинхронные операции.
Здесь мы использовали asyncio
для достижения параллелизма.
Elapsed run time: 0.553961068 seconds
asyncio
быстрее, чем другие методы, потому что threading
использует потоки ОС (операционной системы). Таким образом, потоки управляются ОС, и переключение потоков происходит с ее помощью. asyncio
использует корутины, которые определяются интерпретатором Python. С помощью coroutines программа решает, когда оптимально переключать задачи. За это отвечает even_loop
в asyncio.
Операция с ограничением вычислительной мощности
Сценарий: Как ускорить простой скрипт обработки данных?
Пример синхронизации
Опять же, давайте начнем с эталона.
# cpu-bound_sync.py
import time
from tasks import get_prime_numbers
def main():
for num in range(1000, 16000):
get_prime_numbers(num)
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
Здесь мы выполнили функцию get_prime_numbers
для чисел от 1000 до 16000.
Elapsed run time: 17.863046316 seconds.
Пример многопроцессорной обработки
# cpu-bound_parallel_1.py
import time
from multiprocessing import Pool, cpu_count
from tasks import get_prime_numbers
def main():
with Pool(cpu_count() - 1) as p:
p.starmap(get_prime_numbers, zip(range(1000, 16000)))
p.close()
p.join()
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
Здесь мы использовали multiprocessing
для вычисления простых чисел.
Elapsed run time: 2.9848740599999997 seconds.
concurrent.futures Пример
# cpu-bound_parallel_2.py
import time
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import cpu_count
from tasks import get_prime_numbers
def main():
futures = []
with ProcessPoolExecutor(cpu_count() - 1) as executor:
for num in range(1000, 16000):
futures.append(executor.submit(get_prime_numbers, num))
wait(futures)
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
Здесь мы добились многопроцессорности с помощью concurrent.futures.ProcessPoolExecutor
. Как только задания добавляются во фьючерсы, wait(futures)
ожидает их завершения.
Elapsed run time: 4.452427557 seconds.
concurrent.futures.ProcessPoolExecutor
является оберткой вокруг multiprocessing.Pool
. Она имеет те же ограничения, что и ThreadPoolExecutor
. Если вам нужен больший контроль над многопроцессорностью, используйте multiprocessing.Pool
. concurrent.futures
предоставляет абстракцию как над мультипроцессингом, так и над потоками, что позволяет легко переключаться между ними.
Заключение
Стоит отметить, что использование многопроцессорной обработки для выполнения функции make_request
будет намного медленнее, чем потоковый вариант, так как процессы должны будут ждать ввода-вывода. Однако многопроцессорный подход будет быстрее, чем синхронный.
Аналогично, использование параллелизма для задач, привязанных к процессору, не стоит усилий по сравнению с параллелизмом.
При этом использование параллелизма или параллельности для выполнения скриптов усложняет работу. Ваш код будет труднее читать, тестировать и отлаживать, поэтому используйте их только в случае крайней необходимости для долго выполняющихся сценариев.
concurrent.futures
- это то, с чего я обычно начинаю, поскольку-
- Легко переключаться туда-сюда между параллелизмом и параллельностью
- Зависимым библиотекам не нужно поддерживать asyncio (
requests
противhttpx
) - Чище и легче читать по сравнению с другими подходами
Возьмите код из репозитория parallel-concurrent-examples-python на GitHub.
Back to Top