Triggers

Triggers are used to indicate when the cocotb scheduler should resume coroutine execution. To use a trigger, a coroutine should await it. This will cause execution of the current coroutine to pause. When the trigger fires, execution of the paused coroutine will resume:

async def coro():
    print("Some time before the edge")
    await RisingEdge(clk)
    print("Immediately after the edge")

Simulator Triggers

Signals

class cocotb.triggers.Edge(signal)[source]

Fires on any value change of signal.

class cocotb.triggers.RisingEdge(signal)[source]

Fires on the rising edge of signal, on a transition from 0 to 1.

class cocotb.triggers.FallingEdge(signal)[source]

Fires on the falling edge of signal, on a transition from 1 to 0.

class cocotb.triggers.ClockCycles(signal, num_cycles, rising=True)[source]

Fires after num_cycles transitions of signal from 0 to 1.

Parameters
  • signal – The signal to monitor.

  • num_cycles (int) – The number of cycles to count.

  • rising (bool, optional) – If True, the default, count rising edges. Otherwise, count falling edges.

Timing

class cocotb.triggers.Timer(time: Optional[Union[numbers.Real, decimal.Decimal]] = None, units: str = 'step', *, round_mode: Optional[str] = None, time_ps: Optional[Union[numbers.Real, decimal.Decimal]] = None)[source]

Fire after the specified simulation time period has elapsed.

Parameters
  • time

    The time value.

    Changed in version 1.5.0: Previously this argument was misleadingly called time_ps.

  • units – One of 'step', 'fs', 'ps', 'ns', 'us', 'ms', 'sec'. When units is 'step', the timestep is determined by the simulator (see COCOTB_HDL_TIMEPRECISION).

Examples

>>> await Timer(100, units='ps')

The time can also be a float:

>>> await Timer(100e-9, units='sec')

which is particularly convenient when working with frequencies:

>>> freq = 10e6  # 10 MHz
>>> await Timer(1 / freq, units='sec')

Other builtin exact numeric types can be used too:

>>> from fractions import Fraction
>>> await Timer(Fraction(1, 10), units='ns')
>>> from decimal import Decimal
>>> await Timer(Decimal('100e-9'), units='sec')

These are most useful when using computed durations while avoiding floating point inaccuracies.

See also

get_sim_steps()

Raises

TriggerException – If a negative value is passed for Timer setup.

Changed in version 1.5: Raise an exception when Timer uses a negative value as it is undefined behavior. Warn for 0 as this will cause erratic behavior in some simulators as well.

Changed in version 1.5: Support 'step' as the units argument to mean “simulator time step”.

Deprecated since version 1.5: Using None as the units argument is deprecated, use 'step' instead.

Changed in version 1.6: Support rounding modes.

class cocotb.triggers.ReadOnly[source]

Fires when the current simulation timestep moves to the read-only phase.

The read-only phase is entered when the current timestep no longer has any further delta steps. This will be a point where all the signal values are stable as there are no more RTL events scheduled for the timestep. The simulator will not allow scheduling of more events in this timestep. Useful for monitors which need to wait for all processes to execute (both RTL and cocotb) to ensure sampled signal values are final.

class cocotb.triggers.ReadWrite[source]

Fires when the read-write portion of the simulation cycles is reached.

class cocotb.triggers.NextTimeStep[source]

Fires when the next time step is started.

Python Triggers

class cocotb.triggers.Combine(*triggers)[source]

Fires when all of triggers have fired.

Like most triggers, this simply returns itself.

This is similar to Verilog’s join.

class cocotb.triggers.First(*triggers)[source]

Fires when the first trigger in triggers fires.

Returns the result of the trigger that fired.

This is similar to Verilog’s join_any.

Note

The event loop is single threaded, so while events may be simultaneous in simulation time, they can never be simultaneous in real time. For this reason, the value of t_ret is t1 in the following example is implementation-defined, and will vary by simulator:

t1 = Timer(10, units='ps')
t2 = Timer(10, units='ps')
t_ret = await First(t1, t2)

Note

In the old-style generator-based coroutines, t = yield [a, b] was another spelling of t = yield First(a, b). This spelling is no longer available when using await-based coroutines.

class cocotb.triggers.Join(coroutine)[source]

Fires when a task completes.

The result of blocking on the trigger can be used to get the coroutine result:

async def coro_inner():
    await Timer(1, units='ns')
    return "Hello world"

task = cocotb.start_soon(coro_inner())
result = await Join(task)
assert result == "Hello world"

If the coroutine threw an exception, the await will re-raise it.

property retval

The return value of the joined coroutine.

Note

Typically there is no need to use this attribute - the following code samples are equivalent:

forked = cocotb.start_soon(mycoro())
j = Join(forked)
await j
result = j.retval
forked = cocotb.start_soon(mycoro())
result = await Join(forked)

Synchronization

These are not Triggers themselves, but contain methods that can be used as triggers. These are used to synchronize coroutines with each other.

class cocotb.triggers.Event(name=None)[source]

Event to permit synchronization between two coroutines.

Awaiting wait() from one coroutine will block the coroutine until set() is called somewhere else.

set(data=None)[source]

Wake up all coroutines blocked on this event.

wait()[source]

Get a trigger which fires when another coroutine sets the event.

If the event has already been set, the trigger will fire immediately.

To reset the event (and enable the use of wait again), clear() should be called.

clear()[source]

Clear this event that has fired.

Subsequent calls to wait() will block until set() is called again.

is_set() bool[source]

Return true if event has been set

class cocotb.triggers.Lock(name=None)[source]

Lock primitive (not re-entrant).

This can be used as:

await lock.acquire()
try:
    # do some stuff
finally:
    lock.release()

Changed in version 1.4: The lock can be used as an asynchronous context manager in an async with statement:

async with lock:
    # do some stuff
locked

True if the lock is held.

acquire()[source]

Produce a trigger which fires when the lock is acquired.

release()[source]

Release the lock.

async cocotb.triggers.with_timeout(trigger, timeout_time, timeout_unit='step')[source]

Waits on triggers or coroutines, throws an exception if it waits longer than the given time.

When a coroutine is passed, the callee coroutine is started, the caller blocks until the callee completes, and the callee’s result is returned to the caller. If timeout occurs, the callee is killed and SimTimeoutError is raised.

When an unstarted coroutineis passed, the callee coroutine is started, the caller blocks until the callee completes, and the callee’s result is returned to the caller. If timeout occurs, the callee continues to run and SimTimeoutError is raised.

When a task is passed, the caller blocks until the callee completes and the callee’s result is returned to the caller. If timeout occurs, the callee continues to run and SimTimeoutError is raised.

If a Trigger or Waitable is passed, the caller blocks until the trigger fires, and the trigger is returned to the caller. If timeout occurs, the trigger is cancelled and SimTimeoutError is raised.

Usage:

await with_timeout(coro, 100, 'ns')
await with_timeout(First(coro, event.wait()), 100, 'ns')
Parameters
Returns

First trigger that completed if timeout did not occur.

Raises

SimTimeoutError – If timeout occurs.

New in version 1.3.

Deprecated since version 1.5: Using None as the timeout_unit argument is deprecated, use 'step' instead.

Changed in version 1.7.0: Support passing coroutines.