sim: document.

This commit includes additional non-documentation changes, related to
issues found while documenting it:
- `Simulator.run_until()` no longer accepts a `run_passive=` argument.
  Passive no longer exist and in any case defaulting to `False` does not
  make a lot of sense from an API perspective.
- `add_clock()`'s `phase=` argument, when specified, no longer has
  `period/2` added to it. This wasn't the documented behavior in first
  place and it makes no sense to do that.
- `add_clock()` raises a `NameError` if a clock domain does not exist,
  instead of `ValueError`.
- `add_clock()` raises a `DriverConflict` if a clock domain is already
  being driven by a clock, instead of `ValueError`.
- GTKWave is no longer a part of the installation instructions, and both
  Surfer and GTKWave are recommended (in this order).
This commit is contained in:
Catherine 2024-05-29 15:57:16 +00:00
parent 3c1060f7c7
commit 7870eb344b
16 changed files with 1181 additions and 224 deletions

View file

@ -1,8 +1,11 @@
from .core import *
from .core import Simulator
from ._async import DomainReset, BrokenTrigger, SimulatorContext, TickTrigger, TriggerCombination
from ._pycoro import Settle, Delay, Tick, Passive, Active
__all__ = [
"DomainReset", "BrokenTrigger", "Simulator",
"DomainReset", "BrokenTrigger",
"SimulatorContext", "Simulator", "TickTrigger", "TriggerCombination",
# deprecated
"Settle", "Delay", "Tick", "Passive", "Active",
]

View file

@ -1,3 +1,7 @@
# Public classes and methods do not have docstrings, but are documented in `docs/simulator.rst`.
# Annoyingly, this means that the `sphinxcontrib.napoleon` style docstrings cannot be used, and
# the Sphinx style docstrings must be used instead. I'm sorry.
import typing
import operator
from contextlib import contextmanager
@ -16,13 +20,12 @@ __all__ = [
class DomainReset(Exception):
"""Exception raised when the domain of a a tick trigger that is repeatedly awaited has its
reset asserted."""
"""Exception raised when a tick trigger is repeatedly awaited, and its domain has been reset."""
class BrokenTrigger(Exception):
"""Exception raised when a trigger that is repeatedly awaited in an `async for` loop has
a matching event occur while the body of the `async for` loop is executing."""
"""Exception raised when a trigger that is repeatedly awaited using an :py:`async for` loop has
a matching event occur while the body of the :py:`async for` loop is still executing."""
class SampleTrigger:
@ -66,37 +69,142 @@ class EdgeTrigger:
class DelayTrigger:
def __init__(self, interval):
# Note: even though it is likely to be a bad idea, ``await ctx.delay(0)`` is accepted.
# This is because, if disallowed, people are likely to do even worse things, such as
# `await ctx.delay(1e-15)` instead.
if interval < 0:
raise ValueError(f"Delay cannot be negative")
self.interval_fs = round(float(interval) * 1e15)
class TriggerCombination:
"""TriggerCombination(...)
A list of triggers, the activation of any of which will wake up the caller.
A :class:`TriggerCombination` is an immutable object that stores a list of triggers and
expressions to sample. The trigger combination wakes up the caller when any of these triggers
activate, and it samples all of the signals at the same moment.
The :meth:`SimulatorContext.delay`, :meth:`SimulatorContext.changed`, and
:meth:`SimulatorContext.edge` methods create a trigger combination that consists of just that
one trigger, while :meth:`TriggerCombination.delay`, :meth:`TriggerCombination.changed`, and
:meth:`TriggerCombination.edge` methods create a trigger combination based on another trigger
combination by extending it with an additional trigger. The :meth:`TriggerCombination.sample`
method creates a trigger combination based on another trigger combination that wakes up
the caller in the same conditions but additionally samples the specified expressions.
To wait for a trigger combination to be activated once (a *one-shot* wait), a process or
testbench calls :py:`await triggers`, usually on a newly created trigger combination: ::
async def testbench(ctx):
a_value, b_value = await ctx.changed(dut.a, dut.b)
To repeatedly wait for a trigger combination to be activated (a *multi-shot* wait), a process
or testbench :term:`asynchronously iterates <python:asynchronous iterable>` the trigger
combination, usually using the :py:`async for` loop: ::
async def testbench(ctx):
async a_value, b_value in ctx.changed(dut.a, dut.b):
...
Both one-shot and multi-shot waits return the same :class:`tuple` of return values, the elements
of which are determined by the triggers and sampled expressions that have been added to
the trigger combination, in the order in which they were added. For a detailed description of
the return values, refer to :meth:`SimulatorContext.delay`, :meth:`SimulatorContext.changed`,
:meth:`SimulatorContext.edge`, and :meth:`TriggerCombination.sample`.
Aside from the syntax, there are two differences between one-shot and multi-shot waits:
1. A multi-shot wait continues to observe the trigger combination while the process or testbench
responds to the event. If the trigger combination is activated again before the next
iteration of the asynchronous iterator (such as while the body of the :py:`async for` loop is
executing), the next iteration raises a :exc:`BrokenTrigger` exception to notify the caller
of the missed event.
2. A repeated one-shot wait may be less efficient than a multi-shot wait.
"""
def __init__(self, engine: BaseEngine, process: BaseProcess, *,
triggers: 'tuple[DelayTrigger|ChangedTrigger|SampleTrigger|EdgeTrigger, ...]' = ()):
self._engine = engine
self._process = process # private but used by engines
self._triggers = triggers # private but used by engines
def sample(self, *values) -> 'TriggerCombination':
def sample(self, *exprs) -> 'TriggerCombination':
"""Sample signals when a trigger from this combination is activated.
This method returns a new :class:`TriggerCombination` object. When awaited, this object
returns, in addition to the values that would be returned by :py:`await trigger`, the values
of :py:`exprs` at exactly the moment at which the wait has completed.
Combining :meth:`~SimulatorContext.delay`, :meth:`~SimulatorContext.changed`, or
:meth:`~SimulatorContext.edge` with :meth:`sample` can be used to capture the state of
a circuit at the moment of the event: ::
async for arst_edge, delay_expired, in_a_value, in_b_value in \\
ctx.posedge(arst).delay(1e-3).sample(in_a, in_b):
...
Chaining calls to :meth:`sample` has the same effect as calling it once with the combined
list of arguments. The code below has the same behavior as the code above: ::
async for arst_edge, delay_expired, in_a_value, in_b_value in \\
ctx.posedge(arst).delay(1e-3).sample(in_a).sample(in_b):
...
.. note::
Chaining calls to this method is useful for defining reusable building blocks. See
the documentation for :meth:`TickTrigger.sample` for a detailed example.
"""
return TriggerCombination(self._engine, self._process, triggers=self._triggers +
tuple(SampleTrigger(value) for value in values))
tuple(SampleTrigger(value) for value in exprs))
def delay(self, interval) -> 'TriggerCombination':
"""Add a delay trigger to the list of triggers.
This method returns a new :class:`TriggerCombination` object. When awaited, this object
also waits for the same trigger as :meth:`SimulatorContext.delay`, and returns,
in addition to the values that would be returned by :py:`await trigger`, the value
returned by :meth:`SimulatorContext.delay`.
"""
return TriggerCombination(self._engine, self._process, triggers=self._triggers +
(DelayTrigger(interval),))
def changed(self, *signals) -> 'TriggerCombination':
"""Add a signal change trigger to the list of triggers.
This method returns a new :class:`TriggerCombination` object. When awaited, this object
also waits for the same trigger as :meth:`SimulatorContext.changed`, and returns,
in addition to the values that would be returned by :py:`await trigger`, the values
returned by :meth:`SimulatorContext.changed`.
"""
return TriggerCombination(self._engine, self._process, triggers=self._triggers +
tuple(ChangedTrigger(signal) for signal in signals))
def edge(self, signal, polarity) -> 'TriggerCombination':
"""Add a low-to-high or high-to-low transition trigger to the list of triggers.
This method returns a new :class:`TriggerCombination` object. When awaited, this object
also waits for the same trigger as :meth:`SimulatorContext.edge`, and returns,
in addition to the values that would be returned by :py:`await trigger`, the values
returned by :meth:`SimulatorContext.edge`.
"""
return TriggerCombination(self._engine, self._process, triggers=self._triggers +
(EdgeTrigger(signal, polarity),))
def posedge(self, signal) -> 'TriggerCombination':
"""Add a low-to-high transition trigger to the list of triggers.
Equivalent to :meth:`edge(signal, 1) <edge>`.
"""
return self.edge(signal, 1)
def negedge(self, signal) -> 'TriggerCombination':
return self.edge(signal, 0)
"""Add a high-to-low transition trigger to the list of triggers.
def delay(self, interval) -> 'TriggerCombination':
return TriggerCombination(self._engine, self._process, triggers=self._triggers +
(DelayTrigger(interval),))
Equivalent to :meth:`edge(signal, 0) <edge>`.
"""
return self.edge(signal, 0)
def __await__(self):
trigger = self._engine.add_trigger_combination(self, oneshot=True)
@ -109,6 +217,58 @@ class TriggerCombination:
class TickTrigger:
"""TickTrigger(...)
A trigger that wakes up the caller when the active edge of a clock domain occurs or the domain
is asynchronously reset.
A :class:`TickTrigger` is an immutable object that stores a reference to a clock domain and
a list of expressions to sample.
The :meth:`SimulatorContext.tick` method creates a tick trigger with an empty list of sampled
expressions, and the :meth:`TickTrigger.sample` method creates a tick trigger based on another
tick trigger that additionally samples the specified expressions.
To wait for a tick trigger to be activated once (a *one-shot* wait), a process or testbench
calls :py:`await trigger`, usually on a newly created tick trigger: ::
async def testbench(ctx):
clk_hit, rst_active, a_value, b_value = await ctx.tick().sample(dut.a, dut.b)
To repeatedly wait for a tick trigger to be activated (a *multi-shot* wait), a process or
testbench :term:`asynchronously iterates <python:asynchronous iterable>` the tick trigger,
usually using the :py:`async for` loop: ::
async def testbench(ctx):
async for clk_hit, rst_active, a_value, b_value in ctx.tick().sample(dut.a, dut.b):
...
Both one-shot and multi-shot waits return the same :class:`tuple`
:py:`(clk_hit, rst_active, *values)` of return values:
1. :py:`clk_hit` is :py:`True` if there was an active clock edge at the moment the wait has
completed, and :py:`False` otherwise (that is, if the clock domain was asynchronously reset).
2. :py:`rst_active` is :py:`True` if the clock domain is reset (synchronously or asynchronously)
at the moment the wait has completed, :py:`False` otherwise.
3. All following return values correspond to the sampled expressions in the order in which they
were added.
Aside from the syntax, there are two differences between one-shot and multi-shot waits:
1. A multi-shot wait continues to observe the tick trigger while the process or testbench
responds to the event. If the tick trigger is activated again before the next iteration of
the asynchronous iterator (such as while the body of the :py:`async for` loop is executing),
the next iteration raises a :exc:`BrokenTrigger` exception to notify the caller of the missed
event.
2. A repeated one-shot wait may be less efficient than a multi-shot wait.
.. note::
The exact behavior of :py:`rst_active` differs depending on whether :py:`domain` uses
synchronous or asynchronous reset; in both cases it is :py:`True` if and only if
the domain reset has been asserted. Reusable processes and testbenches, as well as their
building blocks, should handle both cases.
"""
def __init__(self, engine: BaseEngine, process: BaseProcess, *,
domain: ClockDomain, sampled: 'tuple[ValueLike]' = ()):
self._engine = engine
@ -116,13 +276,81 @@ class TickTrigger:
self._domain = domain
self._sampled = sampled
def sample(self, *values: ValueLike) -> 'TickTrigger':
def sample(self, *exprs: ValueLike) -> 'TickTrigger':
"""Sample expressions when this trigger is activated.
This method returns a new :class:`TickTrigger` object. When awaited, this object returns,
in addition to the values that would be otherwise returned by :py:`await trigger`,
the values of :py:`exprs` (any :class:`~.hdl.ValueLike`) at exactly the moment at which
the active clock edge, or the asynchronous reset (if applicable), has occurred.
Combining :meth:`~SimulatorContext.tick` with :meth:`sample` can be used to capture
the state of a circuit after the active clock edge, but before propagation of signal values
that have been updated by that clock edge: ::
async for clk_edge, rst_active, in_a_value, in_b_value in \\
ctx.tick().sample(in_a, in_b):
...
Chaining calls to :meth:`sample` has the same effect as calling it once with the combined
list of arguments. The code below has the same behavior as the code above: ::
async for clk_edge, rst_active, in_a_value, in_b_value in \\
ctx.tick().sample(in_a).sample(in_b):
...
.. note::
Chaining calls to this method is useful for defining reusable building blocks.
The following (simplified for clarity) implementation of :meth:`until` takes advantage
of it by first appending :py:`condition` to the end of the list of captured expressions,
checking if it holds, and then removing it from the list of sampled values: ::
async def until(trigger, condition):
async for clk_edge, rst_active, *values, done in trigger.sample(condition):
if done:
return values
"""
return TickTrigger(self._engine, self._process,
domain=self._domain, sampled=(*self._sampled, *values))
domain=self._domain, sampled=(*self._sampled, *exprs))
async def until(self, condition: ValueLike):
"""Repeat this trigger until a condition is met.
This method awaits this trigger at least once, and returns a :class:`tuple` of the values
that are :meth:`sample`\\ d when :py:`condition` evaluates to a non-zero value. Values
sampled during previous repeats are discarded.
Awaiting a :py:`trigger` returns values indicating the state of the clock and reset signals,
while awaiting :py:`trigger.until(...)` does not:
.. code::
while True:
clk_edge, rst_active, *values, flag_value = await trigger.sample(flag) # never raises
if flag_value:
break
# `values` may be used after the loop finishes
.. code::
values = await trigger.until(flag) # may raise `DomainReset`
Raises
------
:exc:`TypeError`
If the shape of :py:`condition` is a :class:`ShapeCastable`.
:exc:`DomainReset`
If the clock domain has been synchronously or asynchronously reset during the wait.
"""
if not isinstance(condition, ValueLike):
raise TypeError(f"Condition must be a value-like object, not {condition!r}")
if isinstance(condition, ValueCastable):
shape = condition.shape()
if not isinstance(shape, Shape):
raise TypeError(f"The shape of a condition may only be `signed` or `unsigned`, "
f"not {shape!r}")
tick = self.sample(condition).__aiter__()
done = False
while not done:
@ -132,6 +360,32 @@ class TickTrigger:
return tuple(values)
async def repeat(self, count: int):
"""Repeat this trigger a specific number of times.
This method awaits this trigger at least once, and returns a :class:`tuple` of the values
that are :meth:`sample`\\ d during the last repeat. Values sampled during previous repeats
are discarded.
Awaiting a :py:`trigger` returns values indicating the state of the clock and reset signals,
while awaiting :py:`trigger.repeat(...)` does not:
.. code::
for _ in range(3):
clk_edge, rst_active, *values = await trigger # never raises
# `values` may be used after the loop finishes
.. code::
values = await trigger.repeat(3) # may raise `DomainReset`
Raises
------
:exc:`ValueError`
If :py:`count` is less than 1.
:exc:`DomainReset`
If the clock domain has been synchronously or asynchronously reset during the wait.
"""
count = operator.index(count)
if count <= 0:
raise ValueError(f"Repeat count must be a positive integer, not {count!r}")
@ -140,6 +394,7 @@ class TickTrigger:
clk, rst, *values = await tick.__anext__()
if rst:
raise DomainReset
assert clk
return tuple(values)
def _collect_trigger(self):
@ -170,25 +425,114 @@ class TickTrigger:
class SimulatorContext:
"""SimulatorContext(...)
Simulator context.
Simulator processes and testbenches are :py:`async` Python functions that interact with
the simulation using the only argument they receive: the *context*. Using a context, it is
possible to sample or update signals and wait for events to occur in the simulation.
The context has two kinds of methods: :py:`async` methods and non-:py:`async` methods. Calling
an :py:`async` method may cause the caller to be preempted (be paused such that the simulation
time can advance), while calling non-:py:`async` methods never causes that.
.. note::
While a testbench or process is executing without calling :py:`async` methods, no other
testbench or process will run, with one exception: if a testbench calls :meth:`set`, all
processes that wait (directly or indirectly) for the updated signals to change will execute
before the call returns.
"""
def __init__(self, design, engine: BaseEngine, process: BaseProcess):
self._design = design
self._engine = engine
self._process = process
def delay(self, interval) -> TriggerCombination:
return TriggerCombination(self._engine, self._process).delay(interval)
@typing.overload
def get(self, expr: Value) -> int: ... # :nocov:
def changed(self, *signals) -> TriggerCombination:
return TriggerCombination(self._engine, self._process).changed(*signals)
@typing.overload
def get(self, expr: ValueCastable) -> typing.Any: ... # :nocov:
def edge(self, signal, polarity) -> TriggerCombination:
return TriggerCombination(self._engine, self._process).edge(signal, polarity)
def get(self, expr):
"""Sample the value of an expression.
def posedge(self, signal) -> TriggerCombination:
return TriggerCombination(self._engine, self._process).posedge(signal)
The behavior of this method depends on the type of :py:`expr`:
def negedge(self, signal) -> TriggerCombination:
return TriggerCombination(self._engine, self._process).negedge(signal)
- If it is a :class:`~.hdl.ValueCastable` whose shape is a :class:`~.hdl.ShapeCastable`,
its numeric value is converted to a higher-level representation using
:meth:`~.hdl.ShapeCastable.from_bits` and then returned.
- If it is a :class:`~.hdl.Value` or a :class:`~.hdl.ValueCastable` whose shape is
a :class:`~.hdl.Shape`, the numeric value is returned as an :class:`int`.
This method is only available in testbenches.
Raises
------
:exc:`TypeError`
If the caller is a process.
"""
raise NotImplementedError
@typing.overload
def set(self, expr: Value, value: int) -> None: ... # :nocov:
@typing.overload
def set(self, expr: ValueCastable, value: typing.Any) -> None: ... # :nocov:
def set(self, expr, value):
"""Update the value of an expression.
The behavior of this method depends on the type of :py:`expr`:
- If it is a :class:`~.hdl.ValueCastable` whose shape is a :class:`~.hdl.ShapeCastable`,
:py:`value` is converted to a numeric representation using
:meth:`~.hdl.ShapeCastable.const` and then assigned.
- If it is a :class:`~.hdl.Value` or a :class:`~.hdl.ValueCastable` whose shape is
a :class:`~.hdl.Shape`, :py:`value` is assigned as-is.
This method is available in both processes and testbenches.
When used in a testbench, this method runs all processes that wait (directly or
indirectly) for the signals in :py:`expr` to change, and returns only after the change
propagates through the simulated circuits.
"""
raise NotImplementedError
@contextmanager
def critical(self):
"""Context manager that temporarily makes the caller critical.
Testbenches and processes may be *background* or *critical*, where critical ones prevent
:meth:`Simulator.run` from finishing. Processes are always created background, while
testbenches are created critical by default, but may also be created background.
This context manager makes the caller critical for the span of the :py:`with` statement.
This may be useful in cases where an operation (for example, a bus transaction) takes
multiple clock cycles to complete, and must be completed after starting, but the testbench
or process performing it never finishes, always waiting for the next operation to arrive.
In this case, the caller would elevate itself to become critical only for the duration of
the operation itself using this context manager, for example: ::
async def testbench_bus_transaction(ctx):
# On every cycle, check whether the bus has an active transaction...
async for clk_edge, rst_active, bus_active_value in ctx.tick().sample(bus.active):
if bus_active_value: # ... if it does...
with ctx.critical(): # ... make this testbench critical...
addr_value = ctx.get(bus.r_addr)
ctx.set(bus.r_data, ...) # ... perform the access...
await ctx.tick()
ctx.set(bus.done, 1)
await ctx.tick()
ctx.set(bus.done, 0) # ... and complete the transaction later.
# The `run()` method could return at this point, but not before.
"""
try:
old_critical, self._process.critical = self._process.critical, True
yield
finally:
self._process.critical = old_critical
@typing.overload
def tick(self, domain: str, *, context: Elaboratable = None) -> TickTrigger: ... # :nocov:
@ -197,6 +541,33 @@ class SimulatorContext:
def tick(self, domain: ClockDomain) -> TickTrigger: ... # :nocov:
def tick(self, domain="sync", *, context=None):
"""Wait until an active clock edge or an asynchronous reset occurs.
This method returns a :class:`TickTrigger` object that, when awaited, pauses the execution
of the calling process or testbench until the active edge of the clock, or the asynchronous
reset (if applicable), occurs. The returned object may be used to repeatedly wait for one
of these events until a condition is satisfied or a specific number of times. See
the :ref:`tick trigger reference <sim-tick-trigger>` for more details.
The :py:`domain` may be either a :class:`ClockDomain` or a :class:`str`. If it is
a :class:`str`, a clock domain with this name is looked up in
the :ref:`elaboratable <lang-elaboration>` :py:`context`, or in :py:`toplevel` if
:py:`context` is not provided.
Raises
------
:exc:`ValueError`
If :py:`domain` is :py:`"comb"`.
:exc:`ValueError`
If :py:`domain` is a :class:`~.hdl.ClockDomain` and :py:`context` is provided and not
:py:`None`.
:exc:`ValueError`
If :py:`context` is an elaboratable that is not a direct or indirect submodule of
:py:`toplevel`.
:exc:`NameError`
If :py:`domain` is a :class:`str`, but there is no clock domain with this name in
:py:`context` or :py:`toplevel`.
"""
if domain == "comb":
raise ValueError("Combinational domain does not have a clock")
if isinstance(domain, ClockDomain):
@ -204,16 +575,111 @@ class SimulatorContext:
raise ValueError("Context cannot be provided if a clock domain is specified "
"directly")
else:
domain = self._design.lookup_domain(domain, context)
try:
domain = self._design.lookup_domain(domain, context)
except KeyError:
raise NameError(f"Clock domain named {domain!r} does not exist")
return TickTrigger(self._engine, self._process, domain=domain)
@contextmanager
def critical(self):
try:
old_critical, self._process.critical = self._process.critical, True
yield
finally:
self._process.critical = old_critical
def delay(self, interval) -> TriggerCombination:
"""Wait until a time interval has elapsed.
This method returns a :class:`TriggerCombination` object that, when awaited, pauses
the execution of the calling process or testbench by :py:`interval` seconds. The returned
object may be used to wait for multiple events.
The value captured by this trigger is :py:`True` if the delay has expired when the wait has
completed, and :py:`False` otherwise.
The :py:`interval` may be zero, in which case the caller will be scheduled for execution
immediately after all of the processes and testbenches scheduled for the current time step
finish executing. In other words, if a call to :meth:`Simulator.advance` schedules a process
or testbench and it performs :py:`await ctx.delay(0)`, this process or testbench will
continue execution only during the next call to :meth:`Simulator.advance`.
.. note::
Although the behavior of :py:`await ctx.delay(0)` is well-defined, it may make waveforms
difficult to understand and simulations hard to reason about.
Raises
------
:exc:`ValueError`
If :py:`delay` is negative.
"""
return TriggerCombination(self._engine, self._process).delay(interval)
def changed(self, *signals) -> TriggerCombination:
"""Asynchronously wait until one of the signals change.
This method returns a :class:`TriggerCombination` object that, when awaited, pauses
the execution of the calling process or testbench until any of the :py:`signals` change.
The returned object may be used to wait for multiple events.
The values captured by this trigger are the values of :py:`signals` at the time the wait
has completed.
.. warning::
The simulation may produce *glitches*: transient changes to signals (e.g. from 0 to 1
and back to 0) during combinational propagation that are invisible in testbenches or
waveform captures. Glitches will wake up **both processes and testbenches** that use
this method to wait for a signal to change, and both processes and testbenches must be
prepared to handle such spurious wakeups. The presence, count, and sequence in which
glitches occur may also vary between simulation runs.
Testbenches that wait for a signal to change using an :py:`await` statement might only
observe the final value of the signal, and testbenches that wait for changes using
an :py:`async for` loop will crash with a :exc:`BrokenTrigger` exception if they
encounter a glitch.
Processes will observe all of the transient values of the signal.
"""
return TriggerCombination(self._engine, self._process).changed(*signals)
def edge(self, signal, polarity) -> TriggerCombination:
"""Asynchronously wait until a low-to-high or high-to-low transition of a signal occurs.
This method returns a :class:`TriggerCombination` object that, when awaited, pauses
the execution of the calling process or testbench until the value of :py:`signal`
(a single-bit signal or a single-bit slice of a signal) changes from :py:`not polarity`
to :py:`polarity`. The returned object may be used to wait for multiple events.
The value captured by this trigger is :py:`True` if the relevant transition has occurred
at the time the wait has completed, and :py:`False` otherwise.
.. warning::
In most cases, this method should not be used to wait for a status signal to be asserted
or deasserted in a testbench because it is likely to introduce a race condition.
Whenever a suitable clock domain is available, use
:py:`await ctx.tick().until(signal == polarity)` instead.
Raises
------
:exc:`TypeError`
If :py:`signal` is neither a single-bit :class:`Signal` nor a single-bit slice of
a :class:`Signal`.
:exc:`TypeError`
If the shape of :py:`signal` is a :class:`ShapeCastable`.
:exc:`ValueError`
If :py:`polarity` is neither 0 nor 1.
"""
return TriggerCombination(self._engine, self._process).edge(signal, polarity)
def posedge(self, signal) -> TriggerCombination:
"""Asynchronously wait until a signal is asserted.
Equivalent to :meth:`edge(signal, 1) <edge>`.
"""
return TriggerCombination(self._engine, self._process).posedge(signal)
def negedge(self, signal) -> TriggerCombination:
"""Asynchronously wait until a signal is deasserted.
Equivalent to :meth:`edge(signal, 0) <edge>`.
"""
return TriggerCombination(self._engine, self._process).negedge(signal)
class ProcessContext(SimulatorContext):

View file

@ -14,8 +14,8 @@ class Command:
class Settle(Command):
@deprecated("The `Settle` command is deprecated per RFC 27. Use `add_testbench` to write "
"testbenches; in them, an equivalent of `yield Settle()` is performed "
"automatically.")
"testbenches; there, an equivalent of `yield Settle()` is performed "
"automatically after each `ctx.set()`.")
def __init__(self):
pass
@ -37,8 +37,7 @@ class Delay(Command):
class Tick(Command):
def __init__(self, domain="sync"):
if not isinstance(domain, (str, ClockDomain)):
raise TypeError("Domain must be a string or a ClockDomain instance, not {!r}"
.format(domain))
raise TypeError(f"Domain must be a string or a ClockDomain instance, not {domain!r}")
assert domain != "comb"
self.domain = domain

View file

@ -2,10 +2,8 @@ import inspect
import warnings
from .._utils import deprecated
from ..hdl._cd import *
from ..hdl._ir import *
from ..hdl._ast import Value, ValueLike
from ..hdl._mem import MemoryData
from ..hdl import Value, ValueLike, MemoryData, ClockDomain, Fragment
from ..hdl._ir import DriverConflict
from ._base import BaseEngine
from ._async import DomainReset, BrokenTrigger
from ._pycoro import Tick, Settle, Delay, Passive, Active, coro_wrapper
@ -19,51 +17,225 @@ __all__ = [
]
def _seconds_to_femtos(delay: float):
return int(delay * 1e15) # seconds to femtoseconds
class Simulator:
def __init__(self, fragment, *, engine="pysim"):
# Simulator engines aren't yet a part of the public API.
"""Simulator(toplevel)
Simulator for Amaranth designs.
The simulator accepts a *top-level design* (an :ref:`elaboratable <lang-elaboration>`),
*processes* that replace circuits with behavioral code, *clocks* that drive clock domains, and
*testbenches* that exercise the circuits and verify that they work correctly.
The simulator lifecycle consists of four stages:
1. The simulator is created: ::
sim = Simulator(design)
2. Processes, clocks, and testbenches are added as necessary: ::
sim.add_clock(1e-6)
sim.add_clock(1e-7, domain="fast")
sim.add_process(process_instr_decoder)
sim.add_testbench(testbench_cpu_execute)
3. The simulation is run: ::
with sim.write_vcd("waveform.vcd"):
sim.run()
4. (Optional) The simulator is reset: ::
sim.reset()
After the simulator is reset, it may be reused to run the simulation again.
.. note::
Resetting the simulator can also be used to amortize the startup cost while validating
a large design with many short test. In this case, instead of adding new testbenches,
the behavior of the already added testbenches would be modified for each of the tests.
It can also be used to capture waveforms only for simulations that encounter an error.
Arguments
---------
toplevel : :class:`~amaranth.hdl.Elaboratable`
Simulated design.
"""
def __init__(self, toplevel, *, engine="pysim"):
if isinstance(engine, type) and issubclass(engine, BaseEngine):
pass
elif engine == "pysim":
from .pysim import PySimEngine
engine = PySimEngine
else:
raise TypeError("Value '{!r}' is not a simulation engine class or "
"a simulation engine name"
.format(engine))
raise TypeError(
f"Value {engine!r} is not a simulation engine class or a simulation engine name")
self._design = Fragment.get(fragment, platform=None).prepare()
self._engine = engine(self._design)
self._clocked = set()
self._design = Fragment.get(toplevel, platform=None).prepare()
self._engine = engine(self._design)
self._clocked = set()
def _check_function(self, function, *, kind):
def add_clock(self, period, *, phase=None, domain="sync", if_exists=False):
"""Add a clock to the simulation.
Adds a stimulus that toggles the clock signal of :py:`domain` at a 50% duty cycle.
The driven clock signal will toggle every half-:py:`period` seconds starting at :py:`phase`
seconds after the beginning of the simulation; if not specified, :py:`phase` defaults to
half-:py:`period` to avoid coinciding the first active edge with the beginning of
the simulation.
The clock domain to drive is selected by the :py:`domain` argument, which may be
a :class:`~amaranth.hdl.ClockDomain` object or a :class:`str`. If it is a string,
the clock domain with that name is retrieved from the :py:`toplevel` elaboratable.
Raises
------
:exc:`NameError`
If :py:`domain` is a :class:`str`, the :py:`toplevel` elaboratable does not have
a clock domain with that name, and :py:`if_exists` is :py:`False`.
:exc:`~amaranth.hdl.DriverConflict`
If :py:`domain` already has a clock driving it.
"""
if isinstance(domain, ClockDomain):
if (domain.name in self._design.fragment.domains and
domain is not self._design.fragment.domains[domain.name]):
warnings.warn(
f"Adding a clock process that drives a clock domain object "
f"named {domain.name!r}, which is distinct from an identically named domain "
f"in the simulated design",
UserWarning, stacklevel=2)
elif domain in self._design.fragment.domains:
domain = self._design.fragment.domains[domain]
elif if_exists:
return
else:
raise NameError(f"Domain {domain!r} is not present in simulation")
if domain in self._clocked:
raise DriverConflict(f"Domain {domain.name!r} already has a clock driving it")
period_fs = _seconds_to_femtos(period)
if phase is None:
phase_fs = _seconds_to_femtos(period / 2)
else:
phase_fs = _seconds_to_femtos(phase)
self._engine.add_clock_process(domain.clk, phase=phase_fs, period=period_fs)
self._clocked.add(domain)
@staticmethod
def _check_function(function, *, kind):
if not (inspect.isgeneratorfunction(function) or inspect.iscoroutinefunction(function)):
raise TypeError(
f"Cannot add a {kind} {function!r} because it is not an async function or "
f"generator function")
return function
def add_testbench(self, constructor, *, background=False):
"""Add a testbench to the simulation.
Adds a testbench that runs concurrently with the :py:`toplevel` elaboratable and is able to
manipulate its inputs, outputs, and state.
The behavior of the testbench is defined by its *constructor function*, which is
an :py:`async` function that takes a single argument, the :class:`SimulatorContext`: ::
async def testbench(ctx):
...
await ctx.tick()
...
sim.add_testbench(testbench)
This method does not accept coroutines. Rather, the provided :py:`constructor` coroutine
function is called immediately when the testbench is added to create a coroutine, as well as
by the :meth:`reset` method.
The testbench can be *critical* (the default) or *background* (if the :py:`background=True`
argument is specified). The :meth:`run` method will continue advancing the simulation while
any critical testbenches or processes are running, and will exit when only background
testbenches or processes remain. A background testbench can temporarily become critical
using the :meth:`~SimulatorContext.critical` context manager.
At each point in time, all of the non-waiting testbenches are executed in the order in
which they were added. If two testbenches share state, or must manipulate the design in
a coordinated way, they may rely on this execution order for correctness.
"""
constructor = self._check_function(constructor, kind="testbench")
if inspect.iscoroutinefunction(constructor):
self._engine.add_async_testbench(self, constructor, background=background)
else:
# TODO(amaranth-0.6): remove
warnings.warn(
f"Generator-based testbenches are deprecated per RFC 36. Use async "
f"testbenches instead.",
DeprecationWarning, stacklevel=1)
constructor = coro_wrapper(constructor, testbench=True)
self._engine.add_async_testbench(self, constructor, background=background)
def add_process(self, process):
"""Add a process to the simulation.
Adds a process that is evaluated as a part of the :py:`toplevel` elaboratable and is able to
replace circuits with Python code.
The behavior of the process is defined by its *constructor function*, which is
an :py:`async` function that takes a single argument, the :class:`SimulatorContext`: ::
async def process(ctx):
async for clk_edge, rst, ... in ctx.tick().sample(...):
...
sim.add_process(process)
This method does not accept coroutines. Rather, the provided :py:`constructor` coroutine
function is called immediately when the procss is added to create a coroutine, as well as
by the :meth:`reset` method.
Processes can be *critical* or *background*, and are always background when added.
The :meth:`run` method will continue advancing the simulation while any critical testbenches
or processes are running, and will exit when only background testbenches or processes
remain. A background process can temporarily become critical using
the :meth:`~SimulatorContext.critical` context manager.
At each point in time, all of the non-waiting processes are executed in an arbitrary order
that may be different between individual simulation runs.
.. warning::
If two processes share state, they must do so in a way that does not rely on
a particular order of execution for correctness.
Preferably, the shared state would be stored in :class:`~amaranth.hdl.Signal`\\ s (even
if it is not intended to be a part of a circuit), with access to it synchronized using
:py:`await ctx.tick().sample(...)`. Such state is visible in a waveform viewer,
simplifying debugging.
"""
process = self._check_function(process, kind="process")
if inspect.iscoroutinefunction(process):
self._engine.add_async_process(self, process)
else:
def wrapper():
# Only start a bench process after comb settling, so that the initial values are correct.
# Only start a process after comb settling, so that the initial values are correct.
yield Active()
yield object.__new__(Settle)
yield from process()
# TODO(amaranth-0.6): remove
warnings.warn(
f"Generator-based processes are deprecated per RFC 36. Use async "
f"processes instead.",
DeprecationWarning, stacklevel=1)
wrap_process = coro_wrapper(wrapper, testbench=False)
self._engine.add_async_process(self, wrap_process)
def add_testbench(self, testbench, *, background=False):
testbench = self._check_function(testbench, kind="testbench")
if inspect.iscoroutinefunction(testbench):
self._engine.add_async_testbench(self, testbench, background=background)
else:
testbench = coro_wrapper(testbench, testbench=True)
self._engine.add_async_testbench(self, testbench, background=background)
@deprecated("The `add_sync_process` method is deprecated per RFC 27. Use `add_process` or `add_testbench` instead.")
@deprecated("The `add_sync_process` method is deprecated per RFC 27. Use `add_process` or "
"`add_testbench` instead.")
def add_sync_process(self, process, *, domain="sync"):
process = self._check_function(process, kind="process")
def wrapper():
@ -91,125 +263,97 @@ class Simulator:
wrap_process = coro_wrapper(wrapper, testbench=False, default_cmd=Tick(domain))
self._engine.add_async_process(self, wrap_process)
def add_clock(self, period, *, phase=None, domain="sync", if_exists=False):
"""Add a clock process.
Adds a process that drives the clock signal of ``domain`` at a 50% duty cycle.
Arguments
---------
period : float
Clock period. The process will toggle the ``domain`` clock signal every ``period / 2``
seconds.
phase : None or float
Clock phase. The process will wait ``phase`` seconds before the first clock transition.
If not specified, defaults to ``period / 2``.
domain : str or ClockDomain
Driven clock domain. If specified as a string, the domain with that name is looked up
in the root fragment of the simulation.
if_exists : bool
If ``False`` (the default), raise an error if the driven domain is specified as
a string and the root fragment does not have such a domain. If ``True``, do nothing
in this case.
"""
if isinstance(domain, ClockDomain):
if (domain.name in self._design.fragment.domains and
domain is not self._design.fragment.domains[domain.name]):
warnings.warn("Adding a clock process that drives a clock domain object "
"named {!r}, which is distinct from an identically named domain "
"in the simulated design"
.format(domain.name),
UserWarning, stacklevel=2)
elif domain in self._design.fragment.domains:
domain = self._design.fragment.domains[domain]
elif if_exists:
return
else:
raise ValueError("Domain {!r} is not present in simulation"
.format(domain))
if domain in self._clocked:
raise ValueError("Domain {!r} already has a clock driving it"
.format(domain.name))
# We represent times internally in 1 fs units, but users supply float quantities of seconds
period = int(period * 1e15)
if phase is None:
# By default, delay the first edge by half period. This causes any synchronous activity
# to happen at a non-zero time, distinguishing it from the initial values in the waveform
# viewer.
phase = period // 2
else:
phase = int(phase * 1e15) + period // 2
self._engine.add_clock_process(domain.clk, phase=phase, period=period)
self._clocked.add(domain)
def reset(self):
"""Reset the simulation.
Assign the initial value to every signal and memory in the simulation, and restart every user process.
"""
self._engine.reset()
def advance(self):
"""Advance the simulation.
Run every process and commit changes until a fixed point is reached, then advance time
to the closest deadline (if any). If there is an unstable combinational loop,
this function will never return.
Returns ``True`` if there are any active processes, ``False`` otherwise.
"""
return self._engine.advance()
def run(self):
"""Run the simulation while any processes are active.
"""Run the simulation indefinitely.
Processes added with :meth:`add_process` and :meth:`add_sync_process` are initially active,
and may change their status using the ``yield Passive()`` and ``yield Active()`` commands.
Processes compiled from HDL and added with :meth:`add_clock` are always passive.
This method advances the simulation while any critical testbenches or processes continue
executing. It is equivalent to::
while self.advance():
pass
"""
while self.advance():
pass
def run_until(self, deadline, *, run_passive=False):
"""Run the simulation until it advances to ``deadline``.
def run_until(self, deadline, *, run_passive=None):
"""run_until(deadline)
If ``run_passive`` is ``False``, the simulation also stops when there are no active
processes, similar to :meth:`run`. Otherwise, the simulation will stop only after it
advances to or past ``deadline``.
Run the simulation until a specific point in time.
If the simulation stops advancing, this function will never return.
This method advances the simulation until the simulation time reaches :py:`deadline`,
without regard for whether there are critical testbenches or processes executing.
..
This should show the code like in :meth:`run` once the code is not horrible.
"""
# Convert deadline in seconds into internal 1 fs units
deadline = deadline * 1e15
assert self._engine.now <= deadline
while (self.advance() or run_passive) and self._engine.now < deadline:
pass
if run_passive is not None:
# TODO(amaranth-0.6): remove
warnings.warn(
f"The `run_passive` argument of `run_until()` has been removed as a part of "
f"transition to RFC 36.",
DeprecationWarning, stacklevel=1)
deadline_fs = _seconds_to_femtos(deadline)
assert self._engine.now <= deadline_fs
while self._engine.now < deadline_fs:
self.advance()
def advance(self):
"""Advance the simulation.
This method advances the simulation by one time step. After this method completes, all of
the events scheduled for the current point in time will have taken effect, and the current
point in time was advanced to the closest point in the future for which any events are
scheduled (which may be the same point in time).
The non-waiting testbenches are executed in the order they were added, and the processes
are executed as necessary.
Returns :py:`True` if the simulation contains any critical testbenches or processes, and
:py:`False` otherwise.
"""
return self._engine.advance()
def write_vcd(self, vcd_file, gtkw_file=None, *, traces=(), fs_per_delta=0):
"""Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file.
# `fs_per_delta`` is not currently documented; it is not clear if we want to expose
# the concept of "delta cycles" in the surface API. Something like `fs_per_step` might be
# more appropriate.
"""write_vcd(vcd_file, gtkw_file=None, *, traces=())
This method returns a context manager. It can be used as: ::
Capture waveforms to a file.
sim = Simulator(frag)
sim.add_clock(1e-6)
with sim.write_vcd("dump.vcd", "dump.gtkw"):
sim.run_until(1e-3)
This context manager captures waveforms for each signal and memory that is referenced from
:py:`toplevel`, as well as any additional signals or memories specified in :py:`traces`,
and saves them to :py:`vcd_file`. If :py:`gtkw_file` is provided, it is populated with
a GTKWave save file displaying :py:`traces` when opened.
Arguments
---------
vcd_file : str or file-like object
Verilog Value Change Dump file or filename.
gtkw_file : str or file-like object
GTKWave save file or filename.
traces : iterable of Signal
Signals to display traces for.
Use this context manager to wrap a call to :meth:`run` or :meth:`run_until`: ::
with sim.write_vcd("simulation.vcd"):
sim.run()
The :py:`vcd_file` and :py:`gtkw_file` arguments accept either a :term:`python:file object`
or a filename. If a file object is provided, it is closed when exiting the context manager
(once the simulation completes or encounters an error).
The :py:`traces` argument accepts a *trace specification*, which can be one of:
* A :class:`~amaranth.hdl.ValueLike` object, such as a :class:`~amaranth.hdl.Signal`;
* A :class:`~amaranth.hdl.MemoryData` object or an individual row retrieved from one;
* A :class:`tuple` or :class:`list` containing trace specifications;
* A :class:`dict` associating :class:`str` names to trace specifications;
* An :ref:`interface object <wiring>`.
Raises
------
:exc:`TypeError`
If a trace specification refers to a signal with a private name.
"""
if self._engine.now != 0:
for file in (vcd_file, gtkw_file):
if hasattr(file, "close"):
file.close()
# FIXME: can this restriction be lifted?
raise ValueError("Cannot start writing waveforms after advancing simulation time")
def traverse_traces(traces):
@ -222,7 +366,8 @@ class Simulator:
if trace_signal is traces:
raise TypeError("Cannot trace signal with private name")
else:
raise TypeError(f"Cannot trace signal with private name (within {traces!r})")
raise TypeError(
f"Cannot trace signal with private name (within {traces!r})")
elif isinstance(traces, (list, tuple)):
for trace in traces:
traverse_traces(trace)
@ -232,5 +377,16 @@ class Simulator:
traverse_traces(traces)
return self._engine.write_vcd(vcd_file=vcd_file, gtkw_file=gtkw_file,
traces=traces, fs_per_delta=fs_per_delta)
return self._engine.write_vcd(
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces, fs_per_delta=fs_per_delta)
def reset(self):
"""Reset the simulation.
This method reverts the simulation to its initial state:
* The value of each signal is changed to its initial value;
* The contents of each memory is changed to its initial contents;
* Each clock, testbench, and process is restarted.
"""
self._engine.reset()

View file

@ -142,7 +142,8 @@ class _VCDWriter:
suffix = f"[{var_size - 1}:0]"
else:
suffix = ""
self.gtkw_signal_names[signal].append(".".join((*var_scope, field_name)) + suffix)
self.gtkw_signal_names[signal].append(
".".join((*var_scope, field_name)) + suffix)
else:
self.vcd_writer.register_alias(
scope=var_scope, name=field_name,

View file

@ -47,29 +47,29 @@ from amaranth.sim import Simulator
dut = UpCounter(25)
def bench():
async def bench(ctx):
# Disabled counter should not overflow.
yield dut.en.eq(0)
ctx.set(dut.en, 0)
for _ in range(30):
yield
assert not (yield dut.ovf)
await ctx.tick()
assert not ctx.get(dut.ovf)
# Once enabled, the counter should overflow in 25 cycles.
yield dut.en.eq(1)
for _ in range(25):
yield
assert not (yield dut.ovf)
yield
assert (yield dut.ovf)
ctx.set(dut.en, 1)
for _ in range(24):
await ctx.tick()
assert not ctx.get(dut.ovf)
await ctx.tick()
assert ctx.get(dut.ovf)
# The overflow should clear in one cycle.
yield
assert not (yield dut.ovf)
await ctx.tick()
assert not ctx.get(dut.ovf)
sim = Simulator(dut)
sim.add_clock(1e-6) # 1 MHz
sim.add_sync_process(bench)
sim.add_testbench(bench)
with sim.write_vcd("up_counter.vcd"):
sim.run()
# --- CONVERT ---

Binary file not shown.

Before

Width:  |  Height:  |  Size: 33 KiB

View file

@ -29,20 +29,23 @@ Migrating from version 0.4
Apply the following changes to code written against Amaranth 0.4 to migrate it to version 0.5:
* Replace uses of ``m.Case()`` with no patterns with ``m.Default()``
* Replace uses of ``Value.matches()`` with no patterns with ``Const(1)``
* Update uses of ``amaranth.utils.log2_int(need_pow2=False)`` to :func:`amaranth.utils.ceil_log2`
* Update uses of ``amaranth.utils.log2_int(need_pow2=True)`` to :func:`amaranth.utils.exact_log2`
* Update uses of ``reset=`` keyword argument to ``init=``
* Convert uses of ``Simulator.add_sync_process`` used as testbenches to ``Simulator.add_testbench``
* Convert other uses of ``Simulator.add_sync_process`` to ``Simulator.add_process``
* Replace uses of ``amaranth.hdl.Memory`` with ``amaranth.lib.memory.Memory``
* Replace imports of ``amaranth.asserts.{Assert, Assume, Cover}`` with imports from ``amaranth.hdl``
* Remove any usage of ``name=`` with assertions, possibly replacing them with custom messages
* Ensure all elaboratables are subclasses of :class:`Elaboratable`
* Replace uses of :py:`m.Case()` with no patterns with :py:`m.Default()`.
* Replace uses of :py:`Value.matches()` with no patterns with :py:`Const(1)`.
* Update uses of :py:`amaranth.utils.log2_int(need_pow2=False)` to :func:`amaranth.utils.ceil_log2`.
* Update uses of :py:`amaranth.utils.log2_int(need_pow2=True)` to :func:`amaranth.utils.exact_log2`.
* Update uses of :py:`reset=` keyword argument to :py:`init=`.
* Convert uses of :py:`Simulator.add_sync_process` used as testbenches to :meth:`Simulator.add_testbench <amaranth.sim.Simulator.add_testbench>`.
* Convert other uses of :py:`Simulator.add_sync_process` to :meth:`Simulator.add_process <amaranth.sim.Simulator.add_process>`.
* Convert simulator processes and testbenches to use the new async API.
* Replace uses of :py:`amaranth.hdl.Memory` with :class:`amaranth.lib.memory.Memory`.
* Replace imports of :py:`amaranth.asserts.Assert`, :py:`Assume`, and :py:`Cover` with imports from :py:`amaranth.hdl`.
* Remove uses of :py:`name=` keyword argument of :py:`Assert`, :py:`Assume`, and :py:`Cover`; a message can be used instead.
* Ensure all elaboratables are subclasses of :class:`Elaboratable`.
* Ensure clock domains aren't used outside the module that defines them, or its submodules; move clock domain definitions upwards in the hierarchy as necessary
* Remove uses of ``amaranth.lib.coding.*`` by inlining or copying the implementation of the modules
* Update uses of ``platform.request`` to pass ``dir="-"`` and use :mod:`amaranth.lib.io` buffers
* Remove uses of :py:`amaranth.lib.coding.*` by inlining or copying the implementation of the modules.
* Update uses of :py:`platform.request` to pass :py:`dir="-"` and use :mod:`amaranth.lib.io` buffers.
* Update uses of :meth:`Simulator.add_clock <amaranth.sim.Simulator.add_clock>` with explicit :py:`phase` to take into account simulator no longer adding implicit :py:`period / 2`. (Previously, :meth:`Simulator.add_clock <amaranth.sim.Simulator.add_clock>` was documented to first toggle the clock at the time :py:`phase`, but actually first toggled the clock at :py:`period / 2 + phase`.)
* Update uses of :meth:`Simulator.run_until <amaranth.sim.Simulator.run_until>` to remove the :py:`run_passive=True` argument. If the code uses :py:`run_passive=False`, ensure it still works with the new behavior.
Implemented RFCs
@ -51,6 +54,7 @@ Implemented RFCs
.. _RFC 17: https://amaranth-lang.org/rfcs/0017-remove-log2-int.html
.. _RFC 27: https://amaranth-lang.org/rfcs/0027-simulator-testbenches.html
.. _RFC 30: https://amaranth-lang.org/rfcs/0030-component-metadata.html
.. _RFC 36: https://amaranth-lang.org/rfcs/0036-async-testbench-functions.html
.. _RFC 39: https://amaranth-lang.org/rfcs/0039-empty-case.html
.. _RFC 43: https://amaranth-lang.org/rfcs/0043-rename-reset-to-init.html
.. _RFC 45: https://amaranth-lang.org/rfcs/0045-lib-memory.html
@ -68,6 +72,7 @@ Implemented RFCs
* `RFC 17`_: Remove ``log2_int``
* `RFC 27`_: Testbench processes for the simulator
* `RFC 30`_: Component metadata
* `RFC 36`_: Async testbench functions
* `RFC 39`_: Change semantics of no-argument ``m.Case()``
* `RFC 43`_: Rename ``reset=`` to ``init=``
* `RFC 45`_: Move ``hdl.Memory`` to ``lib.Memory``
@ -94,12 +99,12 @@ Language changes
* Added: :meth:`ShapeCastable.from_bits` method. (`RFC 51`_)
* Added: IO values, :class:`IOPort` objects, :class:`IOBufferInstance` objects. (`RFC 53`_)
* Added: :class:`MemoryData` objects. (`RFC 62`_)
* Changed: ``m.Case()`` with no patterns is never active instead of always active. (`RFC 39`_)
* Changed: ``Value.matches()`` with no patterns is ``Const(0)`` instead of ``Const(1)``. (`RFC 39`_)
* Changed: ``Signal(range(stop), init=stop)`` warning has been changed into a hard error and made to trigger on any out-of range value.
* Changed: ``Signal(range(0))`` is now valid without a warning.
* Changed: ``Shape.cast(range(1))`` is now ``unsigned(0)``. (`RFC 46`_)
* Changed: the ``reset=`` argument of :class:`Signal`, :meth:`Signal.like`, :class:`amaranth.lib.wiring.Member`, :class:`amaranth.lib.cdc.FFSynchronizer`, and ``m.FSM()`` has been renamed to ``init=``. (`RFC 43`_)
* Changed: :py:`m.Case()` with no patterns is never active instead of always active. (`RFC 39`_)
* Changed: :py:`Value.matches()` with no patterns is :py:`Const(0)` instead of :py:`Const(1)`. (`RFC 39`_)
* Changed: :py:`Signal(range(stop), init=stop)` warning has been changed into a hard error and made to trigger on any out-of range value.
* Changed: :py:`Signal(range(0))` is now valid without a warning.
* Changed: :py:`Shape.cast(range(1))` is now :py:`unsigned(0)`. (`RFC 46`_)
* Changed: the :py:`reset=` argument of :class:`Signal`, :meth:`Signal.like`, :class:`amaranth.lib.wiring.Member`, :class:`amaranth.lib.cdc.FFSynchronizer`, and :py:`m.FSM()` has been renamed to :py:`init=`. (`RFC 43`_)
* Changed: :class:`Shape` has been made immutable and hashable.
* Changed: :class:`Assert`, :class:`Assume`, :class:`Cover` have been moved to :mod:`amaranth.hdl` from :mod:`amaranth.asserts`. (`RFC 50`_)
* Changed: :class:`Instance` IO ports now accept only IO values, not plain values. (`RFC 53`_)
@ -127,17 +132,21 @@ Standard library changes
* Added: :mod:`amaranth.lib.meta`, :class:`amaranth.lib.wiring.ComponentMetadata`. (`RFC 30`_)
* Deprecated: :mod:`amaranth.lib.coding`. (`RFC 63`_)
* Removed: (deprecated in 0.4) :mod:`amaranth.lib.scheduler`. (`RFC 19`_)
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.FIFOInterface` with ``fwft=False``. (`RFC 20`_)
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.SyncFIFO` with ``fwft=False``. (`RFC 20`_)
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.FIFOInterface` with :py:`fwft=False`. (`RFC 20`_)
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.SyncFIFO` with :py:`fwft=False`. (`RFC 20`_)
Toolchain changes
-----------------
* Added: ``Simulator.add_testbench``. (`RFC 27`_)
* Added: :meth:`Simulator.add_testbench <amaranth.sim.Simulator.add_testbench>`. (`RFC 27`_)
* Added: async function support in :meth:`Simulator.add_testbench <amaranth.sim.Simulator.add_testbench>` and :meth:`Simulator.add_process <amaranth.sim.Simulator.add_process>`. (`RFC 36`_)
* Added: support for :class:`amaranth.hdl.Assert` in simulation. (`RFC 50`_)
* Deprecated: ``Settle`` simulation command. (`RFC 27`_)
* Deprecated: ``Simulator.add_sync_process``. (`RFC 27`_)
* Changed: :meth:`Simulator.add_clock <amaranth.sim.Simulator.add_clock>` no longer implicitly adds :py:`period / 2` when :py:`phase` is specified, actually matching the documentation.
* Changed: :meth:`Simulator.run_until <amaranth.sim.Simulator.run_until>` always runs the simulation until the given deadline, even when no critical processes or testbenches are present.
* Deprecated: :py:`Settle` simulation command. (`RFC 27`_)
* Deprecated: :py:`Simulator.add_sync_process`. (`RFC 27`_)
* Deprecated: the :py:`run_passive` argument to :meth:`Simulator.run_until <amaranth.sim.Simulator.run_until>` has been deprecated, and does nothing.
* Removed: (deprecated in 0.4) use of mixed-case toolchain environment variable names, such as ``NMIGEN_ENV_Diamond`` or ``AMARANTH_ENV_Diamond``; use upper-case environment variable names, such as ``AMARANTH_ENV_DIAMOND``.
@ -150,7 +159,7 @@ Platform integration changes
* Added: :meth:`BuildPlan.extract`.
* Added: ``build.sh`` begins with ``#!/bin/sh``.
* Changed: ``IntelPlatform`` renamed to ``AlteraPlatform``.
* Deprecated: argument ``run_script=`` in :meth:`BuildPlan.execute_local`.
* Deprecated: argument :py:`run_script=` in :meth:`BuildPlan.execute_local`.
* Removed: (deprecated in 0.4) :mod:`vendor.intel`, :mod:`vendor.lattice_ecp5`, :mod:`vendor.lattice_ice40`, :mod:`vendor.lattice_machxo2_3l`, :mod:`vendor.quicklogic`, :mod:`vendor.xilinx`. (`RFC 18`_)

View file

@ -39,6 +39,10 @@ autodoc_default_options = {
autodoc_preserve_defaults = True
autodoc_inherit_docstrings = False
# Amaranth mostly does not include typehints, and showing them in some places but not others is
# worse than not showing them at all.
autodoc_typehints = "none"
napoleon_google_docstring = False
napoleon_numpy_docstring = True
napoleon_use_ivar = True

View file

@ -15,6 +15,7 @@ Language & toolchain
guide
reference
stdlib
simulator
platform
changes
contrib

View file

@ -27,7 +27,7 @@ Amaranth HDL requires Python 3.8; it works on CPython_ 3.8 (or newer), and works
For most workflows, Amaranth requires Yosys_ |yosys-version|. A `compatible version of Yosys <amaranth-yosys_>`_ is distributed via PyPI_ for most popular platforms, so it is usually not necessary to install Yosys separately.
Simulating Amaranth code requires no additional software. However, a waveform viewer like GTKWave_ is invaluable for debugging. As an alternative to GTKWave, the `Amaranth Playground`_ can be used to display waveforms for simple designs.
Simulating Amaranth code requires no additional software. However, a waveform viewer like Surfer_ or GTKWave_ is invaluable for debugging. As an alternative, the `Amaranth Playground`_ can be used to display waveforms for simple designs.
Synthesizing, placing and routing an Amaranth design for an FPGA requires the FPGA family specific toolchain. The open source iCE40, ECP5, MachXO2/3, Nexus, and Gowin toolchains are distributed via PyPI_ for most popular platforms by the YoWASP_ project.
@ -39,6 +39,7 @@ Synthesizing, placing and routing an Amaranth design for an FPGA requires the FP
.. _Yosys: https://yosyshq.net/yosys/
.. _amaranth-yosys: https://pypi.org/project/amaranth-yosys/
.. _PyPI: https://pypi.org/
.. _Surfer: https://surfer-project.org/
.. _GTKWave: https://gtkwave.sourceforge.net/
.. _YoWASP: https://yowasp.org/
@ -58,10 +59,6 @@ Installing prerequisites
:ref:`Install Python <python:using-on-windows>`, either from Windows Store or using the full installer. If using the full installer, make sure to install a 64-bit version of Python.
`Download GTKWave`_, either win32 or win64 binaries. GTKWave does not need to be installed; it can be unpacked to any convenient location and run from there.
.. _Download GTKWave: https://sourceforge.net/projects/gtkwave/files/
|upgrade-pip|
.. code-block:: doscon
@ -71,11 +68,11 @@ Installing prerequisites
.. platform-choice:: macos
:title: macOS
Install Homebrew_. Then, install Python and GTKWave by running:
Install Homebrew_. Then, install Python by running:
.. code-block:: console
$ brew install python gtkwave
$ brew install python
.. _Homebrew: https://brew.sh
@ -89,11 +86,11 @@ Installing prerequisites
:altname: linux
:title: Debian
Install Python and GTKWave by running:
Install Python by running:
.. code-block:: console
$ sudo apt-get install python3-pip gtkwave
$ sudo apt-get install python3-pip
On architectures other than |builtin-yosys-architectures|, install Yosys by running:
@ -113,16 +110,16 @@ Installing prerequisites
:altname: linux
:title: Arch Linux
Install Python, pip, GTKWave and Yosys by running:
Install Python and pip by running:
.. code-block:: console
$ sudo pacman -S python python-pip gtkwave yosys
$ sudo pacman -S python python-pip
.. platform-choice:: linux
:title: Other Linux
Install Python and GTKWave from the package repository of your distribution.
Install Python from the package repository of your distribution.
On architectures other than |builtin-yosys-architectures|, install Yosys from the package repository of your distribution.

288
docs/simulator.rst Normal file
View file

@ -0,0 +1,288 @@
Simulator
#########
.. py:module:: amaranth.sim
The :mod:`amaranth.sim` module, also known as the simulator, makes it possible to evaluate a design's functionality in a virtual environment before it is implemented in hardware.
Simulating circuits
-------------------
.. testsetup::
from amaranth import *
The following examples simulate one of the two designs below: synchronous counter running in the ``sync`` clock domain, and combinational adder. They assume familiarity with the :doc:`language guide <guide>`.
.. testcode::
from amaranth.lib import wiring
from amaranth.lib.wiring import In, Out
class Counter(wiring.Component):
en: In(1, init=1)
count: Out(4)
def elaborate(self, platform):
m = Module()
with m.If(self.en):
m.d.sync += self.count.eq(self.count + 1)
return m
class Adder(wiring.Component):
a: In(16)
b: In(16)
o: Out(17)
def elaborate(self, platform):
m = Module()
m.d.comb += self.o.eq(self.a + self.b)
return m
Running a simulation
++++++++++++++++++++
Simulating a design always requires the three basic steps: constructing the :abbr:`DUT (Design Under Test)`, constructing a :class:`Simulator` for it, and running the simulation with the :meth:`Simulator.run` or :meth:`Simulator.run_until` method:
.. testcode::
from amaranth.sim import Simulator
dut = Counter()
sim = Simulator(dut)
sim.run()
However, the code above neither stimulates the DUT's inputs nor measures the DUT's outputs; the :meth:`Simulator.run` method also immediately returns if no stimulus is added to the simulation. To make it useful, several changes are necessary:
* The :meth:`Simulator.add_clock` method adds a *stimulus*: a process external to the DUT that manipulates its inputs (in this case, toggles the clock of the ``sync`` domain).
* The :meth:`Simulator.run_until` method runs the simulation until a specific deadline is reached.
* The :meth:`Simulator.write_vcd` method captures the DUT's inputs, state, and outputs, and writes it to a :abbr:`VCD (Value Change Dump)` file.
.. _Surfer: https://surfer-project.org/
.. _GTKWave: https://gtkwave.sourceforge.net/
The following code simulates a design and capture the values of all the signals used in the design for each moment of simulation time:
.. testcode::
dut = Counter()
sim = Simulator(dut)
sim.add_clock(1e-6) # 1 µs period, or 1 MHz
with sim.write_vcd("example1.vcd"):
sim.run_until(1e-6 * 15) # 15 periods of the clock
The captured data is saved to a :abbr:`VCD` file :file:`example1.vcd`, which can be displayed with a *waveform viewer* such as Surfer_ or GTKWave_:
.. wavedrom:: simulator/example1
{
"head": {"tock": 0},
"signal": [
{"name": "clk", "wave": "lp.............."},
{"name": "rst", "wave": "l..............."},
{"name": "en", "wave": "h..............."},
{"name": "count", "wave": "================",
"data": ["0","1","2","3","4","5","6","7","8","9","10","11","12","13","14","15"]}
]
}
Testing synchronous circuits
++++++++++++++++++++++++++++
To verify that the DUT works as intended during a simulation, known values are provided as the inputs, and the outputs are compared with the expected results.
This is done by adding a different type of stimulus to the simulator, a *testbench*: an :py:`async` Python function that runs concurrently with the DUT and can manipulate the signals used in the simulation. A testbench is added using the :meth:`Simulator.add_testbench` method, and receives a :class:`SimulatorContext` object through which it can interact with the simulator: inspect the value of signals using the :meth:`ctx.get() <SimulatorContext.get>` method, change the value of signals using the :meth:`ctx.set() <SimulatorContext.set>` method, or wait for an active edge of a :ref:`clock domain <lang-clockdomains>` using the :meth:`ctx.tick() <SimulatorContext.tick>` method.
The following example simulates a counter and verifies that it can be stopped using its :py:`en` input:
.. testcode::
dut = Counter()
async def testbench_example2(ctx):
await ctx.tick().repeat(5) # wait until after the 5th edge of the `sync` domain clock
assert ctx.get(dut.count) == 5 # verify that the counter has the expected value
ctx.set(dut.en, False) # deassert `dut.en`, disabling the counter
await ctx.tick().repeat(5) # wait until after the 10th edge of clock
assert ctx.get(dut.count) == 5 # verify that the counter has not been incrementing
ctx.set(dut.en, True) # assert `dut.en`, enabling the counter again
sim = Simulator(dut)
sim.add_clock(1e-6)
sim.add_testbench(testbench_example2) # add the testbench; run_until() calls the function
with sim.write_vcd("example2.vcd"):
sim.run_until(1e-6 * 15)
Since this circuit is synchronous, and the :meth:`ctx.tick() <SimulatorContext.tick>` method waits until after the circuit has reacted to the clock edge, the change to the :py:`en` input affects the behavior of the circuit on the next clock cycle after the change:
.. wavedrom:: simulator/example2
{
"head": {"tock": 0},
"signal": [
{"name": "clk", "wave": "lp.............."},
{"name": "rst", "wave": "l..............."},
{"name": "en", "wave": "h....0....1....."},
{"name": "count", "wave": "======.....=====",
"data": ["0","1","2","3","4","5","6","7","8","9","10"]}
]
}
Testing combinational circuits
++++++++++++++++++++++++++++++
A testbench that tests a combinational circuit advances simulation time using the :meth:`ctx.delay() <SimulatorContext.delay>` method instead of the :meth:`ctx.tick() <SimulatorContext.tick>` method, since the simulation does not contain a clock in this case. The :meth:`Simulator.run` method stops the simulation and returns once all testbenches finish executing.
The following example simulates an adder:
.. testcode::
dut = Adder()
async def testbench_example3(ctx):
await ctx.delay(1e-6)
ctx.set(dut.a, 2)
ctx.set(dut.b, 2)
assert ctx.get(dut.o) == 4
await ctx.delay(1e-6)
ctx.set(dut.a, 1717)
ctx.set(dut.b, 420)
assert ctx.get(dut.o) == 2137
await ctx.delay(2e-6)
sim = Simulator(dut)
sim.add_testbench(testbench_example3)
with sim.write_vcd("example3.vcd"):
sim.run()
Since this circuit is entirely combinational, and the Amaranth simulator uses a *zero-delay model* of combinational circuits, the outputs change in the same instant as the inputs do:
.. wavedrom:: simulator/example3
{
"signal": [
{"name": "a", "wave": "===.", "data": [0, 2, 1717]},
{"name": "b", "wave": "===.", "data": [0, 2, 420]},
{"name": "o", "wave": "===.", "data": [0, 4, 2137]}
]
}
Replacing circuits with code
----------------------------
.. note::
This section describes an advanced technique that is not commonly used. If you are first learning how to use the simulator, you can skip it.
During simulation, it is possible to replace an Amaranth circuit with the equivalent Python code. This can be used to improve simulation performance, or to avoid reimplementing complex Python algorithms in Amaranth if they do not need to be synthesized.
This is done by adding a *process* to the simulator: an :py:`async` Python function that runs as an integral part of the simulation, simultaneously with the DUT. A process is added using the :meth:`Simulator.add_process` method, and receives a :class:`SimulatorContext` object through which it can interact with the simulator. A process is conceptually similar to a testbench but differs from it in two important ways:
* Testbenches run in a well-defined order (from first to last in the order they were added, yielding control only at :py:`await` points) and cannot observe inconsistent intermediate states of a design, but processes run in an undefined order while the design is converging after a change to its inputs.
* In a process, it is not possible to inspect the value of a signal using the :meth:`ctx.get() <SimulatorContext.get>` method, which guarantees that inconsistent intermediate states of a design cannot be observed by a process either.
A process communicates with the rest of the design in the same way an elaboratable would: through :class:`Signal`\ s.
Replacing synchronous circuits
++++++++++++++++++++++++++++++
Processes cannot inspect values of signals using the :meth:`ctx.get() <SimulatorContext.get>` method. Instead, values of signals in a synchronous process are sampled at each active edge of the clock domain (or, for domains with asynchronous reset, at the assertion of the reset signal) using the :meth:`ctx.tick() <SimulatorContext.tick>` method.
The following code replaces the :py:`Counter` elaboratable with the equivalent Python code in a process, and uses a testbench to verify its correct operation:
.. testcode::
m = Module()
m.domains.sync = cd_sync = ClockDomain()
en = Signal(init=1)
count = Signal(4)
async def process_example4(ctx):
count_value = 0 # initialize counter to 0
async for clk_edge, rst_value, en_value in ctx.tick().sample(en):
if rst_value: # can be asserted with or without clk_edge
count_value = 0 # re-initialize counter
elif clk_edge and en_value:
count_value += 1 # advance the counter
ctx.set(count, count_value) # publish its value to the simulation
async def testbench_example4(ctx):
await ctx.tick().repeat(5)
assert ctx.get(count) == 5
ctx.set(en, False)
await ctx.tick().repeat(5)
assert ctx.get(count) == 5
ctx.set(en, True)
sim = Simulator(m)
sim.add_clock(1e-6)
sim.add_process(process_example4)
sim.add_testbench(testbench_example4)
with sim.write_vcd("example4.vcd", traces=(cd_sync.clk, cd_sync.rst, en, count)):
sim.run()
Unless it is instructed otherwise, the :meth:`Simulator.write_vcd` method only captures values of signals that appear in the circuit provided to the simulator when it is created. The :py:`en` and :py:`count` signals do not, and are added explicitly using the :py:`traces` argument so that they will appear in the VCD file.
Replacing combinational circuits
++++++++++++++++++++++++++++++++
Values of signals in a combinational process are sampled anytime they change using the :meth:`ctx.changed() <SimulatorContext.changed>` method.
The following code replaces the :py:`Adder` elaboratable with the equivalent Python code in a process, and uses a testbench to verify its correct operation:
.. testcode::
m = Module()
a = Signal(16)
b = Signal(16)
o = Signal(17)
async def process_example5(ctx):
async for a_value, b_value in ctx.changed(a, b):
ctx.set(o, a_value + b_value)
async def testbench_example5(ctx):
await ctx.delay(1e-6)
ctx.set(a, 2)
ctx.set(b, 2)
assert ctx.get(o) == 4
await ctx.delay(1e-6)
ctx.set(a, 1717)
ctx.set(b, 420)
assert ctx.get(o) == 2137
await ctx.delay(2e-6)
sim = Simulator(m)
sim.add_process(process_example5)
sim.add_testbench(testbench_example5)
with sim.write_vcd("example5.vcd", traces=[a, b, o]):
sim.run()
Reference
---------
.. autoclass:: Simulator
.. autoclass:: SimulatorContext
.. autoexception:: BrokenTrigger
.. autoexception:: DomainReset
.. _sim-tick-trigger:
.. autoclass:: TickTrigger
.. autoclass:: TriggerCombination

View file

@ -47,14 +47,23 @@ To verify its functionality, the counter can be simulated for a small amount of
:start-after: # --- TEST ---
:end-before: # --- CONVERT ---
The test bench is implemented as a Python generator function that is co-simulated with the counter itself. The test bench can inspect the simulated signals with ``yield sig``, update them with ``yield sig.eq(val)``, and advance the simulation by one clock cycle with ``yield``.
The testbench is implemented as a Python :py:`async` function that is simulated concurrently with the counter itself. The testbench can inspect the simulated signals using :py:`ctx.get(sig)`, update them using :py:`ctx.set(sig, val)`, and advance the simulation by one clock cycle with :py:`await ctx.tick()`. See the :doc:`simulator documentation <simulator>` for details.
.. TODO: link to simulator reference
When run, the testbench finishes successfully, since all of the assertions hold, and produces a VCD file with waveforms recorded for every :class:`Signal` as well as the clock of the ``sync`` domain:
When run, the test bench finishes successfully, since all of the assertions hold, and produces a VCD file with waveforms recorded for every ``Signal`` as well as the clock of the ``sync`` domain:
.. wavedrom:: start/up_counter
.. image:: _images/up_counter_gtkwave.png
:alt: A screenshot of GTKWave displaying waveforms near the clock cycle where the counter overflows.
{
"signal": [
{"name": "clk", "wave": "p.........."},
{"name": "count", "wave": "===========", "data": ["17", "18", "19", "20", "21", "22", "23", "24", "25", "0", "1"]},
{"name": "en", "wave": "1.........."},
{"name": "ovf", "wave": "0.......10."},
],
"head": {
"tock": 48
}
}
Converting a counter

View file

@ -171,21 +171,35 @@ However, the memory read port is also configured to be *transparent* relative to
Simulation
==========
++++++++++
.. todo::
There are two ways to interact with a memory array in a simulator: requesting a read and/or write port that is used only in a testbench, or directly reading and writing memory contents. In most cases, directly accessing memory contents using :meth:`MemoryData.__getitem__ <amaranth.hdl.MemoryData.__getitem__>` is more convenient.
This section will be written once the simulator itself is documented.
For example, this :doc:`testbench </simulator>` will clear the least significant bit of every memory row:
.. testcode::
async def testbench(ctx):
for index in len(memory.data):
ctx.set(memory.data[index], ctx.get(memory.data[index]) & ~1)
Memory description
==================
.. autoexception:: amaranth.hdl.AlreadyElaborated
:noindex:
:noindex:
..
:canonical: amaranth.hdl.AlreadyElaborated
(not available until `amaranth.hdl` documents it)
.. autoclass:: amaranth.hdl.MemoryData
..
:canonical: amaranth.hdl.MemoryData
(not available until `amaranth.hdl` documents it)
Memory component
================

View file

@ -76,7 +76,7 @@ test = [
docs = [
"sphinx~=7.1",
"sphinxcontrib-platformpicker~=1.3",
"sphinxcontrib-yowasp-wavedrom==1.4", # exact version to avoid changes in rendering
"sphinxcontrib-yowasp-wavedrom==1.6", # exact version to avoid changes in rendering
"sphinx-rtd-theme~=2.0",
"sphinx-autobuild",
]

View file

@ -755,14 +755,14 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
m.d.sync += s.eq(0)
with self.assertSimulation(m) as sim:
sim.add_clock(1)
with self.assertRaisesRegex(ValueError,
with self.assertRaisesRegex(DriverConflict,
r"^Domain 'sync' already has a clock driving it$"):
sim.add_clock(1)
def test_add_clock_wrong_missing(self):
m = Module()
with self.assertSimulation(m) as sim:
with self.assertRaisesRegex(ValueError,
with self.assertRaisesRegex(NameError,
r"^Domain 'sync' is not present in simulation$"):
sim.add_clock(1)
@ -1978,9 +1978,19 @@ class SimulatorRegressionTestCase(FHDLTestCase):
with self.assertRaisesRegex(ValueError,
r"^Combinational domain does not have a clock$"):
await ctx.tick("comb")
with self.assertRaisesRegex(NameError,
r"^Clock domain named 'sync2' does not exist$"):
await ctx.tick("sync2")
with self.assertRaisesRegex(ValueError,
r"^Context cannot be provided if a clock domain is specified directly$"):
await ctx.tick(cd_sync, context=m)
with self.assertRaisesRegex(ValueError,
r"^Delay cannot be negative$"):
await ctx.delay(-1)
s = Signal(data.StructLayout({"a": unsigned(1)}))
with self.assertRaisesRegex(TypeError,
r"^The shape of a condition may only be `signed` or `unsigned`, not StructLayout.*$"):
await ctx.tick().until(s)
reached_tb = True
sim = Simulator(m)