sim: write process commands to VCD file.
If delta cycles are expanded (i.e. if the `fs_per_delta` argument to `Simulator.write_vcd` is not zero), then create a string typed variable for each testbench in the simulation, which reflects the current command being executed by that testbench. To make all commands visible, insert a (visual) delta cycle after each executed command, and ensure that there is a change/crossing point in the waveform display each time a command is executed, even if several identical ones in a row. If delta cycles are not expanded, the behavior is unchanged.
This commit is contained in:
parent
36fb9035e4
commit
11f7b887ad
|
@ -12,12 +12,14 @@ __all__ = ["PyCoroProcess"]
|
|||
|
||||
|
||||
class PyCoroProcess(BaseProcess):
|
||||
def __init__(self, state, domains, constructor, *, default_cmd=None, testbench=False):
|
||||
def __init__(self, state, domains, constructor, *, default_cmd=None, testbench=False,
|
||||
on_command=None):
|
||||
self.state = state
|
||||
self.domains = domains
|
||||
self.constructor = constructor
|
||||
self.default_cmd = default_cmd
|
||||
self.testbench = testbench
|
||||
self.on_command = on_command
|
||||
|
||||
self.reset()
|
||||
|
||||
|
@ -79,6 +81,9 @@ class PyCoroProcess(BaseProcess):
|
|||
response = None
|
||||
exception = None
|
||||
|
||||
if self.on_command is not None:
|
||||
self.on_command(self, command)
|
||||
|
||||
if isinstance(command, ValueCastable):
|
||||
command = Value.cast(command)
|
||||
if isinstance(command, Value):
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
from contextlib import contextmanager
|
||||
import itertools
|
||||
import re
|
||||
import os.path
|
||||
|
||||
from ..hdl import *
|
||||
from ..hdl._repr import *
|
||||
|
@ -36,7 +37,7 @@ class _VCDWriter:
|
|||
else:
|
||||
raise NotImplementedError # :nocov:
|
||||
|
||||
def __init__(self, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0):
|
||||
def __init__(self, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0, processes=()):
|
||||
self.fs_per_delta = fs_per_delta
|
||||
|
||||
# Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that
|
||||
|
@ -165,6 +166,26 @@ class _VCDWriter:
|
|||
gtkw_names.append(gtkw_name)
|
||||
|
||||
|
||||
self.vcd_process_vars = {}
|
||||
if fs_per_delta == 0:
|
||||
return # Not useful without delta cycle expansion.
|
||||
for index, process in enumerate(processes):
|
||||
func_name = process.constructor.__name__
|
||||
func_file = os.path.basename(process.constructor.__code__.co_filename)
|
||||
func_line = process.constructor.__code__.co_firstlineno
|
||||
for name in (
|
||||
f"{process.constructor.__name__}",
|
||||
f"{process.constructor.__name__}!{func_file};{func_line}",
|
||||
f"{process.constructor.__name__}#{index}",
|
||||
):
|
||||
try:
|
||||
self.vcd_process_vars[process] = self.vcd_writer.register_var(
|
||||
scope=("debug", "proc"), name=name, var_type="string", size=None,
|
||||
init="(init)")
|
||||
break
|
||||
except KeyError:
|
||||
pass # try another name
|
||||
|
||||
def update_signal(self, timestamp, signal, value):
|
||||
for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()):
|
||||
var_value = self.eval_field(repr.value, signal, value)
|
||||
|
@ -176,6 +197,16 @@ class _VCDWriter:
|
|||
vcd_var = self.vcd_memory_vars[memory._identity][addr]
|
||||
self.vcd_writer.change(vcd_var, timestamp, value)
|
||||
|
||||
def update_process(self, timestamp, process, command):
|
||||
try:
|
||||
vcd_var = self.vcd_process_vars[process]
|
||||
except KeyError:
|
||||
return
|
||||
# Ensure that the waveform viewer displays a change point even if the previous command is
|
||||
# the same as the next one.
|
||||
self.vcd_writer.change(vcd_var, timestamp, "")
|
||||
self.vcd_writer.change(vcd_var, timestamp, repr(command))
|
||||
|
||||
def close(self, timestamp):
|
||||
if self.vcd_writer is not None:
|
||||
self.vcd_writer.close(timestamp)
|
||||
|
@ -425,7 +456,7 @@ class PySimEngine(BaseEngine):
|
|||
|
||||
def add_testbench_process(self, process):
|
||||
self._testbenches.append(PyCoroProcess(self._state, self._design.fragment.domains, process,
|
||||
testbench=True))
|
||||
testbench=True, on_command=self._debug_process))
|
||||
|
||||
def reset(self):
|
||||
self._state.reset()
|
||||
|
@ -462,6 +493,13 @@ class PySimEngine(BaseEngine):
|
|||
|
||||
self._delta_cycles += 1
|
||||
|
||||
def _debug_process(self, process, command):
|
||||
for vcd_writer in self._vcd_writers:
|
||||
now_plus_deltas = self._now_plus_deltas(vcd_writer)
|
||||
vcd_writer.update_process(now_plus_deltas, process, command)
|
||||
|
||||
self._delta_cycles += 1
|
||||
|
||||
def _step_tb(self):
|
||||
# Run processes waiting for an interval to expire (mainly `add_clock_process()``)
|
||||
self._step_rtl()
|
||||
|
@ -494,7 +532,8 @@ class PySimEngine(BaseEngine):
|
|||
@contextmanager
|
||||
def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta):
|
||||
vcd_writer = _VCDWriter(self._design,
|
||||
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces, fs_per_delta=fs_per_delta)
|
||||
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces, fs_per_delta=fs_per_delta,
|
||||
processes=self._testbenches)
|
||||
try:
|
||||
self._vcd_writers.append(vcd_writer)
|
||||
yield
|
||||
|
|
|
@ -1191,6 +1191,15 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
|
|||
with sim.write_vcd("test.vcd", fs_per_delta=1):
|
||||
sim.run()
|
||||
|
||||
def test_process_name_collision(self):
|
||||
def testbench():
|
||||
yield Passive()
|
||||
sim = Simulator(Module())
|
||||
sim.add_testbench(testbench)
|
||||
sim.add_testbench(testbench)
|
||||
with sim.write_vcd("test.vcd", fs_per_delta=1):
|
||||
sim.run()
|
||||
|
||||
|
||||
class SimulatorRegressionTestCase(FHDLTestCase):
|
||||
def test_bug_325(self):
|
||||
|
|
Loading…
Reference in a new issue