# Copyright (c) 2013 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of Potential Ventures Ltd,
# SolarFlare Communications Inc nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL POTENTIAL VENTURES LTD BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A collections of triggers which a testbench can yield."""
import os
import sys
import textwrap
import abc
if "COCOTB_SIM" in os.environ:
import simulator
else:
simulator = None
from cocotb.log import SimLog
from cocotb.result import raise_error, ReturnValue
from cocotb.utils import (
get_sim_steps, get_time_from_sim_steps, with_metaclass,
ParametrizedSingleton, exec_, lazy_property
)
from cocotb import decorators
from cocotb import outcomes
import cocotb
class TriggerException(Exception):
pass
[docs]class Trigger(with_metaclass(abc.ABCMeta)):
"""Base class to derive from."""
# __dict__ is needed here for the `.log` lazy_property below to work.
# The implementation of `_PyObject_GenericGetAttrWithDict` suggests that
# despite its inclusion, __slots__ will overall give speed and memory
# improvements:
# - the `__dict__` is not actually constructed until it's needed, and that
# only happens if the `.log` attribute is used, where performance
# concerns no longer matter.
# - Attribute setting and getting will still go through the slot machinery
# first, as "data descriptors" take priority over dict access
__slots__ = ('primed', '__weakref__', '__dict__')
def __init__(self):
self.primed = False
@lazy_property
def log(self):
return SimLog("cocotb.%s" % (self.__class__.__name__), id(self))
@abc.abstractmethod
def prime(self, callback):
"""Set a callback to be invoked when the trigger fires.
The callback will be invoked with a single argumement, `self`.
Subclasses must override this, but should end by calling the base class
method.
Do not call this directly within coroutines, it is intended to be used
only by the scheduler.
"""
self.primed = True
def unprime(self):
"""Remove the callback, and perform cleanup if necessary.
After being unprimed, a Trigger may be reprimed again in future.
Calling `unprime` multiple times is allowed, subsequent calls should be
a no-op.
Subclasses may override this, but should end by calling the base class
method.
Do not call this directly within coroutines, it is intended to be used
only by the scheduler.
"""
self.primed = False
def __del__(self):
# Ensure if a trigger drops out of scope we remove any pending callbacks
self.unprime()
def __str__(self):
return self.__class__.__name__
@property
def _outcome(self):
"""The result that `yield this_trigger` produces in a coroutine.
The default is to produce the trigger itself, which is done for
ease of use with :class:`~cocotb.triggers.First`.
"""
return outcomes.Value(self)
# Once 2.7 is dropped, this can be run unconditionally
if sys.version_info >= (3, 3):
exec_(textwrap.dedent("""
def __await__(self):
# hand the trigger back to the scheduler trampoline
return (yield self)
"""))
class PythonTrigger(Trigger):
"""Python triggers don't use GPI at all.
For example notification of coroutine completion etc.
"""
class GPITrigger(Trigger):
"""Base Trigger class for GPI triggers.
Consumes simulation time.
"""
__slots__ = ('cbhdl',)
def __init__(self):
Trigger.__init__(self)
# Required to ensure documentation can build
# if simulator is not None:
# self.cbhdl = simulator.create_callback(self)
# else:
self.cbhdl = 0
def unprime(self):
"""Disable a primed trigger, can be reprimed"""
if self.cbhdl != 0:
simulator.deregister_callback(self.cbhdl)
self.cbhdl = 0
Trigger.unprime(self)
def __del__(self):
"""Remove knowledge of the trigger"""
if self.cbhdl != 0:
self.unprime()
Trigger.__del__(self)
[docs]class Timer(GPITrigger):
"""Execution will resume when the specified time period expires.
Consumes simulation time.
"""
def __init__(self, time_ps, units=None):
GPITrigger.__init__(self)
self.sim_steps = get_sim_steps(time_ps, units)
def prime(self, callback):
"""Register for a timed callback"""
if self.cbhdl == 0:
self.cbhdl = simulator.register_timed_callback(self.sim_steps,
callback, self)
if self.cbhdl == 0:
raise_error(self, "Unable set up %s Trigger" % (str(self)))
GPITrigger.prime(self, callback)
def __str__(self):
return self.__class__.__name__ + "(%1.2fps)" % get_time_from_sim_steps(self.sim_steps,units='ps')
# This is needed to make our custom metaclass work with abc.ABCMeta used in the
# `Trigger` base class.
class _ParameterizedSingletonAndABC(ParametrizedSingleton, abc.ABCMeta):
pass
[docs]class ReadOnly(with_metaclass(_ParameterizedSingletonAndABC, GPITrigger)):
"""Execution will resume when the readonly portion of the sim cycles is
reached.
"""
__slots__ = ()
@classmethod
def __singleton_key__(cls):
return None
def __init__(self):
GPITrigger.__init__(self)
def prime(self, callback):
if self.cbhdl == 0:
self.cbhdl = simulator.register_readonly_callback(callback, self)
if self.cbhdl == 0:
raise_error(self, "Unable set up %s Trigger" % (str(self)))
GPITrigger.prime(self, callback)
def __str__(self):
return self.__class__.__name__ + "(readonly)"
class ReadWrite(with_metaclass(_ParameterizedSingletonAndABC, GPITrigger)):
"""Execution will resume when the readwrite portion of the sim cycles is
reached.
"""
__slots__ = ()
@classmethod
def __singleton_key__(cls):
return None
def __init__(self):
GPITrigger.__init__(self)
def prime(self, callback):
if self.cbhdl == 0:
# import pdb
# pdb.set_trace()
self.cbhdl = simulator.register_rwsynch_callback(callback, self)
if self.cbhdl == 0:
raise_error(self, "Unable set up %s Trigger" % (str(self)))
GPITrigger.prime(self, callback)
def __str__(self):
return self.__class__.__name__ + "(readwritesync)"
[docs]class NextTimeStep(with_metaclass(_ParameterizedSingletonAndABC, GPITrigger)):
"""Execution will resume when the next time step is started."""
__slots__ = ()
@classmethod
def __singleton_key__(cls):
return None
def __init__(self):
GPITrigger.__init__(self)
def prime(self, callback):
if self.cbhdl == 0:
self.cbhdl = simulator.register_nextstep_callback(callback, self)
if self.cbhdl == 0:
raise_error(self, "Unable set up %s Trigger" % (str(self)))
GPITrigger.prime(self, callback)
def __str__(self):
return self.__class__.__name__ + "(nexttimestep)"
class _EdgeBase(with_metaclass(_ParameterizedSingletonAndABC, GPITrigger)):
"""Execution will resume when an edge occurs on the provided signal."""
__slots__ = ('signal',)
@classmethod
@property
def _edge_type(self):
"""The edge type, as understood by the C code. Must be set in subclasses."""
raise NotImplementedError
@classmethod
def __singleton_key__(cls, signal):
return signal
def __init__(self, signal):
super(_EdgeBase, self).__init__()
self.signal = signal
def prime(self, callback):
"""Register notification of a value change via a callback"""
if self.cbhdl == 0:
self.cbhdl = simulator.register_value_change_callback(
self.signal._handle, callback, type(self)._edge_type, self
)
if self.cbhdl == 0:
raise_error(self, "Unable set up %s Trigger" % (str(self)))
super(_EdgeBase, self).prime(callback)
def __str__(self):
return self.__class__.__name__ + "(%s)" % self.signal._name
[docs]class RisingEdge(_EdgeBase):
"""Triggers on the rising edge of the provided signal."""
__slots__ = ()
_edge_type = 1
[docs]class FallingEdge(_EdgeBase):
"""Triggers on the falling edge of the provided signal."""
__slots__ = ()
_edge_type = 2
[docs]class Edge(_EdgeBase):
"""Triggers on either edge of the provided signal."""
__slots__ = ()
_edge_type = 3
class _Event(PythonTrigger):
"""Unique instance used by the Event object.
One created for each attempt to wait on the event so that the scheduler
can maintain a dictionary of indexing each individual coroutine.
FIXME: This will leak - need to use peers to ensure everything is removed
"""
def __init__(self, parent):
PythonTrigger.__init__(self)
self.parent = parent
def prime(self, callback):
self._callback = callback
self.parent._prime_trigger(self, callback)
Trigger.prime(self, callback)
def __call__(self):
self._callback(self)
[docs]class Event(object):
"""Event to permit synchronisation between two coroutines."""
def __init__(self, name=""):
self._pending = []
self.name = name
self.fired = False
self.data = None
def _prime_trigger(self, trigger, callback):
self._pending.append(trigger)
[docs] def set(self, data=None):
"""Wake up any coroutines blocked on this event."""
self.fired = True
self.data = data
p = self._pending[:]
self._pending = []
for trigger in p:
trigger()
[docs] def wait(self):
"""This can be yielded to block this coroutine
until another wakes it.
If the event has already been fired, this returns ``NullTrigger``.
To reset the event (and enable the use of ``wait`` again),
:meth:`~cocotb.triggers.Event.clear` should be called.
"""
if self.fired:
return NullTrigger(name="{}.wait()".format(str(self)))
return _Event(self)
[docs] def clear(self):
"""Clear this event that has fired.
Subsequent calls to :meth:`~cocotb.triggers.Event.wait` will block until
:meth:`~cocotb.triggers.Event.set` is called again."""
self.fired = False
def __str__(self):
return self.__class__.__name__ + "(%s)" % self.name
class _Lock(PythonTrigger):
"""Unique instance used by the Lock object.
One created for each attempt to acquire the Lock so that the scheduler
can maintain a dictionary of indexing each individual coroutine.
FIXME: This will leak - need to use peers to ensure everything is removed.
"""
def __init__(self, parent):
PythonTrigger.__init__(self)
self.parent = parent
def prime(self, callback):
self._callback = callback
self.parent._prime_trigger(self, callback)
Trigger.prime(self, callback)
def __call__(self):
self._callback(self)
[docs]class Lock(object):
"""Lock primitive (not re-entrant)."""
def __init__(self, name=""):
self._pending_unprimed = []
self._pending_primed = []
self.name = name
self.locked = False
def _prime_trigger(self, trigger, callback):
self._pending_unprimed.remove(trigger)
if not self.locked:
self.locked = True
callback(trigger)
else:
self._pending_primed.append(trigger)
[docs] def acquire(self):
"""This can be yielded to block until the lock is acquired."""
trig = _Lock(self)
self._pending_unprimed.append(trig)
return trig
[docs] def release(self):
"""Release the lock."""
if not self.locked:
raise_error(self, "Attempt to release an unacquired Lock %s" %
(str(self)))
self.locked = False
# nobody waiting for this lock
if not self._pending_primed:
return
trigger = self._pending_primed.pop(0)
self.locked = True
trigger()
def __str__(self):
return "%s(%s) [%s waiting]" % (str(self.__class__.__name__),
self.name,
len(self._pending_primed))
def __nonzero__(self):
"""Provide boolean of a Lock"""
return self.locked
__bool__ = __nonzero__
class NullTrigger(Trigger):
"""
A trigger that fires instantly, primarily for internal scheduler use.
"""
def __init__(self, name="", outcome=None):
super(NullTrigger, self).__init__()
self._callback = None
self.name = name
self.__outcome = outcome
@property
def _outcome(self):
if self.__outcome is not None:
return self.__outcome
return super(NullTrigger, self)._outcome
def prime(self, callback):
callback(self)
def __str__(self):
return self.__class__.__name__ + "(%s)" % self.name
[docs]class Join(with_metaclass(_ParameterizedSingletonAndABC, PythonTrigger)):
"""Join a coroutine, firing when it exits."""
__slots__ = ('_coroutine',)
@classmethod
def __singleton_key__(cls, coroutine):
return coroutine
def __init__(self, coroutine):
super(Join, self).__init__()
self._coroutine = coroutine
@property
def _outcome(self):
return self._coroutine._outcome
@property
def retval(self):
"""The return value of the joined coroutine.
If the coroutine threw an exception, this attribute will re-raise it.
"""
return self._coroutine.retval
[docs] def prime(self, callback):
if self._coroutine._finished:
callback(self)
else:
super(Join, self).prime(callback)
def __str__(self):
return self.__class__.__name__ + "(%s)" % self._coroutine.__name__
class Waitable(object):
"""
Compatibility layer that emulates `collections.abc.Awaitable`.
This converts a `_wait` abstract method into a suitable `__await__` on
supporting python versions (>=3.3).
"""
__slots__ = ()
@decorators.coroutine
def _wait(self):
"""
Should be implemented by the subclass. Called by `yield self` to
convert the waitable object into a coroutine.
ReturnValue can be used here
"""
raise NotImplementedError
yield
if sys.version_info >= (3, 3):
def __await__(self):
return self._wait().__await__()
class _AggregateWaitable(Waitable):
"""
Base class for Waitables that take mutiple triggers in their constructor
"""
__slots__ = ('triggers',)
def __init__(self, *args):
self.triggers = tuple(args)
# Do some basic type-checking up front, rather than waiting until we
# yield them.
allowed_types = (Trigger, Waitable, decorators.RunningCoroutine)
for trigger in self.triggers:
if not isinstance(trigger, allowed_types):
raise TypeError(
"All triggers must be instances of Trigger! Got: {}"
.format(type(trigger).__name__)
)
@decorators.coroutine
def _wait_callback(trigger, callback):
"""
Wait for a trigger, and call `callback` with the outcome of the yield
"""
try:
ret = outcomes.Value((yield trigger))
except BaseException as exc:
ret = outcomes.Error(exc)
callback(ret)
[docs]class Combine(_AggregateWaitable):
"""
Waits until all the passed triggers have fired.
Like most triggers, this simply returns itself.
"""
__slots__ = ()
@decorators.coroutine
def _wait(self):
waiters = []
e = Event()
triggers = list(self.triggers)
# start a parallel task for each trigger
for t in triggers:
# t=t is needed for the closure to bind correctly
def on_done(ret, t=t):
triggers.remove(t)
if not triggers:
e.set()
ret.get() # re-raise any exception
waiters.append(cocotb.fork(_wait_callback(t, on_done)))
# wait for the last waiter to complete
yield e.wait()
raise ReturnValue(self)
class First(_AggregateWaitable):
"""
Wait for the first of multiple triggers.
Returns the result of the trigger that fired.
.. note::
The event loop is single threaded, so while events may be simultaneous
in simulation time, they can never be simultaneous in real time.
For this reason, the value of ``t_ret is t1`` in the following example
is implementation-defined, and will vary by simulator::
t1 = Timer(10, units='ps')
t2 = Timer(10, units='ps')
t_ret = yield First(t1, t2)
"""
__slots__ = ()
@decorators.coroutine
def _wait(self):
waiters = []
e = Event()
triggers = list(self.triggers)
completed = []
# start a parallel task for each trigger
for t in triggers:
def on_done(ret):
completed.append(ret)
e.set()
waiters.append(cocotb.fork(_wait_callback(t, on_done)))
# wait for a waiter to complete
yield e.wait()
# kill all the other waiters
# TODO: Should this kill the coroutines behind any Join triggers?
# Right now it does not.
for w in waiters:
w.kill()
# get the result from the first task
ret = completed[0]
raise ReturnValue(ret.get())
[docs]class ClockCycles(Waitable):
"""
Execution will resume after *num_cycles* rising edges or *num_cycles* falling edges.
"""
def __init__(self, signal, num_cycles, rising=True):
self.signal = signal
self.num_cycles = num_cycles
if rising is True:
self._type = RisingEdge
else:
self._type = FallingEdge
@decorators.coroutine
def _wait(self):
trigger = self._type(self.signal)
for _ in range(self.num_cycles):
yield trigger
raise ReturnValue(self)