queue — Класс синхронизированной очереди¶
Исходный код: Lib/queue.py.
Модуль queue реализует очереди с несколькими производителями и потребителями. Он особенно полезен в потоковом программировании, когда необходимо безопасно обмениваться информацией между несколькими потоками. Класс Queue в этом модуле реализует всю необходимую семантику блокировки.
Модуль реализует три типа очередей, которые отличаются только порядком получения записей. В очереди FIFO первыми извлекаются первые добавленные задачи. В очереди LIFO первой извлекается самая последняя добавленная запись (работает как стек). В очереди с приоритетом записи сортируются (с помощью модуля heapq), и первой извлекается запись с наименьшим значением.
Внутри эти три типа очередей используют блокировки для временного блокирования конкурирующих потоков; однако они не предназначены для обработки реентерабельности внутри потока.
Кроме того, модуль реализует «простой» тип очереди FIFO, SimpleQueue, специфическая реализация которого обеспечивает дополнительные гарантии в обмен на меньшую функциональность.
Модуль queue определяет следующие классы и исключения:
-
class
queue.Queue(maxsize=0)¶ Конструктор для очереди FIFO. maxsize - целое число, задающее верхний предел количества элементов, которые могут быть помещены в очередь. При достижении этого размера вставка будет блокироваться до тех пор, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.
-
class
queue.LifoQueue(maxsize=0)¶ Конструктор для очереди LIFO. maxsize - целое число, задающее верхний предел количества элементов, которые могут быть помещены в очередь. При достижении этого размера вставка будет блокироваться до тех пор, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.
-
class
queue.PriorityQueue(maxsize=0)¶ Конструктор для приоритетной очереди. maxsize - целое число, задающее верхний предел количества элементов, которые могут быть помещены в очередь. При достижении этого размера вставка будет блокироваться до тех пор, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.
Сначала извлекаются записи с наименьшим значением (запись с наименьшим значением - это та, которую возвращает
sorted(list(entries))[0]). Типичным шаблоном для записей является кортеж в форме:(priority_number, data).Если элементы данных не сравниваются, данные можно обернуть в класс, который игнорирует элемент данных и сравнивает только номер приоритета:
from dataclasses import dataclass, field from typing import Any @dataclass(order=True) class PrioritizedItem: priority: int item: Any=field(compare=False)
-
class
queue.SimpleQueue¶ Конструктор для беспредельной очереди FIFO. Простым очередям не хватает расширенной функциональности, такой как отслеживание задач.
Добавлено в версии 3.7.
-
exception
queue.Empty¶ Исключение, возникающее, когда неблокирующий
get()(илиget_nowait()) вызывается на пустом объектеQueue.
-
exception
queue.Full¶ Исключение, возникающее при вызове неблокирующего
put()(илиput_nowait()) объектаQueue, который переполнен.
Объекты очереди¶
Объекты очереди (Queue, LifoQueue или PriorityQueue) предоставляют публичные методы, описанные ниже.
-
Queue.qsize()¶ Возвращает приблизительный размер очереди. Обратите внимание, qsize() > 0 не гарантирует, что последующая get() не заблокируется, также как qsize() < maxsize не гарантирует, что put() не заблокируется.
-
Queue.empty()¶ Возвращает
True, если очередь пуста,Falseв противном случае. Если empty() возвращаетTrue, это не гарантирует, что последующий вызов put() не заблокируется. Аналогично, если empty() возвращаетFalse, это не гарантирует, что последующий вызов get() не заблокируется.
-
Queue.full()¶ Возвращает
True, если очередь заполнена,Falseв противном случае. Если full() возвращаетTrue, это не гарантирует, что последующий вызов get() не заблокируется. Аналогично, если full() возвращаетFalse, это не гарантирует, что последующий вызов put() не заблокируется.
-
Queue.put(item, block=True, timeout=None)¶ Поместить элемент в очередь. Если опциональный args block равен true и timeout равен
None(по умолчанию), то при необходимости блокируется до тех пор, пока не появится свободный слот. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключениеFull, если в течение этого времени свободный слот не был доступен. Иначе (block равно false), помещает элемент в очередь, если свободный слот доступен немедленно, иначе вызывает исключениеFull(timeout в этом случае игнорируется).
-
Queue.put_nowait(item)¶ Эквивалентно
put(item, block=False).
-
Queue.get(block=True, timeout=None)¶ Удалить и вернуть элемент из очереди. Если опциональный args block равен true и timeout равен
None(по умолчанию), то при необходимости блокируется, пока элемент не будет доступен. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключениеEmpty, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключениеEmpty(timeout в этом случае игнорируется).До версии 3.0 на POSIX-системах и во всех версиях на Windows, если block равен true и timeout равен
None, эта операция переходит в режим непрерывного ожидания на базовой блокировке. Это означает, что не может произойти никаких исключений, в частности, SIGINT не вызоветKeyboardInterrupt.
-
Queue.get_nowait()¶ Эквивалентно
get(False).
Для отслеживания того, были ли поставленные в очередь задачи полностью обработаны потребительскими потоками демона, предлагаются два метода.
-
Queue.task_done()¶ Указывает на то, что ранее поставленная в очередь задача завершена. Используется потоками-потребителями очереди. Для каждого
get(), использованного для получения задачи, последующий вызовtask_done()сообщает очереди, что обработка задачи завершена.Если
join()в настоящее время блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызовtask_done()был получен для каждого элемента, который былput()в очереди).Вызывает ошибку
ValueError, если вызывается большее количество раз, чем было помещено элементов в очередь.
-
Queue.join()¶ Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.
Счетчик незавершенных задач увеличивается всякий раз, когда в очередь добавляется элемент. Счетчик уменьшается всякий раз, когда поток-потребитель вызывает
task_done(), чтобы указать, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля,join()разблокируется.
Пример ожидания завершения поставленных в очередь задач:
import threading
import queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()
# Send thirty task requests to the worker.
for item in range(30):
q.put(item)
# Block until all tasks are done.
q.join()
print('All work completed')
Объекты SimpleQueue¶
Объекты SimpleQueue предоставляют публичные методы, описанные ниже.
-
SimpleQueue.qsize()¶ Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующий get() не заблокируется.
-
SimpleQueue.empty()¶ Возвращает
True, если очередь пуста,Falseв противном случае. Если empty() возвращаетFalse, это не гарантирует, что последующий вызов get() не заблокируется.
-
SimpleQueue.put(item, block=True, timeout=None)¶ Поместить элемент в очередь. Метод никогда не блокируется и всегда завершается успешно (за исключением потенциальных низкоуровневых ошибок, таких как невозможность выделения памяти). Необязательные аргументы block и timeout игнорируются и предоставляются только для совместимости с
Queue.put().CPython implementation detail: This method has a C implementation which is reentrant. That is, a
put()orget()call can be interrupted by anotherput()call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as__del__methods orweakrefcallbacks.
-
SimpleQueue.put_nowait(item)¶ Эквивалент
put(item, block=False), предоставляется для совместимости сQueue.put_nowait().
-
SimpleQueue.get(block=True, timeout=None)¶ Удалить и вернуть элемент из очереди. Если опциональный args block равен true и timeout равен
None(по умолчанию), то при необходимости блокируется, пока элемент не будет доступен. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключениеEmpty, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключениеEmpty(timeout в этом случае игнорируется).
-
SimpleQueue.get_nowait()¶ Эквивалентно
get(False).
См.также
- Класс
multiprocessing.Queue Класс очереди для использования в многопроцессорном (а не многопоточном) контексте.
collections.deque - это альтернативная реализация неограниченных очередей с быстрыми атомарными операциями append() и popleft(), которые не требуют блокировки, а также поддерживают индексацию.