from contextlib import contextmanager import itertools import inspect from vcd import VCDWriter from vcd.gtkw import GTKWSave from .._utils import deprecated from ..hdl import * from ..hdl.ast import SignalDict from ._cmds import * from ._core import * from ._pyrtl import _FragmentCompiler from ._pycoro import PyCoroProcess __all__ = ["Settle", "Delay", "Tick", "Passive", "Active", "Simulator"] class _NameExtractor: def __init__(self): self.names = SignalDict() def __call__(self, fragment, *, hierarchy=("top",)): def add_signal_name(signal): hierarchical_signal_name = (*hierarchy, signal.name) if signal not in self.names: self.names[signal] = {hierarchical_signal_name} else: self.names[signal].add(hierarchical_signal_name) for domain_name, domain_signals in fragment.drivers.items(): if domain_name is not None: domain = fragment.domains[domain_name] add_signal_name(domain.clk) if domain.rst is not None: add_signal_name(domain.rst) for statement in fragment.statements: for signal in statement._lhs_signals() | statement._rhs_signals(): if not isinstance(signal, (ClockSignal, ResetSignal)): add_signal_name(signal) for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments): if subfragment_name is None: subfragment_name = "U${}".format(subfragment_index) self(subfragment, hierarchy=(*hierarchy, subfragment_name)) return self.names class _WaveformWriter: def update(self, timestamp, signal, value): raise NotImplementedError # :nocov: def close(self, timestamp): raise NotImplementedError # :nocov: class _VCDWaveformWriter(_WaveformWriter): @staticmethod def timestamp_to_vcd(timestamp): return timestamp * (10 ** 10) # 1/(100 ps) @staticmethod def decode_to_vcd(signal, value): return signal.decoder(value).expandtabs().replace(" ", "_") def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()): if isinstance(vcd_file, str): vcd_file = open(vcd_file, "wt") if isinstance(gtkw_file, str): gtkw_file = open(gtkw_file, "wt") self.vcd_vars = SignalDict() self.vcd_file = vcd_file self.vcd_writer = vcd_file and VCDWriter(self.vcd_file, timescale="100 ps", comment="Generated by nMigen") self.gtkw_names = SignalDict() self.gtkw_file = gtkw_file self.gtkw_save = gtkw_file and GTKWSave(self.gtkw_file) self.traces = [] signal_names = _NameExtractor()(fragment) trace_names = SignalDict() for trace in traces: if trace not in signal_names: trace_names[trace] = {("top", trace.name)} self.traces.append(trace) if self.vcd_writer is None: return for signal, names in itertools.chain(signal_names.items(), trace_names.items()): if signal.decoder: var_type = "string" var_size = 1 var_init = self.decode_to_vcd(signal, signal.reset) else: var_type = "wire" var_size = signal.width var_init = signal.reset for (*var_scope, var_name) in names: suffix = None while True: try: if suffix is None: var_name_suffix = var_name else: var_name_suffix = "{}${}".format(var_name, suffix) if signal not in self.vcd_vars: vcd_var = self.vcd_writer.register_var( scope=var_scope, name=var_name_suffix, var_type=var_type, size=var_size, init=var_init) self.vcd_vars[signal] = vcd_var else: self.vcd_writer.register_alias( scope=var_scope, name=var_name_suffix, var=self.vcd_vars[signal]) break except KeyError: suffix = (suffix or 0) + 1 if signal not in self.gtkw_names: self.gtkw_names[signal] = (*var_scope, var_name_suffix) def update(self, timestamp, signal, value): vcd_var = self.vcd_vars.get(signal) if vcd_var is None: return vcd_timestamp = self.timestamp_to_vcd(timestamp) if signal.decoder: var_value = self.decode_to_vcd(signal, value) else: var_value = value self.vcd_writer.change(vcd_var, vcd_timestamp, var_value) def close(self, timestamp): if self.vcd_writer is not None: self.vcd_writer.close(self.timestamp_to_vcd(timestamp)) if self.gtkw_save is not None: self.gtkw_save.dumpfile(self.vcd_file.name) self.gtkw_save.dumpfile_size(self.vcd_file.tell()) self.gtkw_save.treeopen("top") for signal in self.traces: if len(signal) > 1 and not signal.decoder: suffix = "[{}:0]".format(len(signal) - 1) else: suffix = "" self.gtkw_save.trace(".".join(self.gtkw_names[signal]) + suffix) if self.vcd_file is not None: self.vcd_file.close() if self.gtkw_file is not None: self.gtkw_file.close() class _SignalState: __slots__ = ("signal", "curr", "next", "waiters", "pending") def __init__(self, signal, pending): self.signal = signal self.pending = pending self.waiters = dict() self.curr = self.next = signal.reset def set(self, value): if self.next == value: return self.next = value self.pending.add(self) def commit(self): if self.curr == self.next: return False self.curr = self.next awoken_any = False for process, trigger in self.waiters.items(): if trigger is None or trigger == self.curr: process.runnable = awoken_any = True return awoken_any class _SimulatorState: def __init__(self): self.timeline = Timeline() self.signals = SignalDict() self.slots = [] self.pending = set() def reset(self): self.timeline.reset() for signal, index in self.signals.items(): self.slots[index].curr = self.slots[index].next = signal.reset self.pending.clear() def get_signal(self, signal): try: return self.signals[signal] except KeyError: index = len(self.slots) self.slots.append(_SignalState(signal, self.pending)) self.signals[signal] = index return index def add_trigger(self, process, signal, *, trigger=None): index = self.get_signal(signal) assert (process not in self.slots[index].waiters or self.slots[index].waiters[process] == trigger) self.slots[index].waiters[process] = trigger def remove_trigger(self, process, signal): index = self.get_signal(signal) assert process in self.slots[index].waiters del self.slots[index].waiters[process] def commit(self): converged = True for signal_state in self.pending: if signal_state.commit(): converged = False self.pending.clear() return converged class Simulator: def __init__(self, fragment): self._state = _SimulatorState() self._fragment = Fragment.get(fragment, platform=None).prepare() self._processes = _FragmentCompiler(self._state)(self._fragment) self._clocked = set() self._waveform_writers = [] def _check_process(self, process): if not (inspect.isgeneratorfunction(process) or inspect.iscoroutinefunction(process)): raise TypeError("Cannot add a process {!r} because it is not a generator function" .format(process)) return process def _add_coroutine_process(self, process, *, default_cmd): self._processes.add(PyCoroProcess(self._state, self._fragment.domains, process, default_cmd=default_cmd)) def add_process(self, process): process = self._check_process(process) def wrapper(): # Only start a bench process after comb settling, so that the reset values are correct. yield Settle() yield from process() self._add_coroutine_process(wrapper, default_cmd=None) def add_sync_process(self, process, *, domain="sync"): process = self._check_process(process) def wrapper(): # Only start a sync process after the first clock edge (or reset edge, if the domain # uses an asynchronous reset). This matches the behavior of synchronous FFs. yield Tick(domain) yield from process() return self._add_coroutine_process(wrapper, default_cmd=Tick(domain)) def add_clock(self, period, *, phase=None, domain="sync", if_exists=False): """Add a clock process. Adds a process that drives the clock signal of ``domain`` at a 50% duty cycle. Arguments --------- period : float Clock period. The process will toggle the ``domain`` clock signal every ``period / 2`` seconds. phase : None or float Clock phase. The process will wait ``phase`` seconds before the first clock transition. If not specified, defaults to ``period / 2``. domain : str or ClockDomain Driven clock domain. If specified as a string, the domain with that name is looked up in the root fragment of the simulation. if_exists : bool If ``False`` (the default), raise an error if the driven domain is specified as a string and the root fragment does not have such a domain. If ``True``, do nothing in this case. """ if isinstance(domain, ClockDomain): pass elif domain in self._fragment.domains: domain = self._fragment.domains[domain] elif if_exists: return else: raise ValueError("Domain {!r} is not present in simulation" .format(domain)) if domain in self._clocked: raise ValueError("Domain {!r} already has a clock driving it" .format(domain.name)) half_period = period / 2 if phase is None: # By default, delay the first edge by half period. This causes any synchronous activity # to happen at a non-zero time, distinguishing it from the reset values in the waveform # viewer. phase = half_period def clk_process(): yield Passive() yield Delay(phase) # Behave correctly if the process is added after the clock signal is manipulated, or if # its reset state is high. initial = (yield domain.clk) steps = ( domain.clk.eq(~initial), Delay(half_period), domain.clk.eq(initial), Delay(half_period), ) while True: yield from iter(steps) self._add_coroutine_process(clk_process, default_cmd=None) self._clocked.add(domain) def reset(self): """Reset the simulation. Assign the reset value to every signal in the simulation, and restart every user process. """ self._state.reset() for process in self._processes: process.reset() def _real_step(self): """Step the simulation. Run every process and commit changes until a fixed point is reached. If there is an unstable combinatorial loop, this function will never return. """ # Performs the two phases of a delta cycle in a loop: converged = False while not converged: # 1. eval: run and suspend every non-waiting process once, queueing signal changes for process in self._processes: if process.runnable: process.runnable = False process.run() for waveform_writer in self._waveform_writers: for signal_state in self._state.pending: waveform_writer.update(self._state.timeline.now, signal_state.signal, signal_state.next) # 2. commit: apply every queued signal change, waking up any waiting processes converged = self._state.commit() # TODO(nmigen-0.4): replace with _real_step @deprecated("instead of `sim.step()`, use `sim.advance()`") def step(self): return self.advance() def advance(self): """Advance the simulation. Run every process and commit changes until a fixed point is reached, then advance time to the closest deadline (if any). If there is an unstable combinatorial loop, this function will never return. Returns ``True`` if there are any active processes, ``False`` otherwise. """ self._real_step() self._state.timeline.advance() return any(not process.passive for process in self._processes) def run(self): """Run the simulation while any processes are active. Processes added with :meth:`add_process` and :meth:`add_sync_process` are initially active, and may change their status using the ``yield Passive()`` and ``yield Active()`` commands. Processes compiled from HDL and added with :meth:`add_clock` are always passive. """ while self.advance(): pass def run_until(self, deadline, *, run_passive=False): """Run the simulation until it advances to ``deadline``. If ``run_passive`` is ``False``, the simulation also stops when there are no active processes, similar to :meth:`run`. Otherwise, the simulation will stop only after it advances to or past ``deadline``. If the simulation stops advancing, this function will never return. """ assert self._state.timeline.now <= deadline while (self.advance() or run_passive) and self._state.timeline.now < deadline: pass @contextmanager def write_vcd(self, vcd_file, gtkw_file=None, *, traces=()): """Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file. This method returns a context manager. It can be used as: :: sim = Simulator(frag) sim.add_clock(1e-6) with sim.write_vcd("dump.vcd", "dump.gtkw"): sim.run_until(1e-3) Arguments --------- vcd_file : str or file-like object Verilog Value Change Dump file or filename. gtkw_file : str or file-like object GTKWave save file or filename. traces : iterable of Signal Signals to display traces for. """ if self._state.timeline.now != 0.0: raise ValueError("Cannot start writing waveforms after advancing simulation time") waveform_writer = _VCDWaveformWriter(self._fragment, vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces) self._waveform_writers.append(waveform_writer) yield waveform_writer.close(self._state.timeline.now) self._waveform_writers.remove(waveform_writer)