Coroutines¶
Testbenches built using cocotb use coroutines. While the coroutine is executing
the simulation is paused. The coroutine uses the yield
keyword to
pass control of execution back to the simulator and simulation time can advance
again.
Typically coroutines yield
a Trigger
object which
indicates to the simulator some event which will cause the coroutine to be woken
when it occurs. For example:
@cocotb.coroutine
def wait_10ns():
cocotb.log.info("About to wait for 10ns")
yield Timer(10000)
cocotb.log.info("Simulation time has advanced by 10 ns")
Coroutines may also yield other coroutines:
@cocotb.coroutine
def wait_100ns():
for i in range(10):
yield wait_10ns()
Coroutines can return a value, so that they can be used by other coroutines.
Before Python 3.3, this requires a ReturnValue
to be raised.
@cocotb.coroutine
def get_signal(clk, signal):
yield RisingEdge(clk)
raise ReturnValue(signal.value)
@cocotb.coroutine
def get_signal_python_33(clk, signal):
# newer versions of Python can use return normally
yield RisingEdge(clk)
return signal.value
@cocotb.coroutine
def check_signal_changes(dut):
first = yield get_signal(dut.clk, dut.signal)
second = yield get_signal(dut.clk, dut.signal)
if first == second:
raise TestFailure("Signal did not change")
Coroutines may also yield a list of triggers and coroutines to indicate that execution should resume if any of them fires:
@cocotb.coroutine
def packet_with_timeout(monitor, timeout):
"""Wait for a packet but time out if nothing arrives"""
yield [Timer(timeout), RisingEdge(dut.ready)]
The trigger that caused execution to resume is passed back to the coroutine, allowing them to distinguish which trigger fired:
@cocotb.coroutine
def packet_with_timeout(monitor, timeout):
"""Wait for a packet but time out if nothing arrives"""
tout_trigger = Timer(timeout)
result = yield [tout_trigger, RisingEdge(dut.ready)]
if result is tout_trigger:
raise TestFailure("Timed out waiting for packet")
Coroutines can be forked for parallel operation within a function of that code and the forked code.
@cocotb.test()
def test_act_during_reset(dut):
"""While reset is active, toggle signals"""
tb = uart_tb(dut)
# "Clock" is a built in class for toggling a clock signal
cocotb.fork(Clock(dut.clk, 1000).start())
# reset_dut is a function -
# part of the user-generated "uart_tb" class
cocotb.fork(tb.reset_dut(dut.rstn, 20000))
yield Timer(10000)
print("Reset is still active: %d" % dut.rstn)
yield Timer(15000)
print("Reset has gone inactive: %d" % dut.rstn)
Coroutines can be joined to end parallel operation within a function.
@cocotb.test()
def test_count_edge_cycles(dut, period=1000, clocks=6):
cocotb.fork(Clock(dut.clk, period).start())
yield RisingEdge(dut.clk)
timer = Timer(period + 10)
task = cocotb.fork(count_edges_cycles(dut.clk, clocks))
count = 0
expect = clocks - 1
while True:
result = yield [timer, task.join()]
if count > expect:
raise TestFailure("Task didn't complete in expected time")
if result is timer:
dut._log.info("Count %d: Task still running" % count)
count += 1
else:
break
Coroutines can be killed before they complete, forcing their completion before they’d naturally end.
@cocotb.test()
def test_different_clocks(dut):
clk_1mhz = Clock(dut.clk, 1.0, units='us')
clk_250mhz = Clock(dut.clk, 4.0, units='ns')
clk_gen = cocotb.fork(clk_1mhz.start())
start_time_ns = get_sim_time(units='ns')
yield Timer(1)
yield RisingEdge(dut.clk)
edge_time_ns = get_sim_time(units='ns')
# NOTE: isclose is a python 3.5+ feature
if not isclose(edge_time_ns, start_time_ns + 1000.0):
raise TestFailure("Expected a period of 1 us")
clk_gen.kill()
clk_gen = cocotb.fork(clk_250mhz.start())
start_time_ns = get_sim_time(units='ns')
yield Timer(1)
yield RisingEdge(dut.clk)
edge_time_ns = get_sim_time(units='ns')
# NOTE: isclose is a python 3.5+ feature
if not isclose(edge_time_ns, start_time_ns + 4.0):
raise TestFailure("Expected a period of 4 ns")