# Copyright cocotb contributors
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause
import asyncio.queues
import collections
import heapq
from typing import Generic, TypeVar
import cocotb
from cocotb.triggers import Event, _pointer_str
[docs]
class QueueFull(asyncio.queues.QueueFull):
"""Raised when the Queue.put_nowait() method is called on a full Queue."""
[docs]
class QueueEmpty(asyncio.queues.QueueEmpty):
"""Raised when the Queue.get_nowait() method is called on a empty Queue."""
T = TypeVar("T")
[docs]
class Queue(Generic[T]):
"""A queue, useful for coordinating producer and consumer coroutines.
If *maxsize* is less than or equal to 0, the queue size is infinite. If it
is an integer greater than 0, then :meth:`put` will block when the queue
reaches *maxsize*, until an item is removed by :meth:`get`.
"""
def __init__(self, maxsize: int = 0):
self._maxsize = maxsize
self._getters = collections.deque()
self._putters = collections.deque()
self._init(maxsize)
def _init(self, maxsize):
self._queue = collections.deque()
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.popleft()
def _wakeup_next(self, waiters):
while waiters:
event, task = waiters.popleft()
if not task.done():
event.set()
break
def __repr__(self):
return f"<{type(self).__name__} {self._format()} at {_pointer_str(self)}>"
def __str__(self):
return f"<{type(self).__name__} {self._format()}>"
def __class_getitem__(cls, type):
return cls
def _format(self):
result = f"maxsize={repr(self._maxsize)}"
if getattr(self, "_queue", None):
result += f" _queue={repr(list(self._queue))}"
if self._getters:
result += f" _getters[{len(self._getters)}]"
if self._putters:
result += f" _putters[{len(self._putters)}]"
return result
[docs]
def qsize(self) -> int:
"""Number of items in the queue."""
return len(self._queue)
@property
def maxsize(self) -> int:
"""Number of items allowed in the queue."""
return self._maxsize
[docs]
def empty(self) -> bool:
"""Return ``True`` if the queue is empty, ``False`` otherwise."""
return not self._queue
[docs]
def full(self) -> bool:
"""Return ``True`` if there are :meth:`maxsize` items in the queue.
.. note::
If the Queue was initialized with ``maxsize=0`` (the default), then
:meth:`full` is never ``True``.
"""
if self._maxsize <= 0:
return False
else:
return self.qsize() >= self._maxsize
[docs]
async def put(self, item: T) -> None:
"""Put an *item* into the queue.
If the queue is full, wait until a free
slot is available before adding the item.
"""
while self.full():
event = Event(f"{type(self).__name__} put")
self._putters.append((event, cocotb._scheduler._current_task))
await event.wait()
self.put_nowait(item)
[docs]
def put_nowait(self, item: T) -> None:
"""Put an *item* into the queue without blocking.
If no free slot is immediately available, raise :exc:`asyncio.QueueFull`.
"""
if self.full():
raise QueueFull()
self._put(item)
self._wakeup_next(self._getters)
[docs]
async def get(self) -> T:
"""Remove and return an item from the queue.
If the queue is empty, wait until an item is available.
"""
while self.empty():
event = Event(f"{type(self).__name__} get")
self._getters.append((event, cocotb._scheduler._current_task))
await event.wait()
return self.get_nowait()
[docs]
def get_nowait(self) -> T:
"""Remove and return an item from the queue.
Return an item if one is immediately available, else raise
:exc:`asyncio.QueueEmpty`.
"""
if self.empty():
raise QueueEmpty()
item = self._get()
self._wakeup_next(self._putters)
return item
[docs]
class PriorityQueue(Queue):
r"""A subclass of :class:`Queue`; retrieves entries in priority order (smallest item first).
Entries are typically tuples of the form ``(priority number, data)``.
"""
def _init(self, maxsize):
self._queue = []
def _put(self, item):
heapq.heappush(self._queue, item)
def _get(self):
return heapq.heappop(self._queue)
[docs]
class LifoQueue(Queue):
"""A subclass of :class:`Queue`; retrieves most recently added entries first."""
def _init(self, maxsize):
self._queue = collections.deque()
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.pop()