Examples

FIFO (examples/fifo)

This example tests a simple FIFO block.

DUT

The FIFO is a 16 x 8-bit memory with a simple single 8-bit input (data_in) and single 8-bit output data interface (data_out). There is a data read (rd) and write (wr) strobe signal. FIFO reports its status using the following output bits:

  • fifo_full - full indicator,

  • fifo_empty - empty indicator,

  • fifo_threshold - threshold indicator (25% full),

  • fifo_overflow - fifo overflow indicator (attempt to write data when FIFO full),

  • fifo_underflow - fifo underflow indicator (attempt to read data when FIFO empty).

Testbench

The test environment randomly performs a read/write operation and checks the data consistency. The functional coverage checks if read/write operation has been executed in any possible FIFO state.

The FIFO status is represented in an instance of the class FifoStatus. This object contains the method update(), which reads the status of the DUT.

class FifoStatus():

    def __init__(self, dut):
        self.dut = dut

    @cocotb.coroutine
    def update(self):
        yield ReadOnly()
        self.empty = (self.dut.fifo_empty == 1)
        self.full = (self.dut.fifo_full == 1)
        self.threshold = (self.dut.fifo_threshold == 1)
        self.overflow = (self.dut.fifo_overflow == 1)
        self.underflow = (self.dut.fifo_underflow == 1)

The main data processing routine is defined in the function process_data(). This function returns the read or written data and the status if the operation ended successfully (which depends on the FIFO status). The functional coverage is sampled at this function.

FIFO_Coverage = coverage_section (
  CoverPoint("top.rw", vname="rw", bins = [True, False]),
  CoverPoint("top.fifo_empty", xf = lambda data, rw, status : status.empty, bins = [True, False]),
  CoverPoint("top.fifo_full", xf = lambda data, rw, status : status.full, bins = [True, False]),
  CoverPoint("top.fifo_threshold", xf = lambda data, rw, status : status.threshold, bins = [True, False]),
  CoverPoint("top.fifo_overflow", xf = lambda data, rw, status : status.overflow, bins = [True, False]),
  CoverPoint("top.fifo_underflow", xf = lambda data, rw, status : status.underflow, bins = [True, False]),
  CoverCross("top.rwXempty", items = ["top.rw", "top.fifo_empty"]),
  CoverCross("top.rwXfull", items = ["top.rw", "top.fifo_full"]),
  CoverCross("top.rwXthreshold", items = ["top.rw", "top.fifo_threshold"]),
  CoverCross("top.rwXoverflow", items = ["top.rw", "top.fifo_overflow"]),
  CoverCross("top.rwXunderflow", items = ["top.rw", "top.fifo_underflow"])
)

@FIFO_Coverage
@cocotb.coroutine
def process_data(data, rw, status):
    success = True
    if rw: #read
        yield RisingEdge(dut.clk)
        #even if fifo empty, try to access in order to reach underflow status
        if (status.empty):
            success = False
        else:
            data = int(dut.data_out)
        dut.rd <= 1
        yield RisingEdge(dut.clk)
        dut.rd <= 0
    elif not rw:
        yield RisingEdge(dut.clk)
        dut.data_in <= data
        dut.wr <= 1
        yield RisingEdge(dut.clk)
        dut.wr <= 0
        #if FIFO full, data was not written (overflow status)
        if status.full:
            success = False
    return data, success

A simple FIFO model is implemented as a double-ended queue. At each successful write to the FIFO, the data is also written to the FIFO model. At each successful read from the FIFO, the data consistency is checked with the FIFO model (and removed from the queue).

fifo_model = deque() #simple scoreboarding - FIFO model as double-ended queue

The main loop performs random operations in the following order:

  • randomize the type of transaction and data,

  • update the FIFO status,

  • process the data to/from the FIFO,

  • depending on data processing status, check data consistency or update FIFO model content.

for _ in range(100): #is that enough repetitions to ensure coverage goal? Check out!
    rw = random.choice([True, False])
    data = random.randint(0,255) if not rw else None

    #call coroutines
    yield status.update() #check FIFO state
    #process data, and check if succeded
    data, success = yield process_data(data, rw, status)

    if rw: #read
        if success:
            #if successful read, check read data with the model
            assert(data == fifo_model.pop())
            log.info("Data read from fifo: %X", data)
        else:
            log.info("Data NOT read, fifo EMPTY!")
    else: #write
        if success:
            #if successful write, append written data to the model
            fifo_model.appendleft(data)
            log.info("Data written to fifo: %X", data)
        else:
            log.info("Data NOT written, fifo FULL!")

Packet Switch (examples/pkt_switch)

This example tests a simple packet switch. The switch routes an incoming packet to one or both of the two outgoing interfaces, depending on configuration.

DUT

The packet switch has a single data input interface (datain_data) and two data output interfaces (dataout0_data, dataout1_data). There are data valid strobes associated with each interface (datain_valid, dataout0_valid, dataout1_valid). Depending on configuration, the packet transmitted to the input interface is passed to first, second or both output interfaces.

There is also a configuration interface which allows for accessing the configuration registers (write-only). The register write operation is performed when the write strobe is high (crtl_wr). The ctrl_data is written under the ctrl_addr address.

There are the following configuration registers:

Address

Function

000

settings:

  • bit 0 - enable address-based filtering

  • bit 1 - enable length-based filtering

  • bit 2 - transmit packet on both interfaces

  • bit 3-7 - UNUSED

010

address for address-based filtering

011

address based filtering mask

100

lower size limit for length based filtering

101

uppoer size limit for length based filtering

If the packet is not filtered or option to transmit packet on both interfaces is enabled, it is transmitted on interface 0. If the packet is filtered or option to transmit packet on both interfaces is enabled, it is transmitted on interface 1.

The packet structure is as follows:

Byte

Field

0

Address (0x00- 0xFF)

1

Length (0x03- 0x20)

2-31

Payload

The packet bytes are transmitted starting from byte 0. The packet is transmitted continuously, so data valid strobe must not be deasserted in the middle of the packet. The transition 1 -> 0 on the data valid strobe denotes the end of the packet.

The address-based filtering is active when packet address bits marked by the mask (reg address 011) are equal to the filtering address bits (reg address 010). The length-based filtering is active when packet length is greater than or equal to lower size limit (reg address 100) and lower than or equal to upper size limit (reg address 101).

Testbench

The test environment randomly transfers packets using different configurations and checks the data consistency.

The packet object is represented by the Packet class. Randomized are fields Address (addr) and Length (len). The Payload (payload) content is randomized using the post_randomize method.

class Packet(Randomized):
    def __init__(self, data = [0, 3, 0]):
        Randomized.__init__(self)
        self.addr = data[0]
        self.len = len(data)
        self.payload = data[2:]

        self.add_rand("addr", list(range(256)))
        self.add_rand("len", list(range(3,32)))

    def post_randomize(self):
        self.payload = [np.random.randint(256) for _ in range(self.len-2)]

There are driver (PacketIFDriver) and monitor (PacketIFMonitor) implemented for the packet interface. Driver (driver) is connected to the datain interface, while two monitors are connected to the dataout0 (monitor0) and dataout1 (monitor1) interface.

driver = PacketIFDriver(dut, name="datain", clock=dut.clk)
monitor0 = PacketIFMonitor(dut, name="dataout0", clock=dut.clk)
monitor1 = PacketIFMonitor(dut, name="dataout1", clock=dut.clk)

For scoreboarding there are queues implemented, associated with each output interface. The monitors callbacks are used to check if received transaction has been expected (for both interfaces separately).

expected_data0 = [] #queue of expeced packet at interface 0
expected_data1 = [] #queue of expeced packet at interface 1

def scoreboarding(pkt, queue_expected):
    assert pkt.addr == queue_expected[0].addr
    assert pkt.len == queue_expected[0].len
    assert pkt.payload == queue_expected[0].payload
    queue_expected.pop()

monitor0.add_callback(lambda _ : scoreboarding(_, expected_data0))
monitor1.add_callback(lambda _ : scoreboarding(_, expected_data1))

The functional coverage is sampled at the logging function call. The following features are covered:

  • length of the packet,

  • type of the filtration (disabled, address filtering, lenght filtering or transmit on both interfaces),

  • address filtering (bitwise AND of the address and mask) - to reduce coverage space this check is performed only on lower 4 bits,

  • length filtering (lower an upper limit),

  • cross of the packet length with the filtering limit.

@CoverPoint(
  "top.packet_length",
  xf = lambda pkt, event, addr, mask, ll, ul: pkt.len,    # packet length
  bins = list(range(3,32))                                # may be 3 ... 31 bytes
)
@CoverPoint("top.event", vname="event", bins = ["DIS", "TB", "AF", "LF"])
@CoverPoint(
  "top.filt_addr",
  xf = lambda pkt, event, addr, mask, ll, ul:         # filtering based on particular bits in header
    (addr & mask & 0x0F) if event == "AF" else None,  # check only if event is "address filtering"
  bins = list(range(16)),                             # check only 4 LSBs if all options tested
)
@CoverPoint(
  "top.filt_len_eq",
  xf = lambda pkt, event, addr, mask, ll, ul: ll == ul,  # filtering of a single packet length
  bins = [True, False]
)
@CoverPoint(
  "top.filt_len_ll",
  vname = "ll",                    # lower limit of packet length
  bins = list(range(3,32))         # 3 ... 31
)
@CoverPoint(
  "top.filt_len_ul",
  vname = "ul",                    # upper limit of packet length
  bins = list(range(3,32))         # 3 ... 31
)
@CoverCross(
  "top.filt_len_ll_x_packet_length",
  items = ["top.packet_length", "top.filt_len_ll"]
)
@CoverCross(
  "top.filt_len_ul_x_packet_length",
  items = ["top.packet_length", "top.filt_len_ul"]
)

The main loop performs random operations in the following order:

  • randomize the type of transaction, data and configuration,

  • configure the DUT and update the scoreboard queues,

  • request the driver to send the packet,

  • log the performed transaction (functional coverage is sampled here).

The scoreboarding is done concurrently to the main loop operations.

for _ in range(1000): # is that enough repetitions to ensure coverage goal? Check out!

    event = np.random.choice(["DIS", "TB", "AF", "LF"])
    # DIS - disable filtering : expect all packets on interface 0
    # TB  - transmit bot : expect all packets on interface 0 and 1
    # AF  - address filtering : expect filtered packets on interface 1, others on 0
    # LF  - length filtering : expect filtered packets on interface 1, others on 0

    # randomize test data
    pkt = Packet();
    pkt.randomize()
    addr = np.random.randint(256)               # 0x00 .. 0xFF
    mask = np.random.randint(256)               # 0x00 .. 0xFF
    low_limit = np.random.randint(3,32)         # 3 ... 31
    up_limit = np.random.randint(low_limit,32)  # low_limit ... 31

    # expect the packet on the particular interface
    if event == "DIS":
        yield disable_filtering()
        expected_data0.append(pkt)
    elif event == "TB":
        yield enable_transmit_both()
        expected_data0.append(pkt)
        expected_data1.append(pkt)
    elif event == "AF":
        yield enable_addr_filtering(addr, mask)
        if ((pkt.addr & mask) == (addr & mask)):
            expected_data1.append(pkt)
        else:
            expected_data0.append(pkt)
    elif event == "LF":
        yield enable_len_filtering(low_limit, up_limit)
        if (low_limit <= pkt.len <= up_limit):
            expected_data1.append(pkt)
        else:
            expected_data0.append(pkt)

    # wait DUT
    yield driver.send(pkt)
    yield RisingEdge(dut.clk)
    yield RisingEdge(dut.clk)

    # LOG the action
    log_sequence(pkt, event, addr, mask, low_limit, up_limit)