Source code for cocotb.triggers

# Copyright (c) 2013 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# All rights reserved.
# 
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of Potential Ventures Ltd,
#       SolarFlare Communications Inc nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
# 
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL POTENTIAL VENTURES LTD BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""A collections of triggers which a testbench can yield."""

import os
import weakref

# For autodocumentation don't need the extension modules
if "SPHINX_BUILD" in os.environ:
    simulator = None
else:
    import simulator
from cocotb.log import SimLog
from cocotb.result import raise_error
from cocotb.utils import (
    get_sim_steps, get_time_from_sim_steps, with_metaclass,
    ParametrizedSingleton
)

class TriggerException(Exception):
    pass

class Trigger(object):
    """Base class to derive from."""
    
    def __init__(self):
        self.log = SimLog("cocotb.%s" % (self.__class__.__name__), id(self))
        self.signal = None
        self.primed = False

    def prime(self, *args):
        self.primed = True

    def unprime(self):
        """Remove any pending callbacks if necessary"""
        self.primed = False

    def __del__(self):
        """Ensure if a trigger drops out of scope we remove any pending
        callbacks"""
        self.unprime()

    def __str__(self):
        return self.__class__.__name__


class PythonTrigger(Trigger):
    """Python triggers don't use GPI at all.
        For example notification of coroutine completion etc.

        TODO:
            Still need to implement unprime.
        """
    pass


class GPITrigger(Trigger):
    """Base Trigger class for GPI triggers.
    Consumes simulation time.
    """
    
    def __init__(self):
        Trigger.__init__(self)

        # Required to ensure documentation can build
        # if simulator is not None:
        #    self.cbhdl = simulator.create_callback(self)
        # else:
        self.cbhdl = 0

    def unprime(self):
        """Disable a primed trigger, can be reprimed"""
        if self.cbhdl != 0:
            simulator.deregister_callback(self.cbhdl)
        self.cbhdl = 0
        Trigger.unprime(self)

    def __del__(self):
        """Remove knowledge of the trigger"""
        if self.cbhdl != 0:
            self.unprime()
        Trigger.__del__(self)


[docs]class Timer(GPITrigger): """Execution will resume when the specified time period expires. Consumes simulation time. """ def __init__(self, time_ps, units=None): GPITrigger.__init__(self) self.sim_steps = get_sim_steps(time_ps, units) def prime(self, callback): """Register for a timed callback""" if self.cbhdl == 0: self.cbhdl = simulator.register_timed_callback(self.sim_steps, callback, self) if self.cbhdl == 0: raise_error(self, "Unable set up %s Trigger" % (str(self))) Trigger.prime(self) def __str__(self): return self.__class__.__name__ + "(%1.2fps)" % get_time_from_sim_steps(self.sim_steps,units='ps')
[docs]class ReadOnly(with_metaclass(ParametrizedSingleton, GPITrigger)): """Execution will resume when the readonly portion of the sim cycles is reached. """ @classmethod def __singleton_key__(cls): return None def __init__(self): GPITrigger.__init__(self) def prime(self, callback): if self.cbhdl == 0: self.cbhdl = simulator.register_readonly_callback(callback, self) if self.cbhdl == 0: raise_error(self, "Unable set up %s Trigger" % (str(self))) Trigger.prime(self) def __str__(self): return self.__class__.__name__ + "(readonly)"
class ReadWrite(with_metaclass(ParametrizedSingleton, GPITrigger)): """Execution will resume when the readwrite portion of the sim cycles is reached. """ @classmethod def __singleton_key__(cls): return None def __init__(self): GPITrigger.__init__(self) def prime(self, callback): if self.cbhdl == 0: # import pdb # pdb.set_trace() self.cbhdl = simulator.register_rwsynch_callback(callback, self) if self.cbhdl == 0: raise_error(self, "Unable set up %s Trigger" % (str(self))) Trigger.prime(self) def __str__(self): return self.__class__.__name__ + "(readwritesync)"
[docs]class NextTimeStep(with_metaclass(ParametrizedSingleton, GPITrigger)): """Execution will resume when the next time step is started.""" @classmethod def __singleton_key__(cls): return None def __init__(self): GPITrigger.__init__(self) def prime(self, callback): if self.cbhdl == 0: self.cbhdl = simulator.register_nextstep_callback(callback, self) if self.cbhdl == 0: raise_error(self, "Unable set up %s Trigger" % (str(self))) Trigger.prime(self) def __str__(self): return self.__class__.__name__ + "(nexttimestep)"
class _EdgeBase(with_metaclass(ParametrizedSingleton, GPITrigger)): """Execution will resume when an edge occurs on the provided signal.""" @classmethod @property def _edge_type(self): """The edge type, as understood by the C code. Must be set in subclasses.""" raise NotImplementedError @classmethod def __singleton_key__(cls, signal): return signal def __init__(self, signal): super(_EdgeBase, self).__init__() self.signal = signal def prime(self, callback): """Register notification of a value change via a callback""" if self.cbhdl == 0: self.cbhdl = simulator.register_value_change_callback( self.signal._handle, callback, type(self)._edge_type, self ) if self.cbhdl == 0: raise_error(self, "Unable set up %s Trigger" % (str(self))) super(_EdgeBase, self).prime() def __str__(self): return self.__class__.__name__ + "(%s)" % self.signal._name
[docs]class RisingEdge(_EdgeBase): """Triggers on the rising edge of the provided signal.""" _edge_type = 1
[docs]class FallingEdge(_EdgeBase): """Triggers on the falling edge of the provided signal.""" _edge_type = 2
[docs]class Edge(_EdgeBase): """Triggers on either edge of the provided signal.""" _edge_type = 3
[docs]class ClockCycles(GPITrigger): """Execution will resume after *num_cycles* rising edges or *num_cycles* falling edges.""" def __init__(self, signal, num_cycles, rising=True): super(ClockCycles, self).__init__() self.signal = signal self.num_cycles = num_cycles if rising is True: self._rising = 1 else: self._rising = 2 def prime(self, callback): self._callback = callback def _check(obj): self.unprime() if self.signal.value: self.num_cycles -= 1 if self.num_cycles <= 0: self._callback(self) return self.cbhdl = simulator.register_value_change_callback(self.signal. _handle, _check, self._rising, self) if self.cbhdl == 0: raise_error(self, "Unable set up %s Trigger" % (str(self))) self.cbhdl = simulator.register_value_change_callback(self.signal. _handle, _check, self._rising, self) if self.cbhdl == 0: raise_error(self, "Unable set up %s Trigger" % (str(self))) Trigger.prime(self) def __str__(self): return self.__class__.__name__ + "(%s)" % self.signal._name
class Combine(PythonTrigger): """Combines multiple triggers together. Coroutine will continue when all triggers have fired. """ def __init__(self, *args): PythonTrigger.__init__(self) self._triggers = args # TODO: check that trigger is an iterable containing # only Trigger objects try: for trigger in self._triggers: if not isinstance(trigger, Trigger): raise TriggerException("All combined triggers must be " "instances of Trigger! Got: %s" % trigger.__class__.__name__) except Exception: raise TriggerException("%s requires a list of Trigger objects" % self.__class__.__name__) def prime(self, callback): self._callback = callback self._fired = [] for trigger in self._triggers: trigger.prime(self._check_all_fired) Trigger.prime(self) def _check_all_fired(self, trigger): self._fired.append(trigger) if self._fired == self._triggers: self._callback(self) def unprime(self): for trigger in self._triggers: trigger.unprime() class _Event(PythonTrigger): """Unique instance used by the Event object. One created for each attempt to wait on the event so that the scheduler can maintain a dictionary of indexing each individual coroutine. FIXME: This will leak - need to use peers to ensure everything is removed """ def __init__(self, parent): PythonTrigger.__init__(self) self.parent = parent def prime(self, callback): self._callback = callback self.parent.prime(callback, self) Trigger.prime(self) def __call__(self): self._callback(self)
[docs]class Event(PythonTrigger): """Event to permit synchronisation between two coroutines.""" def __init__(self, name=""): PythonTrigger.__init__(self) self._pending = [] self.name = name self.fired = False self.data = None def prime(self, callback, trigger): self._pending.append(trigger) Trigger.prime(self)
[docs] def set(self, data=None): """Wake up any coroutines blocked on this event.""" self.fired = True self.data = data p = self._pending[:] self._pending = [] for trigger in p: trigger()
[docs] def wait(self): """This can be yielded to block this coroutine until another wakes it. If the event has already been fired, this returns ``NullTrigger``. To reset the event (and enable the use of ``wait`` again), :meth:`~cocotb.triggers.Event.clear` should be called. """ if self.fired: return NullTrigger() return _Event(self)
[docs] def clear(self): """Clear this event that has fired. Subsequent calls to :meth:`~cocotb.triggers.Event.wait` will block until :meth:`~cocotb.triggers.Event.set` is called again.""" self.fired = False
def __str__(self): return self.__class__.__name__ + "(%s)" % self.name
class _Lock(PythonTrigger): """Unique instance used by the Lock object. One created for each attempt to acquire the Lock so that the scheduler can maintain a dictionary of indexing each individual coroutine. FIXME: This will leak - need to use peers to ensure everything is removed. """ def __init__(self, parent): PythonTrigger.__init__(self) self.parent = parent def prime(self, callback): self._callback = callback self.parent.prime(callback, self) Trigger.prime(self) def __call__(self): self._callback(self)
[docs]class Lock(PythonTrigger): """Lock primitive (not re-entrant).""" def __init__(self, name=""): PythonTrigger.__init__(self) self._pending_unprimed = [] self._pending_primed = [] self.name = name self.locked = False def prime(self, callback, trigger): Trigger.prime(self) self._pending_unprimed.remove(trigger) if not self.locked: self.locked = True callback(trigger) else: self._pending_primed.append(trigger)
[docs] def acquire(self): """This can be yielded to block until the lock is acquired.""" trig = _Lock(self) self._pending_unprimed.append(trig) return trig
def release(self): if not self.locked: raise_error(self, "Attempt to release an unacquired Lock %s" % (str(self))) self.locked = False # nobody waiting for this lock if not self._pending_primed: return trigger = self._pending_primed.pop(0) self.locked = True trigger() def __str__(self): return "%s(%s) [%s waiting]" % (str(self.__class__.__name__), self.name, len(self._pending_primed)) def __nonzero__(self): """Provide boolean of a Lock""" return self.locked __bool__ = __nonzero__
class NullTrigger(Trigger): """Trigger for internal interfacing use call the callback as soon as it is primed and then remove itself from the scheduler. """ def __init__(self, name=""): Trigger.__init__(self) self._callback = None self.name = name def prime(self, callback): callback(self)
[docs]class Join(with_metaclass(ParametrizedSingleton, PythonTrigger)): """Join a coroutine, firing when it exits.""" @classmethod def __singleton_key__(cls, coroutine): return coroutine def __init__(self, coroutine): super(Join, self).__init__() self._coroutine = coroutine self.pass_retval = True @property def retval(self): return self._coroutine.retval def prime(self, callback): if self._coroutine._finished: callback(self) else: super(Join, self).prime(callback) def __str__(self): return self.__class__.__name__ + "(%s)" % self._coroutine.__name__