Source code for cocotb.queue

# Copyright cocotb contributors
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause
import asyncio.queues
import collections
import heapq
from typing import Generic, TypeVar

import cocotb
from cocotb.triggers import Event, _pointer_str


[docs] class QueueFull(asyncio.queues.QueueFull): """Raised when the Queue.put_nowait() method is called on a full Queue."""
[docs] class QueueEmpty(asyncio.queues.QueueEmpty): """Raised when the Queue.get_nowait() method is called on a empty Queue."""
T = TypeVar("T")
[docs] class Queue(Generic[T]): """A queue, useful for coordinating producer and consumer coroutines. If *maxsize* is less than or equal to 0, the queue size is infinite. If it is an integer greater than 0, then :meth:`put` will block when the queue reaches *maxsize*, until an item is removed by :meth:`get`. """ def __init__(self, maxsize: int = 0): self._maxsize = maxsize self._getters = collections.deque() self._putters = collections.deque() self._init(maxsize) def _init(self, maxsize): self._queue = collections.deque() def _put(self, item): self._queue.append(item) def _get(self): return self._queue.popleft() def _wakeup_next(self, waiters): while waiters: event, task = waiters.popleft() if not task.done(): event.set() break def __repr__(self): return f"<{type(self).__name__} {self._format()} at {_pointer_str(self)}>" def __str__(self): return f"<{type(self).__name__} {self._format()}>" def __class_getitem__(cls, type): return cls def _format(self): result = f"maxsize={repr(self._maxsize)}" if getattr(self, "_queue", None): result += f" _queue={repr(list(self._queue))}" if self._getters: result += f" _getters[{len(self._getters)}]" if self._putters: result += f" _putters[{len(self._putters)}]" return result
[docs] def qsize(self) -> int: """Number of items in the queue.""" return len(self._queue)
@property def maxsize(self) -> int: """Number of items allowed in the queue.""" return self._maxsize
[docs] def empty(self) -> bool: """Return ``True`` if the queue is empty, ``False`` otherwise.""" return not self._queue
[docs] def full(self) -> bool: """Return ``True`` if there are :meth:`maxsize` items in the queue. .. note:: If the Queue was initialized with ``maxsize=0`` (the default), then :meth:`full` is never ``True``. """ if self._maxsize <= 0: return False else: return self.qsize() >= self._maxsize
[docs] async def put(self, item: T) -> None: """Put an *item* into the queue. If the queue is full, wait until a free slot is available before adding the item. """ while self.full(): event = Event(f"{type(self).__name__} put") self._putters.append((event, cocotb._scheduler._current_task)) await event.wait() self.put_nowait(item)
[docs] def put_nowait(self, item: T) -> None: """Put an *item* into the queue without blocking. If no free slot is immediately available, raise :exc:`asyncio.QueueFull`. """ if self.full(): raise QueueFull() self._put(item) self._wakeup_next(self._getters)
[docs] async def get(self) -> T: """Remove and return an item from the queue. If the queue is empty, wait until an item is available. """ while self.empty(): event = Event(f"{type(self).__name__} get") self._getters.append((event, cocotb._scheduler._current_task)) await event.wait() return self.get_nowait()
[docs] def get_nowait(self) -> T: """Remove and return an item from the queue. Return an item if one is immediately available, else raise :exc:`asyncio.QueueEmpty`. """ if self.empty(): raise QueueEmpty() item = self._get() self._wakeup_next(self._putters) return item
[docs] class PriorityQueue(Queue): r"""A subclass of :class:`Queue`; retrieves entries in priority order (smallest item first). Entries are typically tuples of the form ``(priority number, data)``. """ def _init(self, maxsize): self._queue = [] def _put(self, item): heapq.heappush(self._queue, item) def _get(self): return heapq.heappop(self._queue)
[docs] class LifoQueue(Queue): """A subclass of :class:`Queue`; retrieves most recently added entries first.""" def _init(self, maxsize): self._queue = collections.deque() def _put(self, item): self._queue.append(item) def _get(self): return self._queue.pop()