sim: allow visualizing delta cycles in VCD dumps.

This commit adds an option `fs_per_delta=` to `Simulator.write_vcd()`.
Specifying a positive integer value for it causes the simulator to
offset value change times by that many femtoseconds for each delta
cycle after the last timeline advancement.

This option is only suitable for debugging. If the timeline is advanced
by less than the combined duration of expanded delta cycles, an error
similar to the following will be raised:

    vcd.writer.VCDPhaseError: Out of order timestamp: 62490

Typically `fs_per_delta=1` is best, since it allows thousands of delta
cycles to be expanded without risking a VCD phase error, but bigger
values can be used for an exaggerated visual effect.

Also, the VCD writer is changed to use 1 fs as the timebase instead of
1 ps. This change is largely invisible to designers, resulting only in
slightly larger VCD files due to longer timestamps.

Since the `fs_per_delta=` option is per VCD writer, it is possible to
simultaneously dump two VCDs, one with and one without delta cycle
expansion:

    with sim.write_vcd("sim.vcd"), sim.write_vcd("sim.d.vcd", fs_per_delta=1):
        sim.run()
This commit is contained in:
Catherine 2024-03-23 07:07:14 +00:00
parent 0cb71f8c57
commit 36fb9035e4
5 changed files with 45 additions and 34 deletions

View file

@ -84,5 +84,5 @@ class BaseEngine:
def advance(self):
raise NotImplementedError # :nocov:
def write_vcd(self, *, vcd_file, gtkw_file, traces):
def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta):
raise NotImplementedError # :nocov:

View file

@ -115,8 +115,8 @@ class PyCoroProcess(BaseProcess):
return False # no assignments
elif type(command) is Delay:
# Internal timeline is in 1ps integeral units, intervals are public API and in floating point
interval = int(command.interval * 1e12) if command.interval is not None else None
# Internal timeline is in 1 fs integeral units, intervals are public API and in floating point
interval = int(command.interval * 1e15) if command.interval is not None else None
self.state.wait_interval(self, interval)
return False # no assignments

View file

@ -157,8 +157,8 @@ class Simulator:
raise ValueError("Domain {!r} already has a clock driving it"
.format(domain.name))
# We represent times internally in 1 ps units, but users supply float quantities of seconds
period = int(period * 1e12)
# We represent times internally in 1 fs units, but users supply float quantities of seconds
period = int(period * 1e15)
if phase is None:
# By default, delay the first edge by half period. This causes any synchronous activity
@ -166,7 +166,7 @@ class Simulator:
# viewer.
phase = period // 2
else:
phase = int(phase * 1e12) + period // 2
phase = int(phase * 1e15) + period // 2
self._engine.add_clock_process(domain.clk, phase=phase, period=period)
self._clocked.add(domain)
@ -207,13 +207,13 @@ class Simulator:
If the simulation stops advancing, this function will never return.
"""
# Convert deadline in seconds into internal 1 ps units
deadline = deadline * 1e12
# Convert deadline in seconds into internal 1 fs units
deadline = deadline * 1e15
assert self._engine.now <= deadline
while (self.advance() or run_passive) and self._engine.now < deadline:
pass
def write_vcd(self, vcd_file, gtkw_file=None, *, traces=()):
def write_vcd(self, vcd_file, gtkw_file=None, *, traces=(), fs_per_delta=0):
"""Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file.
This method returns a context manager. It can be used as: ::
@ -238,4 +238,5 @@ class Simulator:
file.close()
raise ValueError("Cannot start writing waveforms after advancing simulation time")
return self._engine.write_vcd(vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)
return self._engine.write_vcd(vcd_file=vcd_file, gtkw_file=gtkw_file,
traces=traces, fs_per_delta=fs_per_delta)

View file

@ -34,9 +34,11 @@ class _VCDWriter:
sub = _VCDWriter.eval_field(field.operands[0], signal, value)
return Const(sub, field.shape()).value
else:
raise NotImplementedError
raise NotImplementedError # :nocov:
def __init__(self, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0):
self.fs_per_delta = fs_per_delta
def __init__(self, design, *, vcd_file, gtkw_file=None, traces=()):
# Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that
# the simulator is still usable if it's not installed for some reason.
import vcd, vcd.gtkw
@ -54,7 +56,7 @@ class _VCDWriter:
self.vcd_memory_vars = {}
self.vcd_file = vcd_file
self.vcd_writer = vcd_file and vcd.VCDWriter(self.vcd_file,
timescale="1 ps", comment="Generated by Amaranth")
timescale="1 fs", comment="Generated by Amaranth")
self.gtkw_signal_names = SignalDict()
self.gtkw_memory_names = {}
@ -410,6 +412,7 @@ class PySimEngine(BaseEngine):
self._design = design
self._processes = _FragmentCompiler(self._state)(self._design.fragment)
self._testbenches = []
self._delta_cycles = 0
self._vcd_writers = []
def add_clock_process(self, clock, *, phase, period):
@ -429,10 +432,12 @@ class PySimEngine(BaseEngine):
for process in self._processes:
process.reset()
def _step_rtl(self, changed):
def _step_rtl(self):
# Performs the two phases of a delta cycle in a loop:
converged = False
while not converged:
changed = set() if self._vcd_writers else None
# 1. eval: run and suspend every non-waiting process once, queueing signal changes
for process in self._processes:
if process.runnable:
@ -442,11 +447,24 @@ class PySimEngine(BaseEngine):
# 2. commit: apply every queued signal change, waking up any waiting processes
converged = self._state.commit(changed)
def _step_tb(self):
changed = set() if self._vcd_writers else None
for vcd_writer in self._vcd_writers:
now_plus_deltas = self._now_plus_deltas(vcd_writer)
for change in changed:
if isinstance(change, _PySignalState):
signal_state = change
vcd_writer.update_signal(now_plus_deltas,
signal_state.signal, signal_state.curr)
elif isinstance(change, _PyMemoryChange):
vcd_writer.update_memory(now_plus_deltas, change.state.memory,
change.addr, change.state.data[change.addr])
else:
assert False # :nocov:
self._delta_cycles += 1
def _step_tb(self):
# Run processes waiting for an interval to expire (mainly `add_clock_process()``)
self._step_rtl(changed)
self._step_rtl()
# Run testbenches waiting for an interval to expire, or for a signal to change state
converged = False
@ -459,19 +477,7 @@ class PySimEngine(BaseEngine):
while testbench.run():
# Testbench has changed simulation state; run processes triggered by that
converged = False
self._step_rtl(changed)
for vcd_writer in self._vcd_writers:
for change in changed:
if isinstance(change, _PySignalState):
signal_state = change
vcd_writer.update_signal(self._timeline.now,
signal_state.signal, signal_state.curr)
elif isinstance(change, _PyMemoryChange):
vcd_writer.update_memory(self._timeline.now, change.state.memory,
change.addr, change.state.data[change.addr])
else:
assert False # :nocov:
self._step_rtl()
def advance(self):
self._step_tb()
@ -482,13 +488,16 @@ class PySimEngine(BaseEngine):
def now(self):
return self._timeline.now
def _now_plus_deltas(self, vcd_writer):
return self._timeline.now + self._delta_cycles * vcd_writer.fs_per_delta
@contextmanager
def write_vcd(self, *, vcd_file, gtkw_file, traces):
def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta):
vcd_writer = _VCDWriter(self._design,
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces, fs_per_delta=fs_per_delta)
try:
self._vcd_writers.append(vcd_writer)
yield
finally:
vcd_writer.close(self._timeline.now)
vcd_writer.close(self._now_plus_deltas(vcd_writer))
self._vcd_writers.remove(vcd_writer)

View file

@ -1188,7 +1188,8 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
sim = Simulator(Module())
sim.add_testbench(testbench_1)
sim.add_testbench(testbench_2)
sim.run()
with sim.write_vcd("test.vcd", fs_per_delta=1):
sim.run()
class SimulatorRegressionTestCase(FHDLTestCase):