Source code for cocotb_bus.monitors

# Copyright cocotb contributors
# Copyright (c) 2013 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause

"""Class defining the standard interface for a monitor within a testbench.

The monitor is responsible for watching the pins of the DUT and recreating
the transactions.
"""

from collections import deque

import cocotb
from cocotb.decorators import coroutine
from cocotb.log import SimLog
from cocotb.triggers import Event, Timer, First

from cocotb_bus.bus import Bus


class MonitorStatistics:
    """Wrapper class for storing Monitor statistics"""

    def __init__(self):
        self.received_transactions = 0


[docs]class Monitor: """Base class for Monitor objects. Monitors are passive 'listening' objects that monitor pins going in or out of a DUT. This class should not be used directly, but should be sub-classed and the internal :meth:`_monitor_recv` method should be overridden. This :meth:`_monitor_recv` method should capture some behavior of the pins, form a transaction, and pass this transaction to the internal :meth:`_recv` method. The :meth:`_monitor_recv` method is added to the cocotb scheduler during the ``__init__`` phase, so it should not be awaited anywhere. The primary use of a Monitor is as an interface for a :class:`~cocotb.scoreboard.Scoreboard`. Args: callback (callable): Callback to be called with each recovered transaction as the argument. If the callback isn't used, received transactions will be placed on a queue and the event used to notify any consumers. event (cocotb.triggers.Event): Event that will be called when a transaction is received through the internal :meth:`_recv` method. `Event.data` is set to the received transaction. """ def __init__(self, callback=None, event=None): self._event = event self._wait_event = Event() self._recvQ = deque() self._callbacks = [] self.stats = MonitorStatistics() # Sub-classes may already set up logging if not hasattr(self, "log"): self.log = SimLog("cocotb.monitor.%s" % (type(self).__qualname__)) if callback is not None: self.add_callback(callback) # Create an independent coroutine which can receive stuff self._thread = cocotb.scheduler.add(self._monitor_recv())
[docs] def kill(self): """Kill the monitor coroutine.""" if self._thread: self._thread.kill() self._thread = None
def __len__(self): return len(self._recvQ) def __getitem__(self, idx): return self._recvQ[idx]
[docs] def add_callback(self, callback): """Add function as a callback. Args: callback (callable): The function to call back. """ self.log.debug("Adding callback of function %s to monitor", callback.__qualname__) self._callbacks.append(callback)
[docs] @coroutine async def wait_for_recv(self, timeout=None): """With *timeout*, :meth:`.wait` for transaction to arrive on monitor and return its data. Args: timeout: The timeout value for :class:`~.triggers.Timer`. Defaults to ``None``. Returns: Data of received transaction. """ if timeout: t = Timer(timeout) fired = await First(self._wait_event.wait(), t) if fired is t: return None else: await self._wait_event.wait() return self._wait_event.data
# this is not `async` so that we fail in `__init__` on subclasses without it
[docs] def _monitor_recv(self): """Actual implementation of the receiver. Sub-classes should override this method to implement the actual receive routine and call :meth:`_recv` with the recovered transaction. """ raise NotImplementedError("Attempt to use base monitor class without " "providing a ``_monitor_recv`` method")
[docs] def _recv(self, transaction): """Common handling of a received transaction.""" self.stats.received_transactions += 1 # either callback based consumer for callback in self._callbacks: callback(transaction) # Or queued with a notification if not self._callbacks: self._recvQ.append(transaction) if self._event is not None: self._event.set(data=transaction) # If anyone was waiting then let them know if self._wait_event is not None: self._wait_event.set(data=transaction) self._wait_event.clear()
[docs]class BusMonitor(Monitor): """Wrapper providing common functionality for monitoring buses.""" _signals = [] _optional_signals = [] def __init__(self, entity, name, clock, reset=None, reset_n=None, callback=None, event=None, bus_separator="_", array_idx=None): self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.entity = entity self.name = name self.clock = clock self.bus = Bus(self.entity, self.name, self._signals, optional_signals=self._optional_signals, bus_separator=bus_separator, array_idx=array_idx) self._reset = reset self._reset_n = reset_n Monitor.__init__(self, callback=callback, event=event) @property def in_reset(self): """Boolean flag showing whether the bus is in reset state or not.""" if self._reset_n is not None: return not bool(self._reset_n.value.integer) if self._reset is not None: return bool(self._reset.value.integer) return False def __str__(self): return "%s(%s)" % (type(self).__qualname__, self.name)