queue — Класс синхронизированной очереди

Исходный код: Lib/queue.py.


Модуль queue реализует очереди с несколькими производителями и потребителями. Он особенно полезен в потоковом программировании, когда необходимо безопасно обмениваться информацией между несколькими потоками. Класс Queue в этом модуле реализует всю необходимую семантику блокировки.

Модуль реализует три типа очередей, которые отличаются только порядком получения записей. В очереди FIFO первыми извлекаются первые добавленные задачи. В очереди LIFO первой извлекается самая последняя добавленная запись (работает как стек). В очереди с приоритетом записи сортируются (с помощью модуля heapq), и первой извлекается запись с наименьшим значением.

Внутри эти три типа очередей используют блокировки для временного блокирования конкурирующих потоков; однако они не предназначены для обработки реентерабельности внутри потока.

Кроме того, модуль реализует «простой» тип очереди FIFO, SimpleQueue, специфическая реализация которого обеспечивает дополнительные гарантии в обмен на меньшую функциональность.

Модуль queue определяет следующие классы и исключения:

class queue.Queue(maxsize=0)

Конструктор для очереди FIFO. maxsize - целое число, задающее верхний предел количества элементов, которые могут быть помещены в очередь. При достижении этого размера вставка будет блокироваться до тех пор, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.

class queue.LifoQueue(maxsize=0)

Конструктор для очереди LIFO. maxsize - целое число, задающее верхний предел количества элементов, которые могут быть помещены в очередь. При достижении этого размера вставка будет блокироваться до тех пор, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.

class queue.PriorityQueue(maxsize=0)

Конструктор для приоритетной очереди. maxsize - целое число, задающее верхний предел количества элементов, которые могут быть помещены в очередь. При достижении этого размера вставка будет блокироваться до тех пор, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.

Сначала извлекаются записи с наименьшим значением (запись с наименьшим значением - это та, которую возвращает sorted(list(entries))[0]). Типичным шаблоном для записей является кортеж в форме: (priority_number, data).

Если элементы данных не сравниваются, данные можно обернуть в класс, который игнорирует элемент данных и сравнивает только номер приоритета:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

Конструктор для беспредельной очереди FIFO. Простым очередям не хватает расширенной функциональности, такой как отслеживание задач.

Добавлено в версии 3.7.

exception queue.Empty

Исключение, возникающее, когда неблокирующий get() (или get_nowait()) вызывается на пустом объекте Queue.

exception queue.Full

Исключение, возникающее при вызове неблокирующего put() (или put_nowait()) объекта Queue, который переполнен.

Объекты очереди

Объекты очереди (Queue, LifoQueue или PriorityQueue) предоставляют публичные методы, описанные ниже.

Queue.qsize()

Возвращает приблизительный размер очереди. Обратите внимание, qsize() > 0 не гарантирует, что последующая get() не заблокируется, также как qsize() < maxsize не гарантирует, что put() не заблокируется.

Queue.empty()

Возвращает True, если очередь пуста, False в противном случае. Если empty() возвращает True, это не гарантирует, что последующий вызов put() не заблокируется. Аналогично, если empty() возвращает False, это не гарантирует, что последующий вызов get() не заблокируется.

Queue.full()

Возвращает True, если очередь заполнена, False в противном случае. Если full() возвращает True, это не гарантирует, что последующий вызов get() не заблокируется. Аналогично, если full() возвращает False, это не гарантирует, что последующий вызов put() не заблокируется.

Queue.put(item, block=True, timeout=None)

Поместить элемент в очередь. Если опциональный args block равен true и timeout равен None (по умолчанию), то при необходимости блокируется до тех пор, пока не появится свободный слот. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключение Full, если в течение этого времени свободный слот не был доступен. Иначе (block равно false), помещает элемент в очередь, если свободный слот доступен немедленно, иначе вызывает исключение Full (timeout в этом случае игнорируется).

Queue.put_nowait(item)

Эквивалентно put(item, block=False).

Queue.get(block=True, timeout=None)

Удалить и вернуть элемент из очереди. Если опциональный args block равен true и timeout равен None (по умолчанию), то при необходимости блокируется, пока элемент не будет доступен. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключение Empty, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключение Empty (timeout в этом случае игнорируется).

До версии 3.0 на POSIX-системах и во всех версиях на Windows, если block равен true и timeout равен None, эта операция переходит в режим непрерывного ожидания на базовой блокировке. Это означает, что не может произойти никаких исключений, в частности, SIGINT не вызовет KeyboardInterrupt.

Queue.get_nowait()

Эквивалентно get(False).

Для отслеживания того, были ли поставленные в очередь задачи полностью обработаны потребительскими потоками демона, предлагаются два метода.

Queue.task_done()

Указывает на то, что ранее поставленная в очередь задача завершена. Используется потоками-потребителями очереди. Для каждого get(), использованного для получения задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если join() в настоящее время блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызов task_done() был получен для каждого элемента, который был put() в очереди).

Вызывает ошибку ValueError, если вызывается большее количество раз, чем было помещено элементов в очередь.

Queue.join()

Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.

Счетчик незавершенных задач увеличивается всякий раз, когда в очередь добавляется элемент. Счетчик уменьшается всякий раз, когда поток-потребитель вызывает task_done(), чтобы указать, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля, join() разблокируется.

Пример ожидания завершения поставленных в очередь задач:

import threading
import queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

Объекты SimpleQueue

Объекты SimpleQueue предоставляют публичные методы, описанные ниже.

SimpleQueue.qsize()

Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующий get() не заблокируется.

SimpleQueue.empty()

Возвращает True, если очередь пуста, False в противном случае. Если empty() возвращает False, это не гарантирует, что последующий вызов get() не заблокируется.

SimpleQueue.put(item, block=True, timeout=None)

Поместить элемент в очередь. Метод никогда не блокируется и всегда завершается успешно (за исключением потенциальных низкоуровневых ошибок, таких как невозможность выделения памяти). Необязательные аргументы block и timeout игнорируются и предоставляются только для совместимости с Queue.put().

CPython implementation detail: This method has a C implementation which is reentrant. That is, a put() or get() call can be interrupted by another put() call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as __del__ methods or weakref callbacks.

SimpleQueue.put_nowait(item)

Эквивалент put(item, block=False), предоставляется для совместимости с Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)

Удалить и вернуть элемент из очереди. Если опциональный args block равен true и timeout равен None (по умолчанию), то при необходимости блокируется, пока элемент не будет доступен. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключение Empty, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключение Empty (timeout в этом случае игнорируется).

SimpleQueue.get_nowait()

Эквивалентно get(False).

См.также

Класс multiprocessing.Queue

Класс очереди для использования в многопроцессорном (а не многопоточном) контексте.

collections.deque - это альтернативная реализация неограниченных очередей с быстрыми атомарными операциями append() и popleft(), которые не требуют блокировки, а также поддерживают индексацию.

Back to Top