asyncutils.queues#
Non-inheriting extensions of asyncio.Queue with more methods and password protection, and a PotentQueueBase ABC.
Attributes#
Instance of |
|
Instance of |
|
Instance of |
|
Instance of |
Classes#
A base class for queues with much more methods, async- and sync-compatible. |
|
A base class for queues with much more methods, async- and sync-compatible. |
|
A priority queue, where the priority of each item is determined by comparing it to other items, meaning each item should at least implement |
|
A base class for queues with much more methods, async- and sync-compatible. |
|
Functions#
Module Contents#
- class asyncutils.queues.PotentQueueBase[T][source]#
Bases:
asyncio.queues.Queue[T],asyncutils.mixins.LoopBoundMixin,abc.ABCA base class for queues with much more methods, async- and sync-compatible.
- __aiter__() _collections_abc.AsyncGenerator[T]#
Equivalent to
drain_persistent().
- __iter__() _collections_abc.Generator[T]#
Equivalent to
drain_until_empty().
- abstractmethod _get() T[source]#
Get an item from the queue if not empty; called in
get()andget_nowait().
- abstractmethod _init(maxsize: int) None[source]#
Initialize the queue given
maxsize; called in__init__().
- abstractmethod _put(item: T) None[source]#
Put an item into the queue if not empty; called in
put()andput_nowait().
- drain_persistent(max_items: int | None = ..., timeout: float | None = ...) _collections_abc.AsyncGenerator[T][source]#
An async generator that gets items from the queue once available and yields them.
- drain_retlist(max_items: int | None = ...) list[T][source]#
Empty the queue and return a list of the items within.
- drain_until_empty(max_items: int | None = ...) _collections_abc.Generator[T][source]#
A synchronous generator that gets items from the queue until it is emptied and returns.
- enumerate(*, lifo: Literal[False] = ...) SmartQueue[tuple[int, T]][source]#
- enumerate(*, lifo: Literal[True]) SmartLifoQueue[tuple[int, T]]
Return a queue containing the items from enumerate applied on this queue and empty it in the process.
- enumerate_nowait() _collections_abc.Generator[tuple[int, T], None, None][source]#
queue.enumerate_nowait()is equivalent toawait iters.to_list(queue.drain_persistent()).
- async extend(it: asyncutils._internal.types.SupportsIteration[T], timeout: float | None = ...) None[source]#
Add the items from
itinto the queue withintimeout.
- filter(pred: _collections_abc.Callable[[T], bool] = ..., *, lifo: Literal[False] = ...) SmartQueue[T][source]#
- filter(pred: _collections_abc.Callable[[T], bool] = ..., *, lifo: Literal[True]) SmartLifoQueue[T]
Return a new queue from which getters can get the items in this queue that satisfy the predicate; items remaining in the original queue did not satisfy the predicate.
- filter_nowait(pred: _collections_abc.Callable[[T], bool] = ..., /) tuple[list[T], int][source]#
Filter items in the queue by a predicate and return a list of removed items and an integer; the items in the returned list after the index corresponding to that integer were items rejected from the queue due to the queue being full.
- map[R](f: _collections_abc.Callable[[T], _collections_abc.Awaitable[R]], stop_when: asyncio.futures.Future[None] | None = ..., *, lifo: Literal[False] = ...) SmartQueue[R][source]#
- map(f: _collections_abc.Callable[[T], _collections_abc.Awaitable[R]], stop_when: asyncio.futures.Future[None] | None = ..., *, lifo: Literal[True]) SmartLifoQueue[R]
Return a queue that contains items from this queue with the function applied on each of them, emptying this queue in the process (transformation analogous to
builtins.map).
- map_nowait[R](f: _collections_abc.Callable[[T], _collections_abc.Coroutine[Any, Any, R]], /) list[R][source]#
Return a list containing the return values of the function applied on the items in the queue, emptying the queue.
- poppush_nowait(item: T, raising: bool = ...) T[source]#
Pop an item from the queue and push into the other end immediately.
- push(item: T) bool[source]#
Put an item into the queue immediately, popping if necessary; returns success.
- pushpop_nowait(item: T, raising: bool = ...) T[source]#
Push an item into the queue and pop from the other end immediately.
- shutdown(immediate: bool = ...) None[source]#
Shut down the queue. If
immediateisTrue, pending gets raise immediately even if the queue is not empty.
- async smart_get(*, timeout: float | None = ..., default: T = ...) T[source]#
Call get_nowait if an item is immediately available, waiting for a item with
timeoutotherwise; if the timeout expires anddefaultis provided, return it.
- async smart_put(item: T, *, timeout: float | None = ..., raising: bool = ...) bool | None[source]#
Call put_nowait if a slot is immediately available, waiting for a slot with
timeoutotherwise; if the timeout expires andraisingis True, throwTimeoutError.
- starmap[R, *Ts](f: _collections_abc.Callable[[*Ts], _collections_abc.Awaitable[R]], stop_when: asyncio.futures.Future[None] | None = ..., *, lifo: Literal[False] = ...) SmartQueue[R][source]#
- starmap(f: _collections_abc.Callable[[*Ts], _collections_abc.Awaitable[R]], stop_when: asyncio.futures.Future[None] | None = ..., *, lifo: Literal[True]) SmartLifoQueue[R]
Return a queue that contains items from this queue with the function applied on each of them starred, emptying this queue in the process (transformation analogous to
itertools.starmap).
- starmap_nowait[R](f: _collections_abc.Callable[Ellipsis, _collections_abc.Coroutine[Any, Any, R]], /) list[R][source]#
Return a list containing the return values of the function applied on the items in the queue, starred,, emptying the queue.
- sync_get(*, timeout: float | None = ..., default: T = ...) T[source]#
Get an item from the queue synchronously. Above remark applies.
- sync_put(item: T, *, timeout: float | None = ...) bool | None[source]#
Put an item into the queue synchronously. If this functionality is needed, you should likely be using a synchronous queue.
- transaction() contextlib.AbstractContextManager[Self, None][source]#
- Return an async context manager which begins a transaction on entry.If an error occurs within the context, the original items in the queue are restored and the error reraised, unless the error is critical anddeemed to require immediate exit; otherwise, the transaction completes successfully and changes are committed on exit.
- property can_get_now: bool#
Whether items can be get from the queue without blocking at this instant.
- property can_put_now: bool#
Whether items can be put into the queue without blocking at this instant.
- class asyncutils.queues.SmartLifoQueue[T][source]#
Bases:
PotentQueueBase[T]A base class for queues with much more methods, async- and sync-compatible.
- _put(item: T) None[source]#
Put an item into the queue if not empty; called in
put()andput_nowait().
- class asyncutils.queues.SmartPriorityQueue[T](maxsize: int = ..., *, init_items: asyncutils._internal.types.SupportsIteration[T])[source]#
- class asyncutils.queues.SmartPriorityQueue(maxsize: int = ...)
Bases:
PotentQueueBase[T]A priority queue, where the priority of each item is determined by comparing it to other items, meaning each item should at least implement
__lt__().- _put(item: T) None[source]#
Put an item into the queue if not empty; called in
put()andput_nowait().
- peek() T[source]#
Look at the item that would be returned by
get()orget_nowait()without actually getting it.
- async start(maxsize: int, init_items: asyncutils._internal.types.SupportsIteration[T]) None[source]#
- class asyncutils.queues.SmartQueue[T][source]#
Bases:
PotentQueueBase[T]A base class for queues with much more methods, async- and sync-compatible.
- _put(item: T) None[source]#
Put an item into the queue if not empty; called in
put()andput_nowait().
- class asyncutils.queues.UserPriorityQueue[T](maxsize: int = ..., *, init_priority: int = ..., init_items: asyncutils._internal.types.SupportsIteration[T])[source]#
- class asyncutils.queues.UserPriorityQueue(maxsize: int = ..., *, init_priority: int = ...)
Bases:
SmartPriorityQueue[tuple[int,int,T]]A priority queue, where you put in items with an integer priority and the items are retrieved in ascending order of priority, with earlieritems taking precedence in case of ties.Theput()andput_nowait()methods of this class take an additionalpriorityparameter, representing the priority of the item.
- asyncutils.queues.password_queue[T, R](password_put: R, *, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., puttyp: type[R] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.P[R, T][source]#
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.P[Any, T]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., puttyp: type[R], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.P[R, T]
- asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., gettyp: type[R] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.G[R, T]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., gettyp: type[R], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.G[R, T]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.G[Any, T]
- asyncutils.queues.password_queue(password_put: V, password_get: R, maxsize: int = ..., *, protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., gettyp: type[R] = ..., puttyp: type[V] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, V, T]
- asyncutils.queues.password_queue(password_put: V, *, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., gettyp: type[R], puttyp: type[V] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, V, T]
- asyncutils.queues.password_queue(password_put: V, *, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., puttyp: type[V] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[Any, V, T]
- asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., gettyp: type[R] = ..., puttyp: type[V], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, V, T]
- asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., gettyp: type[R] = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, Any, T]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., gettyp: type[R], puttyp: type[V], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, V, T]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., gettyp: type[R], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[R, Any, T]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., puttyp: type[V], init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[Any, V, T]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., init_items: asyncutils._internal.types.SupportsIteration[T], strict: bool = ...) asyncutils._internal.types.B[Any, Any, T]
- asyncutils.queues.password_queue(password_put: R, *, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., puttyp: type[R] = ..., strict: bool = ...) asyncutils._internal.types.P[R, Any]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., puttyp: type[R], strict: bool = ...) asyncutils._internal.types.P[R, Any]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[False] = ..., protect_put: Literal[True] = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., strict: bool = ...) asyncutils._internal.types.P[Any, Any]
- asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., gettyp: type[R] = ..., strict: bool = ...) asyncutils._internal.types.G[R, Any]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., gettyp: type[R], strict: bool = ...) asyncutils._internal.types.G[R, Any]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[False], can_change_get: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., strict: bool = ...) asyncutils._internal.types.G[Any, Any]
- asyncutils.queues.password_queue(password_put: V, password_get: R, maxsize: int = ..., *, protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., gettyp: type[R] = ..., puttyp: type[V] = ..., strict: bool = ...) asyncutils._internal.types.B[R, V, Any]
- asyncutils.queues.password_queue(password_put: V, *, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., gettyp: type[R], puttyp: type[V] = ..., strict: bool = ...) asyncutils._internal.types.B[R, V, Any]
- asyncutils.queues.password_queue(password_put: V, *, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., puttyp: type[V] = ..., strict: bool = ...) asyncutils._internal.types.B[Any, V, Any]
- asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., gettyp: type[R] = ..., puttyp: type[V], strict: bool = ...) asyncutils._internal.types.B[R, V, Any]
- asyncutils.queues.password_queue(*, password_get: R, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., put_from: str = ..., gettyp: type[R] = ..., strict: bool = ...) asyncutils._internal.types.B[R, Any, Any]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., gettyp: type[R], puttyp: type[V], strict: bool = ...) asyncutils._internal.types.B[R, V, Any]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., gettyp: type[R], strict: bool = ...) asyncutils._internal.types.B[R, Any, Any]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., puttyp: type[V], strict: bool = ...) asyncutils._internal.types.B[Any, V, Any]
- asyncutils.queues.password_queue(*, maxsize: int = ..., protect_get: Literal[True], protect_put: Literal[True] = ..., can_change_get: bool = ..., can_change_put: bool = ..., priority: bool = ..., lifo: bool = ..., get_from: str = ..., put_from: str = ..., strict: bool = ...) asyncutils._internal.types.B[Any, Any, Any]
- Return a password-protected queue, the type of which does not inherit from
asyncio.Queuebut has the same interface, withmaximum sizemaxsize.priorityandlifoparameters determine if the queue is a priority queue and last-in-first-out.Ifprotect_getisTrue, get and get_nowait will require a password, specified bypassword_getor retrieved from a variable in thecaller’s scope with nameget_from(default :const`context.PASSWORD_QUEUE_DEFAULT_GET_FROM`).Ifprotect_putisTrue, put and put_nowait will require a password, specified bypassword_putor retrieved from a variable in thecaller’s scope with nameput_from(default :const`context.PASSWORD_QUEUE_DEFAULT_PUT_FROM`).Ifinit_itemsis specified, the items in that (async) iterable will be arranged to enter the queue.The excessive amount of overloads here cannot be helped due to accurate typing needs. When we drop support for Python 3.12, we will usedefault values in the type parameters here to cut this number in half.Danger
This function is not for cryptographic purposes, because no hashing of the password is performed! Attackers may obtain sensitive information, namely the passwords used by the queue, from the memory address of the returned object alone, or worse still, access and modify the internal stack/queue storing the items directly.
- asyncutils.queues.ignore_qempty: Final[asyncutils.exceptions.IgnoreErrors]#
Instance of
exceptions.IgnoreErrorsthat suppressesQueueShutDownandQueueEmpty.
- asyncutils.queues.ignore_qerrs: Final[asyncutils.exceptions.IgnoreErrors]#
Instance of
exceptions.IgnoreErrorsthat suppresses all asyncio queue-related errors.
- asyncutils.queues.ignore_qfull: Final[asyncutils.exceptions.IgnoreErrors]#
Instance of
exceptions.IgnoreErrorsthat suppressesQueueShutDownandQueueFull.
- asyncutils.queues.ignore_qshutdown: Final[asyncutils.exceptions.IgnoreErrors]#
Instance of
exceptions.IgnoreErrorsthat suppressesQueueShutDown.