amaranth/nmigen/sim/pysim.py
whitequark 9bc42cb8c5 sim._pyclock: new type of process.
The overhead of coroutine processes is fairly high. A clock driver
implemented through a coroutine process is mostly overhead. This was
partially addressed in commit 2398b792 by microoptimizing yielding.

This commit eliminates the coroutine process overhead completely by
introducing dedicated clock processes. It also simplifies the logic
to a simple toggle.

This change improves runtime by about 12% on Minerva SRAM SoC.
2020-08-27 07:56:47 +00:00

412 lines
15 KiB
Python

from contextlib import contextmanager
import itertools
import inspect
from vcd import VCDWriter
from vcd.gtkw import GTKWSave
from .._utils import deprecated
from ..hdl import *
from ..hdl.ast import SignalDict
from ._cmds import *
from ._core import *
from ._pyrtl import _FragmentCompiler
from ._pycoro import PyCoroProcess
from ._pyclock import PyClockProcess
__all__ = ["Settle", "Delay", "Tick", "Passive", "Active", "Simulator"]
class _NameExtractor:
def __init__(self):
self.names = SignalDict()
def __call__(self, fragment, *, hierarchy=("top",)):
def add_signal_name(signal):
hierarchical_signal_name = (*hierarchy, signal.name)
if signal not in self.names:
self.names[signal] = {hierarchical_signal_name}
else:
self.names[signal].add(hierarchical_signal_name)
for domain_name, domain_signals in fragment.drivers.items():
if domain_name is not None:
domain = fragment.domains[domain_name]
add_signal_name(domain.clk)
if domain.rst is not None:
add_signal_name(domain.rst)
for statement in fragment.statements:
for signal in statement._lhs_signals() | statement._rhs_signals():
if not isinstance(signal, (ClockSignal, ResetSignal)):
add_signal_name(signal)
for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
if subfragment_name is None:
subfragment_name = "U${}".format(subfragment_index)
self(subfragment, hierarchy=(*hierarchy, subfragment_name))
return self.names
class _WaveformWriter:
def update(self, timestamp, signal, value):
raise NotImplementedError # :nocov:
def close(self, timestamp):
raise NotImplementedError # :nocov:
class _VCDWaveformWriter(_WaveformWriter):
@staticmethod
def timestamp_to_vcd(timestamp):
return timestamp * (10 ** 10) # 1/(100 ps)
@staticmethod
def decode_to_vcd(signal, value):
return signal.decoder(value).expandtabs().replace(" ", "_")
def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()):
if isinstance(vcd_file, str):
vcd_file = open(vcd_file, "wt")
if isinstance(gtkw_file, str):
gtkw_file = open(gtkw_file, "wt")
self.vcd_vars = SignalDict()
self.vcd_file = vcd_file
self.vcd_writer = vcd_file and VCDWriter(self.vcd_file,
timescale="100 ps", comment="Generated by nMigen")
self.gtkw_names = SignalDict()
self.gtkw_file = gtkw_file
self.gtkw_save = gtkw_file and GTKWSave(self.gtkw_file)
self.traces = []
signal_names = _NameExtractor()(fragment)
trace_names = SignalDict()
for trace in traces:
if trace not in signal_names:
trace_names[trace] = {("top", trace.name)}
self.traces.append(trace)
if self.vcd_writer is None:
return
for signal, names in itertools.chain(signal_names.items(), trace_names.items()):
if signal.decoder:
var_type = "string"
var_size = 1
var_init = self.decode_to_vcd(signal, signal.reset)
else:
var_type = "wire"
var_size = signal.width
var_init = signal.reset
for (*var_scope, var_name) in names:
suffix = None
while True:
try:
if suffix is None:
var_name_suffix = var_name
else:
var_name_suffix = "{}${}".format(var_name, suffix)
if signal not in self.vcd_vars:
vcd_var = self.vcd_writer.register_var(
scope=var_scope, name=var_name_suffix,
var_type=var_type, size=var_size, init=var_init)
self.vcd_vars[signal] = vcd_var
else:
self.vcd_writer.register_alias(
scope=var_scope, name=var_name_suffix,
var=self.vcd_vars[signal])
break
except KeyError:
suffix = (suffix or 0) + 1
if signal not in self.gtkw_names:
self.gtkw_names[signal] = (*var_scope, var_name_suffix)
def update(self, timestamp, signal, value):
vcd_var = self.vcd_vars.get(signal)
if vcd_var is None:
return
vcd_timestamp = self.timestamp_to_vcd(timestamp)
if signal.decoder:
var_value = self.decode_to_vcd(signal, value)
else:
var_value = value
self.vcd_writer.change(vcd_var, vcd_timestamp, var_value)
def close(self, timestamp):
if self.vcd_writer is not None:
self.vcd_writer.close(self.timestamp_to_vcd(timestamp))
if self.gtkw_save is not None:
self.gtkw_save.dumpfile(self.vcd_file.name)
self.gtkw_save.dumpfile_size(self.vcd_file.tell())
self.gtkw_save.treeopen("top")
for signal in self.traces:
if len(signal) > 1 and not signal.decoder:
suffix = "[{}:0]".format(len(signal) - 1)
else:
suffix = ""
self.gtkw_save.trace(".".join(self.gtkw_names[signal]) + suffix)
if self.vcd_file is not None:
self.vcd_file.close()
if self.gtkw_file is not None:
self.gtkw_file.close()
class _SignalState:
__slots__ = ("signal", "curr", "next", "waiters", "pending")
def __init__(self, signal, pending):
self.signal = signal
self.pending = pending
self.waiters = dict()
self.curr = self.next = signal.reset
def set(self, value):
if self.next == value:
return
self.next = value
self.pending.add(self)
def commit(self):
if self.curr == self.next:
return False
self.curr = self.next
awoken_any = False
for process, trigger in self.waiters.items():
if trigger is None or trigger == self.curr:
process.runnable = awoken_any = True
return awoken_any
class _SimulatorState:
def __init__(self):
self.timeline = Timeline()
self.signals = SignalDict()
self.slots = []
self.pending = set()
def reset(self):
self.timeline.reset()
for signal, index in self.signals.items():
self.slots[index].curr = self.slots[index].next = signal.reset
self.pending.clear()
def get_signal(self, signal):
try:
return self.signals[signal]
except KeyError:
index = len(self.slots)
self.slots.append(_SignalState(signal, self.pending))
self.signals[signal] = index
return index
def add_trigger(self, process, signal, *, trigger=None):
index = self.get_signal(signal)
assert (process not in self.slots[index].waiters or
self.slots[index].waiters[process] == trigger)
self.slots[index].waiters[process] = trigger
def remove_trigger(self, process, signal):
index = self.get_signal(signal)
assert process in self.slots[index].waiters
del self.slots[index].waiters[process]
def commit(self):
converged = True
for signal_state in self.pending:
if signal_state.commit():
converged = False
self.pending.clear()
return converged
class Simulator:
def __init__(self, fragment):
self._state = _SimulatorState()
self._fragment = Fragment.get(fragment, platform=None).prepare()
self._processes = _FragmentCompiler(self._state)(self._fragment)
self._clocked = set()
self._waveform_writers = []
def _check_process(self, process):
if not (inspect.isgeneratorfunction(process) or inspect.iscoroutinefunction(process)):
raise TypeError("Cannot add a process {!r} because it is not a generator function"
.format(process))
return process
def _add_coroutine_process(self, process, *, default_cmd):
self._processes.add(PyCoroProcess(self._state, self._fragment.domains, process,
default_cmd=default_cmd))
def add_process(self, process):
process = self._check_process(process)
def wrapper():
# Only start a bench process after comb settling, so that the reset values are correct.
yield Settle()
yield from process()
self._add_coroutine_process(wrapper, default_cmd=None)
def add_sync_process(self, process, *, domain="sync"):
process = self._check_process(process)
def wrapper():
# Only start a sync process after the first clock edge (or reset edge, if the domain
# uses an asynchronous reset). This matches the behavior of synchronous FFs.
yield Tick(domain)
yield from process()
return self._add_coroutine_process(wrapper, default_cmd=Tick(domain))
def add_clock(self, period, *, phase=None, domain="sync", if_exists=False):
"""Add a clock process.
Adds a process that drives the clock signal of ``domain`` at a 50% duty cycle.
Arguments
---------
period : float
Clock period. The process will toggle the ``domain`` clock signal every ``period / 2``
seconds.
phase : None or float
Clock phase. The process will wait ``phase`` seconds before the first clock transition.
If not specified, defaults to ``period / 2``.
domain : str or ClockDomain
Driven clock domain. If specified as a string, the domain with that name is looked up
in the root fragment of the simulation.
if_exists : bool
If ``False`` (the default), raise an error if the driven domain is specified as
a string and the root fragment does not have such a domain. If ``True``, do nothing
in this case.
"""
if isinstance(domain, ClockDomain):
pass
elif domain in self._fragment.domains:
domain = self._fragment.domains[domain]
elif if_exists:
return
else:
raise ValueError("Domain {!r} is not present in simulation"
.format(domain))
if domain in self._clocked:
raise ValueError("Domain {!r} already has a clock driving it"
.format(domain.name))
if phase is None:
# By default, delay the first edge by half period. This causes any synchronous activity
# to happen at a non-zero time, distinguishing it from the reset values in the waveform
# viewer.
phase = period / 2
self._processes.add(PyClockProcess(self._state, domain.clk, phase=phase, period=period))
self._clocked.add(domain)
def reset(self):
"""Reset the simulation.
Assign the reset value to every signal in the simulation, and restart every user process.
"""
self._state.reset()
for process in self._processes:
process.reset()
def _real_step(self):
"""Step the simulation.
Run every process and commit changes until a fixed point is reached. If there is
an unstable combinatorial loop, this function will never return.
"""
# Performs the two phases of a delta cycle in a loop:
converged = False
while not converged:
# 1. eval: run and suspend every non-waiting process once, queueing signal changes
for process in self._processes:
if process.runnable:
process.runnable = False
process.run()
for waveform_writer in self._waveform_writers:
for signal_state in self._state.pending:
waveform_writer.update(self._state.timeline.now,
signal_state.signal, signal_state.next)
# 2. commit: apply every queued signal change, waking up any waiting processes
converged = self._state.commit()
# TODO(nmigen-0.4): replace with _real_step
@deprecated("instead of `sim.step()`, use `sim.advance()`")
def step(self):
return self.advance()
def advance(self):
"""Advance the simulation.
Run every process and commit changes until a fixed point is reached, then advance time
to the closest deadline (if any). If there is an unstable combinatorial loop,
this function will never return.
Returns ``True`` if there are any active processes, ``False`` otherwise.
"""
self._real_step()
self._state.timeline.advance()
return any(not process.passive for process in self._processes)
def run(self):
"""Run the simulation while any processes are active.
Processes added with :meth:`add_process` and :meth:`add_sync_process` are initially active,
and may change their status using the ``yield Passive()`` and ``yield Active()`` commands.
Processes compiled from HDL and added with :meth:`add_clock` are always passive.
"""
while self.advance():
pass
def run_until(self, deadline, *, run_passive=False):
"""Run the simulation until it advances to ``deadline``.
If ``run_passive`` is ``False``, the simulation also stops when there are no active
processes, similar to :meth:`run`. Otherwise, the simulation will stop only after it
advances to or past ``deadline``.
If the simulation stops advancing, this function will never return.
"""
assert self._state.timeline.now <= deadline
while (self.advance() or run_passive) and self._state.timeline.now < deadline:
pass
@contextmanager
def write_vcd(self, vcd_file, gtkw_file=None, *, traces=()):
"""Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file.
This method returns a context manager. It can be used as: ::
sim = Simulator(frag)
sim.add_clock(1e-6)
with sim.write_vcd("dump.vcd", "dump.gtkw"):
sim.run_until(1e-3)
Arguments
---------
vcd_file : str or file-like object
Verilog Value Change Dump file or filename.
gtkw_file : str or file-like object
GTKWave save file or filename.
traces : iterable of Signal
Signals to display traces for.
"""
if self._state.timeline.now != 0.0:
raise ValueError("Cannot start writing waveforms after advancing simulation time")
waveform_writer = _VCDWaveformWriter(self._fragment,
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)
self._waveform_writers.append(waveform_writer)
yield
waveform_writer.close(self._state.timeline.now)
self._waveform_writers.remove(waveform_writer)