Source code for cocotb.triggers

# Copyright (c) 2013 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of Potential Ventures Ltd,
#       SolarFlare Communications Inc nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL POTENTIAL VENTURES LTD BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""A collections of triggers which a testbench can await."""

import abc
import warnings
from collections.abc import Awaitable

from cocotb import simulator
from cocotb.log import SimLog
from cocotb.utils import (
    get_sim_steps, get_time_from_sim_steps, ParametrizedSingleton,
    lazy_property, remove_traceback_frames,
)
from cocotb import outcomes
import cocotb


def _pointer_str(obj):
    """
    Get the memory address of *obj* as used in :meth:`object.__repr__`.

    This is equivalent to ``sprintf("%p", id(obj))``, but python does not
    support ``%p``.
    """
    full_repr = object.__repr__(obj)  # gives "<{type} object at {address}>"
    return full_repr.rsplit(' ', 1)[1][:-1]


class TriggerException(Exception):
    pass


[docs]class Trigger(Awaitable): """Base class to derive from.""" # __dict__ is needed here for the `.log` lazy_property below to work. # The implementation of `_PyObject_GenericGetAttrWithDict` suggests that # despite its inclusion, __slots__ will overall give speed and memory # improvements: # - the `__dict__` is not actually constructed until it's needed, and that # only happens if the `.log` attribute is used, where performance # concerns no longer matter. # - Attribute setting and getting will still go through the slot machinery # first, as "data descriptors" take priority over dict access __slots__ = ('primed', '__weakref__', '__dict__') def __init__(self): self.primed = False @lazy_property def log(self): return SimLog("cocotb.%s" % (type(self).__qualname__), id(self))
[docs] @abc.abstractmethod def prime(self, callback): """Set a callback to be invoked when the trigger fires. The callback will be invoked with a single argument, `self`. Sub-classes must override this, but should end by calling the base class method. Do not call this directly within coroutines, it is intended to be used only by the scheduler. """ self.primed = True
[docs] def unprime(self): """Remove the callback, and perform cleanup if necessary. After being un-primed, a Trigger may be re-primed again in the future. Calling `unprime` multiple times is allowed, subsequent calls should be a no-op. Sub-classes may override this, but should end by calling the base class method. Do not call this directly within coroutines, it is intended to be used only by the scheduler. """ self.primed = False
def __del__(self): # Ensure if a trigger drops out of scope we remove any pending callbacks self.unprime() @property def _outcome(self): """The result that `await this_trigger` produces in a coroutine. The default is to produce the trigger itself, which is done for ease of use with :class:`~cocotb.triggers.First`. """ return outcomes.Value(self) def __await__(self): # hand the trigger back to the scheduler trampoline return (yield self)
class PythonTrigger(Trigger): """Python triggers don't use GPI at all. For example: notification of coroutine completion. """
[docs]class GPITrigger(Trigger): """Base Trigger class for GPI triggers. Consumes simulation time. """ __slots__ = ('cbhdl',) def __init__(self): Trigger.__init__(self) # Required to ensure documentation can build # if simulator is not None: # self.cbhdl = simulator.create_callback(self) # else: self.cbhdl = None
[docs] def unprime(self): """Disable a primed trigger, can be re-primed.""" if self.cbhdl is not None: self.cbhdl.deregister() self.cbhdl = None Trigger.unprime(self)
[docs]class Timer(GPITrigger): """Fires after the specified simulation time period has elapsed.""" def __init__(self, time=None, units="step", *, time_ps=None): """ Args: time (numbers.Real or decimal.Decimal): The time value. .. versionchanged:: 1.5.0 Previously this argument was misleadingly called `time_ps`. units (str, optional): One of ``'step'``, ``'fs'``, ``'ps'``, ``'ns'``, ``'us'``, ``'ms'``, ``'sec'``. When *units* is ``'step'``, the timestep is determined by the simulator (see :make:var:`COCOTB_HDL_TIMEPRECISION`). Examples: >>> await Timer(100, units='ps') The time can also be a ``float``: >>> await Timer(100e-9, units='sec') which is particularly convenient when working with frequencies: >>> freq = 10e6 # 10 MHz >>> await Timer(1 / freq, units='sec') Other builtin exact numeric types can be used too: >>> from fractions import Fraction >>> await Timer(Fraction(1, 10), units='ns') >>> from decimal import Decimal >>> await Timer(Decimal('100e-9'), units='sec') These are most useful when using computed durations while avoiding floating point inaccuracies. See Also: :func:`~cocotb.utils.get_sim_steps` Raises: TriggerException: If a negative value is passed for Timer setup. .. versionchanged:: 1.5 Raise an exception when Timer uses a negative value as it is undefined behavior. Warn for 0 as this will cause erratic behavior in some simulators as well. .. versionchanged:: 1.5 Support ``'step'`` as the the *units* argument to mean "simulator time step". .. deprecated:: 1.5 Using None as the the *units* argument is deprecated, use ``'step'`` instead. """ GPITrigger.__init__(self) if time_ps is not None: if time is not None: raise TypeError("Gave argument to both the 'time' and deprecated 'time_ps' parameter") time = time_ps warnings.warn( "The parameter name 'time_ps' has been renamed to 'time'. Please update your invocation.", DeprecationWarning, stacklevel=2) else: if time is None: raise TypeError("Missing required argument 'time'") if time <= 0: if time == 0: warnings.warn("Timer setup with value 0, which might exhibit undefined behavior in some simulators", category=RuntimeWarning, stacklevel=2) else: raise TriggerException("Timer value time_ps must not be negative") if units is None: warnings.warn( 'Using units=None is deprecated, use units="step" instead.', DeprecationWarning, stacklevel=2) units = "step" # don't propagate deprecated value self.sim_steps = get_sim_steps(time, units) def prime(self, callback): """Register for a timed callback.""" if self.cbhdl is None: self.cbhdl = simulator.register_timed_callback(self.sim_steps, callback, self) if self.cbhdl is None: raise TriggerException("Unable set up %s Trigger" % (str(self))) GPITrigger.prime(self, callback) def __repr__(self): return "<{} of {:1.2f}ps at {}>".format( type(self).__qualname__, get_time_from_sim_steps(self.sim_steps, units='ps'), _pointer_str(self) )
# This is needed to make our custom metaclass work with abc.ABCMeta used in the # `Trigger` base class. class _ParameterizedSingletonAndABC(ParametrizedSingleton, abc.ABCMeta): pass
[docs]class ReadOnly(GPITrigger, metaclass=_ParameterizedSingletonAndABC): """Fires when the current simulation timestep moves to the read-only phase. The read-only phase is entered when the current timestep no longer has any further delta steps. This will be a point where all the signal values are stable as there are no more RTL events scheduled for the timestep. The simulator will not allow scheduling of more events in this timestep. Useful for monitors which need to wait for all processes to execute (both RTL and cocotb) to ensure sampled signal values are final. """ __slots__ = () @classmethod def __singleton_key__(cls): return None def __init__(self): GPITrigger.__init__(self) def prime(self, callback): if self.cbhdl is None: self.cbhdl = simulator.register_readonly_callback(callback, self) if self.cbhdl is None: raise TriggerException("Unable set up %s Trigger" % (str(self))) GPITrigger.prime(self, callback) def __repr__(self): return "{}()".format(type(self).__qualname__)
[docs]class ReadWrite(GPITrigger, metaclass=_ParameterizedSingletonAndABC): """Fires when the read-write portion of the simulation cycles is reached.""" __slots__ = () @classmethod def __singleton_key__(cls): return None def __init__(self): GPITrigger.__init__(self) def prime(self, callback): if self.cbhdl is None: # import pdb # pdb.set_trace() self.cbhdl = simulator.register_rwsynch_callback(callback, self) if self.cbhdl is None: raise TriggerException("Unable set up %s Trigger" % (str(self))) GPITrigger.prime(self, callback) def __repr__(self): return "{}()".format(type(self).__qualname__)
[docs]class NextTimeStep(GPITrigger, metaclass=_ParameterizedSingletonAndABC): """Fires when the next time step is started.""" __slots__ = () @classmethod def __singleton_key__(cls): return None def __init__(self): GPITrigger.__init__(self) def prime(self, callback): if self.cbhdl is None: self.cbhdl = simulator.register_nextstep_callback(callback, self) if self.cbhdl is None: raise TriggerException("Unable set up %s Trigger" % (str(self))) GPITrigger.prime(self, callback) def __repr__(self): return "{}()".format(type(self).__qualname__)
class _EdgeBase(GPITrigger, metaclass=_ParameterizedSingletonAndABC): """Internal base class that fires on a given edge of a signal.""" __slots__ = ('signal',) @classmethod @property def _edge_type(self): """The edge type, as understood by the C code. Must be set in sub-classes.""" raise NotImplementedError @classmethod def __singleton_key__(cls, signal): return signal def __init__(self, signal): super(_EdgeBase, self).__init__() self.signal = signal def prime(self, callback): """Register notification of a value change via a callback""" if self.cbhdl is None: self.cbhdl = simulator.register_value_change_callback( self.signal._handle, callback, type(self)._edge_type, self ) if self.cbhdl is None: raise TriggerException("Unable set up %s Trigger" % (str(self))) super(_EdgeBase, self).prime(callback) def __repr__(self): return "{}({!r})".format(type(self).__qualname__, self.signal)
[docs]class RisingEdge(_EdgeBase): """Fires on the rising edge of *signal*, on a transition from ``0`` to ``1``.""" __slots__ = () _edge_type = 1
[docs]class FallingEdge(_EdgeBase): """Fires on the falling edge of *signal*, on a transition from ``1`` to ``0``.""" __slots__ = () _edge_type = 2
[docs]class Edge(_EdgeBase): """Fires on any value change of *signal*.""" __slots__ = () _edge_type = 3
class _Event(PythonTrigger): """Unique instance used by the Event object. One created for each attempt to wait on the event so that the scheduler can maintain a dictionary of indexing each individual coroutine. FIXME: This will leak - need to use peers to ensure everything is removed """ def __init__(self, parent): PythonTrigger.__init__(self) self.parent = parent def prime(self, callback): self._callback = callback self.parent._prime_trigger(self, callback) Trigger.prime(self, callback) def __call__(self): self._callback(self) def __repr__(self): return "<{!r}.wait() at {}>".format(self.parent, _pointer_str(self))
[docs]class Event: """Event to permit synchronization between two coroutines. Awaiting :meth:`wait()` from one coroutine will block the coroutine until :meth:`set()` is called somewhere else. """ def __init__(self, name=None): self._pending = [] self.name = name self.fired = False self.data = None def _prime_trigger(self, trigger, callback): self._pending.append(trigger)
[docs] def set(self, data=None): """Wake up all coroutines blocked on this event.""" self.fired = True self.data = data p = self._pending[:] self._pending = [] for trigger in p: trigger()
[docs] def wait(self): """Get a trigger which fires when another coroutine sets the event. If the event has already been set, the trigger will fire immediately. To reset the event (and enable the use of ``wait`` again), :meth:`clear` should be called. """ if self.fired: return NullTrigger(name="{}.wait()".format(str(self))) return _Event(self)
[docs] def clear(self): """Clear this event that has fired. Subsequent calls to :meth:`~cocotb.triggers.Event.wait` will block until :meth:`~cocotb.triggers.Event.set` is called again.""" self.fired = False
[docs] def is_set(self) -> bool: """ Return true if event has been set """ return self.fired
def __repr__(self): if self.name is None: fmt = "<{0} at {2}>" else: fmt = "<{0} for {1} at {2}>" return fmt.format(type(self).__qualname__, self.name, _pointer_str(self))
class _InternalEvent(PythonTrigger): """Event used internally for triggers that need cross-coroutine synchronization. This Event can only be waited on once, by a single coroutine. Provides transparent __repr__ pass-through to the Trigger using this event, providing a better debugging experience. """ def __init__(self, parent): PythonTrigger.__init__(self) self.parent = parent self._callback = None self.fired = False self.data = None def prime(self, callback): if self._callback is not None: raise RuntimeError("This Trigger may only be awaited once") self._callback = callback Trigger.prime(self, callback) if self.fired: self._callback(self) def set(self, data=None): """Wake up coroutine blocked on this event.""" self.fired = True self.data = data if self._callback is not None: self._callback(self) def is_set(self) -> bool: """Return true if event has been set.""" return self.fired def __await__(self): if self.primed: raise RuntimeError("Only one coroutine may await this Trigger") # hand the trigger back to the scheduler trampoline return (yield self) def __repr__(self): return repr(self.parent) class _Lock(PythonTrigger): """Unique instance used by the Lock object. One created for each attempt to acquire the Lock so that the scheduler can maintain a dictionary of indexing each individual coroutine. FIXME: This will leak - need to use peers to ensure everything is removed. """ def __init__(self, parent): PythonTrigger.__init__(self) self.parent = parent def prime(self, callback): self._callback = callback self.parent._prime_trigger(self, callback) Trigger.prime(self, callback) def __call__(self): self._callback(self) def __repr__(self): return "<{!r}.acquire() at {}>".format(self.parent, _pointer_str(self))
[docs]class Lock: """Lock primitive (not re-entrant). This can be used as:: await lock.acquire() try: # do some stuff finally: lock.release() .. versionchanged:: 1.4 The lock can be used as an asynchronous context manager in an :keyword:`async with` statement:: async with lock: # do some stuff """ def __init__(self, name=None): self._pending_unprimed = [] self._pending_primed = [] self.name = name self.locked = False #: ``True`` if the lock is held. def _prime_trigger(self, trigger, callback): self._pending_unprimed.remove(trigger) if not self.locked: self.locked = True callback(trigger) else: self._pending_primed.append(trigger)
[docs] def acquire(self): """ Produce a trigger which fires when the lock is acquired. """ trig = _Lock(self) self._pending_unprimed.append(trig) return trig
[docs] def release(self): """Release the lock.""" if not self.locked: raise TriggerException("Attempt to release an unacquired Lock %s" % (str(self))) self.locked = False # nobody waiting for this lock if not self._pending_primed: return trigger = self._pending_primed.pop(0) self.locked = True trigger()
def __repr__(self): if self.name is None: fmt = "<{0} [{2} waiting] at {3}>" else: fmt = "<{0} for {1} [{2} waiting] at {3}>" return fmt.format( type(self).__qualname__, self.name, len(self._pending_primed), _pointer_str(self) ) def __bool__(self): """Provide boolean of a Lock""" return self.locked async def __aenter__(self): return await self.acquire() async def __aexit__(self, exc_type, exc, tb): self.release()
class NullTrigger(Trigger): """Fires immediately. Primarily for internal scheduler use. """ def __init__(self, name=None, outcome=None): super(NullTrigger, self).__init__() self._callback = None self.name = name self.__outcome = outcome @property def _outcome(self): if self.__outcome is not None: return self.__outcome return super(NullTrigger, self)._outcome def prime(self, callback): callback(self) def __repr__(self): if self.name is None: fmt = "<{0} at {2}>" else: fmt = "<{0} for {1} at {2}>" return fmt.format(type(self).__qualname__, self.name, _pointer_str(self))
[docs]class Join(PythonTrigger, metaclass=_ParameterizedSingletonAndABC): r"""Fires when a :func:`~cocotb.fork`\ ed coroutine completes. The result of blocking on the trigger can be used to get the coroutine result:: async def coro_inner(): await Timer(1, units='ns') return "Hello world" task = cocotb.fork(coro_inner()) result = await Join(task) assert result == "Hello world" If the coroutine threw an exception, the :keyword:`await` will re-raise it. """ __slots__ = ('_coroutine',) @classmethod def __singleton_key__(cls, coroutine): return coroutine def __init__(self, coroutine): super(Join, self).__init__() self._coroutine = coroutine @property def _outcome(self): return self._coroutine._outcome @property def retval(self): """The return value of the joined coroutine. .. note:: Typically there is no need to use this attribute - the following code samples are equivalent:: forked = cocotb.fork(mycoro()) j = Join(forked) await j result = j.retval :: forked = cocotb.fork(mycoro()) result = await Join(forked) """ return self._coroutine.retval def prime(self, callback): if self._coroutine._finished: callback(self) else: super(Join, self).prime(callback) def __repr__(self): return "{}({!s})".format(type(self).__qualname__, self._coroutine)
[docs]class Waitable(Awaitable): """ Base class for trigger-like objects implemented using coroutines. This converts a `_wait` abstract method into a suitable `__await__`. """ __slots__ = ()
[docs] async def _wait(self): """ Should be implemented by the sub-class. Called by `await self` to convert the waitable object into a coroutine. """ raise NotImplementedError
def __await__(self): return self._wait().__await__()
class _AggregateWaitable(Waitable): """ Base class for Waitables that take mutiple triggers in their constructor """ __slots__ = ('triggers',) def __init__(self, *triggers): self.triggers = tuple(triggers) # Do some basic type-checking up front, rather than waiting until we # await them. allowed_types = (Trigger, Waitable, cocotb.decorators.RunningTask) for trigger in self.triggers: if not isinstance(trigger, allowed_types): raise TypeError( "All triggers must be instances of Trigger! Got: {}" .format(type(trigger).__qualname__) ) def __repr__(self): # no _pointer_str here, since this is not a trigger, so identity # doesn't matter. return "{}({})".format( type(self).__qualname__, ", ".join( repr(Join(t)) if isinstance(t, cocotb.decorators.RunningTask) else repr(t) for t in self.triggers ) ) async def _wait_callback(trigger, callback): """ Wait for a trigger, and call `callback` with the outcome of the await. """ trigger = cocotb.scheduler._trigger_from_any(trigger) try: ret = outcomes.Value(await trigger) except BaseException as exc: # hide this from the traceback ret = outcomes.Error(remove_traceback_frames(exc, ['_wait_callback'])) callback(ret)
[docs]class Combine(_AggregateWaitable): """ Fires when all of *triggers* have fired. Like most triggers, this simply returns itself. This is similar to Verilog's ``join``. """ __slots__ = () async def _wait(self): waiters = [] e = _InternalEvent(self) triggers = list(self.triggers) # start a parallel task for each trigger for t in triggers: # t=t is needed for the closure to bind correctly def on_done(ret, t=t): triggers.remove(t) if not triggers: e.set() ret.get() # re-raise any exception waiters.append(cocotb.fork(_wait_callback(t, on_done))) # wait for the last waiter to complete await e return self
[docs]class First(_AggregateWaitable): """ Fires when the first trigger in *triggers* fires. Returns the result of the trigger that fired. This is similar to Verilog's ``join_any``. .. note:: The event loop is single threaded, so while events may be simultaneous in simulation time, they can never be simultaneous in real time. For this reason, the value of ``t_ret is t1`` in the following example is implementation-defined, and will vary by simulator:: t1 = Timer(10, units='ps') t2 = Timer(10, units='ps') t_ret = await First(t1, t2) .. note:: In the old-style :ref:`generator-based coroutines <yield-syntax>`, ``t = yield [a, b]`` was another spelling of ``t = yield First(a, b)``. This spelling is no longer available when using :keyword:`await`-based coroutines. """ __slots__ = () async def _wait(self): waiters = [] e = _InternalEvent(self) triggers = list(self.triggers) completed = [] # start a parallel task for each trigger for t in triggers: def on_done(ret): completed.append(ret) e.set() waiters.append(cocotb.fork(_wait_callback(t, on_done))) # wait for a waiter to complete await e # kill all the other waiters # TODO: Should this kill the coroutines behind any Join triggers? # Right now it does not. for w in waiters: w.kill() # These lines are the way they are to make tracebacks readable: # - The comment helps the user understand why they are seeing the # traceback, even if it is obvious top cocotb maintainers. # - Using `NullTrigger` here instead of `result = completed[0].get()` # means we avoid inserting an `outcome.get` frame in the traceback first_trigger = NullTrigger(outcome=completed[0]) return await first_trigger # the first of multiple triggers that fired
[docs]class ClockCycles(Waitable): """Fires after *num_cycles* transitions of *signal* from ``0`` to ``1``.""" def __init__(self, signal, num_cycles, rising=True): """ Args: signal: The signal to monitor. num_cycles (int): The number of cycles to count. rising (bool, optional): If ``True``, the default, count rising edges. Otherwise, count falling edges. """ self.signal = signal self.num_cycles = num_cycles if rising is True: self._type = RisingEdge else: self._type = FallingEdge async def _wait(self): trigger = self._type(self.signal) for _ in range(self.num_cycles): await trigger return self def __repr__(self): # no _pointer_str here, since this is not a trigger, so identity # doesn't matter. if self._type is RisingEdge: fmt = "{}({!r}, {!r})" else: fmt = "{}({!r}, {!r}, rising=False)" return fmt.format(type(self).__qualname__, self.signal, self.num_cycles)
[docs]async def with_timeout(trigger, timeout_time, timeout_unit="step"): """ Waits on triggers, throws an exception if it waits longer than the given time. Usage: .. code-block:: python await with_timeout(coro, 100, 'ns') await with_timeout(First(coro, event.wait()), 100, 'ns') Args: trigger (:class:`~cocotb.triggers.Trigger` or :class:`~cocotb.triggers.Waitable` or :class:`~cocotb.decorators.RunningTask`): A single object that could be right of an :keyword:`await` expression in cocotb. timeout_time (numbers.Real or decimal.Decimal): Simulation time duration before timeout occurs. timeout_unit (str, optional): Units of timeout_time, accepts any units that :class:`~cocotb.triggers.Timer` does. Returns: First trigger that completed if timeout did not occur. Raises: :exc:`SimTimeoutError`: If timeout occurs. .. versionadded:: 1.3 .. deprecated:: 1.5 Using None as the the *timeout_unit* argument is deprecated, use ``'step'`` instead. """ if timeout_unit is None: warnings.warn( 'Using timeout_unit=None is deprecated, use timeout_unit="step" instead.', DeprecationWarning, stacklevel=2) timeout_unit="step" # don't propagate deprecated value timeout_timer = cocotb.triggers.Timer(timeout_time, timeout_unit) res = await First(timeout_timer, trigger) if res is timeout_timer: raise cocotb.result.SimTimeoutError else: return res