# Created on Aug 24, 2014
#
# @author: msnook
import cocotb
from cocotb.triggers import Timer, RisingEdge, Event
from cocotb.drivers import BusDriver
from cocotb.binary import BinaryValue, BinaryRepresentation
from collections import deque
[docs]class AD9361(BusDriver):
"""Driver for the AD9361 RF Transceiver."""
def __init__(self, dut, rx_channels=1, tx_channels=1,
tx_clock_half_period=16276, rx_clock_half_period=16276,
loopback_queue_maxlen=16):
self.dut = dut
self.tx_clock_half_period = tx_clock_half_period
self.rx_clock_half_period = rx_clock_half_period
self.rx_frame_asserted = False
self.tx_frame_asserted = False
self.lbqi = deque()
self.lbqq = deque()
cocotb.fork(self._rx_clock())
self.got_tx = Event("Got tx event")
@cocotb.coroutine
def _rx_clock(self):
t = Timer(self.rx_clock_half_period)
while True:
self.dut.rx_clk_in_p <= 1
self.dut.rx_clk_in_n <= 0
yield t
self.dut.rx_clk_in_p <= 0
self.dut.rx_clk_in_n <= 1
yield t
[docs] def send_data(self, i_data, q_data, i_data2=None, q_data2=None,
binaryRepresentation=BinaryRepresentation.TWOS_COMPLEMENT):
"""Forks the ``rx_data_to_ad9361`` coroutine to send data.
Args:
i_data (int): Data of the I0 channel.
q_data (int): Data of the Q0 channel.
i_data2 (int, optional): Data of the I1 channel.
q_data2 (int, optional): Data of the Q1 channel.
binaryRepresentation (BinaryRepresentation): The representation of the binary value.
Default is :any:`TWOS_COMPLEMENT`.
"""
print(binaryRepresentation)
cocotb.fork(self.rx_data_to_ad9361(i_data, q_data, i_data2, q_data2,
binaryRepresentation))
[docs] @cocotb.coroutine
def rx_data_to_ad9361(self, i_data, q_data, i_data2=None, q_data2=None,
binaryRepresentation=BinaryRepresentation.TWOS_COMPLEMENT):
"""Receive data to AD9361.
This is a coroutine.
Args:
i_data (int): Data of the I0 channel.
q_data (int): Data of the Q0 channel.
i_data2 (int, optional): Data of the I1 channel.
q_data2 (int, optional): Data of the Q1 channel.
binaryRepresentation (BinaryRepresentation): The representation of the binary value.
Default is :any:`TWOS_COMPLEMENT`.
"""
i_bin_val = BinaryValue(n_bits=12, bigEndian=False,
binaryRepresentation=binaryRepresentation)
q_bin_val = BinaryValue(n_bits=12, bigEndian=False,
binaryRepresentation=binaryRepresentation)
index = 0
if i_data2 is None and q_data2 is None:
while True:
yield RisingEdge(self.dut.rx_clk_in_p)
if self.rx_frame_asserted:
self.dut.rx_data_in_p <= i_bin_val[5:0]
self.dut.rx_data_in_n <= ~i_bin_val[5:0]
self.rx_frame_asserted = False
self.dut.rx_frame_in_p <= 0
self.dut.rx_frame_in_n <= 1
else:
if index < len(i_data):
i_bin_val.set_value(i_data[index])
q_bin_val.set_value(q_data[index])
index += 1
else:
return
self.dut.rx_data_in_p <= i_bin_val[11:6]
self.dut.rx_data_in_n <= ~i_bin_val[11:6]
self.rx_frame_asserted = True
self.dut.rx_frame_in_p <= 1
self.dut.rx_frame_in_n <= 0
yield RisingEdge(self.dut.rx_clk_in_n)
if self.rx_frame_asserted:
self.dut.rx_data_in_p <= q_bin_val[11:6]
self.dut.rx_data_in_n <= ~q_bin_val[11:6]
else:
self.dut.rx_data_in_p <= q_bin_val[5:0]
self.dut.rx_data_in_n <= ~q_bin_val[5:0]
else:
I_SEND_HIGH = True
Q_SEND_HIGH = True
channel = 1
while True:
yield RisingEdge(self.dut.rx_clk_in_p)
if I_SEND_HIGH:
self.dut.rx_data_in_p <= i_bin_val[11:6]
self.dut.rx_data_in_n <= ~i_bin_val[11:6]
I_SEND_HIGH = False
if channel == 1:
self.dut.rx_frame_in_p <= 1
self.dut.rx_frame_in_n <= 0
elif channel == 2:
self.dut.rx_frame_in_p <= 0
self.dut.rx_frame_in_n <= 1
else:
self.dut.rx_data_in_p <= i_bin_val[5:0]
self.dut.rx_data_in_n <= ~i_bin_val[5:0]
I_SEND_HIGH = True
yield RisingEdge(self.dut.rx_clk_in_n)
if Q_SEND_HIGH:
self.dut.rx_data_in_p <= q_bin_val[5:0]
self.dut.rx_data_in_n <= ~q_bin_val[5:0]
Q_SEND_HIGH = False
else:
self.dut.rx_data_in_p <= q_bin_val[11:6]
self.dut.rx_data_in_n <= ~q_bin_val[11:6]
Q_SEND_HIGH = True
if index < len(i_data):
if channel == 1:
i_bin_val.set_value(i_data[index])
q_bin_val.set_value(q_data[index])
channel = 2
elif channel == 2:
i_bin_val.set_value(i_data2[index])
q_bin_val.set_value(q_data2[index])
channel = 1
index += 1
else:
return
@cocotb.coroutine
def _tx_data_from_ad9361(self):
i_bin_val = BinaryValue(n_bits=12, bigEndian=False)
q_bin_val = BinaryValue(n_bits=12, bigEndian=False)
while True:
yield RisingEdge(self.dut.tx_clk_out_p)
if self.dut.tx_frame_out_p.value.integer == 1:
q_bin_val[11:6] = self.dut.tx_data_out_p.value.get_binstr()
else:
q_bin_val[5:0] = self.dut.tx_data_out_p.value.get_binstr()
yield RisingEdge(self.dut.tx_clk_out_n)
if self.dut.tx_frame_out_p.value.integer == 1:
i_bin_val[11:6] = self.dut.tx_data_out_p.value.get_binstr()
else:
i_bin_val[5:0] = self.dut.tx_data_out_p.value.get_binstr()
# print("i_data",i_bin_val.get_value())
# print("q_data",q_bin_val.get_value())
self.lbqi.append(i_bin_val)
self.lbqq.append(q_bin_val)
self.got_tx.set([i_bin_val, q_bin_val])
@cocotb.coroutine
def _ad9361_tx_to_rx_loopback(self):
cocotb.fork(self._tx_data_from_ad9361())
i_bin_val = BinaryValue(n_bits=12, bigEndian=False)
q_bin_val = BinaryValue(n_bits=12, bigEndian=False)
while True:
yield RisingEdge(self.dut.rx_clk_in_p)
if self.rx_frame_asserted:
self.dut.rx_data_in_p <= i_bin_val[5:0]
self.dut.rx_data_in_n <= ~i_bin_val[5:0]
self.rx_frame_asserted = False
self.dut.rx_frame_in_p <= 0
self.dut.rx_frame_in_n <= 1
else:
if len(self.lbqi) > 0:
i_bin_val = self.lbqi.popleft()
else:
i_bin_val.set_value(0)
if len(self.lbqq) > 0:
q_bin_val = self.lbqq.popleft()
else:
q_bin_val.set_value(0)
self.dut.rx_data_in_p <= i_bin_val[11:6]
self.dut.rx_data_in_n <= ~i_bin_val[11:6]
self.rx_frame_asserted = True
self.dut.rx_frame_in_p <= 1
self.dut.rx_frame_in_n <= 0
yield RisingEdge(self.dut.rx_clk_in_n)
if self.rx_frame_asserted:
self.dut.rx_data_in_p <= q_bin_val[11:6]
self.dut.rx_data_in_n <= ~q_bin_val[11:6]
else:
self.dut.rx_data_in_p <= q_bin_val[5:0]
self.dut.rx_data_in_n <= ~q_bin_val[5:0]
[docs] def ad9361_tx_to_rx_loopback(self):
"""Create loopback from tx to rx.
Forks a coroutine doing the actual task.
"""
cocotb.fork(self._ad9361_tx_to_rx_loopback())
[docs] def tx_data_from_ad9361(self):
"""Transmit data from AD9361.
Forks a coroutine doing the actual task.
"""
cocotb.fork(self._tx_data_from_ad9361())