Петля событий

Исходный код: Lib/asyncio/events.py, Lib/asyncio/base_events.py


Предисловие

Цикл событий является ядром каждого приложения asyncio. Циклы событий запускают асинхронные задачи и обратные вызовы, выполняют операции сетевого ввода-вывода и запускают подпроцессы.

Разработчики приложений обычно используют высокоуровневые функции asyncio, такие как asyncio.run(), и редко обращаются к объекту цикла или вызывают его методы. Этот раздел предназначен в основном для авторов кода нижнего уровня, библиотек и фреймворков, которым необходим более тонкий контроль над поведением цикла событий.

Получение цикла событий

Для получения, установки или создания цикла событий можно использовать следующие низкоуровневые функции:

asyncio.get_running_loop()

Возвращает запущенный цикл событий в текущем потоке ОС.

Если нет запущенного цикла событий, то выдается сообщение RuntimeError. Эта функция может быть вызвана только из корутины или обратного вызова.

Добавлено в версии 3.7.

asyncio.get_event_loop()

Получение текущего цикла событий.

Если в текущем потоке ОС не установлен текущий цикл событий, поток ОС является основным, и set_event_loop() еще не был вызван, asyncio создаст новый цикл событий и установит его в качестве текущего.

Поскольку эта функция имеет довольно сложное поведение (особенно когда используются пользовательские политики цикла событий), использование функции get_running_loop() предпочтительнее, чем get_event_loop() в coroutines и callbacks.

Рассмотрите также возможность использования функции asyncio.run() вместо использования функций более низкого уровня для ручного создания и закрытия цикла событий.

Не рекомендуется, начиная с версии 3.10: Предупреждение об устаревании выдается, если нет запущенного цикла событий. В будущих выпусках Python эта функция будет псевдонимом get_running_loop().

asyncio.set_event_loop(loop)

Установите loop в качестве текущего цикла событий для текущего потока ОС.

asyncio.new_event_loop()

Создает и возвращает новый объект цикла событий.

Обратите внимание, что поведение функций get_event_loop(), set_event_loop() и new_event_loop() может быть изменено с помощью setting a custom event loop policy.

Содержание

Эта страница документации содержит следующие разделы:

Методы циклов событий

Циклы событий имеют низкоуровневые API для следующего:

Запуск и остановка цикла

loop.run_until_complete(future)

Выполняется до тех пор, пока будущее (экземпляр Future) не завершится.

Если аргументом является coroutine object, он неявно запланирован на выполнение как asyncio.Task.

Вернуть результат Future или поднять исключение.

loop.run_forever()

Выполняйте цикл событий до тех пор, пока не будет вызвано stop().

Если stop() будет вызван до вызова run_forever(), цикл опросит селектор ввода/вывода один раз с таймаутом, равным нулю, выполнит все обратные вызовы, запланированные в ответ на события ввода/вывода (и те, которые уже были запланированы), а затем выйдет.

Если stop() вызывается во время выполнения run_forever(), то цикл выполнит текущую партию обратных вызовов и затем выйдет. Обратите внимание, что в этом случае новые обратные вызовы, запланированные callbacks, не будут запущены; вместо этого они будут запущены при следующем вызове run_forever() или run_until_complete().

loop.stop()

Остановите цикл событий.

loop.is_running()

Возвращает True, если цикл событий в данный момент запущен.

loop.is_closed()

Верните True, если цикл событий был закрыт.

loop.close()

Закройте цикл событий.

Цикл не должен быть запущен, когда вызывается эта функция. Все ожидающие обратные вызовы будут отброшены.

Этот метод очищает все очереди и выключает исполнителя, но не ждет его завершения.

Этот метод является идемпотентным и необратимым. Никакие другие методы не должны вызываться после закрытия цикла событий.

coroutine loop.shutdown_asyncgens()

Запланируйте закрытие всех открытых в данный момент объектов asynchronous generator вызовом aclose(). После вызова этого метода цикл событий выдаст предупреждение, если будет итерирован новый асинхронный генератор. Это следует использовать для надежного завершения работы всех запланированных асинхронных генераторов.

Обратите внимание, что нет необходимости вызывать эту функцию, когда используется asyncio.run().

Пример:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

Добавлено в версии 3.6.

coroutine loop.shutdown_default_executor()

Запланируйте закрытие исполнителя по умолчанию и дождитесь, пока он соединит все потоки в ThreadPoolExecutor. После вызова этого метода будет вызвана ошибка RuntimeError, если loop.run_in_executor() будет вызван при использовании исполнителя по умолчанию.

Обратите внимание, что нет необходимости вызывать эту функцию, когда используется asyncio.run().

Добавлено в версии 3.9.

Планирование обратных вызовов

loop.call_soon(callback, *args, context=None)

Запланируйте вызов callback callback с аргументами args на следующей итерации цикла событий.

Обратные вызовы вызываются в том порядке, в котором они были зарегистрированы. Каждый обратный вызов будет вызван ровно один раз.

Необязательный аргумент context, содержащий только ключевое слово, позволяет указать пользовательский contextvars.Context для запуска callback. Текущий контекст используется, если не указан контекст.

Возвращается экземпляр asyncio.Handle, который может быть использован позже для отмены обратного вызова.

Этот метод не является потокобезопасным.

loop.call_soon_threadsafe(callback, *args, context=None)

Потокобезопасный вариант call_soon(). Должен использоваться для планирования обратных вызовов из другого потока.

Вызывает RuntimeError, если вызывается в цикле, который был закрыт. Это может произойти на вторичном потоке, когда основное приложение завершает работу.

См. раздел concurrency and multithreading в документации.

Изменено в версии 3.7: Добавлен параметр context только для ключевого слова. См. раздел PEP 567 для более подробной информации.

Примечание

Большинство функций планирования asyncio не позволяют передавать аргументы в виде ключевых слов. Для этого используйте functools.partial():

# will schedule "print("Hello", flush=True)"
loop.call_soon(
    functools.partial(print, "Hello", flush=True))

Использование частичных объектов обычно удобнее, чем использование лямбд, так как asyncio может лучше отображать частичные объекты в сообщениях об отладке и ошибках.

Планирование отложенных обратных вызовов

Цикл событий предоставляет механизмы для планирования функций обратного вызова, которые должны быть вызваны в определенный момент в будущем. Событийный цикл использует монотонные часы для отслеживания времени.

loop.call_later(delay, callback, *args, context=None)

Планирование обратного вызова, который будет вызван через заданное задержку количество секунд (может быть либо int, либо float).

Возвращается экземпляр asyncio.TimerHandle, который можно использовать для отмены обратного вызова.

обратный вызов будет вызван ровно один раз. Если два обратных вызова запланированы на одно и то же время, порядок их вызова не определен.

Необязательные позиционные args будут переданы обратному вызову при его вызове. Если вы хотите, чтобы обратный вызов был вызван с аргументами в виде ключевых слов, используйте functools.partial().

Необязательный аргумент context, содержащий только ключевое слово, позволяет указать пользовательский contextvars.Context для запуска callback. Текущий контекст используется, если не указан контекст.

Изменено в версии 3.7: Добавлен параметр context только для ключевого слова. См. раздел PEP 567 для более подробной информации.

Изменено в версии 3.8: В Python 3.7 и более ранних версиях с реализацией цикла событий по умолчанию, задержка не могла превышать одного дня. Это было исправлено в Python 3.8.

loop.call_at(when, callback, *args, context=None)

Запланировать обратный вызов для вызова в заданную абсолютную временную метку когда (int или float), используя ту же временную ссылку, что и loop.time().

Поведение этого метода такое же, как и call_later().

Возвращается экземпляр asyncio.TimerHandle, который можно использовать для отмены обратного вызова.

Изменено в версии 3.7: Добавлен параметр context только для ключевого слова. См. раздел PEP 567 для более подробной информации.

Изменено в версии 3.8: В Python 3.7 и более ранних версиях с реализацией цикла событий по умолчанию разница между when и текущим временем не могла превышать одного дня. Это было исправлено в Python 3.8.

loop.time()

Возвращает текущее время в виде значения float в соответствии с внутренними монотонными часами цикла событий.

Примечание

Изменено в версии 3.8: В Python 3.7 и более ранних версиях таймауты (относительный delay или абсолютный when) не должны превышать одного дня. Это было исправлено в Python 3.8.

См.также

Функция asyncio.sleep().

Создание будущего и задач

loop.create_future()

Создайте объект asyncio.Future, присоединенный к циклу событий.

Это предпочтительный способ создания фьючерсов в asyncio. Это позволяет сторонним циклам событий предоставлять альтернативные реализации объекта Future (с лучшей производительностью или инструментарием).

Добавлено в версии 3.5.2.

loop.create_task(coro, *, name=None)

Запланировать выполнение coroutine coro. Возвращает объект Task.

Циклы событий сторонних производителей могут использовать свой собственный подкласс Task для обеспечения совместимости. В данном случае тип результата является подклассом Task.

Если указан аргумент name, а не None, то он устанавливается как имя задачи с помощью Task.set_name().

Изменено в версии 3.8: Добавлен параметр name.

loop.set_task_factory(factory)

Установите фабрику задач, которая будет использоваться loop.create_task().

Если factory равно None, то будет установлена фабрика задач по умолчанию. В противном случае factory должна быть callable с сигнатурой, соответствующей (loop, coro), где loop - ссылка на активный цикл событий, а coro - объект coroutine. Вызываемый объект должен возвращать asyncio.Future-совместимый объект.

loop.get_task_factory()

Возвращает фабрику задач или None, если используется фабрика по умолчанию.

Открытие сетевых подключений

coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

Открыть потоковое транспортное соединение с заданным адресом, указанным host и port.

Семейство сокетов может быть либо AF_INET, либо AF_INET6 в зависимости от host (или аргумента family, если он указан).

Тип сокета будет SOCK_STREAM.

protocol_factory должна быть вызываемой переменной, возвращающей реализацию asyncio protocol.

Этот метод попытается установить соединение в фоновом режиме. В случае успеха он возвращает пару (transport, protocol).

Хронологический обзор основной операции выглядит следующим образом:

  1. Соединение устанавливается и для него создается transport.

  2. protocol_factory вызывается без аргументов и, как ожидается, вернет экземпляр protocol.

  3. Экземпляр протокола соединяется с транспортом путем вызова его метода connection_made().

  4. В случае успеха возвращается кортеж (transport, protocol).

Созданный транспорт представляет собой зависящий от реализации двунаправленный поток.

Другие аргументы:

  • ssl: если задано и не false, создается транспорт SSL/TLS (по умолчанию создается обычный TCP-транспорт). Если ssl является объектом ssl.SSLContext, этот контекст используется для создания транспорта; если ssl является True, используется контекст по умолчанию, возвращаемый из ssl.create_default_context().

  • server_hostname задает или переопределяет имя хоста, с которым будет сравниваться сертификат целевого сервера. Должно передаваться только в том случае, если ssl не None. По умолчанию используется значение аргумента host. Если host пуст, то по умолчанию не используется, и вы должны передать значение для server_hostname. Если server_hostname - пустая строка, сопоставление имен хостов отключено (что представляет собой серьезный риск для безопасности, позволяя проводить потенциальные атаки типа «человек посередине»).

  • family, proto, flags - это необязательные семейство адресов, протокол и флаги, которые будут переданы в getaddrinfo() для разрешения хоста. Если заданы, то все они должны быть целыми числами из соответствующих констант модуля socket.

  • happy_eyeballs_delay, если задано, включает Happy Eyeballs для этого соединения. Это должно быть число с плавающей точкой, представляющее собой количество времени в секундах, которое нужно ждать завершения попытки соединения, прежде чем начинать следующую попытку параллельно. Это «Задержка попытки подключения», как определено в RFC 8305. Разумным значением по умолчанию, рекомендованным RFC, является 0.25 (250 миллисекунд).

  • interleave управляет упорядочиванием адресов, когда имя хоста разрешается в несколько IP-адресов. Если 0 или не указано, переупорядочивание не производится, и адреса пробуются в порядке, возвращаемом getaddrinfo(). Если указано целое положительное число, адреса чередуются по семейству адресов, и данное целое число интерпретируется как «First Address Family Count», как определено в RFC 8305. По умолчанию используется значение 0, если не указана happy_eyeballs_delay, и 1, если указана.

  • sock, если дан, должен быть существующим, уже подключенным socket.socket объектом, который будет использоваться транспортом. Если sock задан, то ни один из host, port, family, proto, flags, happy_eyeballs_delay, interleave и local_addr не должен быть указан.

  • local_addr, если задан, является кортежем (local_host, local_port), используемым для локальной привязки сокета. Параметры local_host и local_port ищутся с помощью getaddrinfo(), аналогично host и port.

  • ssl_handshake_timeout - это (для TLS-соединения) время в секундах, в течение которого следует ждать завершения TLS-квитирования перед прерыванием соединения. 60.0 секунд, если None (по умолчанию).

Изменено в версии 3.5: Добавлена поддержка SSL/TLS в ProactorEventLoop.

Изменено в версии 3.6: Опция сокета TCP_NODELAY устанавливается по умолчанию для всех TCP-соединений.

Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout.

Изменено в версии 3.8: Добавлены параметры happy_eyeballs_delay и interleave.

Алгоритм счастливых глазных яблок: Успех с хостами с двойным стеком. Когда путь и протокол IPv4 сервера работают, а путь и протокол IPv6 сервера не работают, клиентское приложение с двойным стеком испытывает значительную задержку соединения по сравнению с клиентом, использующим только IPv4. Это нежелательно, поскольку приводит к ухудшению качества работы клиента с двойным стеком. Данный документ определяет требования к алгоритмам, которые уменьшают эту видимую пользователем задержку, и предоставляет алгоритм.

Дополнительная информация: https://tools.ietf.org/html/rfc6555

См.также

Функция open_connection() является высокоуровневым альтернативным API. Она возвращает пару (StreamReader, StreamWriter), которые можно использовать непосредственно в коде async/await.

coroutine loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None)

Примечание

Параметр reuse_address больше не поддерживается, так как использование SO_REUSEADDR представляет значительную проблему безопасности для UDP. Явная передача reuse_address=True вызовет исключение.

Когда несколько процессов с разными UID назначают сокеты на одинаковый адрес сокета UDP с помощью SO_REUSEADDR, входящие пакеты могут случайным образом распределяться между сокетами.

Для поддерживаемых платформ reuse_port может быть использован в качестве замены для аналогичной функциональности. При использовании reuse_port вместо него используется SO_REUSEPORT, что специально предотвращает назначение сокетов на один и тот же адрес сокета процессам с разными UID.

Создайте дейтаграммное соединение.

Семейство сокетов может быть либо AF_INET, либо AF_INET6, либо AF_UNIX, в зависимости от host (или аргумента family, если он указан).

Тип сокета будет SOCK_DGRAM.

protocol_factory должна быть вызываемой переменной, возвращающей реализацию protocol.

В случае успеха возвращается кортеж (transport, protocol).

Другие аргументы:

  • local_addr, если задан, является кортежем (local_host, local_port), используемым для локальной привязки сокета. Параметры local_host и local_port ищутся с помощью getaddrinfo().

  • remote_addr, если задан, является кортежем (remote_host, remote_port), используемым для подключения сокета к удаленному адресу. Параметры remote_host и remote_port ищутся с помощью getaddrinfo().

  • family, proto, flags - это необязательные семейство адресов, протокол и флаги, которые будут переданы в getaddrinfo() для разрешения хоста. Если заданы, то все они должны быть целыми числами из соответствующих констант модуля socket.

  • reuse_port указывает ядру разрешить этой конечной точке быть привязанной к тому же порту, к которому привязаны другие существующие конечные точки, если все они устанавливают этот флаг при создании. Эта опция не поддерживается в Windows и некоторых Unix. Если константа SO_REUSEPORT не определена, то эта возможность не поддерживается.

  • allow_broadcast указывает ядру разрешить этой конечной точке посылать сообщения на широковещательный адрес.

  • В качестве опции можно указать sock, чтобы использовать уже существующий, подключенный объект socket.socket, который будет использоваться транспортом. Если указано, local_addr и remote_addr должны быть опущены (должно быть None).

См. примеры UDP echo client protocol и UDP echo server protocol.

Изменено в версии 3.4.4: Добавлены параметры family, proto, flags, reuse_address, reuse_port, *allow_broadcast и sock.

Изменено в версии 3.8.1: Параметр reuse_address больше не поддерживается по соображениям безопасности.

Изменено в версии 3.8: Добавлена поддержка Windows.

coroutine loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Создайте соединение Unix.

Семейство сокетов будет AF_UNIX; тип сокета будет SOCK_STREAM.

В случае успеха возвращается кортеж (transport, protocol).

path является именем сокета домена Unix и является обязательным, если не указан параметр sock. Поддерживаются абстрактные сокеты Unix, пути str, bytes и Path.

Информацию об аргументах этого метода см. в документации метода loop.create_connection().

Availability: Unix.

Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout. Параметр path теперь может быть path-like object.

Создание сетевых серверов

coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

Создайте TCP-сервер (тип сокета SOCK_STREAM), прослушивающий порт адреса хоста.

Возвращает объект Server.

Аргументы:

  • protocol_factory должна быть вызываемой переменной, возвращающей реализацию protocol.

  • Параметр host может быть установлен в несколько типов, которые определяют, где будет прослушиваться сервер:

    • Если host - строка, TCP-сервер привязывается к одному сетевому интерфейсу, указанному host.

    • Если host является последовательностью строк, то TCP-сервер привязывается ко всем сетевым интерфейсам, указанным в последовательности.

    • Если host - пустая строка или None, предполагаются все интерфейсы, и будет возвращен список из нескольких сокетов (скорее всего, один для IPv4 и другой для IPv6).

  • Параметр port может быть задан для указания порта, который должен прослушивать сервер. Если 0 или None (по умолчанию), будет выбран случайный неиспользуемый порт (обратите внимание, что если host разрешается в несколько сетевых интерфейсов, для каждого интерфейса будет выбран свой случайный порт).

  • family может быть установлено в socket.AF_INET или AF_INET6, чтобы заставить сокет использовать IPv4 или IPv6. Если значение не задано, семейство будет определяться по имени хоста (по умолчанию AF_UNSPEC).

  • flags - это битовая маска для getaddrinfo().

  • Опционально можно указать sock, чтобы использовать уже существующий объект сокета. Если указано, host и port не должны быть указаны.

  • backlog - максимальное количество соединений в очереди, переданных в listen() (по умолчанию 100).

  • ssl может быть установлен в экземпляр SSLContext, чтобы включить TLS для принимаемых соединений.

  • reuse_address указывает ядру повторно использовать локальный сокет в состоянии TIME_WAIT, не дожидаясь истечения его естественного таймаута. Если не указано, автоматически устанавливается в True на Unix.

  • reuse_port указывает ядру разрешить этой конечной точке быть привязанной к тому же порту, к которому привязаны другие существующие конечные точки, если все они устанавливают этот флаг при создании. Эта опция не поддерживается в Windows.

  • ssl_handshake_timeout - это (для TLS-сервера) время в секундах, в течение которого следует ждать завершения TLS-квитирования перед прерыванием соединения. 60.0 секунд, если None (по умолчанию).

  • start_serving, установленное в True (по умолчанию), заставляет созданный сервер немедленно начать принимать соединения. Если установлено значение False, пользователь должен ожидать Server.start_serving() или Server.serve_forever(), чтобы сервер начал принимать соединения.

Изменено в версии 3.5: Добавлена поддержка SSL/TLS в ProactorEventLoop.

Изменено в версии 3.5.1: Параметр host может быть последовательностью строк.

Изменено в версии 3.6: Добавлены параметры ssl_handshake_timeout и start_serving. Параметр сокета TCP_NODELAY установлен по умолчанию для всех TCP-соединений.

См.также

Функция start_server() - это альтернативный API более высокого уровня, который возвращает пару StreamReader и StreamWriter, которые можно использовать в коде async/await.

coroutine loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Аналогичен loop.create_server(), но работает с семейством сокетов AF_UNIX.

path - это имя сокета домена Unix, и оно обязательно, если не указан аргумент sock. Поддерживаются абстрактные сокеты Unix, пути str, bytes и Path.

Информацию об аргументах этого метода см. в документации метода loop.create_server().

Availability: Unix.

Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving. Параметр path теперь может быть объектом Path.

coroutine loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None)

Завернуть уже принятое соединение в пару транспорт/протокол.

Этот метод может быть использован серверами, которые принимают соединения вне asyncio, но используют asyncio для их обработки.

Параметры:

  • protocol_factory должна быть вызываемой переменной, возвращающей реализацию protocol.

  • sock - это уже существующий объект сокета, возвращенный из socket.accept.

  • ssl может быть установлен в значение SSLContext, чтобы включить SSL для принимаемых соединений.

  • ssl_handshake_timeout - это (для SSL-соединения) время в секундах для ожидания завершения SSL-квитирования перед прерыванием соединения. 60.0 секунд, если None (по умолчанию).

Возвращает пару (transport, protocol).

Добавлено в версии 3.5.3.

Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout.

Передача файлов

coroutine loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)

Отправить файл по транспорту. Вернуть общее количество отправленных байтов.

Метод использует высокопроизводительные os.sendfile(), если они доступны.

file должен быть обычным файловым объектом, открытым в двоичном режиме.

offset указывает, с какого места начинать чтение файла. Если указано, count - это общее количество байт, которое необходимо передать, в отличие от передачи файла до достижения EOF. Позиция файла всегда обновляется, даже если этот метод приводит к ошибке, а для получения фактического количества переданных байт можно использовать file.tell().

fallback, установленный в True, заставляет asyncio вручную прочитать и отправить файл, когда платформа не поддерживает системный вызов sendfile (например, Windows или SSL socket на Unix).

Вызывает SendfileNotAvailableError, если система не поддерживает системный вызов sendfile и fallback равен False.

Добавлено в версии 3.7.

Обновление TLS

coroutine loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None)

Обновление существующего соединения на основе транспорта до TLS.

Возвращает новый экземпляр транспорта, который протокол должен начать использовать сразу после await. Экземпляр транспорта, переданный в метод start_tls, никогда больше не должен использоваться.

Параметры:

  • экземпляры transport и protocol, которые возвращают методы create_server() и create_connection().

  • sslcontext: сконфигурированный экземпляр SSLContext.

  • server_side передайте True, когда обновляется соединение на стороне сервера (например, созданное create_server()).

  • server_hostname: устанавливает или переопределяет имя хоста, с которым будет сравниваться сертификат целевого сервера.

  • ssl_handshake_timeout - это (для TLS-соединения) время в секундах, в течение которого следует ждать завершения TLS-квитирования перед прерыванием соединения. 60.0 секунд, если None (по умолчанию).

Добавлено в версии 3.7.

Наблюдение за дескрипторами файлов

loop.add_reader(fd, callback, *args)

Начать мониторинг дескриптора файла fd на доступность для чтения и вызвать callback с указанными аргументами, как только fd станет доступен для чтения.

loop.remove_reader(fd)

Прекратить мониторинг файлового дескриптора fd на доступность для чтения.

loop.add_writer(fd, callback, *args)

Начать мониторинг файлового дескриптора fd на предмет доступности для записи и вызвать callback с указанными аргументами, как только fd станет доступен для записи.

Используйте functools.partial() to pass keyword arguments для обратного вызова.

loop.remove_writer(fd)

Прекратить мониторинг файлового дескриптора fd на доступность записи.

См. также раздел Platform Support о некоторых ограничениях этих методов.

Работа с объектами сокетов напрямую

В целом, реализации протоколов, использующие транспортные API, такие как loop.create_connection() и loop.create_server(), быстрее, чем реализации, работающие с сокетами напрямую. Однако есть некоторые случаи использования, когда производительность не критична, а работа с объектами socket напрямую более удобна.

coroutine loop.sock_recv(sock, nbytes)

Получение до nbytes из sock. Асинхронная версия socket.recv().

Возвращает полученные данные в виде объекта bytes.

sock должен быть неблокирующим сокетом.

Изменено в версии 3.7: Хотя этот метод всегда документировался как метод coroutine, релизы до Python 3.7 возвращали Future. Начиная с Python 3.7 этот метод является методом async def.

coroutine loop.sock_recv_into(sock, buf)

Получение данных из sock в буфер buf. Моделируется после блокирующего метода socket.recv_into().

Возвращает количество байтов, записанных в буфер.

sock должен быть неблокирующим сокетом.

Добавлено в версии 3.7.

coroutine loop.sock_sendall(sock, data)

Отправить данные в сокет sock. Асинхронная версия socket.sendall().

Этот метод продолжает отправку на сокет до тех пор, пока не будут отправлены все данные в data или не произойдет ошибка. При успехе возвращается None. При ошибке возникает исключение. Кроме того, не существует способа определить, сколько данных, если таковые имеются, было успешно обработано принимающей стороной соединения.

sock должен быть неблокирующим сокетом.

Изменено в версии 3.7: Несмотря на то, что метод всегда документировался как метод coroutine, до Python 3.7 он возвращал значение Future. Начиная с Python 3.7, это метод async def.

coroutine loop.sock_connect(sock, address)

Подключите sock к удаленному сокету по адресу address.

Асинхронная версия socket.connect().

sock должен быть неблокирующим сокетом.

Изменено в версии 3.5.2: address больше не нужно разрешать. sock_connect попытается проверить, разрешен ли уже адрес вызовом socket.inet_pton(). Если нет, то loop.getaddrinfo() будет использован для разрешения адреса.

coroutine loop.sock_accept(sock)

Принять соединение. Моделируется после блокирующего метода socket.accept().

Сокет должен быть привязан к адресу и прослушивать соединения. Возвращаемое значение представляет собой пару (conn, address), где conn - это новый объект сокета, используемый для отправки и получения данных по соединению, а address - это адрес, привязанный к сокету на другом конце соединения.

sock должен быть неблокирующим сокетом.

Изменено в версии 3.7: Несмотря на то, что метод всегда документировался как метод coroutine, до Python 3.7 он возвращал Future. Начиная с Python 3.7, это метод async def.

См.также

loop.create_server() и start_server().

coroutine loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)

Отправьте файл, используя высокопроизводительный os.sendfile, если это возможно. Верните общее количество отправленных байтов.

Асинхронная версия socket.sendfile().

sock должен быть неблокирующим socket.SOCK_STREAM socket.

file должен быть обычным файловым объектом, открытым в двоичном режиме.

offset указывает, с какого места начинать чтение файла. Если указано, count - это общее количество байт, которое необходимо передать, в отличие от передачи файла до достижения EOF. Позиция файла всегда обновляется, даже если этот метод приводит к ошибке, а для получения фактического количества переданных байт можно использовать file.tell().

fallback, если установлено значение True, заставляет asyncio вручную читать и отправлять файл, когда платформа не поддерживает системный вызов sendfile (например, Windows или SSL socket на Unix).

Вызывает SendfileNotAvailableError, если система не поддерживает sendfile syscall и fallback равен False.

sock должен быть неблокирующим сокетом.

Добавлено в версии 3.7.

DNS

coroutine loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)

Асинхронная версия socket.getaddrinfo().

coroutine loop.getnameinfo(sockaddr, flags=0)

Асинхронная версия socket.getnameinfo().

Изменено в версии 3.7: Оба метода getaddrinfo и getnameinfo всегда документировались как возвращающие coroutine, но до Python 3.7 они фактически возвращали объекты asyncio.Future. Начиная с Python 3.7 оба метода являются корутинами.

Работа с трубами

coroutine loop.connect_read_pipe(protocol_factory, pipe)

Зарегистрируйте конец чтения трубы в цикле событий.

protocol_factory должна быть вызываемой переменной, возвращающей реализацию asyncio protocol.

труба - это file-like object.

Возвращает пару (transport, protocol), где transport поддерживает интерфейс ReadTransport, а protocol является объектом, инстанцированным protocol_factory.

При использовании цикла событий SelectorEventLoop труба переводится в неблокирующий режим.

coroutine loop.connect_write_pipe(protocol_factory, pipe)

Зарегистрируйте конец записи трубы в цикле событий.

protocol_factory должна быть вызываемой переменной, возвращающей реализацию asyncio protocol.

труба - это file-like object.

Возвращает пару (transport, protocol), где transport поддерживает интерфейс WriteTransport, а protocol является объектом, инстанцированным protocol_factory.

При использовании цикла событий SelectorEventLoop труба переводится в неблокирующий режим.

Примечание

SelectorEventLoop не поддерживает вышеуказанные методы в Windows. Используйте ProactorEventLoop вместо этого для Windows.

См.также

Методы loop.subprocess_exec() и loop.subprocess_shell().

Сигналы Unix

loop.add_signal_handler(signum, callback, *args)

Установите callback в качестве обработчика сигнала signum.

Обратный вызов будет вызван loop, вместе с другими очередными обратными вызовами и выполняемыми coroutines этого цикла событий. В отличие от обработчиков сигналов, зарегистрированных с помощью signal.signal(), обратный вызов, зарегистрированный с помощью этой функции, может взаимодействовать с циклом событий.

Вызов ValueError, если номер сигнала недопустим или не поддается регистрации. Вызов RuntimeError, если возникла проблема с настройкой обработчика.

Используйте functools.partial() to pass keyword arguments для обратного вызова.

Как и signal.signal(), эта функция должна быть вызвана в главном потоке.

loop.remove_signal_handler(sig)

Удалите обработчик сигнала sig.

Возвращает True, если обработчик сигнала был удален, или False, если для данного сигнала не был установлен обработчик.

Availability: Unix.

См.также

Модуль signal.

Выполнение кода в пулах потоков или процессов

awaitable loop.run_in_executor(executor, func, *args)

Организуйте вызов func в указанном исполнителе.

Аргумент executor должен быть экземпляром concurrent.futures.Executor. Если executor равен None, то используется исполнитель по умолчанию.

Пример:

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

Этот метод возвращает объект asyncio.Future.

Используйте functools.partial() to pass keyword arguments для функции.

Изменено в версии 3.5.3: loop.run_in_executor() больше не настраивает max_workers создаваемого им исполнителя пула потоков, вместо этого оставляя за исполнителем пула потоков (ThreadPoolExecutor) право устанавливать значение по умолчанию.

loop.set_default_executor(executor)

Установите executor в качестве исполнителя по умолчанию, используемого run_in_executor(). executor должен быть экземпляром ThreadPoolExecutor.

Не рекомендуется, начиная с версии 3.8: Использование исполнителя, который не является экземпляром ThreadPoolExecutor, устарело и вызовет ошибку в Python 3.9.

исполнитель должен быть экземпляром concurrent.futures.ThreadPoolExecutor.

API обработки ошибок

Позволяет настроить способ обработки исключений в цикле событий.

loop.set_exception_handler(handler)

Установите handler в качестве нового обработчика исключений цикла событий.

Если handler равен None, будет установлен обработчик исключений по умолчанию. В противном случае, handler должен быть вызываемым объектом с сигнатурой, соответствующей (loop, context), где loop - ссылка на активный цикл событий, а context - объект dict, содержащий детали исключения (подробности о контексте см. в документации call_exception_handler()).

loop.get_exception_handler()

Возвращает текущий обработчик исключений, или None, если пользовательский обработчик исключений не был установлен.

Добавлено в версии 3.5.2.

loop.default_exception_handler(context)

Обработчик исключений по умолчанию.

Вызывается, когда происходит исключение и не установлен обработчик исключений. Это может быть вызвано пользовательским обработчиком исключений, который хочет отступить от поведения обработчика по умолчанию.

Параметр context имеет то же значение, что и в call_exception_handler().

loop.call_exception_handler(context)

Вызовите обработчик исключения текущего цикла событий.

context - это объект dict, содержащий следующие ключи (новые ключи могут быть введены в будущих версиях Python):

  • „message“: Сообщение об ошибке;

  • „exception“ (необязательно): Объект исключения;

  • „future“ (необязательно): asyncio.Future экземпляр;

  • „task“ (необязательно): asyncio.Task экземпляр;

  • „handle“ (необязательно): asyncio.Handle экземпляр;

  • „протокол“ (необязательно): Protocol экземпляр;

  • „transport“ (необязательно): Transport экземпляр;

  • „socket“ (необязательно): socket.socket экземпляр;

  • „asyncgen“ (необязательно): Асинхронный генератор, который вызвал

    исключение.

Примечание

Этот метод не должен перегружаться в подклассах циклов событий. Для пользовательской обработки исключений используйте метод set_exception_handler().

Включение режима отладки

loop.get_debug()

Получение режима отладки (bool) цикла событий.

Значение по умолчанию True, если переменная окружения PYTHONASYNCIODEBUG установлена в непустую строку, False в противном случае.

loop.set_debug(enabled: bool)

Установите режим отладки цикла событий.

Изменено в версии 3.7: Новый Python Development Mode теперь также может быть использован для включения режима отладки.

См.также

debug mode of asyncio.

Запуск подпроцессов

Методы, описанные в этом подразделе, являются низкоуровневыми. В обычном коде async/await вместо них используйте высокоуровневые функции удобства asyncio.create_subprocess_shell() и asyncio.create_subprocess_exec().

Примечание

В Windows стандартный цикл событий ProactorEventLoop поддерживает подпроцессы, а SelectorEventLoop - нет. Подробности см. в разделе Subprocess Support on Windows.

coroutine loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

Создание подпроцесса из одного или нескольких строковых аргументов, указанных args.

args должен быть списком строк, представленных:

Первая строка задает исполняемый файл программы, а остальные строки - аргументы. Вместе строковые аргументы образуют argv программы.

Это похоже на класс стандартной библиотеки subprocess.Popen, вызываемый с помощью shell=False и списка строк, передаваемых в качестве первого аргумента; однако, где Popen принимает один аргумент, который является списком строк, subprocess_exec принимает несколько строковых аргументов.

Фабрика protocol_factory должна быть вызываемой переменной, возвращающей подкласс класса asyncio.SubprocessProtocol.

Другие параметры:

  • stdin может быть любым из них:

    • файлоподобный объект, представляющий трубу для подключения к стандартному потоку ввода подпроцесса с помощью connect_write_pipe().

    • константа subprocess.PIPE (по умолчанию), которая создаст новую трубу и соединит ее,

    • значение None, которое заставит подпроцесс наследовать дескриптор файла от этого процесса

    • константа subprocess.DEVNULL, которая указывает, что будет использоваться специальный файл os.devnull

  • stdout может быть любым из них:

    • файлоподобный объект, представляющий трубу для подключения к стандартному потоку вывода подпроцесса с помощью connect_write_pipe()

    • константа subprocess.PIPE (по умолчанию), которая создаст новую трубу и соединит ее,

    • значение None, которое заставит подпроцесс наследовать дескриптор файла от этого процесса

    • константа subprocess.DEVNULL, которая указывает, что будет использоваться специальный файл os.devnull

  • stderr может быть любым из них:

    • файлоподобный объект, представляющий трубу для подключения к стандартному потоку ошибок подпроцесса с помощью connect_write_pipe().

    • константа subprocess.PIPE (по умолчанию), которая создаст новую трубу и соединит ее,

    • значение None, которое заставит подпроцесс наследовать дескриптор файла от этого процесса

    • константа subprocess.DEVNULL, которая указывает, что будет использоваться специальный файл os.devnull

    • константа subprocess.STDOUT, которая соединит стандартный поток ошибок со стандартным потоком вывода процесса

  • Все остальные ключевые аргументы передаются в subprocess.Popen без интерпретации, за исключением bufsize, universal_newlines, shell, text, encoding и errors, которые вообще не следует указывать.

    API подпроцесса asyncio не поддерживает декодирование потоков в текст. bytes.decode() можно использовать для преобразования байтов, возвращаемых из потока, в текст.

Документацию по другим аргументам смотрите в конструкторе класса subprocess.Popen.

Возвращает пару (transport, protocol), где transport соответствует базовому классу asyncio.SubprocessTransport, а protocol является объектом, инстанцированным protocol_factory.

coroutine loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

Создайте подпроцесс из cmd, который может быть строкой str или bytes, закодированной в filesystem encoding, используя синтаксис «shell» платформы.

Это похоже на класс стандартной библиотеки subprocess.Popen, вызываемый с помощью shell=True.

Фабрика protocol_factory должна быть вызываемой переменной, возвращающей подкласс класса SubprocessProtocol.

Более подробную информацию об остальных аргументах смотрите в subprocess_exec().

Возвращает пару (transport, protocol), где transport соответствует базовому классу SubprocessTransport, а protocol является объектом, инстанцированным protocol_factory.

Примечание

Приложение несет ответственность за то, чтобы все пробельные и специальные символы были заключены в кавычки должным образом, чтобы избежать уязвимостей shell injection. Функция shlex.quote() может быть использована для правильной экранировки пробелов и специальных символов в строках, которые будут использоваться для построения команд оболочки.

Ручки обратного вызова

class asyncio.Handle

Объект обертки обратного вызова, возвращаемый loop.call_soon(), loop.call_soon_threadsafe().

cancel()

Отменить обратный вызов. Если обратный вызов уже был отменен или выполнен, этот метод не имеет эффекта.

cancelled()

Верните True, если обратный вызов был отменен.

Добавлено в версии 3.7.

class asyncio.TimerHandle

Объект обертки обратного вызова, возвращаемый loop.call_later(), и loop.call_at().

Этот класс является подклассом класса Handle.

when()

Возвращает запланированное время обратного вызова в виде float секунд.

Время - это абсолютная временная метка, использующая ту же временную ссылку, что и loop.time().

Добавлено в версии 3.7.

Объекты сервера

Объекты сервера создаются функциями loop.create_server(), loop.create_unix_server(), start_server() и start_unix_server().

Не инстанцируйте класс напрямую.

class asyncio.Server

Объекты Server являются асинхронными менеджерами контекста. При использовании в операторе async with гарантируется, что объект Server закрыт и не принимает новых соединений, когда оператор async with завершен:

srv = await loop.create_server(...)

async with srv:
    # some code

# At this point, srv is closed and no longer accepts new connections.

Изменено в версии 3.7: Объект Server - это асинхронный менеджер контекста, начиная с Python 3.7.

close()

Прекратите обслуживание: закройте прослушивающие сокеты и установите атрибут sockets в None.

Сокеты, представляющие существующие входящие клиентские соединения, остаются открытыми.

Сервер закрывается асинхронно, используйте корутину wait_closed() для ожидания закрытия сервера.

get_loop()

Возвращает цикл событий, связанный с объектом сервера.

Добавлено в версии 3.7.

coroutine start_serving()

Начните принимать связи.

Этот метод является идемпотентным, поэтому его можно вызывать, когда сервер уже обслуживается.

Ключевое слово start_serving только для параметров loop.create_server() и asyncio.start_server() позволяет создать объект Server, который изначально не принимает соединения. В этом случае Server.start_serving() или Server.serve_forever() можно использовать для того, чтобы сервер начал принимать соединения.

Добавлено в версии 3.7.

coroutine serve_forever()

Начинает принимать соединения до тех пор, пока не будет отменена эта корутина. Отмена задачи serve_forever приводит к закрытию сервера.

Этот метод может быть вызван, если сервер уже принимает соединения. На один объект Server может существовать только одна задача serve_forever.

Пример:

async def client_connected(reader, writer):
    # Communicate with the client with
    # reader/writer streams.  For example:
    await reader.readline()

async def main(host, port):
    srv = await asyncio.start_server(
        client_connected, host, port)
    await srv.serve_forever()

asyncio.run(main('127.0.0.1', 0))

Добавлено в версии 3.7.

is_serving()

Возвращает True, если сервер принимает новые соединения.

Добавлено в версии 3.7.

coroutine wait_closed()

Дождитесь завершения метода close().

sockets

Список объектов socket.socket, которые прослушивает сервер.

Изменено в версии 3.7: До версии Python 3.7 Server.sockets возвращала внутренний список серверных сокетов напрямую. В версии 3.7 возвращается копия этого списка.

Реализация циклов событий

asyncio поставляется с двумя различными реализациями циклов событий: SelectorEventLoop и ProactorEventLoop.

По умолчанию asyncio настроен на использование SelectorEventLoop на Unix и ProactorEventLoop на Windows.

class asyncio.SelectorEventLoop

Цикл событий, основанный на модуле selectors.

Использует наиболее эффективный селектор, доступный для данной платформы. Также можно вручную настроить точную реализацию селектора, который будет использоваться:

import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

Availability: Unix, Windows.

class asyncio.ProactorEventLoop

Цикл событий для Windows, использующий «Порты завершения ввода/вывода» (IOCP).

Availability: Windows.

class asyncio.AbstractEventLoop

Абстрактный базовый класс для циклов событий, совместимых с asyncio.

В разделе Event Loop Methods перечислены все методы, которые должна была определить альтернативная реализация AbstractEventLoop.

Примеры

Обратите внимание, что все примеры в этом разделе целенаправленно показывают, как использовать низкоуровневые API циклов событий, такие как loop.run_forever() и loop.call_soon(). Современные приложения asyncio редко нуждаются в таком написании; лучше использовать высокоуровневые функции, такие как asyncio.run().

Hello World с помощью call_soon()

Пример использования метода loop.call_soon() для планирования обратного вызова. Обратный вызов отображает "Hello World", а затем останавливает цикл событий:

import asyncio

def hello_world(loop):
    """A callback to print 'Hello World' and stop the event loop"""
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

См.также

Аналогичный пример Hello World, созданный с помощью coroutine и функции run().

Отображение текущей даты с помощью call_later()

Пример обратного вызова, отображающего текущую дату каждую секунду. Обратный вызов использует метод loop.call_later(), чтобы переназначить себя через 5 секунд, а затем останавливает цикл событий:

import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

См.также

Аналогичный пример current date, созданный с помощью coroutine и функции run().

Наблюдение за дескриптором файла на предмет событий чтения

Подождите, пока файловый дескриптор не получит некоторые данные с помощью метода loop.add_reader(), а затем закройте цикл событий:

import asyncio
from socket import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()

loop = asyncio.get_event_loop()

def reader():
    data = rsock.recv(100)
    print("Received:", data.decode())

    # We are done: unregister the file descriptor
    loop.remove_reader(rsock)

    # Stop the event loop
    loop.stop()

# Register the file descriptor for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

try:
    # Run the event loop
    loop.run_forever()
finally:
    # We are done. Close sockets and the event loop.
    rsock.close()
    wsock.close()
    loop.close()

См.также

Установка обработчиков сигналов для SIGINT и SIGTERM

(Этот пример signals работает только на Unix).

Зарегистрируйте обработчики для сигналов SIGINT и SIGTERM с помощью метода loop.add_signal_handler():

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print("got signal %s: exit" % signame)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")

asyncio.run(main())
Back to Top