Skip to content

Commit

Permalink
Updated bus monitor to include txn parsing
Browse files Browse the repository at this point in the history
Signed-off-by: Anderson Ignacio da Silva <anderson@aignacio.com>
  • Loading branch information
aignacio committed Jun 14, 2024
1 parent 3d088f0 commit 7f55790
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 68 deletions.
170 changes: 112 additions & 58 deletions cocotbext/ahb/ahb_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# License : MIT license <Check LICENSE>
# Author : Anderson I. da Silva (aignacio) <anderson@aignacio.com>
# Date : 27.10.2023
# Last Modified Date: 10.06.2024
# Last Modified Date: 14.06.2024
import cocotb
import logging
import random
Expand Down Expand Up @@ -48,49 +48,100 @@ def __init__(
async def _monitor_recv(self):
"""Watch the pins and reconstruct transactions."""

pending = False
stable_signals = {}
slave_error_prev = 0
first_txn = {}
first_st = {}
first_st["phase"] = "none"
first_st["write_first_cycle"] = True

while True:
curr_hready = copy.deepcopy(self.bus.hready.value)
second_txn = {}
second_st = {}
second_st["phase"] = "none"
second_st["write_first_cycle"] = True

while True:
await FallingEdge(self.clk)

if (self.bus.hready == 1) and self.bus.hresp == AHBResp.ERROR:
if curr_hready != 0:
raise AssertionError(
"AHB PROTOCOL VIOLATION: Previous hready must be low when AHB Resp == Error and hready == 1"
)

# print(f"pending: {pending}")
# Ensure master does not change its qualifiers before hready
if (pending is True) and (self.bus.hready.value == 0):
self._check_signals(stable_signals)
elif (pending is True) and (self.bus.hready.value == 1):
self._check_signals(stable_signals)
pending = False
self._recv(stable_signals)

# Check for new txn
# print(f"hready: {self.bus.hready.value}")
# print(f"check inputs: {self._check_inputs()}")
# print(f"check valid_tx: {self._check_valid_txn()}")
if (
self.bus.hready.value == 0
and self._check_inputs()
and self._check_valid_txn()
):
pending = True
stable_signals = {
"htrans": copy.deepcopy(self.bus.htrans.value),
"hwrite": copy.deepcopy(self.bus.hwrite.value),
"haddr": copy.deepcopy(self.bus.haddr.value),
"hsize": copy.deepcopy(self.bus.hsize.value),
}
if self.bus.hsel_exist:
stable_signals["hsel"] = copy.deepcopy(self.bus.hsel.value)
else:
pending = False
# Check that address phase signals of the second txn are stable while slave is not available
if (second_st["phase"] == "addr") and (self.bus.hready.value == 0):
self._check_signals(second_txn)

if first_st["phase"] == "data":
# Previous cycle we started a txn and slave was ready, but now if it is a write lets
# check the hwdata whether it is stable or not as the slave is not available anymore
if self.bus.hready.value == 0:
# Copy the latest hresp to ensure slave follow 2-cycle response
slave_error_prev = copy.deepcopy(self.bus.hresp.value)

if (first_txn["hwrite"] == 1) and (
first_st["write_first_cycle"] is True
):
first_txn["hwdata"] = copy.deepcopy(self.bus.hwdata.value)
first_st["write_first_cycle"] = False
elif (first_txn["hwrite"] == 1) and (
first_st["write_first_cycle"] is False
):
if self.bus.hwdata.value == first_txn["hwdata"]:
pass
else:
raise AssertionError(
"AHB PROTOCOL VIOLATION: Master.hwdata signal should not change before slave.hready == 1"
)

# Previous cycle we started a txn and slave was ready, and now it is still ready
# As address and data phase were both completed, push the txn to the next method
elif self.bus.hready.value == 1:
# Check whether the slave response follow the AMBA AHB spec
# with 2-cycle delay
if (self.bus.hresp.value != AHBResp.OKAY) and (slave_error_prev == 0):
raise AssertionError(
"AHB PROTOCOL VIOLATION: Slave is not following the 2-cyle error response \
- ARM IHI 0033B.b (ID102715) - Section 5.1.3"
)

first_txn["response"] = copy.deepcopy(self.bus.hresp.value)
first_txn["hrdata"] = copy.deepcopy(self.bus.hrdata.value)
first_txn["hwdata"] = copy.deepcopy(self.bus.hwdata.value)
# print(first_txn)
self._recv(first_txn)

# Restart the txn status
first_st["phase"] = "none"
first_st["write_first_cycle"] = True
slave_error_prev = 0

# Clean second txn
second_st["phase"] = "none"
second_st["write_first_cycle"] = True

if (self._check_valid_txn() is True) and (first_st["phase"] == "none"):
first_st["phase"] = "data" if self.bus.hready.value == 1 else "addr"

first_txn["hsel"] = (
copy.deepcopy(self.bus.hsel.value) if self.bus.hsel_exist else 0
)
first_txn["haddr"] = copy.deepcopy(self.bus.haddr.value)
first_txn["htrans"] = copy.deepcopy(self.bus.htrans.value)
first_txn["hsize"] = copy.deepcopy(self.bus.hsize.value)
first_txn["hwrite"] = copy.deepcopy(self.bus.hwrite.value)
# We only enter in the if below if the last txn did not complete and the master issued a new txn
elif (self._check_valid_txn() is True) and (first_st["phase"] == "data"):
print("GOT A NEW TXN")
second_st["phase"] = "addr"

second_txn["hsel"] = (
copy.deepcopy(self.bus.hsel.value) if self.bus.hsel_exist else 0
)
second_txn["haddr"] = copy.deepcopy(self.bus.haddr.value)
second_txn["htrans"] = copy.deepcopy(self.bus.htrans.value)
second_txn["hsize"] = copy.deepcopy(self.bus.hsize.value)
second_txn["hwrite"] = copy.deepcopy(self.bus.hwrite.value)

if first_st["phase"] == "addr":
self._check_signals(first_txn)

if self.bus.hready.value == 1:
first_st["phase"] = "data"

def _check_inputs(self) -> bool:
"""Check any of the master signals are resolvable (i.e not 'z')"""
Expand All @@ -114,30 +165,33 @@ def _check_inputs(self) -> bool:
return True

def _check_valid_txn(self) -> bool:
htrans_st = (AHBTrans(self.bus.htrans.value) != AHBTrans.IDLE) and (
AHBTrans(self.bus.htrans.value) != AHBTrans.BUSY
)

if self.bus.hsel_exist:
if self.bus.hready_in_exist:
if (
(self.bus.hsel.value == 1)
and (self.bus.hready_in.value == 1)
and htrans_st
):
return True
if self._check_inputs():
htrans_st = (AHBTrans(self.bus.htrans.value) != AHBTrans.IDLE) and (
AHBTrans(self.bus.htrans.value) != AHBTrans.BUSY
)

if self.bus.hsel_exist: # Decoder to slave
if self.bus.hready_in_exist:
if (
(self.bus.hsel.value == 1)
and (self.bus.hready_in.value == 1)
and htrans_st
):
return True
else:
return False
else:
return False
if (self.bus.hsel.value == 1) and htrans_st:
return True
else:
return False
else:
if (self.bus.hsel.value == 1) and htrans_st:
if htrans_st: # In this case it is just a master
return True
else:
return False
else:
if htrans_st:
return True
else:
return False
return False

def _check_signals(self, stable):
"""Check any of the master signals are resolvable (i.e not 'z')"""
Expand Down
4 changes: 2 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# License : MIT license <Check LICENSE>
# Author : Anderson I. da Silva (aignacio) <anderson@aignacio.com>
# Date : 08.10.2023
# Last Modified Date: 10.06.2024
# Last Modified Date: 14.06.2024

import nox

Expand All @@ -13,7 +13,7 @@
def run(session):
session.env["DUT"] = "ahb_template"
session.env["SIM"] = "icarus"
session.env["COCOTB_LOG_LEVEL"] = "debug"
# session.env["COCOTB_LOG_LEVEL"] = "debug"
# session.env['SIM'] = "verilator"
session.env["TIMEPREC"] = "1ps"
session.env["TIMEUNIT"] = "1ns"
Expand Down
19 changes: 14 additions & 5 deletions tests/test_ahb_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# License : MIT license <Check LICENSE>
# Author : Anderson I. da Silva (aignacio) <anderson@aignacio.com>
# Date : 08.10.2023
# Last Modified Date: 29.11.2023
# Last Modified Date: 14.06.2024

import cocotb
import os
Expand All @@ -14,7 +14,7 @@
from cocotb_test.simulator import run
from cocotb.triggers import ClockCycles
from cocotb.clock import Clock
from cocotbext.ahb import AHBBus, AHBLiteMaster, AHBLiteSlave
from cocotbext.ahb import AHBBus, AHBLiteMaster, AHBLiteSlave, AHBMonitor
from cocotb.regression import TestFactory


Expand Down Expand Up @@ -49,12 +49,23 @@ async def setup_dut(dut, cycles):
dut.hresetn.value = 1


def txn_recv(txn):
print(txn)


@cocotb.test()
async def run_test(dut, bp_fn=None, pip_mode=False):
N = 1000

await setup_dut(dut, cfg.RST_CYCLES)

ahb_lite_mon = AHBMonitor(
AHBBus.from_entity(dut), dut.hclk, dut.hresetn, "ahb_monitor", callback=txn_recv
)

# Below is only required bc of flake8 - non-used rule
type(ahb_lite_mon)

ahb_lite_master = AHBLiteMaster(
AHBBus.from_entity(dut), dut.hclk, dut.hresetn, def_val="Z"
)
Expand Down Expand Up @@ -85,9 +96,7 @@ async def run_test(dut, bp_fn=None, pip_mode=False):

txn_type = [pick_random_value([1, 0]) for _ in range(N)]

resp = await ahb_lite_master.custom(
address, value, txn_type, size, pip_mode
)
resp = await ahb_lite_master.custom(address, value, txn_type, size, pip_mode)


if cocotb.SIM_NAME:
Expand Down
39 changes: 36 additions & 3 deletions tests/test_ahb_lite_sram.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# License : MIT license <Check LICENSE>
# Author : Anderson I. da Silva (aignacio) <anderson@aignacio.com>
# Date : 08.10.2023
# Last Modified Date: 29.11.2023
# Last Modified Date: 14.06.2024

import cocotb
import os
Expand All @@ -18,6 +18,8 @@
from cocotbext.ahb import AHBBus, AHBLiteMaster, AHBLiteSlaveRAM, AHBResp, AHBMonitor
from cocotb.regression import TestFactory

recv_txn = []


def rnd_val(bit: int = 0, zero: bool = True):
if zero is True:
Expand Down Expand Up @@ -50,18 +52,26 @@ async def setup_dut(dut, cycles):
dut.hresetn.value = 1


def txn_recv(txn):
# pass
recv_txn.append(txn)
print(txn)


@cocotb.test()
async def run_test(dut, bp_fn=None, pip_mode=False):
mem_size_kib = 16
N = 1000
N = 1

ahb_bus_slave = AHBBus.from_entity(dut)

data_width = ahb_bus_slave.data_width

await setup_dut(dut, cfg.RST_CYCLES)

ahb_lite_mon = AHBMonitor(ahb_bus_slave, dut.hclk, dut.hresetn)
ahb_lite_mon = AHBMonitor(
ahb_bus_slave, dut.hclk, dut.hresetn, "ahb_monitor", callback=txn_recv
)

# Below is only required bc of flake8 - non-used rule
type(ahb_lite_mon)
Expand Down Expand Up @@ -126,6 +136,29 @@ async def run_test(dut, bp_fn=None, pip_mode=False):
print(expect)
assert real == expect, "DUT != Expected"

# Prepare data to compare
txn = []
for hwrite in [1, 0]:
for i_addr, i_value, i_size in zip(address, value, size):
txn_sent = {}
txn_sent["haddr"] = i_addr
txn_sent["value"] = i_value
txn_sent["size"] = i_size
txn_sent["hwrite"] = hwrite
txn.append(txn_sent)

# Compare all sent txns with the monitor
print(f"Sent txn {len(txn)} and monitored {len(recv_txn)}")
# for index, (real, expect) in enumerate(zip( )):
# if real != expect:
# print("------ERROR------")
# print(f"Txn ID: {index}")
# print("DUT")
# print(real)
# print("Expected")
# print(expect)
# assert real == expect, "DUT != Expected"


if cocotb.SIM_NAME:
factory = TestFactory(run_test)
Expand Down

0 comments on commit 7f55790

Please sign in to comment.