Асинхронный ввод/вывод (asyncio)

Поддержка Python asyncio. Включена поддержка использования Core и ORM с использованием диалектов, совместимых с asyncio.

Добавлено в версии 1.4.

Предупреждение

Пожалуйста, прочитайте Примечания по установке платформы Asyncio (включая Apple M1) для важных указаний по установке для многих платформ, включая Apple M1 Architecture.

Совет

Расширение asyncio в версии SQLAlchemy 1.4.3 теперь можно считать бета-уровнем программного обеспечения. Детали API могут быть изменены, однако на данный момент маловероятно, что будут значительные изменения, несовместимые с обратным развитием.

См.также

Asynchronous IO Support for Core and ORM - первоначальное объявление функции

Интеграция Asyncio - примеры скриптов, иллюстрирующие рабочие примеры использования Core и ORM в рамках расширения asyncio.

Примечания по установке платформы Asyncio (включая Apple M1)

Расширение asyncio требует только Python 3. Оно также зависит от библиотеки greenlet. Эта зависимость установлена по умолчанию на распространенных машинных платформах, включая:

x86_64 aarch64 ppc64le amd64 win32

Для вышеуказанных платформ известно, что greenlet поставляет предварительно собранные файлы колеса. Для других платформ greenlet не устанавливается по умолчанию; текущий список файлов для greenlet можно посмотреть на Greenlet - Download Files. Заметим, что не учтены многие архитектуры, включая Apple M1.

Чтобы установить SQLAlchemy и при этом обеспечить наличие зависимости greenlet независимо от используемой платформы, [asyncio] setuptools extra может быть установлен следующим образом, который будет включать также указание pip установить greenlet:

pip install sqlalchemy[asyncio]

Обратите внимание, что установка greenlet на платформы, не имеющие предварительно собранного файла wheel, означает, что greenlet будет собран из исходных текстов, что требует наличия библиотек разработки Python.

Синопсис - Ядро

Для использования в Core функция create_async_engine() создает экземпляр AsyncEngine, который затем предлагает асинхронную версию традиционного API Engine. Функция AsyncEngine предоставляет AsyncConnection через свои методы AsyncEngine.connect() и AsyncEngine.begin(), которые оба предоставляют асинхронные менеджеры контекста. Затем AsyncConnection может вызывать операторы, используя либо метод AsyncConnection.execute() для передачи буферизованного Result, либо метод AsyncConnection.stream() для передачи потокового серверного AsyncResult:

import asyncio

from sqlalchemy.ext.asyncio import create_async_engine


async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )

    async with engine.begin() as conn:
        await conn.run_sync(meta.drop_all)
        await conn.run_sync(meta.create_all)

        await conn.execute(
            t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
        )

    async with engine.connect() as conn:
        # select a Result, which will be delivered with buffered
        # results
        result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))

        print(result.fetchall())

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(async_main())

Выше, метод AsyncConnection.run_sync() может быть использован для вызова специальных функций DDL, таких как MetaData.create_all(), которые не включают ожидающий хук.

Совет

Рекомендуется вызывать метод AsyncEngine.dispose() с помощью await при использовании объекта AsyncEngine в области видимости, которая выйдет из контекста и будет собрана, как показано в функции async_main в приведенном выше примере. Это гарантирует, что все соединения, открытые пулом соединений, будут правильно утилизированы в ожидающем контексте. В отличие от использования блокирующего ввода-вывода, SQLAlchemy не может правильно утилизировать эти соединения в рамках таких методов, как __del__ или финализаторов weakref, поскольку нет возможности вызвать await. Если явно не утилизировать механизм, когда он выпадает из области видимости, это может привести к предупреждениям, выдаваемым в стандартный вывод, напоминающим форму RuntimeError: Event loop is closed в сборке мусора.

AsyncConnection также имеет «потоковый» API через метод AsyncConnection.stream(), который возвращает объект AsyncResult. Этот объект результата использует курсор на стороне сервера и предоставляет API async/await, такой как async iterator:

async with engine.connect() as conn:
    async_result = await conn.stream(select(t1))

    async for row in async_result:
        print("row: %s" % (row,))

Синопсис - ORM

При использовании запросов 2.0 style класс AsyncSession обеспечивает полную функциональность ORM. В режиме использования по умолчанию необходимо соблюдать особую осторожность, чтобы избежать lazy loading или другого доступа к атрибутам с истекшим сроком действия с использованием ORM отношений и атрибутов столбцов; следующий раздел Предотвращение неявного ввода-вывода при использовании AsyncSession подробно описывает это. Пример ниже иллюстрирует полный пример, включая конфигурацию маппера и сессии:

import asyncio

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker

Base = declarative_base()


class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    data = Column(String)
    create_date = Column(DateTime, server_default=func.now())
    bs = relationship("B")

    # required in order to access columns with server defaults
    # or SQL expression defaults, subsequent to a flush, without
    # triggering an expired load
    __mapper_args__ = {"eager_defaults": True}


class B(Base):
    __tablename__ = "b"
    id = Column(Integer, primary_key=True)
    a_id = Column(ForeignKey("a.id"))
    data = Column(String)


async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    # expire_on_commit=False will prevent attributes from being expired
    # after commit.
    async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

    async with async_session() as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )

        stmt = select(A).options(selectinload(A.bs))

        result = await session.execute(stmt)

        for a1 in result.scalars():
            print(a1)
            print(f"created at: {a1.create_date}")
            for b1 in a1.bs:
                print(b1)

        result = await session.execute(select(A).order_by(A.id))

        a1 = result.scalars().first()

        a1.data = "new data"

        await session.commit()

        # access attribute subsequent to commit; this is what
        # expire_on_commit=False allows
        print(a1.data)

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(async_main())

В приведенном выше примере AsyncSession инстанцируется с помощью необязательного помощника sessionmaker и ассоциируется с AsyncEngine против определенного URL базы данных. Затем он используется в асинхронном менеджере контекста Python (т.е. в операторе async with:) таким образом, что он автоматически закрывается в конце блока; это эквивалентно вызову метода AsyncSession.close().

Примечание

AsyncSession использует режим будущего SQLAlchemy, который имеет несколько потенциально разрушительных изменений. Одним из таких изменений является новое поведение по умолчанию cascade_backrefs является False, что может повлиять на то, как связанные объекты сохраняются в базе данных.

Предотвращение неявного ввода-вывода при использовании AsyncSession

Используя традиционный asyncio, приложение должно избегать точек, в которых может произойти обращение к IO-на-атрибутах. Для предотвращения этого принимаются следующие меры:

  • Загрузчик нетерпения selectinload() используется для нетерпеливой загрузки коллекции A.bs в пределах области действия вызова await session.execute():

    stmt = select(A).options(selectinload(A.bs))

    Если бы стратегия загрузчика по умолчанию «lazyload» была оставлена на месте, доступ к атрибуту A.bs вызвал бы исключение asyncio. Существует множество вариантов ORM-загрузчика, которые могут быть настроены на уровне связки по умолчанию или использоваться на основе каждого запроса, что описано в Техники загрузки отношений.

  • AsyncSession настроен с помощью Session.expire_on_commit, установленного в False, так что мы можем получить доступ к атрибутам объекта после вызова AsyncSession.commit(), как в строке в конце, где мы получаем доступ к атрибуту:

    # create AsyncSession with expire_on_commit=False
    async_session = AsyncSession(engine, expire_on_commit=False)
    
    # sessionmaker version
    async_session = sessionmaker(
        engine, expire_on_commit=False, class_=AsyncSession
    )
    
    async with async_session() as session:
        result = await session.execute(select(A).order_by(A.id))
    
        a1 = result.scalars().first()
    
        # commit would normally expire all attributes
        await session.commit()
    
        # access attribute subsequent to commit; this is what
        # expire_on_commit=False allows
        print(a1.data)
  • Значение Column.server_default в столбце created_at по умолчанию не обновляется после INSERT; вместо этого оно обычно expired so that it can be loaded when needed. Аналогичное поведение применимо к столбцу, в котором параметр Column.default присвоен объекту выражения SQL. Чтобы получить доступ к этому значению с помощью asyncio, оно должно быть обновлено в процессе flush, что достигается установкой параметра mapper.eager_defaults на отображении:

    class A(Base):
        # ...
    
        # column with a server_default, or SQL expression default
        create_date = Column(DateTime, server_default=func.now())
    
        # add this so that it can be accessed
        __mapper_args__ = {"eager_defaults": True}

Другие рекомендации включают:

  • Следует избегать методов типа AsyncSession.expire() в пользу AsyncSession.refresh().

  • Избегайте использования опции каскада all, задокументированной в Каскады, в пользу явного перечисления желаемых свойств каскада. Опция каскада all подразумевает, помимо прочего, настройку refresh-expire, что означает, что метод AsyncSession.refresh() уничтожит атрибуты на связанных объектах, но не обязательно обновит эти связанные объекты, если в relationship() не настроена ускоренная загрузка, оставив их в просроченном состоянии. В будущем выпуске может появиться возможность указывать опции ускоренной загрузки при вызове Session.refresh() и/или AsyncSession.refresh().

  • Для столбцов deferred(), если они вообще используются, следует применять соответствующие опции загрузчика в дополнение к опциям для конструкций relationship(), как отмечено выше. См. раздел Отложенная загрузка колонн для получения информации об отложенной загрузке столбцов.

Запуск синхронных методов и функций в asyncio

Deep Alchemy

Этот подход, по сути, является публичным раскрытием механизма, с помощью которого SQLAlchemy обеспечивает интерфейс asyncio в первую очередь. Несмотря на отсутствие технических проблем, в целом этот подход можно считать «спорным», поскольку он противоречит некоторым центральным принципам модели программирования asyncio, которая заключается в том, что любое программное утверждение, которое потенциально может привести к вызову IO, **должно содержать вызов await, чтобы в программе не было явно указано в каждой строке, где может произойти IO. Данный подход не меняет этой общей идеи, за исключением того, что он позволяет освободить серию синхронных инструкций ввода-вывода от этого правила в рамках вызова функции, по сути, объединив их в один awaitable.

В качестве альтернативного средства интеграции традиционной «ленивой загрузки» SQLAlchemy в цикл событий asyncio, предоставляется опциональный метод, известный как AsyncSession.run_sync(), который будет запускать любую функцию Python внутри гринлета, где традиционные концепции синхронного программирования будут переведены на использование await, когда они достигнут драйвера базы данных. Гипотетический подход здесь заключается в том, что асинхронно-ориентированное приложение может упаковать методы, связанные с базой данных, в функции, которые вызываются с помощью AsyncSession.run_sync().

Изменяя приведенный выше пример, если бы мы не использовали selectinload() для коллекции A.bs, мы могли бы выполнить обработку этих обращений к атрибутам в отдельной функции:

import asyncio

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine


def fetch_and_update_objects(session):
    """run traditional sync-style ORM code in a function that will be
    invoked within an awaitable.

    """

    # the session object here is a traditional ORM Session.
    # all features are available here including legacy Query use.

    stmt = select(A)

    result = session.execute(stmt)
    for a1 in result.scalars():
        print(a1)

        # lazy loads
        for b1 in a1.bs:
            print(b1)

    # legacy Query use
    a1 = session.query(A).order_by(A.id).first()

    a1.data = "new data"


async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncSession(engine) as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )

        await session.run_sync(fetch_and_update_objects)

        await session.commit()

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(async_main())

Приведенный выше подход к запуску определенных функций в рамках «синхронизирующего» бегуна имеет некоторые параллели с приложением, которое запускает приложение SQLAlchemy поверх библиотеки программирования на основе событий, такой как gevent. Различия заключаются в следующем:

  1. В отличие от использования gevent, мы можем продолжать использовать стандартный цикл событий Python asyncio или любой пользовательский цикл событий без необходимости интегрироваться в цикл событий gevent.

  2. Никакого «обезьянничания» не происходит. В приведенном выше примере используется настоящий драйвер asyncio, а лежащий в основе SQLAlchemy пул соединений также использует встроенный в Python asyncio.Queue для пула соединений.

  3. Программа может свободно переключаться между кодом async/await и содержащимися функциями, использующими код sync, практически без ущерба для производительности. Нет ни «исполнителя потока», ни каких-либо дополнительных ожиданий или синхронизации.

  4. Базовые сетевые драйверы также используют чистые концепции Python asyncio, не используются сторонние сетевые библиотеки, как это делают gevent и eventlet.

Использование событий с расширением asyncio

SQLAlchemy event system напрямую не раскрывается расширением asyncio, что означает, что пока не существует «асинхронной» версии обработчика событий SQLAlchemy.

Однако, поскольку расширение asyncio окружает обычный синхронный SQLAlchemy API, обычные обработчики событий в «синхронном» стиле свободно доступны, как это было бы, если бы asyncio не использовался.

Как описано ниже, в настоящее время существует две стратегии регистрации событий в asyncio-facing API:

  • События могут быть зарегистрированы на уровне экземпляра (например, конкретного экземпляра AsyncEngine) путем связывания события с атрибутом sync, который ссылается на проксируемый объект. Например, чтобы зарегистрировать событие PoolEvents.connect() против экземпляра AsyncEngine, используйте его атрибут AsyncEngine.sync_engine в качестве цели. Цели включают:

    AsyncEngine.sync_engine

    AsyncConnection.sync_connection

    AsyncConnection.sync_engine

    AsyncSession.sync_session

  • Чтобы зарегистрировать событие на уровне класса, направленное на все экземпляры одного типа (например, все экземпляры AsyncSession), используйте соответствующий класс в стиле синхронизации. Например, чтобы зарегистрировать событие SessionEvents.before_commit() против класса AsyncSession, используйте класс Session в качестве цели.

При работе в обработчике событий, который находится в контексте asyncio, объекты типа Connection продолжают работать своим обычным «синхронным» способом, не требуя использования await или async; когда сообщения в конечном итоге принимаются адаптером базы данных asyncio, стиль вызова прозрачно адаптируется обратно в стиль вызова asyncio. Для событий, которым передается соединение уровня DBAPI, например PoolEvents.connect(), объект является pep-249 совместимым объектом «соединение», который адаптирует вызовы в стиле синхронизации к драйверу asyncio.

Некоторые примеры обработчиков событий в стиле синхронизации, связанных с конструкциями API, ориентированными на асинхронный режим, показаны ниже:

import asyncio

from sqlalchemy import event, text
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import Session

## Core events ##

engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")


# connect event on instance of Engine
@event.listens_for(engine.sync_engine, "connect")
def my_on_connect(dbapi_con, connection_record):
    print("New DBAPI connection:", dbapi_con)
    cursor = dbapi_con.cursor()

    # sync style API use for adapted DBAPI connection / cursor
    cursor.execute("select 'execute from event'")
    print(cursor.fetchone()[0])


# before_execute event on all Engine instances
@event.listens_for(Engine, "before_execute")
def my_before_execute(
    conn,
    clauseelement,
    multiparams,
    params,
    execution_options,
):
    print("before execute!")


## ORM events ##

session = AsyncSession(engine)


# before_commit event on instance of Session
@event.listens_for(session.sync_session, "before_commit")
def my_before_commit(session):
    print("before commit!")

    # sync style API use on Session
    connection = session.connection()

    # sync style API use on Connection
    result = connection.execute(text("select 'execute from event'"))
    print(result.first())


# after_commit event on all Session instances
@event.listens_for(Session, "after_commit")
def my_after_commit(session):
    print("after commit!")


async def go():
    await session.execute(text("select 1"))
    await session.commit()

    await session.close()
    await engine.dispose()


asyncio.run(go())

Приведенный выше пример выводит что-то вроде:

New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection ...>>
execute from event
before execute!
before commit!
execute from event
after commit!

Использование методов драйвера только с ожиданием в пуле соединений и других событиях

Как говорилось в предыдущем разделе, обработчики событий, такие как обработчики событий PoolEvents, получают соединение «DBAPI» в стиле синхронизации, которое представляет собой объект-обертку, предоставляемый диалектами asyncio SQLAlchemy для адаптации базового соединения «драйвера» asyncio в такое, которое может быть использовано внутренними компонентами SQLAlchemy. Особый случай возникает, когда пользовательская реализация такого обработчика событий должна использовать конечное «драйверное» соединение напрямую, используя только ожидаемые методы этого драйверного соединения. Одним из таких примеров является метод .set_type_codec(), предоставляемый драйвером asyncpg.

Для того, чтобы приспособить этот вариант использования, класс SQLAlchemy AdaptedConnection предоставляет метод AdaptedConnection.run_async(), который позволяет вызывать ожидаемую функцию в «синхронном» контексте обработчика событий или другого внутреннего компонента SQLAlchemy. Этот метод является прямым аналогом метода AsyncConnection.run_sync(), который позволяет методу в стиле sync работать в режиме async.

AdaptedConnection.run_async() должна быть передана функция, которая примет внутреннее соединение «драйвера» как единственный аргумент и вернет awaitable, который будет вызван методом AdaptedConnection.run_async(). Саму переданную функцию не нужно объявлять как async; совершенно нормально, если она будет Python lambda:, так как возвращаемое значение awaitable будет вызвано после возврата:

from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(...)


@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, ...):
    dbapi_connection.run_async(
        lambda connection: connection.set_type_codec(
            "MyCustomType", encoder, decoder, ...
        )
    )

Выше, объект, передаваемый в обработчик события register_custom_types, является экземпляром AdaptedConnection, который предоставляет DBAPI-подобный интерфейс к базовому асинхронному объекту соединения на уровне драйвера. Метод AdaptedConnection.run_async() затем предоставляет доступ к ожидающей среде, в которой можно действовать с базовым соединением на уровне драйвера.

Добавлено в версии 1.4.30.

Использование нескольких циклов событий asyncio

Приложение, использующее несколько циклов событий, например, в редком случае сочетания asyncio с многопоточностью, не должно использовать один и тот же AsyncEngine с разными циклами событий при использовании реализации пула по умолчанию.

Если AsyncEngine передается из одного цикла событий в другой, то перед повторным использованием в новом цикле событий следует вызвать метод AsyncEngine.dispose(). Если этого не сделать, может возникнуть ошибка RuntimeError, подобная Task <Task pending ...> got Future attached to a different loop.

Если один и тот же движок должен использоваться совместно разными циклами, его следует настроить на отключение пула с помощью NullPool, не позволяя движку использовать любое соединение более одного раза:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@host/dbname",
    poolclass=NullPool,
)

Использование asyncio scoped session

Шаблон «scoped session», используемый в потоковой SQLAlchemy с объектом scoped_session, также доступен в asyncio, используя адаптированную версию под названием async_scoped_session.

Совет

SQLAlchemy в целом не рекомендует использовать паттерн «scoped» для новых разработок, поскольку он опирается на изменяемое глобальное состояние, которое также должно быть явно разрушено по завершении работы потока или задачи. В частности, при использовании asyncio, вероятно, лучше передавать AsyncSession непосредственно в ожидающие функции, которым это необходимо.

При использовании async_scoped_session, поскольку в контексте asyncio не существует концепции «thread-local», конструктору необходимо предоставить параметр «scopefunc». Пример ниже иллюстрирует использование функции asyncio.current_task() для этой цели:

from asyncio import current_task

from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.ext.asyncio import AsyncSession

async_session_factory = sessionmaker(some_async_engine, class_=AsyncSession)
AsyncScopedSession = async_scoped_session(async_session_factory, scopefunc=current_task)

some_async_session = AsyncScopedSession()

Предупреждение

Функция «scopefunc», используемая async_scoped_session, вызывается произвольное количество раз в рамках задачи, по одному разу при каждом обращении к базовому AsyncSession. Поэтому функция должна быть эдемпотентной и легковесной, и не должна пытаться создавать или изменять какое-либо состояние, например, устанавливать обратные вызовы и т.д.

Предупреждение

Использование current_task() для «ключа» в области видимости требует, чтобы метод async_scoped_session.remove() вызывался из внешнего awaitable, чтобы гарантировать удаление ключа из реестра после завершения задачи, иначе хэндл задачи, а также AsyncSession останутся в памяти, по сути, создавая утечку памяти. Смотрите следующий пример, который иллюстрирует правильное использование async_scoped_session.remove().

async_scoped_session включает поведение прокси, аналогичное поведению scoped_session, что означает, что с ним можно обращаться как с AsyncSession непосредственно, помня о необходимости использования обычных ключевых слов await, в том числе для метода async_scoped_session.remove():

async def some_function(some_async_session, some_object):
    # use the AsyncSession directly
    some_async_session.add(some_object)

    # use the AsyncSession via the context-local proxy
    await AsyncScopedSession.commit()

    # "remove" the current proxied AsyncSession for the local context
    await AsyncScopedSession.remove()

Добавлено в версии 1.4.19.

Использование инспектора для проверки объектов схемы

SQLAlchemy пока не предлагает asyncio версию Inspector (представленную в Мелкозернистое отражение с инспектором), однако существующий интерфейс можно использовать в контексте asyncio, используя метод AsyncConnection.run_sync() из AsyncConnection:

import asyncio

from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")


def use_inspector(conn):
    inspector = inspect(conn)
    # use the inspector
    print(inspector.get_view_names())
    # return any value to the caller
    return inspector.get_table_names()


async def async_main():
    async with engine.connect() as conn:
        tables = await conn.run_sync(use_inspector)


asyncio.run(async_main())

Документация API двигателя

Документация API набора результатов

Объект AsyncResult является асинхронной версией объекта Result. Он возвращается только при использовании методов AsyncConnection.stream() или AsyncSession.stream(), которые возвращают объект результата, находящийся поверх активного курсора базы данных.

Документация ORM Session API

Back to Top