426 lines
16 KiB
Python
426 lines
16 KiB
Python
from contextlib import contextmanager
|
|
import itertools
|
|
import inspect
|
|
from vcd import VCDWriter
|
|
from vcd.gtkw import GTKWSave
|
|
|
|
from .._utils import deprecated
|
|
from ..hdl import *
|
|
from ..hdl.ast import SignalDict
|
|
from ._cmds import *
|
|
from ._core import *
|
|
from ._pyrtl import _FragmentCompiler
|
|
from ._pycoro import PyCoroProcess
|
|
|
|
|
|
__all__ = ["Settle", "Delay", "Tick", "Passive", "Active", "Simulator"]
|
|
|
|
|
|
class _NameExtractor:
|
|
def __init__(self):
|
|
self.names = SignalDict()
|
|
|
|
def __call__(self, fragment, *, hierarchy=("top",)):
|
|
def add_signal_name(signal):
|
|
hierarchical_signal_name = (*hierarchy, signal.name)
|
|
if signal not in self.names:
|
|
self.names[signal] = {hierarchical_signal_name}
|
|
else:
|
|
self.names[signal].add(hierarchical_signal_name)
|
|
|
|
for domain_name, domain_signals in fragment.drivers.items():
|
|
if domain_name is not None:
|
|
domain = fragment.domains[domain_name]
|
|
add_signal_name(domain.clk)
|
|
if domain.rst is not None:
|
|
add_signal_name(domain.rst)
|
|
|
|
for statement in fragment.statements:
|
|
for signal in statement._lhs_signals() | statement._rhs_signals():
|
|
if not isinstance(signal, (ClockSignal, ResetSignal)):
|
|
add_signal_name(signal)
|
|
|
|
for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
|
|
if subfragment_name is None:
|
|
subfragment_name = "U${}".format(subfragment_index)
|
|
self(subfragment, hierarchy=(*hierarchy, subfragment_name))
|
|
|
|
return self.names
|
|
|
|
|
|
class _WaveformWriter:
|
|
def update(self, timestamp, signal, value):
|
|
raise NotImplementedError # :nocov:
|
|
|
|
def close(self, timestamp):
|
|
raise NotImplementedError # :nocov:
|
|
|
|
|
|
class _VCDWaveformWriter(_WaveformWriter):
|
|
@staticmethod
|
|
def timestamp_to_vcd(timestamp):
|
|
return timestamp * (10 ** 10) # 1/(100 ps)
|
|
|
|
@staticmethod
|
|
def decode_to_vcd(signal, value):
|
|
return signal.decoder(value).expandtabs().replace(" ", "_")
|
|
|
|
def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()):
|
|
if isinstance(vcd_file, str):
|
|
vcd_file = open(vcd_file, "wt")
|
|
if isinstance(gtkw_file, str):
|
|
gtkw_file = open(gtkw_file, "wt")
|
|
|
|
self.vcd_vars = SignalDict()
|
|
self.vcd_file = vcd_file
|
|
self.vcd_writer = vcd_file and VCDWriter(self.vcd_file,
|
|
timescale="100 ps", comment="Generated by nMigen")
|
|
|
|
self.gtkw_names = SignalDict()
|
|
self.gtkw_file = gtkw_file
|
|
self.gtkw_save = gtkw_file and GTKWSave(self.gtkw_file)
|
|
|
|
self.traces = []
|
|
|
|
signal_names = _NameExtractor()(fragment)
|
|
|
|
trace_names = SignalDict()
|
|
for trace in traces:
|
|
if trace not in signal_names:
|
|
trace_names[trace] = {("top", trace.name)}
|
|
self.traces.append(trace)
|
|
|
|
if self.vcd_writer is None:
|
|
return
|
|
|
|
for signal, names in itertools.chain(signal_names.items(), trace_names.items()):
|
|
if signal.decoder:
|
|
var_type = "string"
|
|
var_size = 1
|
|
var_init = self.decode_to_vcd(signal, signal.reset)
|
|
else:
|
|
var_type = "wire"
|
|
var_size = signal.width
|
|
var_init = signal.reset
|
|
|
|
for (*var_scope, var_name) in names:
|
|
suffix = None
|
|
while True:
|
|
try:
|
|
if suffix is None:
|
|
var_name_suffix = var_name
|
|
else:
|
|
var_name_suffix = "{}${}".format(var_name, suffix)
|
|
if signal not in self.vcd_vars:
|
|
vcd_var = self.vcd_writer.register_var(
|
|
scope=var_scope, name=var_name_suffix,
|
|
var_type=var_type, size=var_size, init=var_init)
|
|
self.vcd_vars[signal] = vcd_var
|
|
else:
|
|
self.vcd_writer.register_alias(
|
|
scope=var_scope, name=var_name_suffix,
|
|
var=self.vcd_vars[signal])
|
|
break
|
|
except KeyError:
|
|
suffix = (suffix or 0) + 1
|
|
|
|
if signal not in self.gtkw_names:
|
|
self.gtkw_names[signal] = (*var_scope, var_name_suffix)
|
|
|
|
def update(self, timestamp, signal, value):
|
|
vcd_var = self.vcd_vars.get(signal)
|
|
if vcd_var is None:
|
|
return
|
|
|
|
vcd_timestamp = self.timestamp_to_vcd(timestamp)
|
|
if signal.decoder:
|
|
var_value = self.decode_to_vcd(signal, value)
|
|
else:
|
|
var_value = value
|
|
self.vcd_writer.change(vcd_var, vcd_timestamp, var_value)
|
|
|
|
def close(self, timestamp):
|
|
if self.vcd_writer is not None:
|
|
self.vcd_writer.close(self.timestamp_to_vcd(timestamp))
|
|
|
|
if self.gtkw_save is not None:
|
|
self.gtkw_save.dumpfile(self.vcd_file.name)
|
|
self.gtkw_save.dumpfile_size(self.vcd_file.tell())
|
|
|
|
self.gtkw_save.treeopen("top")
|
|
for signal in self.traces:
|
|
if len(signal) > 1 and not signal.decoder:
|
|
suffix = "[{}:0]".format(len(signal) - 1)
|
|
else:
|
|
suffix = ""
|
|
self.gtkw_save.trace(".".join(self.gtkw_names[signal]) + suffix)
|
|
|
|
if self.vcd_file is not None:
|
|
self.vcd_file.close()
|
|
if self.gtkw_file is not None:
|
|
self.gtkw_file.close()
|
|
|
|
|
|
class _SignalState:
|
|
__slots__ = ("signal", "curr", "next", "waiters", "pending")
|
|
|
|
def __init__(self, signal, pending):
|
|
self.signal = signal
|
|
self.pending = pending
|
|
self.waiters = dict()
|
|
self.curr = self.next = signal.reset
|
|
|
|
def set(self, value):
|
|
if self.next == value:
|
|
return
|
|
self.next = value
|
|
self.pending.add(self)
|
|
|
|
def commit(self):
|
|
if self.curr == self.next:
|
|
return False
|
|
self.curr = self.next
|
|
|
|
awoken_any = False
|
|
for process, trigger in self.waiters.items():
|
|
if trigger is None or trigger == self.curr:
|
|
process.runnable = awoken_any = True
|
|
return awoken_any
|
|
|
|
|
|
class _SimulatorState:
|
|
def __init__(self):
|
|
self.timeline = Timeline()
|
|
self.signals = SignalDict()
|
|
self.slots = []
|
|
self.pending = set()
|
|
|
|
def reset(self):
|
|
self.timeline.reset()
|
|
for signal, index in self.signals.items():
|
|
self.slots[index].curr = self.slots[index].next = signal.reset
|
|
self.pending.clear()
|
|
|
|
def get_signal(self, signal):
|
|
try:
|
|
return self.signals[signal]
|
|
except KeyError:
|
|
index = len(self.slots)
|
|
self.slots.append(_SignalState(signal, self.pending))
|
|
self.signals[signal] = index
|
|
return index
|
|
|
|
def add_trigger(self, process, signal, *, trigger=None):
|
|
index = self.get_signal(signal)
|
|
assert (process not in self.slots[index].waiters or
|
|
self.slots[index].waiters[process] == trigger)
|
|
self.slots[index].waiters[process] = trigger
|
|
|
|
def remove_trigger(self, process, signal):
|
|
index = self.get_signal(signal)
|
|
assert process in self.slots[index].waiters
|
|
del self.slots[index].waiters[process]
|
|
|
|
def commit(self):
|
|
converged = True
|
|
for signal_state in self.pending:
|
|
if signal_state.commit():
|
|
converged = False
|
|
self.pending.clear()
|
|
return converged
|
|
|
|
|
|
class Simulator:
|
|
def __init__(self, fragment):
|
|
self._state = _SimulatorState()
|
|
self._fragment = Fragment.get(fragment, platform=None).prepare()
|
|
self._processes = _FragmentCompiler(self._state)(self._fragment)
|
|
self._clocked = set()
|
|
self._waveform_writers = []
|
|
|
|
def _check_process(self, process):
|
|
if not (inspect.isgeneratorfunction(process) or inspect.iscoroutinefunction(process)):
|
|
raise TypeError("Cannot add a process {!r} because it is not a generator function"
|
|
.format(process))
|
|
return process
|
|
|
|
def _add_coroutine_process(self, process, *, default_cmd):
|
|
self._processes.add(PyCoroProcess(self._state, self._fragment.domains, process,
|
|
default_cmd=default_cmd))
|
|
|
|
def add_process(self, process):
|
|
process = self._check_process(process)
|
|
def wrapper():
|
|
# Only start a bench process after comb settling, so that the reset values are correct.
|
|
yield Settle()
|
|
yield from process()
|
|
self._add_coroutine_process(wrapper, default_cmd=None)
|
|
|
|
def add_sync_process(self, process, *, domain="sync"):
|
|
process = self._check_process(process)
|
|
def wrapper():
|
|
# Only start a sync process after the first clock edge (or reset edge, if the domain
|
|
# uses an asynchronous reset). This matches the behavior of synchronous FFs.
|
|
yield Tick(domain)
|
|
yield from process()
|
|
return self._add_coroutine_process(wrapper, default_cmd=Tick(domain))
|
|
|
|
def add_clock(self, period, *, phase=None, domain="sync", if_exists=False):
|
|
"""Add a clock process.
|
|
|
|
Adds a process that drives the clock signal of ``domain`` at a 50% duty cycle.
|
|
|
|
Arguments
|
|
---------
|
|
period : float
|
|
Clock period. The process will toggle the ``domain`` clock signal every ``period / 2``
|
|
seconds.
|
|
phase : None or float
|
|
Clock phase. The process will wait ``phase`` seconds before the first clock transition.
|
|
If not specified, defaults to ``period / 2``.
|
|
domain : str or ClockDomain
|
|
Driven clock domain. If specified as a string, the domain with that name is looked up
|
|
in the root fragment of the simulation.
|
|
if_exists : bool
|
|
If ``False`` (the default), raise an error if the driven domain is specified as
|
|
a string and the root fragment does not have such a domain. If ``True``, do nothing
|
|
in this case.
|
|
"""
|
|
if isinstance(domain, ClockDomain):
|
|
pass
|
|
elif domain in self._fragment.domains:
|
|
domain = self._fragment.domains[domain]
|
|
elif if_exists:
|
|
return
|
|
else:
|
|
raise ValueError("Domain {!r} is not present in simulation"
|
|
.format(domain))
|
|
if domain in self._clocked:
|
|
raise ValueError("Domain {!r} already has a clock driving it"
|
|
.format(domain.name))
|
|
|
|
half_period = period / 2
|
|
if phase is None:
|
|
# By default, delay the first edge by half period. This causes any synchronous activity
|
|
# to happen at a non-zero time, distinguishing it from the reset values in the waveform
|
|
# viewer.
|
|
phase = half_period
|
|
def clk_process():
|
|
yield Passive()
|
|
yield Delay(phase)
|
|
# Behave correctly if the process is added after the clock signal is manipulated, or if
|
|
# its reset state is high.
|
|
initial = (yield domain.clk)
|
|
steps = (
|
|
domain.clk.eq(~initial),
|
|
Delay(half_period),
|
|
domain.clk.eq(initial),
|
|
Delay(half_period),
|
|
)
|
|
while True:
|
|
yield from iter(steps)
|
|
self._add_coroutine_process(clk_process, default_cmd=None)
|
|
self._clocked.add(domain)
|
|
|
|
def reset(self):
|
|
"""Reset the simulation.
|
|
|
|
Assign the reset value to every signal in the simulation, and restart every user process.
|
|
"""
|
|
self._state.reset()
|
|
for process in self._processes:
|
|
process.reset()
|
|
|
|
def _real_step(self):
|
|
"""Step the simulation.
|
|
|
|
Run every process and commit changes until a fixed point is reached. If there is
|
|
an unstable combinatorial loop, this function will never return.
|
|
"""
|
|
# Performs the two phases of a delta cycle in a loop:
|
|
converged = False
|
|
while not converged:
|
|
# 1. eval: run and suspend every non-waiting process once, queueing signal changes
|
|
for process in self._processes:
|
|
if process.runnable:
|
|
process.runnable = False
|
|
process.run()
|
|
|
|
for waveform_writer in self._waveform_writers:
|
|
for signal_state in self._state.pending:
|
|
waveform_writer.update(self._state.timeline.now,
|
|
signal_state.signal, signal_state.next)
|
|
|
|
# 2. commit: apply every queued signal change, waking up any waiting processes
|
|
converged = self._state.commit()
|
|
|
|
# TODO(nmigen-0.4): replace with _real_step
|
|
@deprecated("instead of `sim.step()`, use `sim.advance()`")
|
|
def step(self):
|
|
return self.advance()
|
|
|
|
def advance(self):
|
|
"""Advance the simulation.
|
|
|
|
Run every process and commit changes until a fixed point is reached, then advance time
|
|
to the closest deadline (if any). If there is an unstable combinatorial loop,
|
|
this function will never return.
|
|
|
|
Returns ``True`` if there are any active processes, ``False`` otherwise.
|
|
"""
|
|
self._real_step()
|
|
self._state.timeline.advance()
|
|
return any(not process.passive for process in self._processes)
|
|
|
|
def run(self):
|
|
"""Run the simulation while any processes are active.
|
|
|
|
Processes added with :meth:`add_process` and :meth:`add_sync_process` are initially active,
|
|
and may change their status using the ``yield Passive()`` and ``yield Active()`` commands.
|
|
Processes compiled from HDL and added with :meth:`add_clock` are always passive.
|
|
"""
|
|
while self.advance():
|
|
pass
|
|
|
|
def run_until(self, deadline, *, run_passive=False):
|
|
"""Run the simulation until it advances to ``deadline``.
|
|
|
|
If ``run_passive`` is ``False``, the simulation also stops when there are no active
|
|
processes, similar to :meth:`run`. Otherwise, the simulation will stop only after it
|
|
advances to or past ``deadline``.
|
|
|
|
If the simulation stops advancing, this function will never return.
|
|
"""
|
|
assert self._state.timeline.now <= deadline
|
|
while (self.advance() or run_passive) and self._state.timeline.now < deadline:
|
|
pass
|
|
|
|
@contextmanager
|
|
def write_vcd(self, vcd_file, gtkw_file=None, *, traces=()):
|
|
"""Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file.
|
|
|
|
This method returns a context manager. It can be used as: ::
|
|
|
|
sim = Simulator(frag)
|
|
sim.add_clock(1e-6)
|
|
with sim.write_vcd("dump.vcd", "dump.gtkw"):
|
|
sim.run_until(1e-3)
|
|
|
|
Arguments
|
|
---------
|
|
vcd_file : str or file-like object
|
|
Verilog Value Change Dump file or filename.
|
|
gtkw_file : str or file-like object
|
|
GTKWave save file or filename.
|
|
traces : iterable of Signal
|
|
Signals to display traces for.
|
|
"""
|
|
if self._state.timeline.now != 0.0:
|
|
raise ValueError("Cannot start writing waveforms after advancing simulation time")
|
|
waveform_writer = _VCDWaveformWriter(self._fragment,
|
|
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)
|
|
self._waveform_writers.append(waveform_writer)
|
|
yield
|
|
waveform_writer.close(self._state.timeline.now)
|
|
self._waveform_writers.remove(waveform_writer)
|