Getting Started With Async Features in Python
Оглавление
- Понимание асинхронного программирования
- Программирование родителей: Не так просто, как кажется!
- Использование асинхронных функций Python на практике
- Заключение
Слышали ли вы об асинхронном программировании на Python? Вам интересно узнать больше об асинхронных функциях Python и о том, как вы можете использовать их в своей работе? Возможно, вы даже пытались писать многопоточные программы и сталкивались с некоторыми проблемами. Если вы хотите понять, как использовать асинхронные функции Python, то вы обратились по адресу.
Из этой статьи вы узнаете:
- Что такое синхронная программа
- Что такое асинхронная программа
- Почему вам может понадобиться написать асинхронную программу
- Как использовать асинхронные функции Python
Все примеры кода, приведенные в этой статье, были протестированы на Python 3.7.2. Вы можете ознакомиться с ними, перейдя по ссылке ниже:
Скачать код: Нажмите здесь, чтобы загрузить код, который вы будете использовать для изучения функций асинхронности в Python в этом руководстве.
Понимание асинхронного программирования
Синхронная программа выполняется по одному шагу за раз. Даже с учетом условного ветвления, циклов и вызовов функций вы все равно можете рассматривать код с точки зрения выполнения одного шага за раз. Когда каждый шаг завершен, программа переходит к следующему.
Вот два примера программ, которые работают таким образом:
-
Программы пакетной обработки часто создаются как синхронные программы. Вы получаете некоторые входные данные, обрабатываете их и создаете некоторые выходные данные. Шаги следуют один за другим, пока программа не достигнет желаемого результата. Программе нужно только обращать внимание на шаги и их порядок.
-
Программы командной строки - это небольшие, быстрые процессы, которые запускаются в терминале. Эти скрипты используются для создания чего-либо, преобразования одной вещи во что-то другое, создания отчета или, возможно, вывода некоторых данных. Это может быть выражено в виде последовательности программных шагов, которые выполняются последовательно до тех пор, пока программа не будет завершена.
Асинхронная программа ведет себя по-другому. Она по-прежнему выполняет один шаг за раз. Разница заключается в том, что система может не дожидаться завершения шага выполнения, прежде чем переходить к следующему.
Это означает, что программа перейдет к выполнению последующих шагов, даже если предыдущий шаг еще не завершен и выполняется в другом месте. Это также означает, что программа знает, что делать, когда предыдущий шаг завершит выполнение.
Почему вы хотите написать программу таким образом? Остальная часть этой статьи поможет вам ответить на этот вопрос и предоставит инструменты, необходимые для элегантного решения интересных асинхронных задач.
Создание асинхронного веб-сервера
Основная функция веб-сервера более или менее совпадает с пакетной обработкой. Сервер получает некоторые входные данные, обрабатывает их и создает выходные данные. Написанная как синхронная программа, она создала бы работающий веб-сервер.
Это также был бы абсолютно ужасный веб-сервер.
Почему? В данном случае одна единица работы (ввод, обработка, вывод) не является единственной целью. Реальная цель заключается в том, чтобы как можно быстрее выполнить сотни или даже тысячи единиц работы. Это может происходить в течение длительного периода времени, и несколько рабочих подразделений могут даже прибыть одновременно.
Можно ли улучшить синхронный веб-сервер? Конечно, вы могли бы оптимизировать этапы выполнения, чтобы вся поступающая работа выполнялась как можно быстрее. К сожалению, у этого подхода есть свои ограничения. Результатом может стать веб-сервер, который реагирует недостаточно быстро, не справляется с достаточным объемом работы или даже выходит из строя, когда работы становится слишком много.
Примечание: Существуют и другие ограничения, с которыми вы могли бы столкнуться, если бы попытались оптимизировать описанный выше подход. К ним относятся скорость сети, скорость ввода-вывода файлов, скорость запросов к базе данных и скорость других подключенных служб, и это лишь некоторые из них. Общим для всех них является то, что все они являются функциями ввода-вывода. Все эти элементы работают на порядки медленнее, чем скорость обработки данных центральным процессором.
В синхронной программе, если на этапе выполнения запускается запрос к базе данных, центральный процессор, по сути, простаивает до тех пор, пока запрос к базе данных не будет возвращен. Для программ, ориентированных на пакетную обработку, это не является приоритетом в большинстве случаев. Целью является обработка результатов этой операции ввода-вывода. Часто это может занять больше времени, чем сама операция ввода-вывода. Любые усилия по оптимизации будут направлены на обработку, а не на ввод-вывод.
Методы асинхронного программирования позволяют вашим программам использовать преимущества относительно медленных процессов ввода-вывода, освобождая центральный процессор для выполнения другой работы.
Другие взгляды на программирование
Когда вы начнете пытаться разобраться в асинхронном программировании, вы можете столкнуться с множеством дискуссий о важности блокировки или написания неблокирующего кода. (Лично я изо всех сил старался получить представление об этих концепциях от людей, которых я расспрашивал, и от документации, которую я читал.)
Что такое неблокирующий код? Что такое блокирующий код, если уж на то пошло? Помогли бы вам ответы на эти вопросы создать лучший веб-сервер? Если да, то как бы вы могли это сделать? Давайте выясним!
Написание асинхронных программ требует, чтобы вы по-другому относились к программированию. Хотя вам может быть трудно усвоить этот новый способ мышления, это также интересное упражнение. Это потому, что реальный мир почти полностью асинхронен, как и то, как вы с ним взаимодействуете.
Представьте себе: вы - родитель, который пытается делать несколько дел одновременно. Вам приходится вести бухгалтерию, стирать белье и присматривать за детьми. Каким-то образом вы можете делать все это одновременно, даже не задумываясь об этом! Давайте разберем это по порядку:
-
Балансирование чековой книжки - это синхронная задача. Один шаг следует за другим, пока все не будет выполнено. Ты делаешь всю работу сам.
-
Однако вы можете оторваться от чековой книжки, чтобы заняться стиркой. Вы выгружаете сушилку, перекладываете одежду из стиральной машины в сушилку и загружаете в стиральную машину еще раз.
-
Работа со стиральной машиной и сушилкой выполняется синхронно, но основная часть работы выполняется после запуска стиральной машины и сушилки. Как только вы их запустите, можете уходить и возвращаться к работе с чековой книжкой. На этом этапе задачи по стирке и сушке стали асинхронными. Стиральная и сушильная машины будут работать независимо друг от друга до тех пор, пока не раздастся звуковой сигнал (оповещающий вас о том, что работа требует внимания).
-
Наблюдение за вашими детьми - еще одна асинхронная задача. Как только они освоятся и начнут играть, они смогут делать это по большей части самостоятельно. Ситуация меняется, когда кому-то требуется внимание, например, когда кто-то проголодался или получил травму. Когда кто-то из ваших детей кричит в тревоге, вы реагируете. Дети - это долгосрочная задача с высоким приоритетом. Наблюдение за ними заменяет любые другие дела, которые вы могли бы выполнять, например, с чековой книжкой или стиркой белья.
Эти примеры могут помочь проиллюстрировать концепции блокирующего и неблокирующего кода. Давайте рассмотрим это с точки зрения программирования. В этом примере вы похожи на центральный процессор. Пока вы перемещаете белье, вы (центральный процессор) заняты и не можете выполнять другую работу, например, вести баланс в чековой книжке. Но это нормально, потому что задача выполняется относительно быстро.
С другой стороны, запуск стиральной машины и сушилки не мешает вам выполнять другие задачи. Это асинхронная функция, поскольку вам не нужно ждать ее завершения. Как только она будет запущена, вы сможете вернуться к выполнению других задач. Это называется переключением контекста: контекст того, что вы делаете, изменился, и звуковой сигнал машины сообщит вам когда-нибудь в будущем, когда задача стирки будет выполнена.
Как человек, вы постоянно работаете именно так. Вы, естественно, одновременно выполняете множество задач, часто не задумываясь об этом. Как разработчик, весь фокус в том, как перевести такое поведение в код, который выполняет те же действия.
Программировать родителей: не так просто, как кажется!
Если вы узнаете себя (или своих родителей) в приведенном выше примере, то это здорово! Вы продвинулись в понимании асинхронного программирования. Опять же, вы можете довольно легко переключаться между конкурирующими задачами, выполняя одни и возобновляя другие. Теперь вы попытаетесь запрограммировать такое поведение на виртуальных родителей!
Мысленный эксперимент №1: Синхронный родитель
Как бы вы создали родительскую программу для выполнения вышеуказанных задач полностью синхронно? Поскольку наблюдение за детьми является приоритетной задачей, возможно, ваша программа будет выполнять именно это. Родители присматривают за детьми, ожидая, когда произойдет что-то, что может потребовать их внимания. Однако в этом сценарии больше ничего (например, проверка чековой книжки или стирка белья) не будет сделано.
Теперь вы можете изменить приоритеты задач, как вам заблагорассудится, но в любой момент времени будет выполняться только одна из них. Это результат синхронного, пошагового подхода. Как и в случае с синхронным веб-сервером, описанным выше, это могло бы сработать, но, возможно, это не лучший способ жить. Родители не смогут выполнять какие-либо другие задачи, пока дети не уснут. Все остальные задачи будут выполняться позже, глубокой ночью. (Еще пара недель такого поведения, и многие настоящие родители могут выброситься из окна!)
Мысленный эксперимент №2: Родительский опрос
Если вы использовали опрос, то вы могли бы изменить порядок действий таким образом, чтобы было выполнено несколько задач. При таком подходе родитель периодически отрывался бы от текущей задачи и проверял, не требуют ли внимания какие-либо другие задачи.
Давайте сделаем интервал опроса примерно пятнадцатиминутным. Теперь каждые пятнадцать минут ваш родитель проверяет, не требуется ли внимание стиральной машине, сушилке или детям. Если нет, то родитель может вернуться к работе с чековой книжкой. Однако, если какая-либо из этих задач требует внимания, родитель позаботится об этом, прежде чем вернуться к чековой книжке. Этот цикл продолжается до следующего перерыва в цикле опроса.
Этот подход также работает, поскольку внимание привлекает множество задач. Однако есть несколько проблем:
-
Родитель может потратить много времени на проверку вещей, которые не требуют внимания: Стиральная машина и сушилка еще не готовы, а дети не нуждаются во внимании, если только не произойдет что-то непредвиденное.
-
Родитель может пропустить выполненные задачи, которые требуют внимания: Например, если стиральная машина завершит свой цикл в начале интервала опроса, то на нее не будет обращено никакого внимания в течение пятнадцати минут ! Более того, считается, что присмотр за детьми - это задача наивысшего приоритета. Они не могли вынести и пятнадцати минут невнимания, когда что-то могло пойти совсем не так.
Вы могли бы решить эти проблемы, сократив интервал опроса, но теперь ваш родительский сервер (центральный процессор) будет тратить больше времени на переключение контекста между задачами. Именно в этот момент вы начинаете ощущать снижение отдачи. (Еще раз повторю, пара недель такой жизни и...… Смотрите предыдущий комментарий об окнах и прыжках.)
Мысленный эксперимент №3: Родительский поток
“Если бы я только мог клонировать себя...” Если вы родитель, то у вас, вероятно, возникали похожие мысли! Поскольку вы программируете виртуальных родителей, вы, по сути, можете сделать это с помощью многопоточности. Это механизм, который позволяет нескольким разделам одной программы выполняться одновременно. Каждый раздел кода, выполняющийся независимо, называется потоком, и все потоки совместно используют одно и то же пространство памяти.
Если вы рассматриваете каждую задачу как часть одной программы, то вы можете разделить их и запускать как потоки. Другими словами, вы можете “клонировать” родителя, создавая по одному экземпляру для каждой задачи: присмотра за детьми, контроля за стиральной машиной, сушилкой и балансировки чековой книжки. Все эти “клоны” работают независимо.
Это звучит как довольно хорошее решение, но и здесь есть некоторые проблемы. Одна из них заключается в том, что вам придется явно указывать каждому родительскому экземпляру, что делать в вашей программе. Это может привести к некоторым проблемам, поскольку все экземпляры совместно используют все, что находится в программном пространстве.
Например, предположим, что родитель А следит за сушилкой. Родитель А видит, что одежда сухая, поэтому он берет управление сушилкой на себя и начинает выгружать одежду. В то же время родитель В видит, что стирка закончена, поэтому он берет на себя управление стиральной машиной и начинает снимать одежду. Однако родителю В также необходимо взять на себя управление сушилкой, чтобы положить в нее мокрую одежду. Этого не может произойти, поскольку в данный момент сушилкой управляет родитель А.
Через некоторое время родитель А закончил выгружать одежду. Теперь он хочет взять на себя управление стиральной машиной и начать перекладывать одежду в пустую сушилку. Этого также не может произойти, потому что в данный момент родитель В управляет стиральной машиной!
Эти два родителя сейчас находятся в тупике. Оба имеют контроль над своим собственным ресурсом , а хотят контролировать другой ресурс. Они будут вечно ждать, пока другой родительский экземпляр освободит управление. Как программисту, вам придется написать код, чтобы разрешить эту ситуацию.
Примечание: Многопоточные программы позволяют создавать несколько параллельных путей выполнения, которые используют одно и то же пространство памяти. Это является как преимуществом, так и недостатком. Любая память, совместно используемая потоками, подвержена тому, что один или несколько потоков пытаются использовать одну и ту же общую память одновременно. Это может привести к повреждению данных, считыванию данных в недопустимом состоянии и, в целом, к беспорядочному хранению данных.
В потоковом программировании переключение контекста происходит под управлением системы, а не программиста. Система управляет тем, когда переключать контексты и когда предоставлять потокам доступ к общим данным, тем самым изменяя контекст использования памяти. Все эти проблемы решаемы в многопоточном коде, но их трудно исправить и трудно отлаживать, когда они неверны.
Вот еще одна проблема, которая может возникнуть при работе с потоками. Предположим, что ребенок получил травму и его необходимо срочно доставить в больницу. Родителю C поручено присматривать за детьми, поэтому они сразу же забирают ребенка. В отделении неотложной помощи родителю С необходимо выписать чек на довольно крупную сумму, чтобы покрыть расходы на посещение врача.
Тем временем один из родителей умер дома, работая с чековой книжкой. Они не знали о том, что был выписан чек на крупную сумму, поэтому были очень удивлены, когда на семейном расчетном счете внезапно образовался перерасход средств!
Помните, что эти два родительских экземпляра работают в рамках одной и той же программы. Семейный расчетный счет является общим ресурсом, поэтому вам придется придумать, как родителю, наблюдающему за детьми, сообщить об этом родителю, который ведет баланс чековой книжки. В противном случае вам нужно было бы предоставить какой-либо механизм блокировки, чтобы ресурс чековой книжки мог использоваться только одним родителем одновременно с обновлениями.
Использование асинхронных функций Python на практике
Теперь вы собираетесь использовать некоторые из подходов, описанных в приведенных выше мысленных экспериментах, и превратить их в работающие программы на Python.
Все примеры в этой статье были протестированы с использованием Python 3.7.2. В файле requirements.txt
указано, какие модули вам необходимо установить для запуска всех примеров. Если вы еще не загрузили файл, вы можете сделать это прямо сейчас:
Скачать код: Нажмите здесь, чтобы загрузить код, который вы будете использовать для изучения функций асинхронности в Python в этом руководстве.
Вы также можете захотеть настроить виртуальную среду Python для запуска кода, чтобы не создавать помех для вашей системы Python.
Синхронное программирование
В этом первом примере показан несколько надуманный способ того, как задача извлекает работу из очереди и обрабатывает эту работу. Очередь в Python - это хорошая структура данных в формате FIFO ("первым пришел - первым вышел"). Она предоставляет методы для размещения объектов в очереди и последующего извлечения их в том порядке, в котором они были вставлены.
В этом случае задача состоит в том, чтобы получить число из очереди и подсчитать количество циклов до этого числа. Когда цикл начинается, он выводится на консоль и снова выводит общее количество. Эта программа демонстрирует один из способов для нескольких синхронных задач обрабатывать работу в очереди.
Программа с именем example_1.py
в репозитории полностью представлена ниже:
1import queue
2
3def task(name, work_queue):
4 if work_queue.empty():
5 print(f"Task {name} nothing to do")
6 else:
7 while not work_queue.empty():
8 count = work_queue.get()
9 total = 0
10 print(f"Task {name} running")
11 for x in range(count):
12 total += 1
13 print(f"Task {name} total: {total}")
14
15def main():
16 """
17 This is the main entry point for the program
18 """
19 # Create the queue of work
20 work_queue = queue.Queue()
21
22 # Put some work in the queue
23 for work in [15, 10, 5, 2]:
24 work_queue.put(work)
25
26 # Create some synchronous tasks
27 tasks = [(task, "One", work_queue), (task, "Two", work_queue)]
28
29 # Run the tasks
30 for t, n, q in tasks:
31 t(n, q)
32
33if __name__ == "__main__":
34 main()
Давайте посмотрим, что делает каждая строка:
- Строка 1 импортирует модуль
queue
. Именно здесь программа сохраняет работу, которую необходимо выполнить для задач. - Строки с 3 по 13 определяют
task()
. Эта функция извлекает работу изwork_queue
и обрабатывает ее до тех пор, пока делать больше нечего. - Строка 15 определяет
main()
для выполнения задач программы. - Строка 20 создает
work_queue
. Все задачи используют этот общий ресурс для получения работы. - Строки с 23 по 24 помещают работу в
work_queue
. В данном случае это просто случайный подсчет значений для обрабатываемых задач. - Строка 27 создает список кортежей задач со значениями параметров, которые будут переданы этим задачам.
- В строках с 30 по 31 выполняется итерация по списку кортежей задач, вызывается каждый из них и передаются ранее определенные значения параметров.
- Строка 34 вызывает
main()
для запуска программы.
Задача в этой программе - это просто функция, принимающая строку и очередь в качестве параметров. При выполнении она ищет что-либо в очереди для обработки. Если есть над чем поработать, то он извлекает значения из очереди, запускает for
цикл для подсчета до этого значения и выводит итоговое значение в конце. Он продолжает получать работу из очереди до тех пор, пока в ней ничего не останется, и завершает работу.
Когда эта программа запускается, она выдает результат, который вы видите ниже:
Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do
Это показывает, что Task One
выполняет всю работу. Цикл while
, который Task One
попадает в task()
, выполняет всю работу в очереди и обрабатывает ее. Когда этот цикл завершается, Task Two
получает возможность запуститься. Однако он обнаруживает, что очередь пуста, поэтому Task Two
выводит инструкцию, в которой говорится, что ему нечего делать, и затем завершает работу. В коде нет ничего, что позволяло бы и Task One
, и Task Two
переключать контексты и работать вместе.
Простой совместный параллелизм
В следующей версии программы эти две задачи будут работать совместно. Добавление инструкции yield
означает, что цикл будет передавать управление в указанной точке, сохраняя при этом свой контекст. Таким образом, задача получения результатов может быть перезапущена позже.
Оператор yield
превращает task()
в генератор. Функция генератора вызывается точно так же, как и любая другая функция в Python, но когда выполняется оператор yield
, управление возвращается вызывающей функции. По сути, это переключение контекста, поскольку управление переходит от функции генератора к вызывающей стороне.
Интересно то, что управление может быть передано обратно функции генератора путем вызова next()
в генераторе. Это переключение контекста обратно на функцию generator, которая запускает выполнение со всеми переменными, которые были определены до yield
, все еще не измененными.
Цикл while
в main()
использует это преимущество при вызове next(t)
. Этот оператор перезапускает задачу с того места, где она ранее выполнялась. Все это означает, что вы контролируете момент переключения контекста: когда оператор yield
выполняется в task()
.
Это форма совместной многозадачности. Программа возвращает управление своим текущим контекстом, чтобы можно было запустить что-то еще. В этом случае это позволяет циклу while
в main()
запускать два экземпляра task()
в качестве функции-генератора. Каждый экземпляр выполняет работу из одной и той же очереди. Это довольно умно, но также требует большой работы, чтобы получить те же результаты, что и в первой программе. Программа example_2.py
демонстрирует этот простой параллелизм и приведена ниже:
1import queue
2
3def task(name, queue):
4 while not queue.empty():
5 count = queue.get()
6 total = 0
7 print(f"Task {name} running")
8 for x in range(count):
9 total += 1
10 yield
11 print(f"Task {name} total: {total}")
12
13def main():
14 """
15 This is the main entry point for the program
16 """
17 # Create the queue of work
18 work_queue = queue.Queue()
19
20 # Put some work in the queue
21 for work in [15, 10, 5, 2]:
22 work_queue.put(work)
23
24 # Create some tasks
25 tasks = [task("One", work_queue), task("Two", work_queue)]
26
27 # Run the tasks
28 done = False
29 while not done:
30 for t in tasks:
31 try:
32 next(t)
33 except StopIteration:
34 tasks.remove(t)
35 if len(tasks) == 0:
36 done = True
37
38if __name__ == "__main__":
39 main()
Вот что происходит в приведенном выше коде:
- Строки с 3 по 11 определяют
task()
как и раньше, но добавлениеyield
в строке 10 превращает функцию в генератор. Здесь происходит переключение контекста и управление передается обратно в циклwhile
вmain()
. - Строка 25 создает список задач, но несколько иным способом, чем вы видели в предыдущем примере кода. В этом случае каждая задача вызывается с параметрами, указанными в переменной списка
tasks
. Это необходимо для того, чтобы функция генератораtask()
была запущена в первый раз. - Строки с 31 по 36 представляют собой изменения в цикле
while
вmain()
, которые позволяютtask()
работать совместно. Здесь управление возвращается к каждому экземпляруtask()
, когда оно завершается, позволяя циклу продолжиться и запустить другую задачу. - Строка 32 возвращает управление
task()
и продолжает свое выполнение после точки, из которой был вызванyield
. - Строка 36 устанавливает переменную
done
. Циклwhile
завершается, когда все задачи выполнены и удалены изtasks
.
Это результат, полученный при запуске этой программы:
Task One running
Task Two running
Task Two total: 10
Task Two running
Task One total: 15
Task One running
Task Two total: 5
Task One total: 2
Вы можете видеть, что и Task One
, и Task Two
выполняются и потребляют работу из очереди. Так и задумано, поскольку обе задачи обрабатывают работу, и каждая из них отвечает за два элемента в очереди. Это интересно, но опять же, для достижения таких результатов требуется немало работы.
Хитрость здесь заключается в использовании инструкции yield
, которая превращает task()
в генератор и выполняет переключение контекста. Программа использует этот контекстный переключатель для управления циклом while
в main()
, позволяя двум экземплярам задачи выполняться совместно.
Обратите внимание, что Task Two
сначала выводит общее значение. Это может навести вас на мысль, что задачи выполняются асинхронно. Однако это все равно синхронная программа. Она структурирована таким образом, что две задачи могут обмениваться контекстами. Причина, по которой Task Two
выводит свою сумму первой, заключается в том, что она считает только до 10, в то время как Task One
считает до 15. Task Two
просто выводит свою сумму первой, поэтому вывод выводится на консоль раньше Task One
.
Примечание: Во всех примерах кода, которые следуют из этого пункта, используется модуль под названием codetiming для определения времени и вывода того, сколько времени потребовалось для выполнения фрагментов кода . Здесь есть отличная статья о RealPython, в которой подробно рассказывается о модуле синхронизации кода и о том, как его использовать.
Этот модуль является частью индекса пакетов Python и создан Гейром Арне Хьелле, который является частью команды Real Python.. Гейр Арне оказал мне большую помощь в написании обзоров и предложений для этой статьи. Если вы пишете код, который должен включать функции синхронизации, стоит обратить внимание на модуль codetiming от Гейра Арне.
Чтобы сделать модуль codetiming доступным для приведенных ниже примеров, вам необходимо установить его. Это можно сделать с помощью pip
с помощью этой команды: pip install codetiming
или с помощью этой команды: pip install -r requirements.txt
. Файл requirements.txt
является частью репозитория примеров кода.
Совместный параллелизм с блокированием Вызовов
Следующая версия программы будет такой же, как и предыдущая, за исключением добавления символа time.sleep(delay)
в тело вашего цикла задач. Это добавляет задержку, основанную на значении, полученном из рабочей очереди, к каждой итерации цикла выполнения задачи. Задержка имитирует эффект блокирующего вызова, возникающего в вашей задаче.
Блокирующий вызов - это код, который останавливает процессор от выполнения каких-либо других действий в течение некоторого периода времени. В приведенных выше мысленных экспериментах, если бы родитель не смог оторваться от составления чековой книжки до тех пор, пока она не была заполнена, это было бы блокирующим вызовом.
time.sleep(delay)
в этом примере происходит то же самое, потому что центральный процессор не может делать ничего другого, кроме как ждать истечения срока задержки.
1import time
2import queue
3from codetiming import Timer
4
5def task(name, queue):
6 timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
7 while not queue.empty():
8 delay = queue.get()
9 print(f"Task {name} running")
10 timer.start()
11 time.sleep(delay)
12 timer.stop()
13 yield
14
15def main():
16 """
17 This is the main entry point for the program
18 """
19 # Create the queue of work
20 work_queue = queue.Queue()
21
22 # Put some work in the queue
23 for work in [15, 10, 5, 2]:
24 work_queue.put(work)
25
26 tasks = [task("One", work_queue), task("Two", work_queue)]
27
28 # Run the tasks
29 done = False
30 with Timer(text="\nTotal elapsed time: {:.1f}"):
31 while not done:
32 for t in tasks:
33 try:
34 next(t)
35 except StopIteration:
36 tasks.remove(t)
37 if len(tasks) == 0:
38 done = True
39
40if __name__ == "__main__":
41 main()
Вот что отличается в приведенном выше коде:
- Строка 1 импортирует модуль
time
, чтобы предоставить программе доступ кtime.sleep()
. - Строка 3 импортирует код
Timer
из модуляcodetiming
. - Строка 6 создает экземпляр
Timer
, используемый для измерения времени, затрачиваемого на каждую итерацию цикла выполнения задачи. - Строка 10 запускает экземпляр
timer
- Строка 11 изменяет
task()
наtime.sleep(delay)
, чтобы имитировать задержку ввода-вывода. Это заменяет циклfor
, который выполнял подсчет вexample_1.py
. - Строка 12 останавливает экземпляр
timer
и выводит время, прошедшее с момента вызоваtimer.start()
. - Строка 30 создает
Timer
context manager, который выводит время, затраченное на выполнение всего цикла while .
Когда вы запустите эту программу, вы увидите следующий результат:
Task One running
Task One elapsed time: 15.0
Task Two running
Task Two elapsed time: 10.0
Task One running
Task One elapsed time: 5.0
Task Two running
Task Two elapsed time: 2.0
Total elapsed time: 32.0
Как и прежде, и Task One
, и Task Two
выполняются, принимая работу из очереди и обрабатывая ее. Однако, даже с добавлением задержки, вы можете видеть, что совместный параллелизм ничего вам не дал. Задержка останавливает обработку всей программы, и центральный процессор просто ожидает окончания задержки ввода-вывода.
Это именно то, что подразумевается под блокирующим кодом в документации по асинхронному программированию на Python. Вы заметите, что время, необходимое для запуска всей программы, равно суммарному времени всех задержек. Выполнение задач таким образом не является победой.
Совместный параллелизм С неблокирующими Вызовами
В следующей версии программы были внесены небольшие изменения. В ней используются возможности асинхронного программирования Python с использованием asyncio/await, предоставленные в Python 3.
Модули time
и queue
были заменены на пакет asyncio
. Это дает вашей программе доступ к асинхронному (неблокирующему) режиму ожидания и очереди. Изменение на task()
определяет ее как асинхронную с добавлением префикса async
в строке 4. Это указывает Python на то, что функция будет асинхронной.
Другим важным изменением является удаление операторов time.sleep(delay)
и yield
и замена их на await asyncio.sleep(delay)
. Это создает неблокирующую задержку, которая приведет к обратному переключению контекста на вызывающий объект main()
.
Цикл while
внутри main()
больше не существует. Вместо task_array
есть вызов await asyncio.gather(...)
. Это говорит о asyncio
двух вещах:
- Создайте две задачи на основе
task()
и запустите их. - Дождитесь завершения обеих задач, прежде чем двигаться дальше.
В последней строке программы asyncio.run(main())
выполняется main()
. Это создает так называемый цикл обработки событий ). Именно этот цикл будет запускать main()
, который, в свою очередь, запустит два экземпляра task()
.
Цикл обработки событий лежит в основе асинхронной системы Python. Он запускает весь код, включая main()
. Когда выполняется код задачи, центральный процессор занят работой. При достижении ключевого слова await
происходит переключение контекста, и управление передается обратно в цикл обработки событий. Цикл обработки событий просматривает все задачи, ожидающие события (в данном случае тайм-аута asyncio.sleep(delay)
), и передает управление задаче с готовым событием.
await asyncio.sleep(delay)
не блокирует работу центрального процессора. Вместо ожидания истечения времени ожидания задержки центральный процессор регистрирует событие ожидания в очереди задач цикла обработки событий и выполняет переключение контекста, передавая управление циклу обработки событий. Цикл обработки событий непрерывно отслеживает завершенные события и передает управление обратно задаче, ожидающей этого события. Таким образом, центральный процессор может оставаться занятым, если работа доступна, в то время как цикл обработки событий отслеживает события, которые произойдут в будущем.
Примечание: Асинхронная программа выполняется в одном потоке. Переключение контекста с одного раздела кода на другой, которое может повлиять на данные, полностью зависит от вас. Это означает, что вы можете распределить и завершить доступ ко всем данным в общей памяти перед выполнением переключения контекста. Это упрощает проблему с общей памятью, присущую потоковому коду.
Код example_4.py
указан ниже:
1import asyncio
2from codetiming import Timer
3
4async def task(name, work_queue):
5 timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
6 while not work_queue.empty():
7 delay = await work_queue.get()
8 print(f"Task {name} running")
9 timer.start()
10 await asyncio.sleep(delay)
11 timer.stop()
12
13async def main():
14 """
15 This is the main entry point for the program
16 """
17 # Create the queue of work
18 work_queue = asyncio.Queue()
19
20 # Put some work in the queue
21 for work in [15, 10, 5, 2]:
22 await work_queue.put(work)
23
24 # Run the tasks
25 with Timer(text="\nTotal elapsed time: {:.1f}"):
26 await asyncio.gather(
27 asyncio.create_task(task("One", work_queue)),
28 asyncio.create_task(task("Two", work_queue)),
29 )
30
31if __name__ == "__main__":
32 asyncio.run(main())
Вот в чем разница между этой программой и example_3.py
:
- Строка 1 импортирует
asyncio
, чтобы получить доступ к асинхронным функциям Python. Это заменяет импортtime
. - Строка 2 импортирует код
Timer
из модуляcodetiming
. - В строке 4 показано добавление ключевого слова
async
перед определениемtask()
. Это информирует программу о том, чтоtask
может выполняться асинхронно. - Строка 5 создает экземпляр
Timer
, используемый для измерения времени, затрачиваемого на каждую итерацию цикла выполнения задачи. - Строка 9 запускает экземпляр
timer
- Строка 10 заменяет
time.sleep(delay)
на неблокирующуюasyncio.sleep(delay)
, которая также возвращает управление (или переключает контексты) обратно в основной цикл обработки событий. - Строка 11 останавливает экземпляр
timer
и выводит время, прошедшее с момента вызоваtimer.start()
. - Строка 18 создает неблокирующий асинхронный
work_queue
. - Строки с 21 по 22 помещают работу в
work_queue
асинхронным образом, используя ключевое словоawait
. - Строка 25 создает
Timer
контекстный менеджер, который выводит время, затраченное на выполнение всего цикла while. - Строки с 26 по 29 создайте две задачи и соберите их вместе, чтобы программа ожидала завершения обеих задач.
- Строка 32 запускает асинхронную работу программы. Она также запускает внутренний цикл обработки событий.
Когда вы смотрите на выходные данные этой программы, обратите внимание, что и Task One
, и Task Two
запускаются одновременно, а затем ожидайте вызова mockito:
Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0
Total elapsed time: 17.0
Это указывает на то, что await asyncio.sleep(delay)
не блокируется и что выполняется другая работа.
В конце выполнения программы вы заметите, что общее время, затраченное на выполнение, по сути, вдвое меньше времени, затраченного на выполнение example_3.py
. В этом преимущество программы, использующей асинхронные функции Python! Каждая задача могла выполняться await asyncio.sleep(delay)
одновременно. Общее время выполнения программы теперь меньше, чем сумма ее частей. Вы отошли от синхронной модели!
Синхронные (блокирующие) HTTP-вызовы
Следующая версия программы - это своего рода шаг вперед, а также шаг назад. Программа выполняет некоторую реальную работу с реальным вводом-выводом, отправляя HTTP-запросы к списку URL-адресов и получая содержимое страницы. Однако он делает это блокирующим (синхронным) образом.
Программа была изменена, чтобы импортировать замечательный requests
модуль для выполнения фактических HTTP-запросов. Кроме того, очередь теперь содержит список URL-адресов, а не цифры. Кроме того, task()
больше не увеличивает счетчик. Вместо этого requests
получает содержимое URL-адреса, извлеченного из очереди, и выводит время, которое потребовалось для этого.
Код example_5.py
указан ниже:
1import queue
2import requests
3from codetiming import Timer
4
5def task(name, work_queue):
6 timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
7 with requests.Session() as session:
8 while not work_queue.empty():
9 url = work_queue.get()
10 print(f"Task {name} getting URL: {url}")
11 timer.start()
12 session.get(url)
13 timer.stop()
14 yield
15
16def main():
17 """
18 This is the main entry point for the program
19 """
20 # Create the queue of work
21 work_queue = queue.Queue()
22
23 # Put some work in the queue
24 for url in [
25 "http://google.com",
26 "http://yahoo.com",
27 "http://linkedin.com",
28 "http://apple.com",
29 "http://microsoft.com",
30 "http://facebook.com",
31 "http://twitter.com",
32 ]:
33 work_queue.put(url)
34
35 tasks = [task("One", work_queue), task("Two", work_queue)]
36
37 # Run the tasks
38 done = False
39 with Timer(text="\nTotal elapsed time: {:.1f}"):
40 while not done:
41 for t in tasks:
42 try:
43 next(t)
44 except StopIteration:
45 tasks.remove(t)
46 if len(tasks) == 0:
47 done = True
48
49if __name__ == "__main__":
50 main()
Вот что происходит в этой программе:
- Строка 2 импортирует
requests
, которая предоставляет удобный способ выполнения HTTP-вызовов. - Строка 3 импортирует код
Timer
из модуляcodetiming
. - Строка 6 создает экземпляр
Timer
, используемый для измерения времени, затрачиваемого на каждую итерацию цикла выполнения задачи. - Строка 11 запускает экземпляр
timer
- Строка 12 вводит задержку, аналогичную
example_3.py
. Однако на этот раз он вызываетsession.get(url)
, который возвращает содержимое URL-адреса, полученного изwork_queue
. - Строка 13 останавливает экземпляр
timer
и выводит время, прошедшее с момента вызоваtimer.start()
. - Строки с 23 по 32 помещают список URL-адресов в
work_queue
. - Строка 39 создает
Timer
контекстный менеджер, который будет выводить время, затраченное на выполнение всего цикла while.
Когда вы запустите эту программу, вы увидите следующий результат:
Task One getting URL: http://google.com
Task One total elapsed time: 0.3
Task Two getting URL: http://yahoo.com
Task Two total elapsed time: 0.8
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.4
Task Two getting URL: http://apple.com
Task Two total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task One total elapsed time: 0.5
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.5
Task One getting URL: http://twitter.com
Task One total elapsed time: 0.4
Total elapsed time: 3.2
Как и в предыдущих версиях программы, yield
превращает task()
в генератор. Также выполняется переключение контекста, позволяющее запускать другой экземпляр задачи.
Каждая задача получает URL-адрес из рабочей очереди, извлекает содержимое страницы и сообщает, сколько времени потребовалось для получения этого содержимого.
Как и прежде, yield
позволяет обеим вашим задачам выполняться совместно. Однако, поскольку эта программа выполняется синхронно, каждый вызов session.get()
блокирует работу центрального процессора до тех пор, пока страница не будет извлечена. Обратите внимание на общее время, затраченное на запуск всей программы в конце. Это будет иметь смысл для следующего примера.
Асинхронные (неблокирующие) HTTP-вызовы
Эта версия программы изменяет предыдущую, чтобы использовать асинхронные функции Python. Он также импортирует модуль aiohttp
, который представляет собой библиотеку для выполнения HTTP-запросов асинхронным способом с использованием asyncio
.
Приведенные здесь задачи были изменены, чтобы удалить вызов yield
, поскольку код для выполнения HTTP-вызова GET
больше не блокируется. Он также выполняет переключение контекста обратно в цикл обработки событий.
Список программ example_6.py
приведен ниже:
1import asyncio
2import aiohttp
3from codetiming import Timer
4
5async def task(name, work_queue):
6 timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
7 async with aiohttp.ClientSession() as session:
8 while not work_queue.empty():
9 url = await work_queue.get()
10 print(f"Task {name} getting URL: {url}")
11 timer.start()
12 async with session.get(url) as response:
13 await response.text()
14 timer.stop()
15
16async def main():
17 """
18 This is the main entry point for the program
19 """
20 # Create the queue of work
21 work_queue = asyncio.Queue()
22
23 # Put some work in the queue
24 for url in [
25 "http://google.com",
26 "http://yahoo.com",
27 "http://linkedin.com",
28 "http://apple.com",
29 "http://microsoft.com",
30 "http://facebook.com",
31 "http://twitter.com",
32 ]:
33 await work_queue.put(url)
34
35 # Run the tasks
36 with Timer(text="\nTotal elapsed time: {:.1f}"):
37 await asyncio.gather(
38 asyncio.create_task(task("One", work_queue)),
39 asyncio.create_task(task("Two", work_queue)),
40 )
41
42if __name__ == "__main__":
43 asyncio.run(main())
Вот что происходит в этой программе:
- Строка 2 импортирует библиотеку
aiohttp
, которая предоставляет асинхронный способ выполнения HTTP-вызовов. - Строка 3 импортирует код
Timer
из модуляcodetiming
. - Строка 5 помечает
task()
как асинхронную функцию. - Строка 6 создает экземпляр
Timer
, используемый для измерения времени, затрачиваемого на каждую итерацию цикла выполнения задачи. - Строка 7 создает
aiohttp
менеджер контекста сеанса. - Строка 8 создает
aiohttp
диспетчер контекста ответа. Он также выполняет HTTPGET
вызов URL-адреса, взятого изwork_queue
. - Строка 11 запускает экземпляр
timer
- Строка 12 использует сеанс для асинхронного получения текста, извлеченного из URL-адреса.
- Строка 13 останавливает экземпляр
timer
и выводит время, прошедшее с момента вызоваtimer.start()
. - Строка 39 создает
Timer
контекстный менеджер, который выводит время, затраченное на выполнение всего цикла while.
Когда вы запустите эту программу, вы увидите следующий результат:
Task One getting URL: http://google.com
Task Two getting URL: http://yahoo.com
Task One total elapsed time: 0.3
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.3
Task One getting URL: http://apple.com
Task One total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task Two total elapsed time: 0.9
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.4
Task Two getting URL: http://twitter.com
Task One total elapsed time: 0.5
Task Two total elapsed time: 0.3
Total elapsed time: 1.7
Посмотрите на общее время, затраченное на получение содержимого каждого URL-адреса, а также на время, затраченное на получение содержимого каждого URL-адреса отдельно. Вы увидите, что продолжительность составляет примерно половину суммарного времени всех HTTP-вызовов GET
. Это связано с тем, что вызовы HTTP GET
выполняются асинхронно. Другими словами, вы эффективно используете возможности центрального процессора, позволяя ему выполнять несколько запросов одновременно.
Поскольку процессор работает очень быстро, в этом примере, вероятно, может быть создано столько задач, сколько существует URL-адресов. В этом случае время выполнения программы будет равно времени самого медленного поиска URL-адреса.
Заключение
В этой статье вы найдете инструменты, необходимые для того, чтобы начать использовать методы асинхронного программирования в своем арсенале. Использование асинхронных функций Python позволяет программно управлять переключением контекста. Это означает, что со многими сложными проблемами, с которыми вы можете столкнуться при многопоточном программировании, легче справиться.
Асинхронное программирование - мощный инструмент, но он полезен не для всех типов программ. Например, если вы пишете программу, которая вычисляет число пи с точностью до миллионного знака после запятой, то асинхронный код вам не поможет. Такие программы привязаны к процессору и не требуют большого объема ввода-вывода. Однако, если вы пытаетесь реализовать сервер или программу, выполняющую ввод-вывод (например, доступ к файлам или сети), то использование асинхронных функций Python может иметь огромное значение.
Подводя итог, можно сказать, что вы узнали:
- Что такое синхронные программы
- Чем отличаются асинхронные программы, но они также мощные и управляемые
- Почему вам может понадобиться писать асинхронные программы
- Как использовать встроенные функции асинхронности в Python
Вы можете получить код для всех примеров программ, используемых в этом руководстве:
Скачать код: Нажмите здесь, чтобы загрузить код, который вы будете использовать для изучения функций асинхронности в Python в этом руководстве.
Теперь, когда вы овладели этими мощными навыками, вы можете поднять свои программы на новый уровень!
Back to Top