Coroutines, Triggers, Tasks, and Concurrency#
This document covers in greater depth the features that cocotb provides to make writing complex testbenches possible, including the following topics:
Running independent concurrent tasks with
cocotb.start_soon().Waiting for multiple things to finish with
gather()andselect().Structured concurrency with
TaskManager.Inter-task synchronization with
Lock.
What it will not cover is any simulation-specific or testing-specific features such as getting values from a signal, or passing and failing tests.
Now before we can get into concurrency, we need to cover the building blocks of cocotb’s concurrency system, the first being coroutines.
Coroutines#
What is a coroutine?#
A coroutine is a special type of function that can be paused and resumed at certain points in its execution. When a coroutine is paused, it yields control to another coroutine, allowing other code to run. When a coroutine is resumed, it continues executing from where it left off.
If this sounds a lot like a Python generator, that’s because it is. In fact, Python’s built-in coroutines are built on top of generators.
How do I define a coroutine?#
The main difference between regular Python functions or generators and coroutine functions
is that coroutine functions are defined with async def instead of just def.
async def my_coroutine():
# This is a coroutine function because it is defined with `async def` instead of just `def`.
# Even if there's no `await` expression in the body of the function.
print("Hello, World!")
What makes coroutine functions useful is that they can contain await expressions.
This is similar to yield expressions in generators:
execution is paused at the point of the await expression until the awaited object results in a value,
then execution of the coroutine resumes from the point of the await expression.
Why do I need coroutines?#
Looking at the example above, you might be asking why we need coroutines at all.
It looks like it could be a regular def function and non- awaited function call.
There are two main reasons why we need coroutines:
We can
awaitthings that aren’t coroutines.We can run multiple coroutines concurrently.
In terms of cocotb, the first point is necessary to support awaiting triggers,
which are not coroutines but awaitable objects that represent events in the simulation.
@cocotb.test
async def my_test(dut):
await RisingEdge(dut.clk) # awaitable...
assert not inspect.iscoroutine(RisingEdge(dut.clk)) # but not a coroutine
We will cover cocotb’s concurrency features in later sections.
Using Coroutines#
Coroutine functions support all the same features that regular Python functions support.
They can take arguments, return values, and raise exceptions.
The only difference is that they can use await expressions while regular functions cannot.
Note
That is not a license to change all your functions to coroutine functions - there is overhead associated with creating coroutines.
The other main difference is that coroutine functions, like generator functions, do not run the function body immediately upon calling;
nor does calling the function (without the await) block and return the function result.
Instead, calling a coroutine function returns a coroutine object, which is an awaitable.
The coroutine function’s body starts only after that coroutine object is awaited.
async def my_coroutine(arg1, arg2):
local_var = arg1 + arg2
if local_var > 10:
raise ValueError("Too big!")
return local_var
@cocotb.test
async def my_test(dut):
ret = await my_coroutine(3, 4) # ret == 7
try:
await my_coroutine(5, 6)
except ValueError as e:
assert str(e) == "Too big!"
Looking at that example, notice the await keyword appears between the assignment and the function call.
This is because await is an expression that takes an awaitable object.
The result of the expression is the result of the awaitable.
For coroutine functions, this means the function return value or an exception if one is raised.
To best visualize this, consider the following equivalent code below.
@cocotb.test
async def my_test(dut):
coro = my_coroutine(3, 4)
ret = (await coro)
Triggers#
We mentioned earlier that one of the main reasons we need coroutines is to await things that aren’t other coroutines.
In cocotb, that thing is triggers <trigger>; they are another building block of cocotb’s concurrency system.
cocotb provides triggers that represent various events in the simulation, such as a signal changing value or a certain amount of time passing.
It also provides some triggers that represent user-defined events in Python, such as an Event.
For example, a RisingEdge(dut.clk) trigger represents the next change in value on the dut.clk signal to a 1.
When you await a trigger, the current coroutine is paused until the event represented by the trigger occurs in the simulation,
at which point the coroutine is resumed.
@cocotb.test(timeout_time=10, timeout_unit="ns")
async def my_test(dut):
print(get_sim_time("ns")) # 0 ns, the start of the test.
await RisingEdge(dut.clk) # This coroutine is paused until the next rising edge of dut.clk.
print(get_sim_time("ns")) # 10 ns, the time of the rising edge of dut.clk.
A gotcha to watch out for is that if an event represented by the trigger will never occur,
then the coroutine will be paused indefinitely and your test will hang.
This is a good argument for specifying timeout_time and timeout_unit arguments to @cocotb.test as seen above.
The only common API that all triggers share is that they are awaitable objects. We will be covering several particular triggers relating to concurrency later in this document, so here is a table of the most commonly used simulation triggers which we won’t be covering in detail in this document.
Simulator Triggers |
|
Resume after a single-bit signal changes to a
1Resume after a single-bit signal changes to a
0Resume after any edge on a signal
Resume after the specified time
Resume when the simulation timestep moves to the read-only phase
Resume when the simulation timestep moves to the read-write phase
Resume when the simulation timestep moves to the next time step
|
|
Concurrency#
As testbenches get more complex, you will very quickly run into the need to do multiple things at the same time:
drive inputs while monitoring outputs
check expected results while generating stimulus
wait indefinitely for potential error conditions while doing everything else
This is where concurrency comes in. The following is a theoretical introduction to concurrency in general and as it pertains to cocotb. Feel free to skip this section if you are already familiar with concurrency and just want to get to the practical details of how to use it in cocotb.
What is concurrency?#
Concurrency is best defined as “When multiple routines can start executing simultaneously.”
This definition has the advantage of not saying “running simultaneously,” which could be confusing. This means that you can start running routine A but before A finishes running, routine B can start running. Thus A and B are executing simultaneously.
Concurrency vs parallelism#
Simultaneously is not the same as “in parallel.” Parallelism is a subset of concurrency. There are forms of concurrency where two routines are executing on different execution threads of your CPU “in parallel”; and there are other forms of concurrency where only one routine is running while the others lie idle. This is still concurrency because all of these routines have started before any have finished.
Tasks vs threads#
cocotb’s concurrency system is of the non-parallel form. This means that only one routine is running at a time while the others lie idle. Additionally, cocotb’s concurrency system is cooperative, meaning that the currently running routine must explicitly yield control before another routine can resume.
To best understand what we are dealing with, let’s compare cocotb’s core unit of concurrency, the task, to the more familiar concept of threads.
Threads are a form of parallel concurrency where multiple routines can run simultaneously on different CPU threads. Additionally, threads are preemptive, meaning that the operating system can interrupt a running thread to allow another thread to run. This means when you have multiple threads, you have to worry about things like race conditions caused by parallel threads contending for shared resources, or threads being preemptively scheduled off in the middle of an operation on a shared resource leaving it in an inconsistent state.
cocotb’s form of concurrency is free of these issues. But cocotb’s tasks are not free of all issues caused by concurrency.
It is still possible to deadlock your test. Additionally, the scheduling order of tasks is not guaranteed to be predictable, so you must be careful to not make assumptions about the order in which tasks will run. Finally, because cocotb’s tasks must explicitly yield control to allow other tasks to run, you must be careful not to write a task that never yields control, as this will starve other tasks of execution time and your testbench will hang.
Symmetric vs asymmetric concurrency#
Finally, the last bit of theory we need to cover is the difference between symmetric and asymmetric concurrency.
Asymmetric concurrency is when tasks have an asymmetric relationship with other tasks, typically this is a parent-child relationship. In this type of concurrency multiple child tasks are spawned by a parent task, the parent task is usually blocked upon the completion of its child tasks, and the parent task has the right to stop the execution of any of its child tasks at any time.
An example of this would be when your test needs to wait for multiple tasks, which drive different inputs of your design, to complete before it can check the results of the test.
Symmetric concurrency is the opposite, where tasks have no special relationship to each other. Symmetric tasks run independently from each other and cannot block each other. They control their own lifetime.
An example of this would be a task which spins forever in a while True: ... loop,
waiting for a specific error condition to occur,
while the rest of the test is running.
Structured Concurrency#
Concurrency is hard to get right, so providing tools to prevent common mistakes is important. cocotb follows the lead of other leading concurrency frameworks such as trio to provide what is called “structured concurrency.” The primary goal is to make the lifetimes of concurrent tasks easier to manage and reason about. This prevents the common mistake of “leaking” tasks, where concurrent tasks are still running in the background when they shouldn’t be. Structured concurrency provides these guarantees lexically, so a reader can easily see the lifetime of a concurrent task just by looking at the code.
Compatibility with other concurrency frameworks#
There are many other concurrency frameworks available in Python; asyncio and trio are the most popular ones. While they all leverage Python’s built-in coroutines, they are not compatible with cocotb’s concurrency system. This is because cocotb’s tasks and triggers are coupled to cocotb’s event loop which is fundamentally different than asyncio’s or trio’s. Likewise, other concurrency frameworks have their own equivalents to tasks and triggers that are coupled to their own event loops.
You cannot use other concurrency frameworks’ APIs or objects in a cocotb test, nor can you use cocotb’s tasks and triggers in other concurrency frameworks.
cocotb.start_soon()#
A typical concurrency use case in cocotb is to run a task “in the background” or “fire-and-forget,” meaning that you want to start a task that runs concurrently and independently from the rest of the test and let it decide on its own when to stop running (if ever) without any intervention from the rest of the test. Examples of this include:
The main execution thread of a Driver or Monitor.
A concurrent assertion checker.
cocotb provides cocotb.start_soon() for this purpose.
This function takes a coroutine,
wraps it into a Task,
and runs that task concurrently and independently from the rest of the test.
The Task object is then returned to the caller.
Note
The reason it is called “start_soon” is that the coroutine is only scheduled to start running;
it will not start running until some time after the current task yields control.
async def assert_no_valid_gaps(dut):
while True:
await RisingEdge(dut.clk)
assert dut.ready.value == 1 and dut.valid.value == 1
@cocotb.test()
async def my_test(dut):
cocotb.start_soon(assert_no_valid_gaps(dut))
# rest of the test
Notice the body of the assert_no_valid_gaps task is an infinite loop.
This is not a problem as long as the task yields control to allow other tasks to run.
It accomplishes this by awaiting the rising edge of the clock in each iteration of the loop.
Also notice we are not waiting for the task to end anywhere in our test. We start it and forget about it.
Next, let’s make a Driver that drives random data into an AXI-Stream-like interface continuously, as long as the ready signal is asserted.
The Driver’s main execution thread (MyDriver._run) is started with cocotb.start_soon().
class MyDriver:
def __init__(self, intf):
self._intf = intf
self._task = None
async def _run(self):
while True:
self._intf.valid = 0
self._intf.data = 0
await RisingEdge(self._intf.clk)
if self._intf.ready.value == 1:
self._intf.valid = 1
self._intf.data = random.randint(0, 255)
def start(self):
self._task = cocotb.start_soon(self._run())
Note that the cocotb.start_soon() function returns the Task object that it creates.
We save that in the self._task attribute of our Driver and will use that in a later section of this document to stop the Driver’s execution.
Mapping this back to our earlier discussion of symmetric vs asymmetric concurrency, both of these are examples of symmetric concurrency. There is no established relationship between the concurrent assert or the Driver’s main thread and any other particular task.
Tasks#
The Task is the final basic building block of cocotb’s concurrency system.
All concurrency-oriented APIs in cocotb revolve around Tasks in some way,
so understanding what they are and why they are necessary is essential to using concurrency in cocotb effectively.
Why Tasks?#
Python coroutines only support being awaited,
which blocks the current task until the coroutine finishes and results in the coroutine’s return value.
This is a problem when we want to run multiple coroutines concurrently,
because we don’t want to block the current task until the other coroutine finishes.
Tasks solve this problem while also adding some additional features on top of regular coroutines, including:
Checking the status of a Task.
Getting the result of a Task.
Stopping a Task’s execution.
Using Tasks#
Just like regular Python coroutines, awaiting a Task will block the current task until the task finishes and results in the Task’s return value.
We can check whether a task has completed for any reason by calling the done() method.
We can get the result of a task with the result() method;
however, this will raise an exception if the task is not done yet, if the task raised an exception during its execution, or was cancelled.
async def my_coroutine():
await Timer(10, unit="ns")
return 42
@cocotb.test()
async def my_test(dut):
task = cocotb.start_soon(my_coroutine())
# Check if the task is still running.
assert not task.done()
# Wait for the task to finish and get its result.
result = await task
assert result == 42
# The task is now done.
assert task.done()
# We can await it again, unlike Python coroutines,
# which will finish immediately if the task is already done.
# This often happens if there are multiple tasks waiting for the same task to finish.
await task
# Now that it's done, we can get the result without awaiting it.
assert task.result() == 42
Handling Exceptions#
Just like regular Python coroutines, Tasks can raise exceptions.
If an exception is raised in a Task, that exception is stored in the task and can be retrieved with the exception() method.
If you await a task that has an exception, or call its result() method, that exception will be re-raised.
async def my_coroutine():
await Timer(10, unit="ns")
raise ValueError("Something went wrong!")
@cocotb.test()
async def my_test(dut):
task = cocotb.start_soon(my_coroutine())
# Wait for the task to finish and catch the exception.
try:
await task
except ValueError as e:
assert str(e) == "Something went wrong!"
# The task is now done.
assert task.done()
# We can get the exception without awaiting it.
assert isinstance(task.exception(), ValueError)
Cancellation and clean-up#
Tasks can also be cancelled.
This is done by calling the cancel() method on the task you want to cancel.
However, cancelling a task does not necessarily stop its execution immediately; the cancellation is scheduled.
This is because when you cancel a Task, a CancelledError exception is raised inside the Task.
We are rescheduling the task to run with that exception raised.
That CancelledError exception will become the Task’s result.
One final note is that calling result() or exception() on a cancelled task will raise that same CancelledError exception.
async def my_coroutine():
while True:
await Timer(10, unit="ns")
@cocotb.test()
async def my_test(dut):
task = cocotb.start_soon(my_coroutine())
# Cancel the task after 50 ns.
await Timer(50, unit="ns")
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled!")
Now we can add some more functionality to our Driver example by adding a MyDriver.stop method that cancels the Driver’s main task to stop the Driver’s execution.
We are also taking advantage of the fact that cancellation raises a CancelledError exception to clean up the interface when the Driver is stopped.
class MyDriver:
def __init__(self, intf):
self._intf = intf
self._task = None
async def _run(self):
try:
while True:
self._intf.valid = 0
self._intf.data = 0
await RisingEdge(self._intf.clk)
if self._intf.ready.value == 1:
self._intf.valid = 1
self._intf.data = random.randint(0, 255)
finally:
# Clean up the interface when the task is cancelled.
# This is triggered by the CancelledError exception being raised.
self._intf.valid = 0
self._intf.data = 0
def start(self):
assert self._task is None, "Driver is already running!"
self._task = cocotb.start_soon(self._run())
def stop(self):
assert self._task is not None, "Driver is not running!"
# Cancel the task to stop the Driver's execution.
self._task.cancel()
self._task = None
gather() and select()#
Another common concurrency use case you may run into is needing to wait for multiple tasks to finish before proceeding. For specific examples:
Waiting for multiple simultaneously executing stimulus generators to finish before finishing the test.
Waiting for multiple testbench components to quiesce during test end.
Timing out an operation.
cocotb provides two functions for this purpose, gather() and select().
Both of these functions take a variable number of awaitable objects and wait for all (gather()) or any (select()) of them to finish before proceeding.
gather() returns a list of results corresponding to each of the awaitable objects <awaitable> passed in,
while select() returns a tuple of the result of the first awaitable to return and the index in the argument list it corresponds to.
This is accomplished by creating an “internal waiter Task” that awaits each of the awaitable objects <awaitable>.
These functions provide value by guaranteeing that no internal waiter task will be left running unintentionally after they finish.
If any of the awaitable objects <awaitable> raise an exception, that exception is propagated to the caller of gather() or select().
After an exception is raised, all unfinished internal waiter Tasks are cancelled.
Likewise, as soon as the first awaitable in a select() returns, all unfinished internal waiter Tasks are cancelled.
These guarantees are what is meant by “structured concurrency.”
These two functions compose with each other easily to create more complex waiting conditions.
@cocotb.test()
async def my_test(arbitrator):
... # set up test
# 10k transactions on each sequencer running simultaneously
await gather(
sequencer_a.run(10000),
sequencer_b.run(10000),
)
res, i = await select(
# Wait for scoreboard and the design to quiesce
gather(
scoreboard.wait_for_quiescence(),
RisingEdge(dut.empty),
),
# Timeout after 100 us
Timer(100, unit="us"),
)
if i == 0:
print("Design quiesced!")
else:
raise TimeoutError("Design did not quiesce within 100 us!")
... # check results of the test
Note
If you pass a Task object to gather() or select() and an exception occurs
or select() returns before that task finishes,
the passed in task will not be cancelled, only the internal waiter Task.
Note
You may be familiar with First and Combine which can also be used to wait for multiple things,
but they are limited to waiting for triggers only, not coroutines or other awaitables <awaitable>.
This often forces the user to use cocotb.start_soon() and manage task lifetimes themselves
which is verbose and rarely done correctly leading to tasks “leaking”.
They also do not return useful results like gather() and select() do.
For those reasons they are no longer recommended except for passing to functions or objects where specifically triggers are expected.
TaskManager#
The TaskManager class is another way to run multiple async routines concurrently and wait for them all to complete.
It properly manages the lifetime of its “children” and handles exceptions and cancellations gracefully.
Unlike gather() which takes all awaitables and coroutines at once,
TaskManager allows adding new awaitables and coroutines dynamically,
and provides options to control exception handling behavior on a per-Task basis,
making it much more flexible.
Basic Usage#
To use TaskManager, first construct it and use it as an asynchronous context manager with the async with statement.
Inside the context block you can use the @fork decorator method to start Tasks concurrently.
When control reaches the end of the context block
the TaskManager blocks the encompassing Task until all child Tasks complete.
from cocotb.triggers import TaskManager
# Drive two interfaces concurrently until both complete.
async with TaskManager() as tm:
@tm.fork
async def drive_interface1(): ...
@tm.fork
async def drive_interface2(): ...
# Control returns here when all drive Tasks have completed
In addition to the @fork method for starting coroutine functions concurrently,
start_soon() can be used for awaiting arbitrary awaitables concurrently.
# Wait for operation to complete or timeout after 1 us
async with TaskManager() as tm:
tm.start_soon(RisingEdge(cocotb.top.operation_complete))
@tm.fork
async def watchdog():
await Timer(1, "us")
raise TimeoutError("Operation did not complete in time")
Inspecting Child Task Results#
You can inspect the result of child tasks by storing the Task objects returned by the start_soon() method.
When decorating a coroutine function with @fork,
the name of the function will become the returned Task object.
async with TaskManager() as tm:
task1 = tm.start_soon(RisingEdge(cocotb.top.signal_a))
# task2 will become the task object after wrapping the coroutine function with @fork
@tm.fork
async def task2():
return 42
assert task1.done()
assert task1.result() is RisingEdge(cocotb.top.signal_a)
assert task2.done()
assert task2.result() == 42
Note
After exiting the context block and the TaskManager has begun finishing,
no further calls to start_soon() or @fork are permitted.
Attempting to do so will raise a RuntimeError.
Handling Exceptions and continue_on_error#
TaskManager gracefully handles exceptions raised in child Tasks or in the context block itself.
It ensures that no child Task is left running unintentionally by the time the context block exits.
The behavior of TaskManager when a child Task raises an exception is controlled by the continue_on_error parameter.
The constructor for TaskManager accepts an optional parameter default_continue_on_error which is used as the default for all child tasks;
it defaults to False.
The TaskManager-wide default can be overridden on a per-Task basis using the continue_on_error parameter to the @fork or start_soon() methods.
async with TaskManager(default_continue_on_error=True) as tm:
@tm.fork(continue_on_error=False)
async def task1(): ...
tm.start_soon(some_coroutine(), continue_on_error=True)
If a child Task raises an exception,
one of two behaviors will occur depending on the value of continue_on_error for that Task.
If the continue_on_error parameter is False, all other child Tasks are cancelled and the TaskManager will begin shutting down.
If the continue_on_error parameter is True, the exception is captured and other child Tasks are allowed to continue running.
After all child Tasks have finished,
all exceptions, besides CancelledError, are gathered into an ExceptionGroup,
or a BaseExceptionGroup, if at least one of the exceptions is a BaseException,
and raised in the enclosing scope.
You can catch the ExceptionGroup to handle errors from child Tasks
by either catching the ExceptionGroup as you would typically;
or, if you are running Python 3.11 or later,
using the new except* syntax
to catch specific exception types from the group.
This new syntax will run the except clause for each matching exception in the group.
try:
async with TaskManager(default_continue_on_error=True) as tm:
@tm.fork
async def task1():
...
raise ValueError("An error occurred in task1")
@tm.fork
async def task2():
...
raise ValueError("An error occurred in task2")
except* ValueError as e:
# This will print both ValueErrors from task1 and task2
cocotb.log.info(f"Caught ValueError from TaskManager: {e}")
Note
After a Task fails and the TaskManager begins cancelling,
no further calls to start_soon() or @fork are permitted.
Attempting to do so will raise a RuntimeError.
Failures Within the Context Block#
You are permitted to add any await statement to the body of the context block.
This means that it is possible for child tasks to start running, and then end with an exception, before the context block has finished.
In this case, a CancelledError will be raised from the current await expression in the context block,
allowing the user to perform any necessary cleanup.
This CancelledError will be squashed when the context block exits,
and TaskManager continues shutting down as it normally would.
async with TaskManager() as tm:
@tm.fork
async def task1():
raise ValueError("An error occurred in task1")
try:
await Timer(10, unit="ns") # During this await, task1 will fail
except CancelledError:
cocotb.log.info(
"The rest of the context block will be skipped due to task1 failing"
)
raise # DON'T FORGET THIS
... # This code will be skipped
Warning
Just like with Task, if a TaskManager context block is cancelled
and squashes the resulting asyncio.CancelledError, the test will be forcibly failed immediately.
Always remember to re-raise the asyncio.CancelledError if you catch it.
A context block can also fail with an exception like a child Task could.
In this case, if the context_continue_on_error parameter to the constructor is False, all child Tasks are cancelled;
if it is set to True, other child Tasks are allowed to continue running.
In either case, after all child Tasks have finished,
all exceptions, besides CancelledError, are gathered into an ExceptionGroup,
or a BaseExceptionGroup, if at least one of the exceptions is a BaseException,
and raised in the enclosing scope.
try:
async with TaskManager(context_continue_on_error=True) as tm:
@tm.fork
async def task1():
...
return 42
raise ValueError("An error occurred in the context block")
except* ValueError as e:
# This will print the ValueError from the context block
cocotb.log.info(f"Caught ValueError from TaskManager: {e}")
assert task1.result() == 42 # task1 was allowed to continue running until completion
Nesting TaskManager#
TaskManagers can be arbitrarily nested.
When any child Task fails, the entire tree of child Tasks will eventually be cancelled.
async with TaskManager() as tm_outer:
@tm_outer.fork
async def outer_task():
...
raise RuntimeError("An error occurred in outer_task")
async with TaskManager() as tm_inner:
# This inner task will be cancelled when outer_task fails
@tm_inner.fork
async def another_task(): ...
assert outer_task.exception() is RuntimeError
assert another_task.cancelled()
Note
Prefer using gather() and select() when possible as they are more readable for the cases they can cover.
Synchronization and Inter-Task Communication#
What we’ve covered in the past few sections mostly deals with asymmetric concurrency: parent tasks spawning multiple independent child tasks and waiting for them to finish. However, it is common for multiple independent tasks to need to communicate or coordinate with each other. For example:
A Driver running concurrently needs stimulus that’s generated by another task or the test.
Multiple tasks need to share access to a shared resource like a register bus.
The test needs to know when a stimulus generator has finished or a scoreboard has quiesced.
cocotb provides several APIs for inter-task communication and synchronization, including:
Eventfor notifying waiters that some event has occurred.Queuefor passing messages between tasks.Lockfor ensuring exclusive access to a shared resource.
Let’s go through each of these in turn.
Event#
An Event allows one or more tasks to wait for an event to occur.
Waiting is done by awaiting Event.wait(), which blocks until the Event is set.
The Event is set by calling Event.set(), which wakes up all tasks that are waiting on that Event.
Events are stateful;
after an Event is set, any future calls to Event.wait() will not block.
You can check to see if the Event is in the “set” state with Event.is_set()
and put the Event back into the “unset” state with Event.clear().
from cocotb.triggers import Event, with_timeout
class Checker:
"""Checks expected vs actual in order of arrival. Assumes expecteds arrive before actuals."""
def __init__(self):
self._quiesced = Event()
self._expected_queue = []
def add_expected(self, expected):
self._expected_queue.append(expected)
self._quiesced.clear() # Not quiesced if there's an expected waiting to be checked.
def add_actual(self, actual):
expected = self._expected_queue.pop(0)
assert actual == expected
if not self._expected_queue:
self._quiesced.set() # Quiesced if there are no more expecteds waiting to be checked.
@cocotb.test
async def my_test(dut):
checker = Checker()
... # set up drivers and run stimulus
# Wait for all expecteds to be checked.
await with_timeout(
checker._quiesced.wait(),
timeout_time=100,
timeout_unit="us",
)
Queue#
A Queue is a first-in-first-out (FIFO) queue with built-in synchronization,
somewhat similar to a Mailbox in UVM,
and very similar to asyncio.Queue in the Python standard library.
It supports multiple producers and consumers,
and allows producers and consumers to wait for items to be added or removed from the queue as necessary.
There are two interfaces to a Queue, a blocking interface with put() and get(),
and a non-blocking interface with put_nowait() and get_nowait().
Each Queue has a maximum size, which is infinite by default.
If a producer tries to add an item to a full queue, it will block until there is space in the queue (put()) or raise an exception (put_nowait()).
If a consumer tries to remove an item from an empty queue, it will block until there is an item in the queue (get()) or raise an exception (get_nowait()).
A common use case might be a Driver, which runs in an independent task, that needs stimulus from the test or some other task.
Going back to our earlier Driver example, we can modify it to take stimulus from a Queue instead of generating it randomly.
from cocotb.queue import Queue
class MyDriver:
def __init__(self, intf):
self._intf = intf
self._task = None
self._queue = Queue()
async def _run(self):
while True:
# Default values when there's no stimulus to send.
self._intf.valid = 0
self._intf.data = 0
# Blocks the Driver in the idle state until there is stimulus to send.
data = await self._queue.get()
# Synchronize.
await RisingEdge(self._intf.clk)
# Write the data.
self._intf.valid = 1
self._intf.data = data
# Wait for ready to go high to ensure the transaction is accepted.
while self._intf.ready.value != 1:
await RisingEdge(self._intf.clk)
def add_transaction(self, data):
# Add stimulus to the queue without blocking.
# Since the Queue is unbounded, this will never raise an exception.
self._queue.put_nowait(data)
... # start and stop methods omitted for brevity
@cocotb.test
async def my_test(dut):
driver = MyDriver(dut.input)
Lock#
The final synchronization primitive that cocotb provides is Lock.
This is a mutual exclusion lock similar to asyncio.Lock in the Python standard library.
A Lock has two states, “locked” and “unlocked.”
When a Lock is locked, any task that tries to acquire the lock will block until the lock is unlocked.
When a Lock is unlocked, any task can acquire the lock, which will change its state to locked.
The methods Lock.acquire() and release() are used to acquire and release the lock, respectively.
However, it is recommended to use Lock as an asynchronous context manager with async with,
which will automatically acquire the lock at the beginning of the block and release it at the end of the block, even if an exception is raised.
A common use case is to gate access to a shared resource like a register bus.
You may have multiple tasks that need to read registers for checking purposes,
or poll a register for a condition to be met,
or to write registers for configuration purposes.
To prevent collisions, a Lock should be used.
Below we wrap a register bus interface in a class that uses a Lock to ensure only one transaction can be happening at a time.
from enum import IntEnum
from cocotb.triggers import RisingEdge, Lock
class RequestType(IntEnum):
IDLE = 0
READ = 1
WRITE = 2
class RegisterBus:
def __init__(self, intf):
self._intf = intf
# The mutual exclusion lock.
self._lock = Lock()
async def read(self, addr):
# Acquire the lock.
async with self._lock:
# Perform the read transaction on the interface.
self._intf.addr = addr
self._intf.request = RequestType.READ
await RisingEdge(self._intf.clk)
self._intf.request = RequestType.IDLE
while self._intf.complete.value != 1:
await RisingEdge(self._intf.clk)
return self._intf.data.value
# Lock automatically released.
async def write(self, addr, data):
# Acquire the lock.
async with self._lock:
# Perform the write transaction on the interface.
self._intf.addr = addr
self._intf.data = data
self._intf.request = RequestType.WRITE
await RisingEdge(self._intf.clk)
self._intf.request = RequestType.IDLE
while self._intf.complete.value != 1:
await RisingEdge(self._intf.clk)
# Lock automatically released.
Async generators#
One final coroutine feature that Python provides that is worth mentioning is the async generator.
They are conceptually similar to regular Python generators,
and use the same yield syntax to send values back to the caller,
but they also support awaiting in their body like regular Python coroutines.
Iteration over async generators is done with async for:
async def ten_samples_of(clk, signal):
for i in range(10):
await RisingEdge(clk)
yield signal.value # this means "send back to the for loop"
@cocotb.test()
async def test_samples_are_even(dut):
async for sample in ten_samples_of(dut.clk, dut.signal):
assert sample % 2 == 0
More details on this type of generator can be found in PEP 525.