Соединения / Двигатели

Как настроить ведение журнала?

См. Настройка ведения журнала.

Как объединить подключения к базе данных? Объединены ли мои соединения в пул?

SQLAlchemy в большинстве случаев автоматически выполняет пул соединений на уровне приложения. За исключением SQLite, объект Engine ссылается на QueuePool как источник соединения.

Для более подробной информации смотрите Конфигурация двигателя и Объединение соединений.

Как передать пользовательские аргументы подключения в API моей базы данных?

Вызов create_engine() принимает дополнительные аргументы либо непосредственно через ключевое слово-аргумент connect_args:

e = create_engine(
    "mysql://scott:tiger@localhost/test", connect_args={"encoding": "utf8"}
)

Или для основных строковых и целочисленных аргументов, они обычно могут быть указаны в строке запроса URL:

e = create_engine("mysql://scott:tiger@localhost/test?encoding=utf8")

«Сервер MySQL исчез»

Основная причина этой ошибки заключается в том, что соединение MySQL вышло по таймеру и было закрыто сервером. Сервер MySQL закрывает соединения, которые простаивают в течение периода времени, который по умолчанию составляет восемь часов. Чтобы учесть это, необходимо включить параметр create_engine.pool_recycle, который гарантирует, что соединение, которое старше заданного количества секунд, будет удалено и заменено новым соединением при следующей проверке.

В более общем случае для обеспечения перезагрузки базы данных и других временных потерь связи из-за сетевых проблем, соединения, находящиеся в пуле, могут быть повторно использованы в ответ на более общие методы обнаружения разъединения. В разделе Работа с разъединениями приведена информация о «пессимистических» (например, предварительный пинг) и «оптимистических» (например, плавное восстановление) методах. В современной SQLAlchemy предпочтение отдается «пессимистическому» подходу.

«Команды рассинхронизированы; вы не можете выполнить эту команду сейчас» / «Этот объект результата не возвращает строки. Он был закрыт автоматически»

Драйверы MySQL имеют довольно широкий класс режимов отказа, при которых состояние соединения с сервером находится в недопустимом состоянии. Обычно, когда соединение используется снова, появляется одно из этих двух сообщений об ошибке. Причина в том, что состояние сервера было изменено на такое, которого клиентская библиотека не ожидает, так что когда клиентская библиотека выдает новый оператор на соединение, сервер отвечает не так, как ожидалось.

В SQLAlchemy, поскольку соединения баз данных объединяются в пул, проблема рассинхронизации сообщений на соединении становится более важной, поскольку при сбое операции, если само соединение находится в непригодном состоянии, если оно вернется в пул соединений, то при повторной проверке оно будет работать неправильно. Смягчение этой проблемы заключается в том, что соединение инвалидируется, когда происходит такой сбой, так что базовое соединение базы данных с MySQL отбрасывается. Это аннулирование происходит автоматически для многих известных режимов отказа, а также может быть вызвано явно с помощью метода Connection.invalidate().

Существует также второй класс режимов отказа в этой категории, когда менеджер контекста, такой как with session.begin_nested():, хочет «откатить» транзакцию при возникновении ошибки; однако в некоторых режимах отказа соединения сам откат (который также может быть операцией RELEASE SAVEPOINT) также терпит неудачу, что приводит к искажению трассировки стека.

Первоначально причина этой ошибки была довольно проста: она означала, что многопоточная программа вызывала команды на одном соединении из более чем одного потока. Это относилось к оригинальному драйверу «MySQLdb» native-C, который был практически единственным используемым драйвером. Однако с появлением драйверов на чистом Python, таких как PyMySQL и MySQL-connector-Python, а также с увеличением использования таких инструментов, как gevent/eventlet, многопроцессорной обработки (часто с Celery) и других, появилась целая серия факторов, которые, как известно, могут вызвать эту проблему, некоторые из которых были улучшены в версиях SQLAlchemy, но другие неизбежны:

  • Разделение соединения между потоками - Это первоначальная причина возникновения подобных ошибок. Программа использовала одно и то же соединение в двух или более потоках одновременно, в результате чего несколько наборов сообщений смешивались в соединении, переводя сессию на стороне сервера в состояние, которое клиент уже не знает, как интерпретировать. Однако сегодня более вероятны другие причины.

  • Обмен файловым хэндлом соединения между процессами - Обычно это происходит, когда программа использует os.fork() для порождения нового процесса, и TCP-соединение, которое присутствует в родительском процессе, становится общим для одного или нескольких дочерних процессов. Поскольку несколько процессов теперь посылают сообщения по сути на один и тот же файл-хэндл, сервер получает чередующиеся сообщения и нарушает состояние соединения.

    Этот сценарий может произойти очень легко, если программа использует модуль «multiprocessing» Python и использует Engine, который был создан в родительском процессе. Обычно «многопроцессорность» используется при использовании таких инструментов, как Celery. Правильный подход должен заключаться либо в том, что при первом запуске дочернего процесса создается новый Engine, отбрасывая все Engine, которые были созданы в родительском процессе; либо Engine, унаследованный от родительского процесса, может утилизировать свой внутренний пул соединений путем вызова Engine.dispose().

  • Greenlet Monkeypatching w/ Exits - При использовании таких библиотек, как gevent или eventlet, которые обезьянничают сетевой API Python, такие библиотеки, как PyMySQL, теперь работают в асинхронном режиме, даже если они не были разработаны явно для этой модели. Распространенной проблемой является прерывание работы зеленого потока, часто из-за логики таймаута в приложении. Это приводит к возникновению исключения GreenletExit, и чисто-Python MySQL драйвер прерывается от своей работы, которая могла заключаться в получении ответа от сервера или подготовке к иному восстановлению состояния соединения. Когда исключение прерывает всю эту работу, разговор между клиентом и сервером рассинхронизируется, и последующее использование соединения может завершиться неудачей. SQLAlchemy начиная с версии 1.1.0 знает, как защититься от этого, поскольку если операция с базой данных прерывается так называемым «исключением выхода», которое включает GreenletExit и любой другой подкласс Python BaseException, не являющийся также подклассом Exception, соединение аннулируется.

  • Откат / освобождение SAVEPOINT не удается - Некоторые классы ошибок приводят к тому, что соединение становится непригодным для использования в контексте транзакции, а также при работе в блоке «SAVEPOINT». В этих случаях сбой соединения приводит к тому, что любой SAVEPOINT больше не существует, но когда SQLAlchemy или приложение пытается «откатить» эту точку сохранения, операция «RELEASE SAVEPOINT» терпит неудачу, обычно с сообщением типа «точка сохранения не существует». В этом случае в Python 3 будет выведена цепочка исключений, где также будет показана конечная «причина» ошибки. В Python 2 «цепочка» исключений отсутствует, однако последние версии SQLAlchemy пытаются вывести предупреждение, иллюстрирующее первоначальную причину ошибки, но при этом выдают непосредственную ошибку, которая является ошибкой ROLLBACK.

Как автоматически «повторить» выполнение заявления?

В разделе документации Работа с разъединениями обсуждаются стратегии, доступные для пула соединений, которые были отключены с момента последней проверки конкретного соединения. Наиболее современной функцией в этом отношении является параметр create_engine.pre_ping, который позволяет выполнить «пинг» соединения базы данных, когда оно извлекается из пула, переподключаясь, если текущее соединение было отключено.

Важно отметить, что этот «пинг» испускается только до того, как соединение фактически используется для операции. Как только соединение передано вызывающей стороне, согласно спецификации Python DBAPI оно становится объектом операции autobegin, что означает, что при первом использовании оно автоматически начинает новую транзакцию, которая остается в силе для последующих операторов, пока не будет вызван метод DBAPI-уровня connection.commit() или connection.rollback().

При современном использовании SQLAlchemy серия SQL-запросов всегда вызывается в этом транзакционном состоянии, при условии, что DBAPI autocommit mode не включено (подробнее об этом в следующем разделе), что означает, что ни один отдельный запрос не фиксируется автоматически; если операция завершится неудачно, эффекты всех запросов в текущей транзакции будут потеряны.

Последствия, которые это имеет для понятия «повторной попытки» утверждения, заключаются в том, что в стандартном случае, когда соединение потеряно, вся транзакция потеряна. Нет никакого полезного способа, чтобы база данных могла «переподключиться и повторить попытку» и продолжить с того места, где она остановилась, поскольку данные уже потеряны. По этой причине SQLAlchemy не имеет прозрачной функции «переподключения», которая работает в середине транзакции, для случая, когда соединение с базой данных отключилось во время использования. Канонический подход к решению проблемы разрыва соединения в середине операции заключается в повторении всей операции с начала транзакции, часто с помощью пользовательского декоратора Python, который будет «повторять» определенную функцию несколько раз до тех пор, пока она не будет успешной, или в другой архитектуре приложения таким образом, чтобы оно было устойчиво к разрыву транзакций, который приводит к сбою операций.

Существует также понятие расширений, которые могут отслеживать все утверждения, выполнявшиеся в рамках транзакции, и затем воспроизводить их все в новой транзакции, чтобы приблизить операцию «повторной попытки». Система event system в SQLAlchemy позволяет построить такую систему, однако этот подход также не совсем полезен, поскольку невозможно гарантировать, что эти операторы DML будут работать с одним и тем же состоянием, так как после завершения транзакции состояние базы данных в новой транзакции может быть совершенно другим. Архитектура «повторной попытки» в явном виде в приложении в точках начала и фиксации транзакционных операций остается лучшим подходом, поскольку транзакционные методы на уровне приложения лучше всего знают, как повторно выполнить свои шаги.

В противном случае, если SQLAlchemy предоставит функцию, которая прозрачно и беззвучно «переподключит» соединение в середине транзакции, это приведет к тому, что данные будут беззвучно потеряны. Пытаясь скрыть проблему, SQLAlchemy значительно ухудшит ситуацию.

Однако, если мы **не используем транзакции, то у нас есть больше возможностей, как описано в следующем разделе.

Использование автокоммита DBAPI позволяет использовать версию прозрачного восстановления только для чтения

В предыдущем разделе было приведено обоснование отсутствия механизма прозрачного переподключения, и предполагалось, что приложение на самом деле использует транзакции на уровне DBAPI. Поскольку большинство DBAPI сейчас предлагают native «autocommit» settings, мы можем использовать эти возможности для обеспечения ограниченной формы прозрачного повторного соединения для операций только чтение, только автокоммит. Прозрачный повторный запрос может быть применен к методу cursor.execute() в DBAPI, однако его все еще небезопасно применять к методу cursor.executemany() в DBAPI, поскольку запрос мог израсходовать любую часть переданных аргументов.

Предупреждение

Следующий рецепт не должен использоваться для операций записи данных. Пользователи должны внимательно прочитать и понять, как работает рецепт, и очень тщательно протестировать режимы отказа на специально предназначенном для этого драйвере DBAPI, прежде чем использовать этот рецепт в производственных целях. Механизм повторных попыток не гарантирует предотвращения ошибок разъединения во всех случаях.

К методу cursor.execute() уровня DBAPI можно применить простой механизм повторных попыток, используя крючки DialectEvents.do_execute() и DialectEvents.do_execute_no_params(), который сможет перехватывать разрывы соединений во время выполнения операторов. Он не будет перехватывать разрывы соединения во время операций выборки набора результатов, для тех DBAPI, которые не полностью буферизируют наборы результатов. Рецепт требует, чтобы база данных поддерживала автокоммит на уровне DBAPI, и не гарантируется для конкретных бэкендов. Представлена единственная функция reconnecting_engine(), которая применяет крючки событий к данному объекту Engine, возвращая версию always-autocommit, которая включает автокоммит на уровне DBAPI. Соединение будет прозрачно переподключаться для выполнения операторов с одним параметром или без параметров:

import time

from sqlalchemy import event


def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e

Учитывая вышеприведенный рецепт, переподключение в середине транзакции можно продемонстрировать с помощью следующего сценария доказательства концепции. После запуска он будет выдавать в базу данных каждые пять секунд сообщение SELECT 1:

from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":

    engine = create_engine("mysql://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine("mysql://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)

Перезапустите базу данных во время выполнения сценария, чтобы продемонстрировать операцию прозрачного переподключения:

$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

Приведенный выше рецепт протестирован для SQLAlchemy 1.4.

Почему SQLAlchemy выдает так много ROLLBACK?

В настоящее время SQLAlchemy предполагает, что соединения DBAPI находятся в режиме «без автокоммита» - это стандартное поведение API баз данных Python, то есть предполагается, что транзакция всегда находится в процессе. Пул соединений выдает connection.rollback() при возврате соединения. Это делается для того, чтобы все транзакционные ресурсы, оставшиеся на соединении, были освобождены. Для таких баз данных, как PostgreSQL или MSSQL, где ресурсы таблиц агрессивно блокируются, это очень важно, чтобы строки и таблицы не оставались заблокированными в соединениях, которые больше не используются. В противном случае приложение может зависнуть. Однако это относится не только к блокировкам и одинаково важно для любой базы данных с любым видом изоляции транзакций, включая MySQL с InnoDB. Любое соединение, которое все еще находится внутри старой транзакции, вернет устаревшие данные, если эти данные уже были запрошены на этом соединении в пределах изоляции. О том, почему вы можете видеть несвежие данные даже в MySQL, читайте на сайте https://dev.mysql.com/doc/refman/5.1/en/innodb-transaction-model.html.

Я нахожусь на MyISAM - как мне его отключить?

Поведение пула соединений при возврате соединения можно настроить с помощью reset_on_return:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)

Я работаю на SQL Server - как мне превратить эти ROLLBACK в COMMIT?

reset_on_return принимает значения commit, rollback в дополнение к True, False и None. Установка значения commit вызовет COMMIT при возврате любого соединения в пул:

engine = create_engine(
    "mssql://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)

Я использую несколько соединений с базой данных SQLite (обычно для проверки работы транзакций), и моя тестовая программа не работает!

Если используется база данных SQLite :memory: или версия SQLAlchemy до версии 0.7, пулом соединений по умолчанию является пул SingletonThreadPool, который поддерживает ровно одно соединение SQLite на поток. Поэтому два соединения, используемые в одном потоке, на самом деле будут одним и тем же соединением SQLite. Убедитесь, что вы не используете базу данных :memory: и используйте NullPool, который является пулом по умолчанию для баз данных без памяти в текущих версиях SQLAlchemy.

См.также

pysqlite_threading_pooling - информация о поведении PySQLite.

Как получить необработанное соединение DBAPI при использовании Engine?

При использовании обычного соединения на уровне движка SA вы можете получить проксированную на пул версию соединения DBAPI через атрибут Connection.connection на Connection, а для действительно реального соединения DBAPI вы можете вызвать атрибут _ConnectionFairy.dbapi_connection на нем. В обычных драйверах синхронизации обычно нет необходимости обращаться к не пул-проксированному DBAPI-соединению, так как все методы проксируются через:

engine = create_engine(...)
conn = engine.connect()

# pep-249 style ConnectionFairy connection pool proxy object
connection_fairy = conn.connection

# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj

# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection

# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection

Изменено в версии 1.4.24: Добавлен атрибут _ConnectionFairy.dbapi_connection, который заменяет предыдущий атрибут _ConnectionFairy.connection, который все еще остается доступным; этот атрибут всегда предоставляет объект соединения синхронного стиля pep-249. Также добавлен атрибут _ConnectionFairy.driver_connection, который всегда будет ссылаться на реальное соединение на уровне драйвера, независимо от того, какой API он представляет.

Доступ к базовому соединению для драйвера asyncio

Когда используется драйвер asyncio, в приведенной выше схеме есть два изменения. Первое заключается в том, что при использовании AsyncConnection доступ к _ConnectionFairy должен осуществляться с помощью метода awaitable AsyncConnection.get_raw_connection(). Возвращаемый _ConnectionFairy в этом случае сохраняет схему использования pep-249 в стиле sync, а атрибут _ConnectionFairy.dbapi_connection ссылается на объект соединения, адаптированный SQLAlchemy, который адаптирует соединение asyncio к API pep-249 в стиле sync, другими словами, при использовании драйвера asyncio происходит два уровня проксирования. Фактическое соединение asyncio доступно из атрибута driver_connection. Пересказ предыдущего примера в терминах asyncio выглядит следующим образом:

async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

Изменено в версии 1.4.24: Добавлены атрибуты _ConnectionFairy.dbapi_connection и _ConnectionFairy.driver_connection для обеспечения доступа к соединениям pep-249, уровням адаптации pep-249 и базовым соединениям драйверов с использованием согласованного интерфейса.

При использовании драйверов asyncio вышеуказанное соединение «DBAPI» на самом деле является адаптированной к SQLAlchemy формой соединения, которая представляет синхронный API в стиле pep-249. Чтобы получить доступ к реальному соединению драйвера asyncio, которое будет представлять оригинальный asyncio API используемого драйвера, к нему можно получить доступ через атрибут _ConnectionFairy.driver_connection в _ConnectionFairy. Для стандартного драйвера pep-249, _ConnectionFairy.dbapi_connection и _ConnectionFairy.driver_connection являются синонимами.

Вы должны убедиться, что вернули все настройки уровня изоляции или другие специфические для работы настройки соединения в нормальное состояние, прежде чем возвращать его в пул.

В качестве альтернативы возврату настроек, вы можете вызвать метод Connection.detach() на Connection или проксированном соединении, который деассоциирует соединение из пула таким образом, что оно будет закрыто и отброшено при вызове Connection.close():

conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

Как использовать движки / соединения / сессии с многопроцессорной обработкой Python или os.fork()?

Об этом говорится в разделе Использование пулов соединений с многопроцессорной обработкой или os.fork().

Back to Top