diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index 8f6fe40..6dc1e60 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -65,10 +65,13 @@ class BaseSimulation: class BaseEngine: + def add_clock_process(self, clock, *, phase, period): + raise NotImplementedError # :nocov: + def add_coroutine_process(self, process, *, default_cmd): raise NotImplementedError # :nocov: - def add_clock_process(self, clock, *, phase, period): + def add_testbench_process(self, process): raise NotImplementedError # :nocov: def reset(self): diff --git a/amaranth/sim/_pycoro.py b/amaranth/sim/_pycoro.py index 085eb00..8b5b259 100644 --- a/amaranth/sim/_pycoro.py +++ b/amaranth/sim/_pycoro.py @@ -1,7 +1,7 @@ import inspect from ..hdl import * -from ..hdl._ast import Statement, SignalSet, ValueCastable +from ..hdl._ast import Statement, Assign, SignalSet, ValueCastable from ..hdl._mem import MemorySimRead, MemorySimWrite from .core import Tick, Settle, Delay, Passive, Active from ._base import BaseProcess, BaseMemoryState @@ -12,11 +12,12 @@ __all__ = ["PyCoroProcess"] class PyCoroProcess(BaseProcess): - def __init__(self, state, domains, constructor, *, default_cmd=None): + def __init__(self, state, domains, constructor, *, default_cmd=None, testbench=False): self.state = state self.domains = domains self.constructor = constructor self.default_cmd = default_cmd + self.testbench = testbench self.reset() @@ -70,7 +71,7 @@ class PyCoroProcess(BaseProcess): except StopIteration: self.passive = True self.coroutine = None - return + return False # no assignment try: if command is None: @@ -88,6 +89,8 @@ class PyCoroProcess(BaseProcess): elif isinstance(command, Statement): exec(_StatementCompiler.compile(self.state, command), self.exec_locals) + if isinstance(command, Assign) and self.testbench: + return True # assignment; run a delta cycle elif type(command) is Tick: domain = command.domain @@ -102,17 +105,20 @@ class PyCoroProcess(BaseProcess): self.add_trigger(domain.clk, trigger=1 if domain.clk_edge == "pos" else 0) if domain.rst is not None and domain.async_reset: self.add_trigger(domain.rst, trigger=1) - return + return False # no assignments + + elif self.testbench and (command is None or isinstance(command, Settle)): + raise TypeError(f"Command {command!r} is not allowed in testbenches") elif type(command) is Settle: self.state.wait_interval(self, None) - return + return False # no assignments elif type(command) is Delay: # Internal timeline is in 1ps integeral units, intervals are public API and in floating point interval = int(command.interval * 1e12) if command.interval is not None else None self.state.wait_interval(self, interval) - return + return False # no assignments elif type(command) is Passive: self.passive = True @@ -140,6 +146,8 @@ class PyCoroProcess(BaseProcess): state = self.state.slots[index] assert isinstance(state, BaseMemoryState) state.write(addr, data) + if self.testbench: + return True # assignment; run a delta cycle elif command is None: # only possible if self.default_cmd is None raise TypeError("Received default command from process {!r} that was added " diff --git a/amaranth/sim/core.py b/amaranth/sim/core.py index 25e5a49..0291ec5 100644 --- a/amaranth/sim/core.py +++ b/amaranth/sim/core.py @@ -115,33 +115,7 @@ class Simulator: self._engine.add_coroutine_process(wrapper, default_cmd=Tick(domain)) def add_testbench(self, process): - process = self._check_process(process) - def wrapper(): - generator = process() - # Only start a bench process after power-on reset finishes. Use object.__new__ to - # avoid deprecation warning. - yield object.__new__(Settle) - result = None - exception = None - while True: - try: - if exception is None: - command = generator.send(result) - else: - command = generator.throw(exception) - except StopIteration: - break - if command is None or isinstance(command, Settle): - exception = TypeError(f"Command {command!r} is not allowed in testbenches") - else: - try: - result = yield command - exception = None - yield object.__new__(Settle) - except Exception as e: - result = None - exception = e - self._engine.add_coroutine_process(wrapper, default_cmd=None) + self._engine.add_testbench_process(self._check_process(process)) def add_clock(self, period, *, phase=None, domain="sync", if_exists=False): """Add a clock process. diff --git a/amaranth/sim/pysim.py b/amaranth/sim/pysim.py index c5ccf2f..229b223 100644 --- a/amaranth/sim/pysim.py +++ b/amaranth/sim/pysim.py @@ -409,24 +409,27 @@ class PySimEngine(BaseEngine): self._design = design self._processes = _FragmentCompiler(self._state)(self._design.fragment) + self._testbenches = [] self._vcd_writers = [] + def add_clock_process(self, clock, *, phase, period): + self._processes.add(PyClockProcess(self._state, clock, + phase=phase, period=period)) + def add_coroutine_process(self, process, *, default_cmd): self._processes.add(PyCoroProcess(self._state, self._design.fragment.domains, process, default_cmd=default_cmd)) - def add_clock_process(self, clock, *, phase, period): - self._processes.add(PyClockProcess(self._state, clock, - phase=phase, period=period)) + def add_testbench_process(self, process): + self._testbenches.append(PyCoroProcess(self._state, self._design.fragment.domains, process, + testbench=True)) def reset(self): self._state.reset() for process in self._processes: process.reset() - def _step(self): - changed = set() if self._vcd_writers else None - + def _step_rtl(self, changed): # Performs the two phases of a delta cycle in a loop: converged = False while not converged: @@ -439,6 +442,25 @@ class PySimEngine(BaseEngine): # 2. commit: apply every queued signal change, waking up any waiting processes converged = self._state.commit(changed) + def _step_tb(self): + changed = set() if self._vcd_writers else None + + # Run processes waiting for an interval to expire (mainly `add_clock_process()``) + self._step_rtl(changed) + + # Run testbenches waiting for an interval to expire, or for a signal to change state + converged = False + while not converged: + converged = True + # Schedule testbenches in a deterministic, predictable order by iterating a list + for testbench in self._testbenches: + if testbench.runnable: + testbench.runnable = False + while testbench.run(): + # Testbench has changed simulation state; run processes triggered by that + converged = False + self._step_rtl(changed) + for vcd_writer in self._vcd_writers: for change in changed: if isinstance(change, _PySignalState): @@ -452,9 +474,9 @@ class PySimEngine(BaseEngine): assert False # :nocov: def advance(self): - self._step() + self._step_tb() self._timeline.advance() - return any(not process.passive for process in self._processes) + return any(not process.passive for process in (*self._processes, *self._testbenches)) @property def now(self): diff --git a/tests/test_sim.py b/tests/test_sim.py index bafc820..e86d5de 100644 --- a/tests/test_sim.py +++ b/tests/test_sim.py @@ -1174,6 +1174,22 @@ class SimulatorIntegrationTestCase(FHDLTestCase): Coverage hit at .*test_sim\.py:\d+: Counter: 009 """).lstrip()) + def test_testbench_preemption(self): + sig = Signal(8) + def testbench_1(): + yield sig[0:4].eq(0b1010) + yield sig[4:8].eq(0b0101) + def testbench_2(): + yield Passive() + while True: + val = yield sig + assert val in (0, 0b01011010), f"{val=:#010b}" + yield Delay(0) + sim = Simulator(Module()) + sim.add_testbench(testbench_1) + sim.add_testbench(testbench_2) + sim.run() + class SimulatorRegressionTestCase(FHDLTestCase): def test_bug_325(self):