# Copyright (c) 2013 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of Potential Ventures Ltd,
# SolarFlare Communications Inc nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL POTENTIAL VENTURES LTD BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Monitors for Intel Avalon interfaces.
See https://www.intel.com/content/dam/www/programmable/us/en/pdfs/literature/manual/mnl_avalon_spec_1_3.pdf
NB Currently we only support a very small subset of functionality.
"""
import warnings
from cocotb.utils import hexdump
from cocotb.decorators import coroutine
from cocotb.monitors import BusMonitor
from cocotb.triggers import RisingEdge, ReadOnly
from cocotb.binary import BinaryValue
class AvalonProtocolError(Exception):
pass
[docs]class AvalonST(BusMonitor):
"""Avalon-ST bus.
Non-packetized so each valid word is a separate transaction.
"""
_signals = ["valid", "data"]
_optional_signals = ["ready"]
_default_config = {"firstSymbolInHighOrderBits": True}
def __init__(self, entity, name, clock, *, config={}, **kwargs):
BusMonitor.__init__(self, entity, name, clock, **kwargs)
self.config = self._default_config.copy()
for configoption, value in config.items():
self.config[configoption] = value
self.log.debug("Setting config option %s to %s", configoption, str(value))
@coroutine
def _monitor_recv(self):
"""Watch the pins and reconstruct transactions."""
# Avoid spurious object creation by recycling
clkedge = RisingEdge(self.clock)
rdonly = ReadOnly()
def valid():
if hasattr(self.bus, "ready"):
return self.bus.valid.value and self.bus.ready.value
return self.bus.valid.value
# NB could yield on valid here more efficiently?
while True:
yield clkedge
yield rdonly
if valid():
vec = self.bus.data.value
vec.big_endian = self.config["firstSymbolInHighOrderBits"]
self._recv(vec.buff)
[docs]class AvalonSTPkts(BusMonitor):
"""Packetized Avalon-ST bus.
Args:
entity, name, clock: see :class:`BusMonitor`
config (dict): bus configuration options
report_channel (bool): report channel with data, default is False
Setting to True on bus without channel signal will give an error
"""
_signals = ["valid", "data", "startofpacket", "endofpacket"]
_optional_signals = ["error", "channel", "ready", "empty"]
_default_config = {
"dataBitsPerSymbol" : 8,
"firstSymbolInHighOrderBits" : True,
"maxChannel" : 0,
"readyLatency" : 0,
"invalidTimeout" : 0,
}
def __init__(self, entity, name, clock, *, config={}, report_channel=False, **kwargs):
BusMonitor.__init__(self, entity, name , clock, **kwargs)
self.config = self._default_config.copy()
self.report_channel = report_channel
# Set default config maxChannel to max value on channel bus
if hasattr(self.bus, 'channel'):
self.config['maxChannel'] = (2 ** len(self.bus.channel)) -1
else:
if report_channel:
raise ValueError("Channel reporting asked on bus without channel signal")
for configoption, value in config.items():
self.config[configoption] = value
self.log.debug("Setting config option %s to %s",
configoption, str(value))
num_data_symbols = (len(self.bus.data) /
self.config["dataBitsPerSymbol"])
if (num_data_symbols > 1 and not hasattr(self.bus, 'empty')):
raise AttributeError(
"%s has %i data symbols, but contains no object named empty" %
(self.name, num_data_symbols))
self.config["useEmpty"] = (num_data_symbols > 1)
if hasattr(self.bus, 'channel'):
if len(self.bus.channel) > 128:
raise AttributeError("AvalonST interface specification defines channel width as 1-128. "
"%d channel width is %d" %
(self.name, len(self.bus.channel)))
maxChannel = (2 ** len(self.bus.channel)) -1
if self.config['maxChannel'] > maxChannel:
raise AttributeError("%s has maxChannel=%d, but can only support a maximum channel of "
"(2**channel_width)-1=%d, channel_width=%d" %
(self.name, self.config['maxChannel'], maxChannel, len(self.bus.channel)))
@coroutine
def _monitor_recv(self):
"""Watch the pins and reconstruct transactions."""
# Avoid spurious object creation by recycling
clkedge = RisingEdge(self.clock)
rdonly = ReadOnly()
pkt = b""
in_pkt = False
invalid_cyclecount = 0
channel = None
def valid():
if hasattr(self.bus, 'ready'):
return self.bus.valid.value and self.bus.ready.value
return self.bus.valid.value
while True:
yield clkedge
yield rdonly
if self.in_reset:
continue
if valid():
invalid_cyclecount = 0
if self.bus.startofpacket.value:
if pkt:
raise AvalonProtocolError("Duplicate start-of-packet received on %s" %
str(self.bus.startofpacket))
pkt = b""
in_pkt = True
if not in_pkt:
raise AvalonProtocolError("Data transfer outside of "
"packet")
# Handle empty and X's in empty / data
vec = BinaryValue()
if not self.bus.endofpacket.value:
vec = self.bus.data.value
else:
value = self.bus.data.value.get_binstr()
if self.config["useEmpty"] and self.bus.empty.value.integer:
empty = self.bus.empty.value.integer * self.config["dataBitsPerSymbol"]
if self.config["firstSymbolInHighOrderBits"]:
value = value[:-empty]
else:
value = value[empty:]
vec.assign(value)
if not vec.is_resolvable:
raise AvalonProtocolError("After empty masking value is still bad? "
"Had empty {:d}, got value {:s}".format(empty,
self.bus.data.value.get_binstr()))
vec.big_endian = self.config['firstSymbolInHighOrderBits']
pkt += vec.buff
if hasattr(self.bus, 'channel'):
if channel is None:
channel = self.bus.channel.value.integer
if channel > self.config["maxChannel"]:
raise AvalonProtocolError("Channel value (%d) is greater than maxChannel (%d)" %
(channel, self.config["maxChannel"]))
elif self.bus.channel.value.integer != channel:
raise AvalonProtocolError("Channel value changed during packet")
if self.bus.endofpacket.value:
self.log.info("Received a packet of %d bytes", len(pkt))
self.log.debug(hexdump(pkt))
self.channel = channel
if self.report_channel:
self._recv({"data": pkt, "channel": channel})
else:
self._recv(pkt)
pkt = b""
in_pkt = False
channel = None
else:
if in_pkt:
invalid_cyclecount += 1
if self.config["invalidTimeout"]:
if invalid_cyclecount >= self.config["invalidTimeout"]:
raise AvalonProtocolError(
"In-Packet Timeout. Didn't receive any valid data for %d cycles!" %
invalid_cyclecount)
class AvalonSTPktsWithChannel(AvalonSTPkts):
"""Packetized AvalonST bus using channel.
This class is deprecated. Use AvalonSTPkts(..., report_channel=True, ...)
"""
def __init__(self, entity, name, clock, **kwargs):
warnings.warn(
"Use of AvalonSTPktsWithChannel is deprecated\n"
"\tUse AvalonSTPkts(..., report_channel=True, ...)",
DeprecationWarning, stacklevel=2
)
AvalonSTPkts.__init__(self, entity, name, clock, report_channel=True, **kwargs)