# Created on Aug 24, 2014
#
# @author: msnook
import cocotb
from cocotb.triggers import Timer, RisingEdge, ReadOnly, Lock, Event
from cocotb.bus import Bus
from cocotb.result import ReturnValue
from cocotb.drivers import BusDriver
from cocotb.binary import BinaryValue, BinaryRepresentation
from collections import deque
[docs]class AD9361(BusDriver):
"""Driver for the AD9361 RF Transceiver."""
def __init__(self, dut, rx_channels=1, tx_channels=1,
tx_clock_half_period=16276, rx_clock_half_period=16276,
loopback_queue_maxlen=16):
self.dut = dut
self.tx_clock_half_period = tx_clock_half_period
self.rx_clock_half_period = rx_clock_half_period
self.rx_frame_asserted = False
self.tx_frame_asserted = False
self.lbqi = deque()
self.lbqq = deque()
cocotb.fork(self._rx_clock())
self.got_tx = Event("Got tx event")
@cocotb.coroutine
def _rx_clock(self):
t = Timer(self.rx_clock_half_period)
while True:
self.dut.rx_clk_in_p <= 1
self.dut.rx_clk_in_n <= 0
yield t
self.dut.rx_clk_in_p <= 0
self.dut.rx_clk_in_n <= 1
yield t
[docs] def send_data(self, i_data, q_data, i_data2=None, q_data2=None,
binaryRepresentation=BinaryRepresentation.TWOS_COMPLEMENT):
"""Forks the ``rx_data_to_ad9361`` coroutine to send data.
Args:
i_data (int): Data of the I0 channel.
q_data (int): Data of the Q0 channel.
i_data2 (int, optional): Data of the I1 channel.
q_data2 (int, optional): Data of the Q1 channel.
binaryRepresentation (BinaryRepresentation): The representation of the binary value.
Default is :any:`TWOS_COMPLEMENT`.
"""
print(binaryRepresentation)
cocotb.fork(self.rx_data_to_ad9361(i_data, q_data, i_data2, q_data2,
binaryRepresentation))
[docs] @cocotb.coroutine
def rx_data_to_ad9361(self, i_data, q_data, i_data2=None, q_data2=None,
binaryRepresentation=BinaryRepresentation.TWOS_COMPLEMENT):
"""Receive data to AD9361.
This is a coroutine.
Args:
i_data (int): Data of the I0 channel.
q_data (int): Data of the Q0 channel.
i_data2 (int, optional): Data of the I1 channel.
q_data2 (int, optional): Data of the Q1 channel.
binaryRepresentation (BinaryRepresentation): The representation of the binary value.
Default is :any:`TWOS_COMPLEMENT`.
"""
i_bin_val = BinaryValue(n_bits=12, bigEndian=False,
binaryRepresentation=binaryRepresentation)
q_bin_val = BinaryValue(n_bits=12, bigEndian=False,
binaryRepresentation=binaryRepresentation)
index = 0
if i_data2 is None and q_data2 is None:
while True:
yield RisingEdge(self.dut.rx_clk_in_p)
if self.rx_frame_asserted:
self.dut.rx_data_in_p <= i_bin_val[5:0]
self.dut.rx_data_in_n <= ~i_bin_val[5:0]
self.rx_frame_asserted = False
self.dut.rx_frame_in_p <= 0
self.dut.rx_frame_in_n <= 1
else:
if index < len(i_data):
i_bin_val.set_value(i_data[index])
q_bin_val.set_value(q_data[index])
index += 1
else:
return
self.dut.rx_data_in_p <= i_bin_val[11:6]
self.dut.rx_data_in_n <= ~i_bin_val[11:6]
self.rx_frame_asserted = True
self.dut.rx_frame_in_p <= 1
self.dut.rx_frame_in_n <= 0
yield RisingEdge(self.dut.rx_clk_in_n)
if self.rx_frame_asserted:
self.dut.rx_data_in_p <= q_bin_val[11:6]
self.dut.rx_data_in_n <= ~q_bin_val[11:6]
else:
self.dut.rx_data_in_p <= q_bin_val[5:0]
self.dut.rx_data_in_n <= ~q_bin_val[5:0]
else:
I_SEND_HIGH = True
Q_SEND_HIGH = True
channel = 1
while True:
yield RisingEdge(self.dut.rx_clk_in_p)
if I_SEND_HIGH:
self.dut.rx_data_in_p <= i_bin_val[11:6]
self.dut.rx_data_in_n <= ~i_bin_val[11:6]
I_SEND_HIGH = False
if channel == 1:
self.dut.rx_frame_in_p <= 1
self.dut.rx_frame_in_n <= 0
elif channel == 2:
self.dut.rx_frame_in_p <= 0
self.dut.rx_frame_in_n <= 1
else:
self.dut.rx_data_in_p <= i_bin_val[5:0]
self.dut.rx_data_in_n <= ~i_bin_val[5:0]
I_SEND_HIGH = True
yield RisingEdge(self.dut.rx_clk_in_n)
if Q_SEND_HIGH:
self.dut.rx_data_in_p <= q_bin_val[5:0]
self.dut.rx_data_in_n <= ~q_bin_val[5:0]
Q_SEND_HIGH = False
else:
self.dut.rx_data_in_p <= q_bin_val[11:6]
self.dut.rx_data_in_n <= ~q_bin_val[11:6]
Q_SEND_HIGH = True
if index < len(i_data):
if channel == 1:
i_bin_val.set_value(i_data[index])
q_bin_val.set_value(q_data[index])
channel = 2
elif channel == 2:
i_bin_val.set_value(i_data2[index])
q_bin_val.set_value(q_data2[index])
channel = 1
index += 1
else:
return
@cocotb.coroutine
def _tx_data_from_ad9361(self):
i_bin_val = BinaryValue(n_bits=12, bigEndian=False)
q_bin_val = BinaryValue(n_bits=12, bigEndian=False)
while True:
yield RisingEdge(self.dut.tx_clk_out_p)
if self.dut.tx_frame_out_p.value.integer == 1:
q_bin_val[11:6] = self.dut.tx_data_out_p.value.get_binstr()
else:
q_bin_val[5:0] = self.dut.tx_data_out_p.value.get_binstr()
yield RisingEdge(self.dut.tx_clk_out_n)
if self.dut.tx_frame_out_p.value.integer == 1:
i_bin_val[11:6] = self.dut.tx_data_out_p.value.get_binstr()
else:
i_bin_val[5:0] = self.dut.tx_data_out_p.value.get_binstr()
# print("i_data",i_bin_val.get_value())
# print("q_data",q_bin_val.get_value())
self.lbqi.append(i_bin_val)
self.lbqq.append(q_bin_val)
self.got_tx.set([i_bin_val, q_bin_val])
@cocotb.coroutine
def _ad9361_tx_to_rx_loopback(self):
cocotb.fork(self._tx_data_from_ad9361())
i_bin_val = BinaryValue(n_bits=12, bigEndian=False)
q_bin_val = BinaryValue(n_bits=12, bigEndian=False)
while True:
yield RisingEdge(self.dut.rx_clk_in_p)
if self.rx_frame_asserted:
self.dut.rx_data_in_p <= i_bin_val[5:0]
self.dut.rx_data_in_n <= ~i_bin_val[5:0]
self.rx_frame_asserted = False
self.dut.rx_frame_in_p <= 0
self.dut.rx_frame_in_n <= 1
else:
if len(self.lbqi) > 0:
i_bin_val = self.lbqi.popleft()
else:
i_bin_val.set_value(0)
if len(self.lbqq) > 0:
q_bin_val = self.lbqq.popleft()
else:
q_bin_val.set_value(0)
self.dut.rx_data_in_p <= i_bin_val[11:6]
self.dut.rx_data_in_n <= ~i_bin_val[11:6]
self.rx_frame_asserted = True
self.dut.rx_frame_in_p <= 1
self.dut.rx_frame_in_n <= 0
yield RisingEdge(self.dut.rx_clk_in_n)
if self.rx_frame_asserted:
self.dut.rx_data_in_p <= q_bin_val[11:6]
self.dut.rx_data_in_n <= ~q_bin_val[11:6]
else:
self.dut.rx_data_in_p <= q_bin_val[5:0]
self.dut.rx_data_in_n <= ~q_bin_val[5:0]
[docs] def ad9361_tx_to_rx_loopback(self):
"""Create loopback from tx to rx.
Forks a coroutine doing the actual task.
"""
cocotb.fork(self._ad9361_tx_to_rx_loopback())
[docs] def tx_data_from_ad9361(self):
"""Transmit data from AD9361.
Forks a coroutine doing the actual task.
"""
cocotb.fork(self._tx_data_from_ad9361())