diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 5f20f90..0539670 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -5,6 +5,7 @@ on: branches: - master - 'stable/**' + - bus_monitor_impr pull_request: branches: - master diff --git a/README.md b/README.md index 3b5d29c..8760ea9 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,10 @@ The repository contains a small script that starts a container fetched from Dock ```bash $ cd cocotbext-ahb/ $ ./ship.sh +# To run all tests +$ nox +# To run a specific test +$ nox -s run -- -k "test_ahb_lite.py" ``` Once the container is up and running, to run the tests through [nox](https://nox.thea.codes/en/stable/) and [pytest](https://docs.pytest.org/), run the following: ```bash diff --git a/cocotbext/ahb/ahb_monitor.py b/cocotbext/ahb/ahb_monitor.py index 387b689..4b1e4c2 100644 --- a/cocotbext/ahb/ahb_monitor.py +++ b/cocotbext/ahb/ahb_monitor.py @@ -126,7 +126,6 @@ async def _monitor_recv(self): first_txn["hwrite"] = copy.deepcopy(self.bus.hwrite.value) # We only enter in the if below if the last txn did not complete and the master issued a new txn elif (self._check_valid_txn() is True) and (first_st["phase"] == "data"): - print("GOT A NEW TXN") second_st["phase"] = "addr" second_txn["hsel"] = ( diff --git a/tests/test_ahb_lite_monitor_scoreboard.py b/tests/test_ahb_lite_monitor_scoreboard.py new file mode 100644 index 0000000..93f9f20 --- /dev/null +++ b/tests/test_ahb_lite_monitor_scoreboard.py @@ -0,0 +1,205 @@ +# -*- coding: utf-8 -*- +# File : test_ahb_lite_monitor_scoreboard.py +# License : MIT license +# Author : Anderson I. da Silva (aignacio) +# Date : 08.10.2023 +# Last Modified Date: 14.06.2024 + +import cocotb +import os +import random +import math +import pytest + +from const import cfg +from cocotb_test.simulator import run +from cocotb.triggers import ClockCycles +from cocotb.clock import Clock +from cocotbext.ahb import AHBBus, AHBLiteMaster, AHBLiteSlaveRAM, AHBResp, AHBMonitor +from cocotb.regression import TestFactory + +recv_txn = [] + + +def rnd_val(bit: int = 0, zero: bool = True): + if zero is True: + return random.randint(0, (2**bit) - 1) + else: + return random.randint(1, (2**bit) - 1) + + +def pick_random_value(input_list): + if input_list: + return random.choice(input_list) + else: + return None # Return None if the list is empty + + +def slave_back_pressure_generator(): + while True: + yield pick_random_value([False, True]) + + +def slave_no_back_pressure_generator(): + while True: + yield True + + +async def setup_dut(dut, cycles): + cocotb.start_soon(Clock(dut.hclk, *cfg.CLK_100MHz).start()) + dut.hresetn.value = 0 + await ClockCycles(dut.hclk, cycles) + dut.hresetn.value = 1 + + +def get_random_txn(mem_size_kib, data_width, N): + # Generate a list of unique addresses with the double of memory size + # to create error responses + address = random.sample(range(0, 2 * mem_size_kib * 1024, 8), N) + # Generate a list of random 32-bit values + value = [rnd_val(data_width) for _ in range(N)] + # Generate a list of random sizes + if data_width == 32: + size = [pick_random_value([1, 2, 4]) for _ in range(N)] + else: + size = [pick_random_value([1, 2, 4, 8]) for _ in range(N)] + + # Create the comparison list with expected results + expected = [] + for addr, val, sz in zip(address, value, size): + resp, data = 0, 0 + if addr >= mem_size_kib * 1024: + resp = AHBResp.ERROR + else: + resp = AHBResp.OKAY + if sz == 1: + data = val & 0xFF + elif sz == 2: + data = val & 0xFFFF + elif sz == 4: + data = val & 0xFFFFFFFF + elif sz == 8: + data = val & 0xFFFFFFFFFFFFFFFF + expected.append({"resp": resp, "data": hex(data)}) + + return address, value, size, expected + + +def txn_recv(txn): + # pass + recv_txn.append(txn) + print(txn) + + +@cocotb.test() +async def run_test(dut, bp_fn=None, pip_mode=False): + mem_size_kib = 16 + N = 10 + + ahb_bus_slave = AHBBus.from_entity(dut) + + data_width = ahb_bus_slave.data_width + + await setup_dut(dut, cfg.RST_CYCLES) + + ahb_lite_mon = AHBMonitor( + ahb_bus_slave, dut.hclk, dut.hresetn, "ahb_monitor", callback=txn_recv + ) + + # Below is only required bc of flake8 - non-used rule + type(ahb_lite_mon) + + ahb_lite_sram = AHBLiteSlaveRAM( + AHBBus.from_entity(dut), + dut.hclk, + dut.hresetn, + def_val=0, + bp=bp_fn, + mem_size=mem_size_kib * 1024, + ) + + # Below is only required bc of flake8 - non-used rule + type(ahb_lite_sram) + + ahb_lite_master = AHBLiteMaster( + AHBBus.from_entity(dut), dut.hclk, dut.hresetn, def_val="Z" + ) + + address, value, size, expected = get_random_txn(mem_size_kib, data_width, N) + + # Perform the writes and reads + resp = await ahb_lite_master.write(address, value, size, pip=pip_mode) + resp = await ahb_lite_master.read(address, size, pip=pip_mode) + + type(resp) + + # Compare all txns + for index, (real, expect) in enumerate(zip(resp, expected)): + if real != expect: + print("------ERROR------") + print(f"Txn ID: {index}") + print("DUT") + print(real) + print("Expected") + print(expect) + assert real == expect, "DUT != Expected" + + # Prepare data to compare + txn = [] + for hwrite in [1, 0]: + for i_addr, i_value, i_size in zip(address, value, size): + txn_sent = {} + txn_sent["haddr"] = i_addr + txn_sent["value"] = i_value + txn_sent["size"] = i_size + txn_sent["hwrite"] = hwrite + txn.append(txn_sent) + + # Compare all sent txns with the monitor + print(f"Sent txn {len(txn)} and monitored {len(recv_txn)}") + # for index, (real, expect) in enumerate(zip( )): + # if real != expect: + # print("------ERROR------") + # print(f"Txn ID: {index}") + # print("DUT") + # print(real) + # print("Expected") + # print(expect) + # assert real == expect, "DUT != Expected" + + +if cocotb.SIM_NAME: + factory = TestFactory(run_test) + factory.add_option( + "bp_fn", [slave_back_pressure_generator(), slave_no_back_pressure_generator()] + ) + factory.add_option("pip_mode", [False, True]) + factory.generate_tests() + + +@pytest.mark.parametrize("data_width", [{"DATA_WIDTH": "32"}, {"DATA_WIDTH": "64"}]) +def test_ahb_lite_sram_monitor_scoreboard(data_width): + """ + Test AHB lite SRAM to check monitor txns + + Test ID: 8 + """ + module = os.path.splitext(os.path.basename(__file__))[0] + SIM_BUILD = os.path.join( + cfg.TESTS_DIR, + f"../run_dir/sim_build_{cfg.SIMULATOR}_{module}_data_width_{data_width['DATA_WIDTH']}_bits", + ) + extra_args_sim = cfg.EXTRA_ARGS + + run( + python_search=[cfg.TESTS_DIR], + verilog_sources=cfg.VERILOG_SOURCES, + toplevel=cfg.TOPLEVEL, + module=module, + parameters=data_width, + sim_build=SIM_BUILD, + extra_args=extra_args_sim, + extra_env=cfg.EXTRA_ENV, + timescale=cfg.TIMESCALE, + waves=1, + )