Coroutines, Tasks and Triggers#
cocotb testbenches use Python’s coroutines. Tasks are cocotb objects that wrap coroutines and are used to schedule concurrent execution of the coroutines.
While active tasks are executing, the simulation is paused.
The coroutine uses the await keyword to
block on another coroutine’s execution or pass control of execution back to the
simulator, allowing simulation time to advance.
Typically coroutines await a Trigger object which
pauses the task, and indicates to the simulator some event which will cause the task to resume execution.
In the following example, Timer is such a trigger:
async def wait_10ns():
cocotb.log.info("About to wait for 10 ns")
await Timer(10, unit="ns")
cocotb.log.info("Simulation time has advanced by 10 ns")
Coroutines may also await on other coroutines:
async def wait_100ns():
for i in range(10):
await wait_10ns()
Coroutines can return a value, so that they can be used by other coroutines.
async def get_signal(clk, signal):
await RisingEdge(clk)
return signal.value
async def check_signal_changes(dut):
first = await get_signal(dut.clk, dut.signal)
second = await get_signal(dut.clk, dut.signal)
assert first != second, "Signal did not change"
Triggers and Concurrency Utilities#
Below is a table of triggers and concurrency utilities that are useful for writing testbenches and models.
Signal Edges |
|
Resume after a rising edge on a single-bit signal
Resume after a falling edge on a single-bit signal
Resume after any edge on a signal, but prefer using
signal.value_changeResume after the specified number of transitions of a signal
(deprecated, use
signal.value_change instead) |
|
Timing |
|
Resume after the specified time
Resume when the simulation timestep moves to the read-only phase
Resume when the simulation timestep moves to the read-write phase
Resume when the simulation timestep moves to the next time step
|
|
Concurrency |
|
Resume after all given Tasks or Triggers
Resume after any given Tasks or Triggers
Resume after all given Triggers
Resume after any given Triggers
Resume when a Task completes
Await on all given Tasks or Triggers concurrently and block until a condition is met
Resume “soon”
|
|
Synchronization |
|
A way to signal an event across Tasks
A mutual exclusion lock
Resume latest at the specified timeout time
|
|
Footnotes
Concurrent Execution#
Coroutines can be scheduled for concurrent execution with start_soon().
These concurrently running coroutines are called Tasks.
start_soon() schedules the coroutine for future execution,
some time after the current Task yields control.
@cocotb.test()
async def test_act_during_reset(dut):
"""While reset is active, toggle signals"""
tb = uart_tb(dut)
# "Clock" is a built in class for toggling a clock signal
Clock(dut.clk, 1, unit="ns").start()
# reset_dut is a function -
# part of the user-generated "uart_tb" class
# run reset_dut immediately before continuing
await tb.reset_dut(dut.rstn, 20)
await Timer(10, unit="ns")
print("Reset is still active: %d" % dut.rstn)
await Timer(15, unit="ns")
print("Reset has gone inactive: %d" % dut.rstn)
Other tasks can be used in an await statement to suspend the current task until the other task finishes.
@cocotb.test()
async def test_count_edge_cycles(dut, period_ns=1, clocks=6):
Clock(dut.clk, period_ns, unit="ns").start()
await RisingEdge(dut.clk)
timer = Timer(period_ns + 10, "ns")
task = cocotb.start_soon(count_edges_cycles(dut.clk, clocks))
count = 0
expect = clocks - 1
while True:
result = await First(timer, task)
assert count <= expect, "Task didn't complete in expected time"
if result is timer:
cocotb.log.info("Count %d: Task still running", count)
count += 1
else:
break
Tasks can be killed before they complete, forcing their completion before they would naturally end.
@cocotb.test()
async def test_different_clocks(dut):
clk_1mhz = Clock(dut.clk, 1.0, unit="us")
clk_250mhz = Clock(dut.clk, 4.0, unit="ns")
clk_1mhz.start()
start_time_ns = get_sim_time(unit="ns")
await Timer(1, unit="ns")
await RisingEdge(dut.clk)
edge_time_ns = get_sim_time(unit="ns")
assert isclose(edge_time_ns, start_time_ns + 1000.0), "Expected a period of 1 us"
clk_1mhz.stop() # stop 1MHz clock here
clk_250mhz.start()
start_time_ns = get_sim_time(unit="ns")
await Timer(1, unit="ns")
await RisingEdge(dut.clk)
edge_time_ns = get_sim_time(unit="ns")
assert isclose(edge_time_ns, start_time_ns + 4.0), "Expected a period of 4 ns"
Changed in version 1.4: The cocotb.coroutine decorator is no longer necessary for async def coroutines.
async def coroutines can be used, without the @cocotb.coroutine decorator, wherever decorated coroutines are accepted,
including yield statements and cocotb.fork (since replaced with start_soon()).
Changed in version 1.6: Added cocotb.start() and cocotb.start_soon() scheduling functions.
Changed in version 1.7: Deprecated cocotb.fork.
Changed in version 2.0: Removed cocotb.fork.
Changed in version 2.0: Removed cocotb.coroutine.
Removed in version 2.0: Removed references to the deprecated cocotb.start().
Waiting For Multiple Events Simultaneously#
Occasionally you’ll need to wait for either one of many Tasks or Triggers to fire,
or a collection of Tasks or Triggers to fire.
This is what First and Combine provide, respectively.
Waiting For One Of Multiple Events#
First is like awaiting multiple Triggers or Tasks at the same time,
and resumes after one of the Triggers or Tasks fires.
It returns the result of awaiting the Task or Trigger that fired first.
Below we see it used to implement a timeout.
@cocotb.test
async def test_quiesce_or_timeout(dut):
# generate stimulus and drive it to the design
for trans in generate_transactions():
await drive(dut.intf, trans)
# wait for the design to quiesce or timeout
timeout = Timer(10, "us")
quiesce_task = cocotb.start_soon(quiesce())
result = await First(timeout, quiesce_task)
assert result is not timeout, "Design has hung!"
Fortunately for users timeouts are a common operation and cocotb provides with_timeout().
The second section in the above code using it would be await with_timeout(quiesce(), 10, "us").
Note
First does not cancel Tasks that did not complete after it returns.
This means that Tasks passed to it are still running.
You may need to cancel those Tasks with Task.cancel().
Determining Which Task Finishes First#
First can be used to determine which of multiple Tasks complete first using the following idiom.
@cocotb.test
async def test_which_finished_first(dut):
task_A = cocotb.start_soon(drive_A())
task_B = cocotb.start_soon(drive_B())
# Pass Task.complete rather than the Task directly.
result = await First(task_A.complete, task_B.complete)
# Compare the result against the Task's "complete" object.
if result is task_A.complete:
cocotb.log.info("Input A finished first")
else:
cocotb.log.info("Input B finished first")
Waiting For Multiple Events#
Combine is like awaiting multiple Triggers or Tasks at the same time,
but it resumes after all the listed Triggers or Tasks fire.
Using the example from the previous section, we can use it to wait until both the driving and quiesce are done.
@cocotb.test
async def test_wait_for_both(dut):
# generate stimulus and drive it to the design
async def drive_transactions():
for trans in generate_transactions():
await drive(dut.intf, trans)
# wait for both the driving and quiescing to complete before continuing
await Combine(
cocotb.start_soon(drive_transactions()),
cocotb.start_soon(quiesce())
)
And of course, the sky is the limit when you compose the two.
@cocotb.test
async def test_wait_for_both_with_timeout(dut):
# wait for both the driving and quiescing to complete before continuing
# but timeout if *either* the driving or settling take too long
await Combine(
cocotb.start_soon(with_timeout(drive_transactions(), 1, "us")),
cocotb.start_soon(with_timeout(quiesce(), 10, "us")),
)
Note
Combine does not cancel Tasks that did not complete if it fails with an exception.
This means that Tasks passed to it are still running.
You may need to cancel those Tasks with Task.cancel().
TaskManager#
The TaskManager class is another way to run multiple async routines concurrently and wait for them all to complete.
It properly manages the lifetime of its “children” and handles exceptions and cancellations gracefully.
Unlike gather() which takes all awaitables and coroutines at once,
TaskManager allows adding new awaitables and coroutines dynamically,
and provides options to control exception handling behavior on a per-Task basis,
making it much more flexible.
Basic Usage#
To use TaskManager, first construct it and use it as an asynchronous context manager with the async with statement.
Inside of the context block you can use the @fork decorator method to start Tasks concurrently.
When control reaches the end of the context block
the TaskManager blocks the encompassing Task until all child Tasks complete.
from cocotb.triggers import TaskManager
# Drive two interfaces concurrently until both complete.
async with TaskManager() as tm:
@tm.fork
async def drive_interface1(): ...
@tm.fork
async def drive_interface2(): ...
# Control returns here when all drive Tasks have completed
In addition to the @fork method for starting coroutine functions concurrently,
start_soon() can be used for awaiting arbitrary awaitables concurrently.
# Wait for operation to complete or timeout after 1 us
async with TaskManager() as tm:
tm.start_soon(RisingEdge(cocotb.top.operation_complete))
@tm.fork
async def watchdog():
await Timer(1, "us")
raise TimeoutError("Operation did not complete in time")
Inspecting Child Task Results#
You can inspect the result of child classes by storing the Task objects returned by the start_soon() method.
When decoratoring a coroutine function with @fork,
the name of the function will become the returned Task object.
async with TaskManager() as tm:
task1 = tm.start_soon(RisingEdge(cocotb.top.signal_a))
# task2 will become the Task object after wrapping the coroutine function with @fork
@tm.fork
async def task2():
return 42
assert task1.done()
assert task1.result() is RisingEdge(cocotb.top.signal_a)
assert task2.done()
assert task2.result() == 42
Note
After exiting the context block and the TaskManager has begun finishing,
no further calls to start_soon() or @fork are permitted.
Attempting to do so will raise a RuntimeError.
Handling Exceptions and continue_on_error#
TaskManager gracefully handles exceptions raised in child Tasks or in the context block itself.
It ensures that no child Task is left running unintentionally by the time the context block exits.
The behavior of TaskManager when a child Task raises an exception is controlled by the continue_on_error parameter.
The constructor for TaskManager accepts an optional parameter default_continue_on_error which is used as the default for all children Tasks;
it defaults to False.
The TaskManager-wide default can be overridden on a per-Task basis using the continue_on_error parameter to the @fork or start_soon() methods.
async with TaskManager(default_continue_on_error=True) as tm:
@tm.fork(continue_on_error=False)
async def task1(): ...
tm.start_soon(some_coroutine(), continue_on_error=True)
If a child Task raises an exception,
one of two behaviors will occur depending on the value of continue_on_error for that Task.
If the continue_on_error parameter is False, all other child Tasks are cancelled and the TaskManager will begin shutting down.
If the continue_on_error parameter is True, the exception is captured and other child Tasks are allowed to continue running.
After all child Tasks have finished,
all exceptions, besides CancelledError, are gathered into an ExceptionGroup,
or a BaseExceptionGroup, if at least one of the exceptions is a BaseException,
and raised in the enclosing scope.
You can catch the ExceptionGroup to handle errors from child Tasks
by either catching the ExceptionGroup as you would typically;
or, if you are running Python 3.11 or later,
using the new except* syntax
to catch specific exception types from the group.
This new syntax will run the except clause for each matching exception in the group.
try:
async with TaskManager(default_continue_on_error=True) as tm:
@tm.fork
async def task1():
...
raise ValueError("An error occurred in task1")
@tm.fork
async def task2():
...
raise ValueError("An error occurred in task2")
except* ValueError as e:
# This will print both ValueErrors from task1 and task2
cocotb.log.info(f"Caught ValueError from TaskManager: {e}")
Note
After a Task fails and the TaskManager begins cancelling,
no further calls to start_soon() or @fork are permitted.
Failures Within the Context Block#
You are permitted to add any await statement to the body of the context block.
This means that it is possible for child tasks to start running, and then end with an exception, before the context block has finished.
In this case, a CancelledError will be raised from the current await expression in the context block,
allowing the user to perform any necessary cleanup.
This CancelledError will be squashed when the context block exits,
and TaskManager continues shutting down as it normally would.
async with TaskManager() as tm:
@tm.fork
async def task1():
raise ValueError("An error occurred in task1")
try:
await Timer(10) # During this await, task1 will fail
except CancelledError:
cocotb.log.info(
"The rest of the context block will be skipped due to task1 failing"
)
raise # DON'T FORGET THIS
... # This code will be skipped
Warning
Just like with Task, if a TaskManager context block is cancelled
and squashes the resulting asyncio.CancelledError, the test will be forcibly failed immediately.
Always remember to re-raise the asyncio.CancelledError if you catch it.
A context block can also fail with an exception like a child Task could.
In this case, if the context_continue_on_error parameter to the constructor is False, all child Tasks are cancelled;
if it is set to True, other child Tasks are allowed to continue running.
In either case, after all child Tasks have finished,
all exceptions, besides CancelledError, are gathered into an ExceptionGroup,
or a BaseExceptionGroup, if at least one of the exceptions is a BaseException,
and raised in the enclosing scope.
try:
async with TaskManager(context_continue_on_error=True) as tm:
@tm.fork
async def task1():
...
return 42
raise ValueError("An error occurred in the context block")
except* ValueError as e:
# This will print the ValueError from the context block
cocotb.log.info(f"Caught ValueError from TaskManager: {e}")
assert task1.result() == 42 # task1 was allowed to continue running until completion
Nesting TaskManager#
TaskManagers can be arbitrarily nested.
When any child Task fails, the entire tree of child Tasks will eventually be cancelled.
async with TaskManager() as tm_outer:
@tm_outer.fork
async def outer_task():
...
raise RuntimeError("An error occurred in outer_task")
async with TaskManager() as tm_inner:
# This inner task will be cancelled when outer_task fails
@tm_inner.fork
async def another_task(): ...
assert outer_task.exception() is RuntimeError
assert another_task.cancelled()
Async generators#
Starting with Python 3.6, a yield statement within an async function
has a new meaning which matches the typical meaning of yield within regular Python code.
It can be used to create a special type of generator function that can be iterated with async for:
async def ten_samples_of(clk, signal):
for i in range(10):
await RisingEdge(clk)
yield signal.value # this means "send back to the for loop"
@cocotb.test()
async def test_samples_are_even(dut):
async for sample in ten_samples_of(dut.clk, dut.signal):
assert sample % 2 == 0
More details on this type of generator can be found in PEP 525.
Generator-based coroutines#
Changed in version 2.0: This style, which used the cocotb.coroutine decorator and the yield syntax, was removed.