Source code for cocotb_bus.monitors.avalon

# Copyright cocotb contributors
# Copyright (c) 2013 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause

"""Monitors for Intel Avalon interfaces.

See https://www.intel.com/content/dam/www/programmable/us/en/pdfs/literature/manual/mnl_avalon_spec_1_3.pdf

NB Currently we only support a very small subset of functionality.
"""

import warnings

from cocotb.utils import hexdump
from cocotb.triggers import RisingEdge, ReadOnly
from cocotb.binary import BinaryValue

from cocotb_bus.monitors import BusMonitor


class AvalonProtocolError(Exception):
    pass


[docs]class AvalonST(BusMonitor): """Avalon-ST bus. Non-packetized so each valid word is a separate transaction. """ _signals = ["valid", "data"] _optional_signals = ["ready"] _default_config = {"firstSymbolInHighOrderBits": True} def __init__(self, entity, name, clock, *, config={}, **kwargs): BusMonitor.__init__(self, entity, name, clock, **kwargs) self.config = self._default_config.copy() for configoption, value in config.items(): self.config[configoption] = value self.log.debug("Setting config option %s to %s", configoption, str(value)) async def _monitor_recv(self): """Watch the pins and reconstruct transactions.""" # Avoid spurious object creation by recycling clkedge = RisingEdge(self.clock) rdonly = ReadOnly() def valid(): if hasattr(self.bus, "ready"): return self.bus.valid.value and self.bus.ready.value return self.bus.valid.value # NB could await on valid here more efficiently? while True: await clkedge await rdonly if valid(): vec = self.bus.data.value vec.big_endian = self.config["firstSymbolInHighOrderBits"] self._recv(vec.buff)
[docs]class AvalonSTPkts(BusMonitor): """Packetized Avalon-ST bus. Args: entity, name, clock: see :class:`BusMonitor` config (dict): bus configuration options report_channel (bool): report channel with data, default is False Setting to True on bus without channel signal will give an error """ _signals = ["valid", "data", "startofpacket", "endofpacket"] _optional_signals = ["error", "channel", "ready", "empty"] _default_config = { "dataBitsPerSymbol" : 8, "firstSymbolInHighOrderBits" : True, "maxChannel" : 0, "readyLatency" : 0, "invalidTimeout" : 0, } def __init__(self, entity, name, clock, *, config={}, report_channel=False, **kwargs): BusMonitor.__init__(self, entity, name , clock, **kwargs) self.config = self._default_config.copy() self.report_channel = report_channel # Set default config maxChannel to max value on channel bus if hasattr(self.bus, 'channel'): self.config['maxChannel'] = (2 ** len(self.bus.channel)) -1 else: if report_channel: raise ValueError("Channel reporting asked on bus without channel signal") for configoption, value in config.items(): self.config[configoption] = value self.log.debug("Setting config option %s to %s", configoption, str(value)) num_data_symbols = (len(self.bus.data) / self.config["dataBitsPerSymbol"]) if (num_data_symbols > 1 and not hasattr(self.bus, 'empty')): raise AttributeError( "%s has %i data symbols, but contains no object named empty" % (self.name, num_data_symbols)) self.config["useEmpty"] = (num_data_symbols > 1) if hasattr(self.bus, 'channel'): if len(self.bus.channel) > 128: raise AttributeError("AvalonST interface specification defines channel width as 1-128. " "%d channel width is %d" % (self.name, len(self.bus.channel))) maxChannel = (2 ** len(self.bus.channel)) -1 if self.config['maxChannel'] > maxChannel: raise AttributeError("%s has maxChannel=%d, but can only support a maximum channel of " "(2**channel_width)-1=%d, channel_width=%d" % (self.name, self.config['maxChannel'], maxChannel, len(self.bus.channel))) async def _monitor_recv(self): """Watch the pins and reconstruct transactions.""" # Avoid spurious object creation by recycling clkedge = RisingEdge(self.clock) rdonly = ReadOnly() pkt = b"" in_pkt = False invalid_cyclecount = 0 channel = None def valid(): if hasattr(self.bus, 'ready'): return self.bus.valid.value and self.bus.ready.value return self.bus.valid.value while True: await clkedge await rdonly if self.in_reset: continue if valid(): invalid_cyclecount = 0 if self.bus.startofpacket.value: if pkt: raise AvalonProtocolError("Duplicate start-of-packet received on %s" % str(self.bus.startofpacket)) pkt = b"" in_pkt = True if not in_pkt: raise AvalonProtocolError("Data transfer outside of " "packet") # Handle empty and X's in empty / data vec = BinaryValue() if not self.bus.endofpacket.value: vec = self.bus.data.value else: value = self.bus.data.value.get_binstr() if self.config["useEmpty"] and self.bus.empty.value.integer: empty = self.bus.empty.value.integer * self.config["dataBitsPerSymbol"] if self.config["firstSymbolInHighOrderBits"]: value = value[:-empty] else: value = value[empty:] vec.assign(value) if not vec.is_resolvable: raise AvalonProtocolError("After empty masking value is still bad? " "Had empty {:d}, got value {:s}".format(empty, self.bus.data.value.get_binstr())) vec.big_endian = self.config['firstSymbolInHighOrderBits'] pkt += vec.buff if hasattr(self.bus, 'channel'): if channel is None: channel = self.bus.channel.value.integer if channel > self.config["maxChannel"]: raise AvalonProtocolError("Channel value (%d) is greater than maxChannel (%d)" % (channel, self.config["maxChannel"])) elif self.bus.channel.value.integer != channel: raise AvalonProtocolError("Channel value changed during packet") if self.bus.endofpacket.value: self.log.info("Received a packet of %d bytes", len(pkt)) self.log.debug(hexdump(pkt)) self.channel = channel if self.report_channel: self._recv({"data": pkt, "channel": channel}) else: self._recv(pkt) pkt = b"" in_pkt = False channel = None else: if in_pkt: invalid_cyclecount += 1 if self.config["invalidTimeout"]: if invalid_cyclecount >= self.config["invalidTimeout"]: raise AvalonProtocolError( "In-Packet Timeout. Didn't receive any valid data for %d cycles!" % invalid_cyclecount)
class AvalonSTPktsWithChannel(AvalonSTPkts): """Packetized AvalonST bus using channel. This class is deprecated. Use AvalonSTPkts(..., report_channel=True, ...) """ def __init__(self, entity, name, clock, **kwargs): warnings.warn( "Use of AvalonSTPktsWithChannel is deprecated\n" "\tUse AvalonSTPkts(..., report_channel=True, ...)", DeprecationWarning, stacklevel=2 ) AvalonSTPkts.__init__(self, entity, name, clock, report_channel=True, **kwargs)