asyncutils.queues

Contents

asyncutils.queues#

Non-inheriting extensions of asyncio.Queue with more methods and password protection, and a PotentQueueBase ABC.

Attributes#

ignore_qempty

Instance of exceptions.IgnoreErrors that suppresses QueueShutDown and QueueEmpty.

ignore_qerrs

Instance of exceptions.IgnoreErrors that suppresses all asyncio queue-related errors.

ignore_qfull

Instance of exceptions.IgnoreErrors that suppresses QueueShutDown and QueueFull.

ignore_qshutdown

Instance of exceptions.IgnoreErrors that suppresses QueueShutDown.

Classes#

PotentQueueBase

A base class for queues with much more methods, async- and sync-compatible.

SmartLifoQueue

A base class for queues with much more methods, async- and sync-compatible.

SmartPriorityQueue

A priority queue, where the priority of each item is determined by comparing it to other items, meaning each item should at least implement __lt__().

SmartQueue

A base class for queues with much more methods, async- and sync-compatible.

UserPriorityQueue

Functions#

Module Contents#

class asyncutils.queues.PotentQueueBase[T][source]#

Bases: asyncio.queues.Queue[T], asyncutils.mixins.LoopBoundMixin, abc.ABC

A base class for queues with much more methods, async- and sync-compatible.

__aiter__() _collections_abc.AsyncGenerator[T]#

Equivalent to drain_persistent().

__bool__() bool[source]#

Whether there are items in the queue.

__iter__() _collections_abc.Generator[T]#

Equivalent to drain_until_empty().

abstractmethod _get() T[source]#

Get an item from the queue if not empty; called in get() and get_nowait().

abstractmethod _init(maxsize: int) None[source]#

Initialize the queue given maxsize; called in __init__().

abstractmethod _put(item: T) None[source]#

Put an item into the queue if not empty; called in put() and put_nowait().

clear() None[source]#

Clear all the entries from the queue.

drain_persistent(max_items: int | None = ..., timeout: float | None = ...) _collections_abc.AsyncGenerator[T][source]#

An async generator that gets items from the queue once available and yields them.

drain_retlist(max_items: int | None = ...) list[T][source]#

Empty the queue and return a list of the items within.

drain_until_empty(max_items: int | None = ...) _collections_abc.Generator[T][source]#

A synchronous generator that gets items from the queue until it is emptied and returns.

empty() bool[source]#

Whether the queue is empty.

enumerate(*, lifo: Literal[False] = ...) SmartQueue[tuple[int, T]][source]#
enumerate(*, lifo: Literal[True]) SmartLifoQueue[tuple[int, T]]

Return a queue containing the items from enumerate applied on this queue and empty it in the process.

enumerate_nowait() _collections_abc.Generator[tuple[int, T], None, None][source]#

queue.enumerate_nowait() is equivalent to await iters.to_list(queue.drain_persistent()).

async extend(it: asyncutils._internal.types.SupportsIteration[T], timeout: float | None = ...) None[source]#

Add the items from it into the queue within timeout.

filter(pred: _collections_abc.Callable[[T], bool] = ..., *, lifo: Literal[False] = ...) SmartQueue[T][source]#
filter(pred: _collections_abc.Callable[[T], bool] = ..., *, lifo: Literal[True]) SmartLifoQueue[T]

Return a new queue from which getters can get the items in this queue that satisfy the predicate; items remaining in the original queue did not satisfy the predicate.

filter_nowait(pred: _collections_abc.Callable[[T], bool] = ..., /) tuple[list[T], int][source]#

Filter items in the queue by a predicate and return a list of removed items and an integer; the items in the returned list after the index corresponding to that integer were items rejected from the queue due to the queue being full.

map[R](f: _collections_abc.Callable[[T], _collections_abc.Awaitable[R]], stop_when: asyncio.futures.Future[None] | None = ..., *, lifo: Literal[False] = ...) SmartQueue[R][source]#
map(f: _collections_abc.Callable[[T], _collections_abc.Awaitable[R]], stop_when: asyncio.futures.Future[None] | None = ..., *, lifo: Literal[True]) SmartLifoQueue[R]

Return a queue that contains items from this queue with the function applied on each of them, emptying this queue in the process (transformation analogous to builtins.map).

map_nowait[R](f: _collections_abc.Callable[[T], _collections_abc.Coroutine[Any, Any, R]], /) list[R][source]#

Return a list containing the return values of the function applied on the items in the queue, emptying the queue.

abstractmethod peek_all() list[T][source]#

Return a list of all the items in the queue.

async poppush(item: T) T[source]#

Similar.

poppush_nowait(item: T, raising: bool = ...) T[source]#

Pop an item from the queue and push into the other end immediately.

push(item: T) bool[source]#

Put an item into the queue immediately, popping if necessary; returns success.

async pushpop(item: T) T[source]#

The above, but done asynchronously and not immediately.

pushpop_nowait(item: T, raising: bool = ...) T[source]#

Push an item into the queue and pop from the other end immediately.

abstractmethod qsize() int[source]#

Return the size of the queue as an integer.

reset() None[source]#

Revert the queue to an empty state.

shutdown(immediate: bool = ...) None[source]#

Shut down the queue. If immediate is True, pending gets raise immediately even if the queue is not empty.

async smart_get(*, timeout: float | None = ..., default: T = ...) T[source]#

Call get_nowait if an item is immediately available, waiting for a item with timeout otherwise; if the timeout expires and default is provided, return it.

async smart_put(item: T, *, timeout: float | None = ..., raising: bool = ...) bool | None[source]#

Call put_nowait if a slot is immediately available, waiting for a slot with timeout otherwise; if the timeout expires and raising is True, throw TimeoutError.

starmap[R, *Ts](f: _collections_abc.Callable[[*Ts], _collections_abc.Awaitable[R]], stop_when: asyncio.futures.Future[None] | None = ..., *, lifo: Literal[False] = ...) SmartQueue[R][source]#
starmap(f: _collections_abc.Callable[[*Ts], _collections_abc.Awaitable[R]], stop_when: asyncio.futures.Future[None] | None = ..., *, lifo: Literal[True]) SmartLifoQueue[R]

Return a queue that contains items from this queue with the function applied on each of them starred, emptying this queue in the process (transformation analogous to itertools.starmap).

starmap_nowait[R](f: _collections_abc.Callable[Ellipsis, _collections_abc.Coroutine[Any, Any, R]], /) list[R][source]#

Return a list containing the return values of the function applied on the items in the queue, starred,, emptying the queue.

sync_get(*, timeout: float | None = ..., default: T = ...) T[source]#

Get an item from the queue synchronously. Above remark applies.

sync_put(item: T, *, timeout: float | None = ...) bool | None[source]#

Put an item into the queue synchronously. If this functionality is needed, you should likely be using a synchronous queue.

transaction() contextlib.AbstractContextManager[Self, None][source]#
Return an async context manager which begins a transaction on entry.
If an error occurs within the context, the original items in the queue are restored and the error reraised, unless the error is critical and
deemed to require immediate exit; otherwise, the transaction completes successfully and changes are committed on exit.
property can_get_now: bool#

Whether items can be get from the queue without blocking at this instant.

property can_put_now: bool#

Whether items can be put into the queue without blocking at this instant.

property capacity: int | float#

The capacity of the queue. Can be float('inf').

property fully_functional: bool#

queue.fully_functional == queue.can_put_now and queue.can_get_now.

property is_shutdown: bool#

Whether the queue is shutting down or has been shutdown.

property remaining_capacity: int | float#

The remaining number of slots in the queue. Can be float('inf').

property utilization_rate: float#

The number of items the queue divided by its capacity.

class asyncutils.queues.SmartLifoQueue[T][source]#

Bases: PotentQueueBase[T]

A base class for queues with much more methods, async- and sync-compatible.

_get() T[source]#

Get an item from the queue if not empty; called in get() and get_nowait().

_init(maxsize: int) None[source]#

Initialize the queue given maxsize; called in __init__().

_put(item: T) None[source]#

Put an item into the queue if not empty; called in put() and put_nowait().

peek(i: int = ..., /) T[source]#

Look at the item at index i, defaulting to the item most recently put in (that would be returned by get() or get_nowait()).

peek_all() list[T][source]#

Return a list of all the items in the queue.

qsize() int[source]#

Return the size of the queue as an integer.

class asyncutils.queues.SmartPriorityQueue[T](maxsize: int = ..., *, init_items: asyncutils._internal.types.SupportsIteration[T])[source]#
class asyncutils.queues.SmartPriorityQueue(maxsize: int = ...)

Bases: PotentQueueBase[T]

A priority queue, where the priority of each item is determined by comparing it to other items, meaning each item should at least implement __lt__().

_get() T[source]#

Get an item from the queue if not empty; called in get() and get_nowait().

_init(maxsize: int) None[source]#

Initialize the queue given maxsize; called in __init__().

_put(item: T) None[source]#

Put an item into the queue if not empty; called in put() and put_nowait().

peek() T[source]#

Look at the item that would be returned by get() or get_nowait() without actually getting it.

peek_all() list[T][source]#

Return a list of all the items in the queue.

qsize() int[source]#

Return the size of the queue as an integer.

async start(maxsize: int, init_items: asyncutils._internal.types.SupportsIteration[T]) None[source]#
class asyncutils.queues.SmartQueue[T][source]#

Bases: PotentQueueBase[T]

A base class for queues with much more methods, async- and sync-compatible.

_get() T[source]#

Get an item from the queue if not empty; called in get() and get_nowait().

_init(maxsize: int) None[source]#

Initialize the queue given maxsize; called in __init__().

_put(item: T) None[source]#

Put an item into the queue if not empty; called in put() and put_nowait().

peek() T[source]#

Look at the item that would be returned by get() or get_nowait() without actually getting it.

peek_all() list[T][source]#

Return a list of all the items in the queue.

qsize() int[source]#

Return the size of the queue as an integer.

rotate(n: int = ..., /) None[source]#

Rotate the items in the queue by n indices synchronously, which can be negative.

class asyncutils.queues.UserPriorityQueue[T](maxsize: int = ..., *, init_priority: int = ..., init_items: asyncutils._internal.types.SupportsIteration[T])[source]#
class asyncutils.queues.UserPriorityQueue(maxsize: int = ..., *, init_priority: int = ...)

Bases: SmartPriorityQueue[tuple[int, int, T]]

A priority queue, where you put in items with an integer priority and the items are retrieved in ascending order of priority, with earlier
items taking precedence in case of ties.
The put() and put_nowait() methods of this class take an additional priority parameter, representing the priority of the item.
classmethod from_iter_of_tuples(items: asyncutils._internal.types.SupportsIteration[tuple[int, int, T]], maxsize: int = ...) Self[source]#

Build a queue from the (async) iterable of tuples (priority, tiebreak, item).

async get() T[source]#
get_nowait() T[source]#
async put(item: T, priority: int = ...) None[source]#
put_nowait(item: T, priority: int = ...) None[source]#
asyncutils.queues.password_queue[T, R](password_put: R, *, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., puttyp: type[R] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.P[R, T][source]#
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.P[Any, T]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., puttyp: type[R], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.P[R, T]
asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., gettyp: type[R] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.G[R, T]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., gettyp: type[R], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.G[R, T]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.G[Any, T]
asyncutils.queues.password_queue(password_put: V, password_get: R, maxsize: int = ..., *, protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., gettyp: type[R] = ..., puttyp: type[V] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, V, T]
asyncutils.queues.password_queue(password_put: V, *, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., gettyp: type[R], puttyp: type[V] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, V, T]
asyncutils.queues.password_queue(password_put: V, *, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., puttyp: type[V] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[Any, V, T]
asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., gettyp: type[R] = ..., puttyp: type[V], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, V, T]
asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., gettyp: type[R] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, Any, T]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., gettyp: type[R], puttyp: type[V], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, V, T]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., gettyp: type[R], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, Any, T]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., puttyp: type[V], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[Any, V, T]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[Any, Any, T]
asyncutils.queues.password_queue(password_put: R, *, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., puttyp: type[R] = ..., strict: bool = ...) asyncutils._internal.types.P[R, Any]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., puttyp: type[R], strict: bool = ...) asyncutils._internal.types.P[R, Any]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., strict: bool = ...) asyncutils._internal.types.P[Any, Any]
asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., gettyp: type[R] = ..., strict: bool = ...) asyncutils._internal.types.G[R, Any]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., gettyp: type[R], strict: bool = ...) asyncutils._internal.types.G[R, Any]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., strict: bool = ...) asyncutils._internal.types.G[Any, Any]
asyncutils.queues.password_queue(password_put: V, password_get: R, maxsize: int = ..., *, protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., gettyp: type[R] = ..., puttyp: type[V] = ..., strict: bool = ...) asyncutils._internal.types.B[R, V, Any]
asyncutils.queues.password_queue(password_put: V, *, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., gettyp: type[R], puttyp: type[V] = ..., strict: bool = ...) asyncutils._internal.types.B[R, V, Any]
asyncutils.queues.password_queue(password_put: V, *, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., puttyp: type[V] = ..., strict: bool = ...) asyncutils._internal.types.B[Any, V, Any]
asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., gettyp: type[R] = ..., puttyp: type[V], strict: bool = ...) asyncutils._internal.types.B[R, V, Any]
asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., gettyp: type[R] = ..., strict: bool = ...) asyncutils._internal.types.B[R, Any, Any]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., gettyp: type[R], puttyp: type[V], strict: bool = ...) asyncutils._internal.types.B[R, V, Any]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., gettyp: type[R], strict: bool = ...) asyncutils._internal.types.B[R, Any, Any]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., puttyp: type[V], strict: bool = ...) asyncutils._internal.types.B[Any, V, Any]
asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., strict: bool = ...) asyncutils._internal.types.B[Any, Any, Any]
Return a password-protected queue, the type of which does not inherit from asyncio.Queue but has the same interface, with
maximum size maxsize. priority and lifo parameters determine if the queue is a priority queue and last-in-first-out.
If protect_get is True, get and get_nowait will require a password, specified by password_get or retrieved from a variable in the
caller’s scope with name get_from (default :const`context.PASSWORD_QUEUE_DEFAULT_GET_FROM`).
If protect_put is True, put and put_nowait will require a password, specified by password_put or retrieved from a variable in the
caller’s scope with name put_from (default :const`context.PASSWORD_QUEUE_DEFAULT_PUT_FROM`).
If init_items is specified, the items in that (async) iterable will be arranged to enter the queue.
The excessive amount of overloads here cannot be helped due to accurate typing needs. When we drop support for Python 3.12, we will use
default values in the type parameters here to cut this number in half.

Danger

This function is not for cryptographic purposes, because no hashing of the password is performed! Attackers may obtain sensitive information, namely the passwords used by the queue, from the memory address of the returned object alone, or worse still, access and modify the internal stack/queue storing the items directly.

asyncutils.queues.ignore_qempty: Final[asyncutils.exceptions.IgnoreErrors]#

Instance of exceptions.IgnoreErrors that suppresses QueueShutDown and QueueEmpty.

asyncutils.queues.ignore_qerrs: Final[asyncutils.exceptions.IgnoreErrors]#

Instance of exceptions.IgnoreErrors that suppresses all asyncio queue-related errors.

asyncutils.queues.ignore_qfull: Final[asyncutils.exceptions.IgnoreErrors]#

Instance of exceptions.IgnoreErrors that suppresses QueueShutDown and QueueFull.

asyncutils.queues.ignore_qshutdown: Final[asyncutils.exceptions.IgnoreErrors]#

Instance of exceptions.IgnoreErrors that suppresses QueueShutDown.