Source code for cocotb.wavedrom

# Copyright cocotb contributors
# Copyright (c) 2014 Potential Ventures Ltd
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause

import json
from collections import OrderedDict, defaultdict
from collections.abc import Mapping

import cocotb
from cocotb.handle import SimHandleBase
from cocotb.triggers import ReadOnly, RisingEdge

try:
    from cocotb_bus.bus import Bus
except ImportError:
    Bus = None


[docs]class Wavedrom: """Base class for a WaveDrom compatible tracer.""" def __init__(self, obj, name=""): self._hdls = OrderedDict() self._name = name if isinstance(obj, Mapping): self._hdls.update(obj) elif isinstance(obj, SimHandleBase): name = obj._name.split(".")[-1] self._hdls[name] = obj self._name = name elif Bus is not None and isinstance(obj, Bus): self._hdls.update(obj._signals) self._name = obj._name else: raise TypeError( "Cannot use {} with {} objects".format( type(self).__qualname__, type(obj).__name__ ) ) from None self.clear()
[docs] def sample(self): """Record a sample of the signal value at this point in time.""" def _lastval(samples): for x in range(len(samples) - 1, -1, -1): if samples[x] not in "=.|": return samples[x] return None for name, hdl in self._hdls.items(): val = hdl.value valstr = val.binstr.lower() # Decide what character to use to represent this signal if len(valstr) == 1: char = valstr elif "x" in valstr: char = "x" elif "u" in valstr: char = "u" elif "z" in valstr: char = "z" else: if ( len(self._data[name]) and self._data[name][-1] == int(val) and self._samples[name][-1] in "=." ): char = "." else: char = "=" self._data[name].append(int(val)) # Detect if this is unchanged if len(valstr) == 1 and char == _lastval(self._samples[name]): char = "." self._samples[name].append(char)
[docs] def clear(self): """Delete all sampled data.""" self._samples = defaultdict(list) self._data = defaultdict(list)
def gap(self): for name, hdl in self._hdls.items(): self._samples[name].append("|")
[docs] def get(self, add_clock=True): """Return the samples as a list suitable for use with WaveDrom.""" siglist = [] traces = [] for name in self._hdls.keys(): samples = self._samples[name] traces.append({"name": name, "wave": "".join(samples)}) if name in self._data: traces[-1]["data"] = " ".join([repr(s) for s in self._data[name]]) if len(traces) > 1: traces.insert(0, self._name) siglist.append(traces) else: siglist.append(traces[0]) if add_clock: tracelen = len(traces[-1]["wave"]) siglist.insert(0, {"name": "clk", "wave": "p" + "." * (tracelen - 1)}) return siglist
[docs]class trace: """Context manager to enable tracing of signals. Arguments are an arbitrary number of signals or buses to trace. We also require a clock to sample on, passed in as a keyword argument. Usage:: with trace(sig1, sig2, a_bus, clk=clk) as waves: # Stuff happens, we trace it # Dump to JSON format compatible with WaveDrom j = waves.dumpj() """ def __init__(self, *args, clk=None): self._clock = clk self._signals = [] for arg in args: self._signals.append(Wavedrom(arg)) self._coro = None self._clocks = 0 self._enabled = False if self._clock is None: raise ValueError("Trace requires a clock to sample") async def _monitor(self): self._clocks = 0 while True: await RisingEdge(self._clock) await ReadOnly() if not self._enabled: continue self._clocks += 1 for sig in self._signals: sig.sample() def insert_gap(self): self._clocks += 1 for sig in self._signals: sig.gap() def disable(self): self._enabled = False def enable(self): self._enabled = True def __enter__(self): for sig in self._signals: sig.clear() self.enable() self._coro = cocotb.start_soon(self._monitor()) return self def __exit__(self, exc_type, exc_val, exc_tb): self._coro.kill() for sig in self._signals: sig.clear() self.disable() return None def write(self, filename, **kwargs): with open(filename, "w") as f: f.write(self.dumpj(**kwargs)) def dumpj(self, header="", footer="", config=""): trace = {"signal": []} trace["signal"].append( {"name": "clock", "wave": "p" + "." * (self._clocks - 1)} ) for sig in self._signals: trace["signal"].extend(sig.get(add_clock=False)) if header: if isinstance(header, dict): trace["head"] = header else: trace["head"] = {"text": header} if footer: if isinstance(footer, dict): trace["foot"] = footer else: trace["foot"] = {"text": footer} if config: trace["config"] = config return json.dumps(trace, indent=4, sort_keys=False)