Implement RFC 50: Print and string formatting.

Co-authored-by: Catherine <whitequark@whitequark.org>
This commit is contained in:
Wanda 2024-03-06 05:23:47 +01:00 committed by Catherine
parent 715a8d4934
commit bfe541a6d7
20 changed files with 1090 additions and 112 deletions

View file

@ -16,6 +16,7 @@ from .hdl import *
__all__ = [
"Shape", "unsigned", "signed",
"Value", "Const", "C", "Mux", "Cat", "Array", "Signal", "ClockSignal", "ResetSignal",
"Format", "Print", "Assert",
"Module",
"ClockDomain",
"Elaboratable", "Fragment", "Instance",

View file

@ -1,4 +1,15 @@
from .hdl._ast import AnyConst, AnySeq, Initial, Assert, Assume, Cover
from .hdl._ast import AnyConst, AnySeq, Initial
from . import hdl as __hdl
__all__ = ["AnyConst", "AnySeq", "Initial", "Assert", "Assume", "Cover"]
def __getattr__(name):
import warnings
if name in __hdl.__dict__ and name in __all__:
if not (name.startswith("__") and name.endswith("__")):
warnings.warn(f"instead of `{__name__}.{name}`, use `{__hdl.__name__}.{name}`",
DeprecationWarning, stacklevel=2)
return getattr(__hdl, name)
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View file

@ -441,8 +441,8 @@ class ModuleEmitter:
continue # Instances use one wire per output, not per cell.
elif isinstance(cell, (_nir.PriorityMatch, _nir.Matches)):
continue # Inlined into assignment lists.
elif isinstance(cell, (_nir.SyncProperty, _nir.AsyncProperty, _nir.Memory,
_nir.SyncWritePort)):
elif isinstance(cell, (_nir.SyncPrint, _nir.AsyncPrint, _nir.SyncProperty,
_nir.AsyncProperty, _nir.Memory, _nir.SyncWritePort)):
continue # No outputs.
elif isinstance(cell, _nir.AssignmentList):
width = len(cell.default)
@ -859,37 +859,78 @@ class ModuleEmitter:
})
self.builder.cell(f"$memrd_v2", ports=ports, params=params, src=_src(cell.src_loc))
def emit_property(self, cell_idx, cell):
if isinstance(cell, _nir.AsyncProperty):
ports = {
"A": self.sigspec(cell.test),
"EN": self.sigspec(cell.en),
}
if isinstance(cell, _nir.SyncProperty):
test = self.builder.wire(1, attrs={"init": _ast.Const(0, 1)})
en = self.builder.wire(1, attrs={"init": _ast.Const(0, 1)})
for (d, q) in [
(cell.test, test),
(cell.en, en),
]:
ports = {
"D": self.sigspec(d),
"Q": q,
"CLK": self.sigspec(cell.clk),
}
params = {
"WIDTH": 1,
"CLK_POLARITY": {
"pos": True,
"neg": False,
}[cell.clk_edge],
}
self.builder.cell(f"$dff", ports=ports, params=params, src=_src(cell.src_loc))
ports = {
"A": test,
"EN": en,
}
self.builder.cell(f"${cell.kind}", name=cell.name, ports=ports, src=_src(cell.src_loc))
def emit_print(self, cell_idx, cell):
args = []
format = []
if cell.format is not None:
for chunk in cell.format.chunks:
if isinstance(chunk, str):
format.append(chunk)
else:
spec = _ast.Format._parse_format_spec(chunk.format_desc, _ast.Shape(len(chunk.value), chunk.signed))
type = spec["type"]
if type == "s":
assert len(chunk.value) % 8 == 0
for bit in reversed(range(0, len(chunk.value), 8)):
args += chunk.value[bit:bit+8]
else:
args += chunk.value
if type is None:
type = "d"
if type == "x" or type == "X":
# TODO(yosys): "H" type
type = "h"
if type == "s":
# TODO(yosys): support for single unicode character?
type = "c"
width = spec["width"]
align = spec["align"]
if align is None:
align = ">" if type != "c" else "<"
if align == "=":
# TODO(yosys): "=" alignment
align = ">"
fill = spec["fill"]
if fill not in (" ", "0"):
# TODO(yosys): arbitrary fill
fill = " "
# TODO(yosys): support for options, grouping
sign = spec["sign"]
if sign != "+":
# TODO(yosys): support " " sign
sign = ""
if type == "c":
signed = ""
elif chunk.signed:
signed = "s"
else:
signed = "u"
format.append(f"{{{len(chunk.value)}:{align}{fill}{width or ''}{type}{sign}{signed}}}")
ports = {
"EN": self.sigspec(cell.en),
"ARGS": self.sigspec(_nir.Value(args)),
}
params = {
"FORMAT": "".join(format),
"ARGS_WIDTH": len(args),
"PRIORITY": -cell_idx,
}
if isinstance(cell, (_nir.AsyncPrint, _nir.AsyncProperty)):
ports["TRG"] = self.sigspec(_nir.Value())
params["TRG_ENABLE"] = False
params["TRG_WIDTH"] = 0
params["TRG_POLARITY"] = 0
if isinstance(cell, (_nir.SyncPrint, _nir.SyncProperty)):
ports["TRG"] = self.sigspec(cell.clk)
params["TRG_ENABLE"] = True
params["TRG_WIDTH"] = 1
params["TRG_POLARITY"] = cell.clk_edge == "pos"
if isinstance(cell, (_nir.AsyncPrint, _nir.SyncPrint)):
self.builder.cell(f"$print", params=params, ports=ports, src=_src(cell.src_loc))
if isinstance(cell, (_nir.AsyncProperty, _nir.SyncProperty)):
params["FLAVOR"] = cell.kind
ports["A"] = self.sigspec(cell.test)
self.builder.cell(f"$check", params=params, ports=ports, src=_src(cell.src_loc))
def emit_any_value(self, cell_idx, cell):
self.builder.cell(f"${cell.kind}", ports={
@ -939,8 +980,8 @@ class ModuleEmitter:
self.emit_write_port(cell_idx, cell)
elif isinstance(cell, (_nir.AsyncReadPort, _nir.SyncReadPort)):
self.emit_read_port(cell_idx, cell)
elif isinstance(cell, (_nir.AsyncProperty, _nir.SyncProperty)):
self.emit_property(cell_idx, cell)
elif isinstance(cell, (_nir.AsyncPrint, _nir.SyncPrint, _nir.AsyncProperty, _nir.SyncProperty)):
self.emit_print(cell_idx, cell)
elif isinstance(cell, _nir.AnyValue):
self.emit_any_value(cell_idx, cell)
elif isinstance(cell, _nir.Initial):

View file

@ -1,6 +1,7 @@
from ._ast import Shape, unsigned, signed, ShapeCastable, ShapeLike
from ._ast import Value, ValueCastable, ValueLike
from ._ast import Const, C, Mux, Cat, Array, Signal, ClockSignal, ResetSignal
from ._ast import Format, Print, Assert, Assume, Cover
from ._dsl import SyntaxError, SyntaxWarning, Module
from ._cd import DomainError, ClockDomain
from ._ir import UnusedElaboratable, Elaboratable, DriverConflict, Fragment, Instance
@ -14,6 +15,7 @@ __all__ = [
"Shape", "unsigned", "signed", "ShapeCastable", "ShapeLike",
"Value", "ValueCastable", "ValueLike",
"Const", "C", "Mux", "Cat", "Array", "Signal", "ClockSignal", "ResetSignal",
"Format", "Print", "Assert", "Assume", "Cover",
# _dsl
"SyntaxError", "SyntaxWarning", "Module",
# _cd

View file

@ -2,12 +2,13 @@ from abc import ABCMeta, abstractmethod
import warnings
import functools
import operator
import string
import re
from collections import OrderedDict
from collections.abc import Iterable, MutableMapping, MutableSet, MutableSequence
from enum import Enum, EnumMeta
from itertools import chain
from ._repr import *
from .. import tracer
from ..utils import *
from .._utils import *
@ -21,8 +22,9 @@ __all__ = [
"Signal", "ClockSignal", "ResetSignal",
"ValueCastable", "ValueLike",
"Initial",
"Format",
"Statement", "Switch",
"Property", "Assign", "Assert", "Assume", "Cover",
"Property", "Assign", "Print", "Assert", "Assume", "Cover",
"SignalKey", "SignalDict", "SignalSet",
]
@ -337,7 +339,7 @@ class ShapeCastable:
# TODO: write an RFC for turning this into a proper interface method
def _value_repr(self, value):
return (Repr(FormatInt(), value),)
return (_repr.Repr(_repr.FormatInt(), value),)
class _ShapeLikeMeta(type):
@ -1260,6 +1262,17 @@ class Value(metaclass=ABCMeta):
#: assert info == "a signal"
__hash__ = None # type: ignore
def __format__(self, format_desc):
"""Forbidden formatting.
Since normal Python formatting (f-strings and ``str.format``) must immediately return
a string, it is unsuitable for formatting Amaranth values. To format a value at simulation
time, use :class:`Format` instead. If you really want to dump the AST at elaboration time,
use ``repr`` instead (for instance, via ``f"{value!r}"``).
"""
raise TypeError(f"Value {self!r} cannot be converted to string. Use `Format` for "
f"simulation-time formatting, or use `repr` to print the AST.")
def _lhs_signals(self):
raise TypeError(f"Value {self!r} cannot be used in assignments")
@ -1925,20 +1938,20 @@ class Signal(Value, DUID, metaclass=_SignalMeta):
self._value_repr = tuple(orig_shape._value_repr(self))
elif isinstance(orig_shape, type) and issubclass(orig_shape, Enum):
# A non-Amaranth enum needs a value repr constructed for it.
self._value_repr = (Repr(FormatEnum(orig_shape), self),)
self._value_repr = (_repr.Repr(_repr.FormatEnum(orig_shape), self),)
else:
# Any other case is formatted as a plain integer.
self._value_repr = (Repr(FormatInt(), self),)
self._value_repr = (_repr.Repr(_repr.FormatInt(), self),)
# Compute the value representation that will be used by Amaranth.
if decoder is None:
self._value_repr = (Repr(FormatInt(), self),)
self._value_repr = (_repr.Repr(_repr.FormatInt(), self),)
self._decoder = None
elif not (isinstance(decoder, type) and issubclass(decoder, Enum)):
self._value_repr = (Repr(FormatCustom(decoder), self),)
self._value_repr = (_repr.Repr(_repr.FormatCustom(decoder), self),)
self._decoder = decoder
else: # Violence. In the name of backwards compatibility!
self._value_repr = (Repr(FormatEnum(decoder), self),)
self._value_repr = (_repr.Repr(_repr.FormatEnum(decoder), self),)
def enum_decoder(value):
try:
return "{0.name:}/{0.value:}".format(decoder(value))
@ -2299,6 +2312,189 @@ class Initial(Value):
return "(initial)"
@final
class Format:
def __init__(self, format, *args, **kwargs):
fmt = string.Formatter()
chunks = []
used_args = set()
auto_arg_index = 0
def get_field(field_name):
nonlocal auto_arg_index
if field_name == "":
if auto_arg_index is None:
raise ValueError("cannot switch from manual field "
"specification to automatic field "
"numbering")
field_name = str(auto_arg_index)
auto_arg_index += 1
elif field_name.isdigit():
if auto_arg_index is not None and auto_arg_index > 0:
raise ValueError("cannot switch from automatic field "
"numbering to manual field "
"specification")
auto_arg_index = None
obj, arg_used = fmt.get_field(field_name, args, kwargs)
used_args.add(arg_used)
return obj
def subformat(sub_string):
result = []
for literal, field_name, format_spec, conversion in fmt.parse(sub_string):
result.append(literal)
if field_name is not None:
obj = get_field(field_name)
obj = fmt.convert_field(obj, conversion)
format_spec = subformat(format_spec)
result.append(fmt.format_field(obj, format_spec))
return "".join(result)
for literal, field_name, format_spec, conversion in fmt.parse(format):
chunks.append(literal)
if field_name is not None:
obj = get_field(field_name)
obj = fmt.convert_field(obj, conversion)
format_spec = subformat(format_spec)
if isinstance(obj, Value):
# Perform validation.
self._parse_format_spec(format_spec, obj.shape())
chunks.append((obj, format_spec))
elif isinstance(obj, ValueCastable):
raise TypeError("'ValueCastable' formatting is not supported")
elif isinstance(obj, Format):
if format_spec != "":
raise ValueError(f"Format specifiers ({format_spec!r}) cannot be used for 'Format' objects")
chunks += obj._chunks
else:
chunks.append(fmt.format_field(obj, format_spec))
for i in range(len(args)):
if i not in used_args:
raise ValueError(f"format positional argument {i} was not used")
for name in kwargs:
if name not in used_args:
raise ValueError(f"format keyword argument {name!r} was not used")
self._chunks = self._clean_chunks(chunks)
@classmethod
def _from_chunks(cls, chunks):
res = object.__new__(cls)
res._chunks = cls._clean_chunks(chunks)
return res
@classmethod
def _clean_chunks(cls, chunks):
res = []
for chunk in chunks:
if isinstance(chunk, str) and chunk == "":
continue
if isinstance(chunk, str) and res and isinstance(res[-1], str):
res[-1] += chunk
else:
res.append(chunk)
return tuple(res)
def _to_format_string(self):
format_string = []
args = []
for chunk in self._chunks:
if isinstance(chunk, str):
format_string.append(chunk.replace("{", "{{").replace("}", "}}"))
else:
arg, format_spec = chunk
args.append(arg)
if format_spec:
format_string.append(f"{{:{format_spec}}}")
else:
format_string.append("{}")
return ("".join(format_string), tuple(args))
def __add__(self, other):
if not isinstance(other, Format):
return NotImplemented
return Format._from_chunks(self._chunks + other._chunks)
def __repr__(self):
format_string, args = self._to_format_string()
args = "".join(f" {arg!r}" for arg in args)
return f"(format {format_string!r}{args})"
def __format__(self, format_desc):
"""Forbidden formatting.
``Format`` objects cannot be directly formatted for the same reason as the ``Value``s
they contain.
"""
raise TypeError(f"Format object {self!r} cannot be converted to string. Use `repr` "
f"to print the AST, or pass it to the `Print` statement.")
_FORMAT_SPEC_PATTERN = re.compile(r"""
(?:
(?P<fill>.)?
(?P<align>[<>=^])
)?
(?P<sign>[-+ ])?
(?P<options>[#]?[0]?)
(?P<width>[1-9][0-9]*)?
(?P<grouping>[_,])?
(?P<type>[bodxXcsn])?
""", re.VERBOSE)
@staticmethod
def _parse_format_spec(spec: str, shape: Shape):
match = Format._FORMAT_SPEC_PATTERN.fullmatch(spec)
if not match:
raise ValueError(f"Invalid format specifier {spec!r}")
if match["align"] == "^":
raise ValueError(f"Alignment {match['align']!r} is not supported")
if match["grouping"] == ",":
raise ValueError(f"Grouping option {match['grouping']!r} is not supported")
if match["type"] == "n":
raise ValueError(f"Presentation type {match['type']!r} is not supported")
if match["type"] in ("c", "s"):
if shape.signed:
raise ValueError(f"Cannot print signed value with format specifier {match['type']!r}")
if match["align"] == "=":
raise ValueError(f"Alignment {match['align']!r} is not allowed with format specifier {match['type']!r}")
if "#" in match["options"]:
raise ValueError(f"Alternate form is not allowed with format specifier {match['type']!r}")
if "0" in match["options"]:
raise ValueError(f"Zero fill is not allowed with format specifier {match['type']!r}")
if match["sign"] is not None:
raise ValueError(f"Sign is not allowed with format specifier {match['type']!r}")
if match["grouping"] is not None:
raise ValueError(f"Cannot specify {match['grouping']!r} with format specifier {match['type']!r}")
if match["type"] == "s" and shape.width % 8 != 0:
raise ValueError(f"Value width must be divisible by 8 with format specifier {match['type']!r}")
return {
# Single character or None.
"fill": match["fill"],
# '<', '>', '=', or None. Cannot be '=' for types 'c' and 's'.
"align": match["align"],
# '-', '+', ' ', or None. Always None for types 'c' and 's'.
"sign": match["sign"],
# "", "#", "0", or "#0". Always "" for types 'c' and 's'.
"options": match["options"],
# An int.
"width": int(match["width"]) if match["width"] is not None else 0,
# '_' or None. Always None for types 'c' and 's'.
"grouping": match["grouping"],
# 'b', 'o', 'd', 'x', 'X', 'c', 's', or None.
"type": match["type"],
}
def _rhs_signals(self):
res = SignalSet()
for chunk in self._chunks:
if not isinstance(chunk, str):
obj, format_spec = chunk
res |= obj._rhs_signals()
return res
class _StatementList(list):
def __repr__(self):
return "({})".format(" ".join(map(repr, self)))
@ -2350,6 +2546,47 @@ class Assign(Statement):
return f"(eq {self.lhs!r} {self.rhs!r})"
class UnusedPrint(UnusedMustUse):
pass
@final
class Print(Statement, MustUse):
_MustUse__warning = UnusedPrint
def __init__(self, *args, sep=" ", end="\n", src_loc_at=0):
self._MustUse__silence = True
super().__init__(src_loc_at=src_loc_at)
if not isinstance(sep, str):
raise TypeError(f"'sep' must be a string, not {sep!r}")
if not isinstance(end, str):
raise TypeError(f"'end' must be a string, not {end!r}")
chunks = []
first = True
for arg in args:
if not first and sep != "":
chunks.append(sep)
first = False
chunks += Format("{}", arg)._chunks
if end != "":
chunks.append(end)
self._message = Format._from_chunks(chunks)
del self._MustUse__silence
@property
def message(self):
return self._message
def _lhs_signals(self):
return set()
def _rhs_signals(self):
return self.message._rhs_signals()
def __repr__(self):
return f"(print {self.message!r})"
class UnusedProperty(UnusedMustUse):
pass
@ -2363,14 +2600,17 @@ class Property(Statement, MustUse):
Assume = "assume"
Cover = "cover"
def __init__(self, kind, test, *, name=None, src_loc_at=0):
def __init__(self, kind, test, message=None, *, src_loc_at=0):
self._MustUse__silence = True
super().__init__(src_loc_at=src_loc_at)
self._kind = self.Kind(kind)
self._test = Value.cast(test)
self._name = name
if not isinstance(self.name, str) and self.name is not None:
raise TypeError("Property name must be a string or None, not {!r}"
.format(self.name))
if isinstance(message, str):
message = Format._from_chunks([message])
if message is not None and not isinstance(message, Format):
raise TypeError(f"Property message must be None, str, or Format, not {message!r}")
self._message = message
del self._MustUse__silence
@property
def kind(self):
@ -2381,31 +2621,33 @@ class Property(Statement, MustUse):
return self._test
@property
def name(self):
return self._name
def message(self):
return self._message
def _lhs_signals(self):
return set()
def _rhs_signals(self):
if self.message is not None:
return self.message._rhs_signals() | self.test._rhs_signals()
return self.test._rhs_signals()
def __repr__(self):
if self.name is not None:
return f"({self.name}: {self.kind.value} {self.test!r})"
if self.message is not None:
return f"({self.kind.value} {self.test!r} {self.message!r})"
return f"({self.kind.value} {self.test!r})"
def Assert(test, *, name=None, src_loc_at=0):
return Property("assert", test, name=name, src_loc_at=src_loc_at+1)
def Assert(test, message=None, *, src_loc_at=0):
return Property("assert", test, message, src_loc_at=src_loc_at+1)
def Assume(test, *, name=None, src_loc_at=0):
return Property("assume", test, name=name, src_loc_at=src_loc_at+1)
def Assume(test, message=None, *, src_loc_at=0):
return Property("assume", test, message, src_loc_at=src_loc_at+1)
def Cover(test, *, name=None, src_loc_at=0):
return Property("cover", test, name=name, src_loc_at=src_loc_at+1)
def Cover(test, message=None, *, src_loc_at=0):
return Property("cover", test, message, src_loc_at=src_loc_at+1)
class _LateBoundStatement(Statement):
@ -2617,4 +2859,4 @@ class SignalSet(_MappedKeySet):
_unmap_key = lambda self, key: key.signal
from ._repr import *
from . import _repr

View file

@ -9,7 +9,7 @@ from .._utils import flatten
from ..utils import bits_for
from .. import tracer
from ._ast import *
from ._ast import _StatementList, _LateBoundStatement, Property
from ._ast import _StatementList, _LateBoundStatement, Property, Print
from ._ir import *
from ._cd import *
from ._xfrm import *
@ -184,7 +184,7 @@ def resolve_statement(stmt):
src_loc=stmt.src_loc,
case_src_locs=stmt.case_src_locs,
)
elif isinstance(stmt, (Assign, Property)):
elif isinstance(stmt, (Assign, Property, Print)):
return stmt
else:
assert False # :nocov:
@ -584,9 +584,9 @@ class Module(_ModuleBuilderRoot, Elaboratable):
self._pop_ctrl()
for stmt in Statement.cast(assigns):
if not isinstance(stmt, (Assign, Property, _LateBoundStatement)):
if not isinstance(stmt, (Assign, Property, Print, _LateBoundStatement)):
raise SyntaxError(
f"Only assignments and property checks may be appended to d.{domain}")
f"Only assignments, prints, and property checks may be appended to d.{domain}")
stmt._MustUse__used = True

View file

@ -222,7 +222,7 @@ class Fragment:
continue
# While we're at it, show a message.
message = ("Signal '{}' is driven from multiple fragments: {}"
message = ("Signal '{!r}' is driven from multiple fragments: {}"
.format(signal, ", ".join(subfrag_names)))
if mode == "error":
raise DriverConflict(message)
@ -972,6 +972,17 @@ class NetlistEmitter:
else:
assert False # :nocov:
def emit_format(self, module_idx, format):
chunks = []
for chunk in format._chunks:
if isinstance(chunk, str):
chunks.append(chunk)
else:
value, format_desc = chunk
value, signed = self.emit_rhs(module_idx, value)
chunks.append(_nir.FormatValue(value, format_desc, signed=signed))
return _nir.Format(chunks)
def emit_stmt(self, module_idx: int, fragment: _ir.Fragment, domain: str,
stmt: _ast.Statement, cond: _nir.Net):
if domain == "comb":
@ -986,6 +997,25 @@ class NetlistEmitter:
if len(rhs) < width:
rhs = self.extend(rhs, signed, width)
self.emit_assign(module_idx, cd, stmt.lhs, 0, rhs, cond, src_loc=stmt.src_loc)
elif isinstance(stmt, _ast.Print):
en_cell = _nir.AssignmentList(module_idx,
default=_nir.Value.zeros(),
assignments=[
_nir.Assignment(cond=cond, start=0, value=_nir.Value.ones(),
src_loc=stmt.src_loc)
],
src_loc=stmt.src_loc)
cond, = self.netlist.add_value_cell(1, en_cell)
format = self.emit_format(module_idx, stmt.message)
if cd is None:
cell = _nir.AsyncPrint(module_idx, en=cond,
format=format, src_loc=stmt.src_loc)
else:
clk, = self.emit_signal(cd.clk)
cell = _nir.SyncPrint(module_idx, en=cond,
clk=clk, clk_edge=cd.clk_edge,
format=format, src_loc=stmt.src_loc)
self.netlist.add_cell(cell)
elif isinstance(stmt, _ast.Property):
test, _signed = self.emit_rhs(module_idx, stmt.test)
if len(test) != 1:
@ -999,14 +1029,18 @@ class NetlistEmitter:
],
src_loc=stmt.src_loc)
cond, = self.netlist.add_value_cell(1, en_cell)
if stmt.message is None:
format = None
else:
format = self.emit_format(module_idx, stmt.message)
if cd is None:
cell = _nir.AsyncProperty(module_idx, kind=stmt.kind.value, test=test, en=cond,
name=stmt.name, src_loc=stmt.src_loc)
format=format, src_loc=stmt.src_loc)
else:
clk, = self.emit_signal(cd.clk)
cell = _nir.SyncProperty(module_idx, kind=stmt.kind.value, test=test, en=cond,
clk=clk, clk_edge=cd.clk_edge, name=stmt.name,
src_loc=stmt.src_loc)
clk=clk, clk_edge=cd.clk_edge,
format=format, src_loc=stmt.src_loc)
self.netlist.add_cell(cell)
elif isinstance(stmt, _ast.Switch):
test, _signed = self.emit_rhs(module_idx, stmt.test)

View file

@ -6,13 +6,16 @@ from ._ast import SignalDict
__all__ = [
# Netlist core
"Net", "Value", "Netlist", "ModuleNetFlow", "Module", "Cell", "Top",
"Net", "Value", "FormatValue", "Format",
"Netlist", "ModuleNetFlow", "Module", "Cell", "Top",
# Computation cells
"Operator", "Part",
# Decision tree cells
"Matches", "PriorityMatch", "Assignment", "AssignmentList",
# Storage cells
"FlipFlop", "Memory", "SyncWritePort", "AsyncReadPort", "SyncReadPort",
# Print cells
"AsyncPrint", "SyncPrint",
# Formal verification cells
"Initial", "AnyValue", "AsyncProperty", "SyncProperty",
# Foreign interface cells
@ -159,6 +162,57 @@ class Value(tuple):
__str__ = __repr__
class FormatValue:
"""A single formatted value within ``Format``.
Attributes
----------
value: Value
format_desc: str
signed: bool
"""
def __init__(self, value, format_desc, *, signed):
assert isinstance(format_desc, str)
assert isinstance(signed, bool)
self.value = Value(value)
self.format_desc = format_desc
self.signed = signed
def __repr__(self):
sign = "s" if self.signed else "u"
return f"({sign} {self.value!r} {self.format_desc!r})"
class Format:
"""Like _ast.Format, but for NIR.
Attributes
----------
chunks: tuple of str and FormatValue
"""
def __init__(self, chunks):
self.chunks = tuple(chunks)
for chunk in self.chunks:
assert isinstance(chunk, (str, FormatValue))
def __repr__(self):
return f"({' '.join(repr(chunk) for chunk in self.chunks)})"
def input_nets(self):
nets = set()
for chunk in self.chunks:
if isinstance(chunk, FormatValue):
nets |= set(chunk.value)
return nets
def resolve_nets(self, netlist: "Netlist"):
for chunk in self.chunks:
if isinstance(chunk, FormatValue):
chunk.value = netlist.resolve_value(chunk.value)
class Netlist:
"""A fine netlist. Consists of:
@ -837,6 +891,73 @@ class SyncReadPort(Cell):
return f"(read_port {self.memory} {self.width} {self.addr} {self.en} {self.clk_edge} {self.clk} ({transparent_for}))"
class AsyncPrint(Cell):
"""Corresponds to ``Print`` in the "comb" domain.
Attributes
----------
en: Net
format: Format
"""
def __init__(self, module_idx, *, en, format, src_loc):
super().__init__(module_idx, src_loc=src_loc)
assert isinstance(format, Format)
self.en = Net.ensure(en)
self.format = format
def input_nets(self):
return {self.en} | self.format.input_nets()
def output_nets(self, self_idx: int):
return set()
def resolve_nets(self, netlist: Netlist):
self.en = netlist.resolve_net(self.en)
self.format.resolve_nets(netlist)
def __repr__(self):
return f"(print {self.en} {self.format!r})"
class SyncPrint(Cell):
"""Corresponds to ``Print`` in domains other than "comb".
Attributes
----------
en: Net
clk: Net
clk_edge: str, either 'pos' or 'neg'
format: Format
"""
def __init__(self, module_idx, *, en, clk, clk_edge, format, src_loc):
super().__init__(module_idx, src_loc=src_loc)
assert clk_edge in ('pos', 'neg')
assert isinstance(format, Format)
self.en = Net.ensure(en)
self.clk = Net.ensure(clk)
self.clk_edge = clk_edge
self.format = format
def input_nets(self):
return {self.en, self.clk} | self.format.input_nets()
def output_nets(self, self_idx: int):
return set()
def resolve_nets(self, netlist: Netlist):
self.en = netlist.resolve_net(self.en)
self.clk = netlist.resolve_net(self.clk)
self.format.resolve_nets(netlist)
def __repr__(self):
return f"(print {self.en} {self.clk_edge} {self.clk} {self.format!r})"
class Initial(Cell):
"""Corresponds to ``Initial`` value."""
@ -892,19 +1013,23 @@ class AsyncProperty(Cell):
kind: str, either 'assert', 'assume', or 'cover'
test: Net
en: Net
name: str
format: Format or None
"""
def __init__(self, module_idx, *, kind, test, en, name, src_loc):
def __init__(self, module_idx, *, kind, test, en, format, src_loc):
super().__init__(module_idx, src_loc=src_loc)
assert format is None or isinstance(format, Format)
assert kind in ('assert', 'assume', 'cover')
self.kind = kind
self.test = Net.ensure(test)
self.en = Net.ensure(en)
self.name = name
self.format = format
def input_nets(self):
return {self.test, self.en}
if self.format is None:
return {self.test, self.en}
else:
return {self.test, self.en} | self.format.input_nets()
def output_nets(self, self_idx: int):
return set()
@ -912,9 +1037,11 @@ class AsyncProperty(Cell):
def resolve_nets(self, netlist: Netlist):
self.test = netlist.resolve_net(self.test)
self.en = netlist.resolve_net(self.en)
if self.format is not None:
self.format.resolve_nets(netlist)
def __repr__(self):
return f"({self.kind} {self.name!r} {self.test} {self.en})"
return f"({self.kind} {self.test} {self.en} {self.format!r})"
class SyncProperty(Cell):
@ -928,12 +1055,13 @@ class SyncProperty(Cell):
en: Net
clk: Net
clk_edge: str, either 'pos' or 'neg'
name: str
format: Format or None
"""
def __init__(self, module_idx, *, kind, test, en, clk, clk_edge, name, src_loc):
def __init__(self, module_idx, *, kind, test, en, clk, clk_edge, format, src_loc):
super().__init__(module_idx, src_loc=src_loc)
assert format is None or isinstance(format, Format)
assert kind in ('assert', 'assume', 'cover')
assert clk_edge in ('pos', 'neg')
self.kind = kind
@ -941,10 +1069,13 @@ class SyncProperty(Cell):
self.en = Net.ensure(en)
self.clk = Net.ensure(clk)
self.clk_edge = clk_edge
self.name = name
self.format = format
def input_nets(self):
return {self.test, self.en, self.clk}
if self.format is None:
return {self.test, self.en, self.clk}
else:
return {self.test, self.en, self.clk} | self.format.input_nets()
def output_nets(self, self_idx: int):
return set()
@ -953,9 +1084,11 @@ class SyncProperty(Cell):
self.test = netlist.resolve_net(self.test)
self.en = netlist.resolve_net(self.en)
self.clk = netlist.resolve_net(self.clk)
if self.format is not None:
self.format.resolve_nets(netlist)
def __repr__(self):
return f"({self.kind} {self.name!r} {self.test} {self.en} {self.clk_edge} {self.clk})"
return f"({self.kind} {self.test} {self.en} {self.clk_edge} {self.clk} {self.format!r})"
class Instance(Cell):

View file

@ -5,7 +5,7 @@ from collections.abc import Iterable
from .._utils import flatten
from .. import tracer
from ._ast import *
from ._ast import _StatementList, AnyValue, Property
from ._ast import _StatementList, AnyValue
from ._cd import *
from ._ir import *
from ._mem import MemoryInstance
@ -145,6 +145,10 @@ class StatementVisitor(metaclass=ABCMeta):
def on_Assign(self, stmt):
pass # :nocov:
@abstractmethod
def on_Print(self, stmt):
pass # :nocov:
@abstractmethod
def on_Property(self, stmt):
pass # :nocov:
@ -166,6 +170,8 @@ class StatementVisitor(metaclass=ABCMeta):
def on_statement(self, stmt):
if type(stmt) is Assign:
new_stmt = self.on_Assign(stmt)
elif type(stmt) is Print:
new_stmt = self.on_Print(stmt)
elif type(stmt) is Property:
new_stmt = self.on_Property(stmt)
elif type(stmt) is Switch:
@ -178,7 +184,7 @@ class StatementVisitor(metaclass=ABCMeta):
new_stmt.src_loc = stmt.src_loc
if isinstance(new_stmt, Switch) and isinstance(stmt, Switch):
new_stmt.case_src_locs = stmt.case_src_locs
if isinstance(new_stmt, Property):
if isinstance(new_stmt, (Print, Property)):
new_stmt._MustUse__used = True
return new_stmt
@ -190,11 +196,28 @@ class StatementTransformer(StatementVisitor):
def on_value(self, value):
return value
def on_Format(self, format):
chunks = []
for chunk in format._chunks:
if isinstance(chunk, str):
chunks.append(chunk)
else:
value, format_spec = chunk
chunks.append((self.on_value(value), format_spec))
return Format._from_chunks(chunks)
def on_Assign(self, stmt):
return Assign(self.on_value(stmt.lhs), self.on_value(stmt.rhs))
def on_Print(self, stmt):
return Print(self.on_Format(stmt.message), end="")
def on_Property(self, stmt):
return Property(stmt.kind, self.on_value(stmt.test), name=stmt.name)
if stmt.message is None:
message = None
else:
message = self.on_Format(stmt.message)
return Property(stmt.kind, self.on_value(stmt.test), message)
def on_Switch(self, stmt):
cases = OrderedDict((k, self.on_statement(s)) for k, s in stmt.cases.items())
@ -386,12 +409,23 @@ class DomainCollector(ValueVisitor, StatementVisitor):
def on_Initial(self, value):
pass
def on_Format(self, format):
for chunk in format._chunks:
if not isinstance(chunk, str):
value, _format_spec = chunk
self.on_value(value)
def on_Assign(self, stmt):
self.on_value(stmt.lhs)
self.on_value(stmt.rhs)
def on_Print(self, stmt):
self.on_Format(stmt.message)
def on_Property(self, stmt):
self.on_value(stmt.test)
if stmt.message is not None:
self.on_Format(stmt.message)
def on_Switch(self, stmt):
self.on_value(stmt.test)

View file

@ -1,7 +1,8 @@
"""First-in first-out queues."""
from .. import *
from ..asserts import *
from ..hdl import Assume
from ..asserts import Initial
from ..utils import ceil_log2
from .coding import GrayEncoder, GrayDecoder
from .cdc import FFSynchronizer, AsyncFFSynchronizer

View file

@ -4,7 +4,7 @@ from contextlib import contextmanager
import sys
from ..hdl import *
from ..hdl._ast import SignalSet, _StatementList
from ..hdl._ast import SignalSet, _StatementList, Property
from ..hdl._xfrm import ValueVisitor, StatementVisitor
from ..hdl._mem import MemoryInstance
from ._base import BaseProcess
@ -113,6 +113,15 @@ class _RHSValueCompiler(_ValueCompiler):
# If not None, `inputs` gets populated with RHS signals.
self.inputs = inputs
def sign(self, value):
value_mask = (1 << len(value)) - 1
masked = f"({value_mask:#x} & {self(value)})"
if value.shape().signed:
return f"sign({masked}, {-1 << (len(value) - 1):#x})"
else: # unsigned
return masked
def on_Const(self, value):
return f"{value.value}"
@ -345,7 +354,31 @@ class _LHSValueCompiler(_ValueCompiler):
return gen
def value_to_string(value):
"""Unpack a Verilog-like (but LSB-first) string of unknown width from an integer."""
msg = bytearray()
while value:
byte = value & 0xff
value >>= 8
if byte:
msg.append(byte)
return msg.decode()
def pin_blame(src_loc, exc):
if src_loc is None:
raise exc
filename, line = src_loc
code = compile("\n" * (line - 1) + "raise exc", filename, "exec")
exec(code, {"exc": exc})
class _StatementCompiler(StatementVisitor, _Compiler):
helpers = {
"value_to_string": value_to_string,
"pin_blame": pin_blame,
}
def __init__(self, state, emitter, *, inputs=None, outputs=None):
super().__init__(state, emitter)
self.rhs = _RHSValueCompiler(state, emitter, mode="curr", inputs=inputs)
@ -358,11 +391,7 @@ class _StatementCompiler(StatementVisitor, _Compiler):
self.emitter.append("pass")
def on_Assign(self, stmt):
gen_rhs_value = self.rhs(stmt.rhs) # check for oversized value before generating mask
gen_rhs = f"({(1 << len(stmt.rhs)) - 1:#x} & {gen_rhs_value})"
if stmt.rhs.shape().signed:
gen_rhs = f"sign({gen_rhs}, {-1 << (len(stmt.rhs) - 1):#x})"
return self.lhs(stmt.lhs)(gen_rhs)
return self.lhs(stmt.lhs)(self.rhs.sign(stmt.rhs))
def on_Switch(self, stmt):
gen_test_value = self.rhs(stmt.test) # check for oversized value before generating mask
@ -387,8 +416,47 @@ class _StatementCompiler(StatementVisitor, _Compiler):
with self.emitter.indent():
self(stmts)
def emit_format(self, format):
format_string = []
args = []
for chunk in format._chunks:
if isinstance(chunk, str):
format_string.append(chunk.replace("{", "{{").replace("}", "}}"))
else:
value, format_desc = chunk
value = self.rhs.sign(value)
if format_desc.endswith("s"):
format_desc = format_desc[:-1]
value = f"value_to_string({value})"
format_string.append(f"{{:{format_desc}}}")
args.append(value)
format_string = "".join(format_string)
args = ", ".join(args)
return f"{format_string!r}.format({args})"
def on_Print(self, stmt):
self.emitter.append(f"print({self.emit_format(stmt.message)}, end='')")
def on_Property(self, stmt):
raise NotImplementedError # :nocov:
if stmt.kind == Property.Kind.Cover:
if stmt.message is not None:
self.emitter.append(f"if {self.rhs.sign(stmt.test)}:")
with self.emitter.indent():
filename, line = stmt.src_loc
self.emitter.append(f"print(\"Coverage hit at \" {filename!r} \":{line}:\", {self.emit_format(stmt.message)})")
else:
self.emitter.append(f"if not {self.rhs.sign(stmt.test)}:")
with self.emitter.indent():
if stmt.kind == Property.Kind.Assert:
kind = "Assertion"
elif stmt.kind == Property.Kind.Assume:
kind = "Assumption"
else:
assert False # :nocov:
if stmt.message is not None:
self.emitter.append(f"pin_blame({stmt.src_loc!r}, AssertionError(\"{kind} violated: \" + {self.emit_format(stmt.message)}))")
else:
self.emitter.append(f"pin_blame({stmt.src_loc!r}, AssertionError(\"{kind} violated\"))")
@classmethod
def compile(cls, state, stmt):
@ -541,7 +609,11 @@ class _FragmentCompiler:
else:
filename = "<string>"
exec_locals = {"slots": self.state.slots, **_ValueCompiler.helpers}
exec_locals = {
"slots": self.state.slots,
**_ValueCompiler.helpers,
**_StatementCompiler.helpers,
}
exec(compile(code, filename, "exec"), exec_locals)
domain_process.run = exec_locals["run"]

View file

@ -35,6 +35,8 @@ Apply the following changes to code written against Amaranth 0.4 to migrate it t
* Convert uses of ``Simulator.add_sync_process`` used as testbenches to ``Simulator.add_testbench``
* Convert other uses of ``Simulator.add_sync_process`` to ``Simulator.add_process``
* Replace uses of ``amaranth.hdl.Memory`` with ``amaranth.lib.memory.Memory``
* Replace imports of ``amaranth.asserts.{Assert, Assume, Cover}`` with imports from ``amaranth.hdl``
* Remove any usage of ``name=`` with assertions, possibly replacing them with custom messages
Implemented RFCs
@ -46,6 +48,7 @@ Implemented RFCs
.. _RFC 43: https://amaranth-lang.org/rfcs/0043-rename-reset-to-init.html
.. _RFC 45: https://amaranth-lang.org/rfcs/0045-lib-memory.html
.. _RFC 46: https://amaranth-lang.org/rfcs/0046-shape-range-1.html
.. _RFC 50: https://amaranth-lang.org/rfcs/0050-print.html
* `RFC 17`_: Remove ``log2_int``
* `RFC 27`_: Testbench processes for the simulator
@ -53,6 +56,7 @@ Implemented RFCs
* `RFC 43`_: Rename ``reset=`` to ``init=``
* `RFC 45`_: Move ``hdl.Memory`` to ``lib.Memory``
* `RFC 46`_: Change ``Shape.cast(range(1))`` to ``unsigned(0)``
* `RFC 50`_: ``Print`` statement and string formatting
Language changes
@ -62,6 +66,7 @@ Language changes
* Added: :class:`ast.Slice` objects have been made const-castable.
* Added: :func:`amaranth.utils.ceil_log2`, :func:`amaranth.utils.exact_log2`. (`RFC 17`_)
* Added: :class:`Format` objects, :class:`Print` statements, messages in :class:`Assert`, :class:`Assume` and :class:`Cover`. (`RFC 50`_)
* Changed: ``m.Case()`` with no patterns is never active instead of always active. (`RFC 39`_)
* Changed: ``Value.matches()`` with no patterns is ``Const(0)`` instead of ``Const(1)``. (`RFC 39`_)
* Changed: ``Signal(range(stop), init=stop)`` warning has been changed into a hard error and made to trigger on any out-of range value.
@ -69,11 +74,13 @@ Language changes
* Changed: ``Shape.cast(range(1))`` is now ``unsigned(0)``. (`RFC 46`_)
* Changed: the ``reset=`` argument of :class:`Signal`, :meth:`Signal.like`, :class:`amaranth.lib.wiring.Member`, :class:`amaranth.lib.cdc.FFSynchronizer`, and ``m.FSM()`` has been renamed to ``init=``. (`RFC 43`_)
* Changed: :class:`Shape` has been made immutable and hashable.
* Changed: :class:`Assert`, :class:`Assume`, :class:`Cover` have been moved to :mod:`amaranth.hdl` from :mod:`amaranth.asserts`. (`RFC 50`_)
* Deprecated: :func:`amaranth.utils.log2_int`. (`RFC 17`_)
* Deprecated: :class:`amaranth.hdl.Memory`. (`RFC 45`_)
* Removed: (deprecated in 0.4) :meth:`Const.normalize`. (`RFC 5`_)
* Removed: (deprecated in 0.4) :class:`Repl`. (`RFC 10`_)
* Removed: (deprecated in 0.4) :class:`ast.Sample`, :class:`ast.Past`, :class:`ast.Stable`, :class:`ast.Rose`, :class:`ast.Fell`.
* Removed: assertion names in :class:`Assert`, :class:`Assume` and :class:`Cover`. (`RFC 50`_)
Standard library changes
@ -91,6 +98,7 @@ Toolchain changes
-----------------
* Added: ``Simulator.add_testbench``. (`RFC 27`_)
* Added: support for :class:`amaranth.hdl.Assert` in simulation. (`RFC 50`_)
* Deprecated: ``Settle`` simulation command. (`RFC 27`_)
* Deprecated: ``Simulator.add_sync_process``. (`RFC 27`_)
* Removed: (deprecated in 0.4) use of mixed-case toolchain environment variable names, such as ``NMIGEN_ENV_Diamond`` or ``AMARANTH_ENV_Diamond``; use upper-case environment variable names, such as ``AMARANTH_ENV_DIAMOND``.

View file

@ -938,6 +938,8 @@ Every signal included in the target of an assignment becomes a part of the domai
The answer is no. While this kind of code is occasionally useful, rejecting it greatly simplifies backends, simulators, and analyzers.
In addition to assignments, :ref:`assertions, assumptions <lang-asserts>`, and :ref:`debug prints <lang-print>` can be added using the same syntax.
.. _lang-assignorder:
@ -1287,6 +1289,89 @@ Consider the following code:
Whenever there is a transition on the clock of the ``sync`` domain, the :py:`timer` signal is incremented by one if :py:`up` is true, decremented by one if :py:`down` is true, and retains its value otherwise.
.. _lang-assert:
Assertions
==========
Some properties are so important that if they are violated, the computations described by the design become meaningless. These properties should be guarded with an :class:`Assert` statement that immediately terminates the simulation if its condition is false. Assertions should generally be added to a :ref:`synchronous domain <lang-sync>`, and may have an optional message printed when it is violated:
.. testcode::
ip = Signal(16)
m.d.sync += Assert(ip < 128, "instruction pointer past the end of program code!")
Assertions may be nested within a :ref:`control block <lang-control>`:
.. testcode::
:hide:
booting = Signal()
.. testcode::
with m.If(~booting):
m.d.sync += Assert(ip < 128)
.. warning::
While is is also possible to add assertions to the :ref:`combinatorial domain <lang-comb>`, simulations of combinatorial circuits may have *glitches*: instantaneous, transient changes in the values of expressions that are being computed which do not affect the result of the computation (and are not visible in most waveform viewers for that reason). Depending on the tools used for simulation, a glitch in the condition of an assertion or of a :ref:`control block <lang-control>` that contains it may cause the simulation to be terminated, even if the glitch would have been instantaneously resolved afterwards.
If the condition of an assertion is assigned in a synchronous domain, then it is safe to add that assertion in the combinatorial domain. For example, neither of the assertions in the example below will be violated due to glitches, regardless of which domain the :py:`ip` and :py:`booting` signals are driven by:
.. testcode::
ip_sync = Signal.like(ip)
m.d.sync += ip_sync.eq(ip)
m.d.comb += Assert(ip_sync < 128)
with m.If(booting):
m.d.comb += Assert(ip_sync < 128)
Assertions should be added in a :ref:`synchronous domain <lang-sync>` when possible. In cases where it is not, such as if the condition is a signal that is assigned in a synchronous domain elsewhere, care should be taken while adding the assertion to the combinatorial domain.
.. _lang-print:
Debug printing
==============
The value of any expression, or of several of them, can be printed to the terminal during simulation using the :class:`Print` statement. When added to the :ref:`combinatorial domain <lang-comb>`, the value of an expression is printed whenever it changes:
.. testcode::
state = Signal()
m.d.comb += Print(state)
When added to a :ref:`synchronous domain <lang-sync>`, the value of an expression is printed whenever the active edge occurs on the clock of that domain:
.. testcode::
m.d.sync += Print("on tick: ", state)
The :class:`Print` statement, regardless of the domain, may be nested within a :ref:`control block <lang-control>`:
.. testcode::
old_state = Signal.like(state)
m.d.sync += old_state.eq(state)
with m.If(state != old_state):
m.d.sync += Print("was: ", old_state, "now: ", state)
The arguments to the :class:`Print` statement have the same meaning as the arguments to the Python :func:`print` function, with the exception that only :py:`sep` and :py:`end` keyword arguments are supported. In addition, the :class:`Format` helper can be used to apply formatting to the values, similar to the Python :meth:`str.format` method:
.. testcode::
addr = Signal(32)
m.d.sync += Print(Format("address: {:08x}", addr))
In both :class:`Print` and :class:`Format`, arguments that are not Amaranth :ref:`values <lang-values>` are formatted using the usual Python rules. The optional second :py:`message` argument to :class:`Assert` (described :ref:`above <lang-assert>`) also accepts a string or the :class:`Format` helper:
.. testcode::
m.d.sync += Assert((addr & 0b111) == 0, message=Format("unaligned address {:08x}!", addr))
.. _lang-clockdomains:
Clock domains

View file

@ -63,6 +63,9 @@ The prelude exports exactly the following names:
* :class:`Signal`
* :class:`ClockSignal`
* :class:`ResetSignal`
* :class:`Format`
* :class:`Print`
* :func:`Assert`
* :class:`Module`
* :class:`ClockDomain`
* :class:`Elaboratable`

View file

@ -1,3 +1,5 @@
# amaranth: UnusedPrint=no, UnusedProperty
import warnings
from enum import Enum, EnumMeta
@ -329,8 +331,6 @@ class ValueTestCase(FHDLTestCase):
r"^Cannot slice value with a value; use Value.bit_select\(\) or Value.word_select\(\) instead$"):
Const(31)[s:s+3]
def test_shift_left(self):
self.assertRepr(Const(256, unsigned(9)).shift_left(0),
"(cat (const 0'd0) (const 9'd256))")
@ -452,6 +452,12 @@ class ValueTestCase(FHDLTestCase):
s = Const(10).replicate(3)
self.assertEqual(repr(s), "(cat (const 4'd10) (const 4'd10) (const 4'd10))")
def test_format_wrong(self):
sig = Signal()
with self.assertRaisesRegex(TypeError,
r"^Value \(sig sig\) cannot be converted to string."):
f"{sig}"
class ConstTestCase(FHDLTestCase):
def test_shape(self):
@ -1494,6 +1500,151 @@ class InitialTestCase(FHDLTestCase):
self.assertEqual(i.shape(), unsigned(1))
class FormatTestCase(FHDLTestCase):
def test_construct(self):
a = Signal()
b = Signal()
c = Signal()
self.assertRepr(Format("abc"), "(format 'abc')")
fmt = Format("{{abc}}")
self.assertRepr(fmt, "(format '{{abc}}')")
self.assertEqual(fmt._chunks, ("{abc}",))
fmt = Format("{abc}", abc="{def}")
self.assertRepr(fmt, "(format '{{def}}')")
self.assertEqual(fmt._chunks, ("{def}",))
fmt = Format("a: {a:0{b}}, b: {b}", a=13, b=4)
self.assertRepr(fmt, "(format 'a: 0013, b: 4')")
fmt = Format("a: {a:0{b}x}, b: {b}", a=a, b=4)
self.assertRepr(fmt, "(format 'a: {:04x}, b: 4' (sig a))")
fmt = Format("a: {a}, b: {b}, a: {a}", a=a, b=b)
self.assertRepr(fmt, "(format 'a: {}, b: {}, a: {}' (sig a) (sig b) (sig a))")
fmt = Format("a: {0}, b: {1}, a: {0}", a, b)
self.assertRepr(fmt, "(format 'a: {}, b: {}, a: {}' (sig a) (sig b) (sig a))")
fmt = Format("a: {}, b: {}", a, b)
self.assertRepr(fmt, "(format 'a: {}, b: {}' (sig a) (sig b))")
subfmt = Format("a: {:2x}, b: {:3x}", a, b)
fmt = Format("sub: {}, c: {:4x}", subfmt, c)
self.assertRepr(fmt, "(format 'sub: a: {:2x}, b: {:3x}, c: {:4x}' (sig a) (sig b) (sig c))")
def test_construct_wrong(self):
a = Signal()
b = Signal(signed(16))
with self.assertRaisesRegex(ValueError,
r"^cannot switch from manual field specification to automatic field numbering$"):
Format("{0}, {}", a, b)
with self.assertRaisesRegex(ValueError,
r"^cannot switch from automatic field numbering to manual field specification$"):
Format("{}, {1}", a, b)
with self.assertRaisesRegex(TypeError,
r"^'ValueCastable' formatting is not supported$"):
Format("{}", MockValueCastable(Const(0)))
with self.assertRaisesRegex(ValueError,
r"^Format specifiers \('s'\) cannot be used for 'Format' objects$"):
Format("{:s}", Format(""))
with self.assertRaisesRegex(ValueError,
r"^format positional argument 1 was not used$"):
Format("{}", a, b)
with self.assertRaisesRegex(ValueError,
r"^format keyword argument 'b' was not used$"):
Format("{a}", a=a, b=b)
with self.assertRaisesRegex(ValueError,
r"^Invalid format specifier 'meow'$"):
Format("{a:meow}", a=a)
with self.assertRaisesRegex(ValueError,
r"^Alignment '\^' is not supported$"):
Format("{a:^13}", a=a)
with self.assertRaisesRegex(ValueError,
r"^Grouping option ',' is not supported$"):
Format("{a:,}", a=a)
with self.assertRaisesRegex(ValueError,
r"^Presentation type 'n' is not supported$"):
Format("{a:n}", a=a)
with self.assertRaisesRegex(ValueError,
r"^Cannot print signed value with format specifier 'c'$"):
Format("{b:c}", b=b)
with self.assertRaisesRegex(ValueError,
r"^Value width must be divisible by 8 with format specifier 's'$"):
Format("{a:s}", a=a)
with self.assertRaisesRegex(ValueError,
r"^Alignment '=' is not allowed with format specifier 'c'$"):
Format("{a:=13c}", a=a)
with self.assertRaisesRegex(ValueError,
r"^Sign is not allowed with format specifier 'c'$"):
Format("{a:+13c}", a=a)
with self.assertRaisesRegex(ValueError,
r"^Zero fill is not allowed with format specifier 'c'$"):
Format("{a:013c}", a=a)
with self.assertRaisesRegex(ValueError,
r"^Alternate form is not allowed with format specifier 'c'$"):
Format("{a:#13c}", a=a)
with self.assertRaisesRegex(ValueError,
r"^Cannot specify '_' with format specifier 'c'$"):
Format("{a:_c}", a=a)
def test_plus(self):
a = Signal()
b = Signal()
fmt_a = Format("a = {};", a)
fmt_b = Format("b = {};", b)
fmt = fmt_a + fmt_b
self.assertRepr(fmt, "(format 'a = {};b = {};' (sig a) (sig b))")
self.assertEqual(fmt._chunks[2], ";b = ")
def test_plus_wrong(self):
with self.assertRaisesRegex(TypeError,
r"^unsupported operand type\(s\) for \+: 'Format' and 'str'$"):
Format("") + ""
def test_format_wrong(self):
fmt = Format("")
with self.assertRaisesRegex(TypeError,
r"^Format object .* cannot be converted to string."):
f"{fmt}"
class PrintTestCase(FHDLTestCase):
def test_construct(self):
a = Signal()
b = Signal()
p = Print("abc")
self.assertRepr(p, "(print (format 'abc\\n'))")
p = Print("abc", "def")
self.assertRepr(p, "(print (format 'abc def\\n'))")
p = Print("abc", b)
self.assertRepr(p, "(print (format 'abc {}\\n' (sig b)))")
p = Print(a, b, end="", sep=", ")
self.assertRepr(p, "(print (format '{}, {}' (sig a) (sig b)))")
p = Print(Format("a: {a:04x}", a=a))
self.assertRepr(p, "(print (format 'a: {:04x}\\n' (sig a)))")
def test_construct_wrong(self):
with self.assertRaisesRegex(TypeError,
r"^'sep' must be a string, not 13$"):
Print("", sep=13)
with self.assertRaisesRegex(TypeError,
r"^'end' must be a string, not 13$"):
Print("", end=13)
class AssertTestCase(FHDLTestCase):
def test_construct(self):
a = Signal()
b = Signal()
p = Assert(a)
self.assertRepr(p, "(assert (sig a))")
p = Assert(a, "abc")
self.assertRepr(p, "(assert (sig a) (format 'abc'))")
p = Assert(a, Format("a = {}, b = {}", a, b))
self.assertRepr(p, "(assert (sig a) (format 'a = {}, b = {}' (sig a) (sig b)))")
def test_construct_wrong(self):
a = Signal()
b = Signal()
with self.assertRaisesRegex(TypeError,
r"^Property message must be None, str, or Format, not \(sig b\)$"):
Assert(a, b)
class SwitchTestCase(FHDLTestCase):
def test_default_case(self):
s = Switch(Const(0), {None: []})

View file

@ -87,7 +87,7 @@ class DSLTestCase(FHDLTestCase):
def test_d_asgn_wrong(self):
m = Module()
with self.assertRaisesRegex(SyntaxError,
r"^Only assignments and property checks may be appended to d\.sync$"):
r"^Only assignments, prints, and property checks may be appended to d\.sync$"):
m.d.sync += Switch(self.s1, {})
def test_comb_wrong(self):

View file

@ -214,12 +214,12 @@ class FragmentPortsTestCase(FHDLTestCase):
(cell 1 3 (~ 2.0))
(cell 2 4 (~ 6.0))
(cell 3 4 (assignment_list 1'd0 (1 0:1 1'd1)))
(cell 4 4 (assert None 0.2 3.0))
(cell 4 4 (assert 0.2 3.0 None))
(cell 5 5 (~ 6.0))
(cell 6 7 (~ 10.0))
(cell 7 7 (~ 0.2))
(cell 8 7 (assignment_list 1'd0 (1 0:1 1'd1)))
(cell 9 7 (assert None 7.0 8.0))
(cell 9 7 (assert 7.0 8.0 None))
(cell 10 8 (~ 0.2))
)
""")
@ -3146,6 +3146,69 @@ class SwitchTestCase(FHDLTestCase):
)
""")
def test_print(self):
m = Module()
a = Signal(6)
b = Signal(signed(8))
en = Signal()
m.domains.a = ClockDomain()
m.domains.b = ClockDomain(async_reset=True)
m.domains.c = ClockDomain(reset_less=True, clk_edge="neg")
with m.If(en):
m.d.comb += Print(a, end="")
m.d.comb += Print(b)
m.d.a += Print(a, b)
m.d.b += Print(Format("values: {:02x}, {:+d}", a, b))
m.d.c += Print("meow")
nl = build_netlist(Fragment.get(m, None), [
a, b, en,
ClockSignal("a"), ResetSignal("a"),
ClockSignal("b"), ResetSignal("b"),
ClockSignal("c"),
])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 'a' 0.2:8)
(input 'b' 0.8:16)
(input 'en' 0.16)
(input 'a_clk' 0.17)
(input 'a_rst' 0.18)
(input 'b_clk' 0.19)
(input 'b_rst' 0.20)
(input 'c_clk' 0.21)
)
(cell 0 0 (top
(input 'a' 2:8)
(input 'b' 8:16)
(input 'en' 16:17)
(input 'a_clk' 17:18)
(input 'a_rst' 18:19)
(input 'b_clk' 19:20)
(input 'b_rst' 20:21)
(input 'c_clk' 21:22)
))
(cell 1 0 (matches 0.16 1))
(cell 2 0 (priority_match 1 1.0))
(cell 3 0 (assignment_list 1'd0 (2.0 0:1 1'd1)))
(cell 4 0 (print 3.0 ((u 0.2:8 ''))))
(cell 5 0 (assignment_list 1'd0 (2.0 0:1 1'd1)))
(cell 6 0 (print 5.0 ((s 0.8:16 '') '\\n')))
(cell 7 0 (matches 0.16 1))
(cell 8 0 (priority_match 1 7.0))
(cell 9 0 (assignment_list 1'd0 (8.0 0:1 1'd1)))
(cell 10 0 (print 9.0 pos 0.17 ((u 0.2:8 '') ' ' (s 0.8:16 '') '\\n')))
(cell 11 0 (matches 0.16 1))
(cell 12 0 (priority_match 1 11.0))
(cell 13 0 (assignment_list 1'd0 (12.0 0:1 1'd1)))
(cell 14 0 (print 13.0 pos 0.19 ('values: ' (u 0.2:8 '02x') ', ' (s 0.8:16 '+d') '\\n')))
(cell 15 0 (matches 0.16 1))
(cell 16 0 (priority_match 1 15.0))
(cell 17 0 (assignment_list 1'd0 (16.0 0:1 1'd1)))
(cell 18 0 (print 17.0 neg 0.21 ('meow\\n')))
)
""")
def test_assert(self):
m = Module()
i = Signal(6)
@ -3154,11 +3217,11 @@ class SwitchTestCase(FHDLTestCase):
m.domains.c = ClockDomain(reset_less=True, clk_edge="neg")
with m.If(i[5]):
m.d.comb += Assert(i[0])
m.d.comb += Assume(i[1], name="a")
m.d.comb += Assume(i[1], "aaa")
m.d.a += Assert(i[2])
m.d.b += Assume(i[3], name="b")
m.d.c += Cover(i[4], name="c")
m.d.comb += Cover(i, name="d")
m.d.b += Assume(i[3], Format("value: {}", i))
m.d.c += Cover(i[4], "c")
m.d.comb += Cover(i, "d")
nl = build_netlist(Fragment.get(m, None), [
i,
ClockSignal("a"), ResetSignal("a"),
@ -3186,25 +3249,23 @@ class SwitchTestCase(FHDLTestCase):
(cell 1 0 (matches 0.7 1))
(cell 2 0 (priority_match 1 1.0))
(cell 3 0 (assignment_list 1'd0 (2.0 0:1 1'd1)))
(cell 4 0 (assert None 0.2 3.0))
(cell 4 0 (assert 0.2 3.0 None))
(cell 5 0 (assignment_list 1'd0 (2.0 0:1 1'd1)))
(cell 6 0 (assume 'a' 0.3 5.0))
(cell 6 0 (assume 0.3 5.0 ('aaa')))
(cell 7 0 (b 0.2:8))
(cell 8 0 (assignment_list 1'd0 (2.0 0:1 1'd1)))
(cell 9 0 (cover 'd' 7.0 8.0))
(cell 9 0 (cover 7.0 8.0 ('d')))
(cell 10 0 (matches 0.7 1))
(cell 11 0 (priority_match 1 10.0))
(cell 12 0 (assignment_list 1'd0 (11.0 0:1 1'd1)))
(cell 13 0 (assert None 0.4 12.0 pos 0.8))
(cell 13 0 (assert 0.4 12.0 pos 0.8 None))
(cell 14 0 (matches 0.7 1))
(cell 15 0 (priority_match 1 14.0))
(cell 16 0 (assignment_list 1'd0 (15.0 0:1 1'd1)))
(cell 17 0 (assume 'b' 0.5 16.0 pos 0.10))
(cell 17 0 (assume 0.5 16.0 pos 0.10 ('value: ' (u 0.2:8 ''))))
(cell 18 0 (matches 0.7 1))
(cell 19 0 (priority_match 1 18.0))
(cell 20 0 (assignment_list 1'd0 (19.0 0:1 1'd1)))
(cell 21 0 (cover 'c' 0.6 20.0 neg 0.12))
(cell 21 0 (cover 0.6 20.0 neg 0.12 ('c')))
)
""")

View file

@ -1,5 +1,4 @@
from amaranth.hdl import *
from amaranth.asserts import *
from amaranth.sim import *
from amaranth.lib.coding import *

View file

@ -3,7 +3,7 @@
import warnings
from amaranth.hdl import *
from amaranth.asserts import *
from amaranth.asserts import Initial, AnyConst
from amaranth.sim import *
from amaranth.lib.fifo import *
from amaranth.lib.memory import *

View file

@ -1,6 +1,8 @@
import os
import warnings
from contextlib import contextmanager
from contextlib import contextmanager, redirect_stdout
from io import StringIO
from textwrap import dedent
from amaranth._utils import flatten
from amaranth.hdl._ast import *
@ -416,7 +418,7 @@ class SimulatorUnitTestCase(FHDLTestCase):
class SimulatorIntegrationTestCase(FHDLTestCase):
@contextmanager
def assertSimulation(self, module, deadline=None):
def assertSimulation(self, module, *, deadline=None):
sim = Simulator(module)
yield sim
with sim.write_vcd("test.vcd", "test.gtkw"):
@ -1074,6 +1076,104 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
self.assertEqual((yield o), 1)
sim.add_testbench(process)
def test_print(self):
m = Module()
ctr = Signal(16)
m.d.sync += ctr.eq(ctr + 1)
with m.If(ctr % 3 == 0):
m.d.sync += Print(Format("Counter: {ctr:03d}", ctr=ctr))
output = StringIO()
with redirect_stdout(output):
with self.assertSimulation(m) as sim:
sim.add_clock(1e-6, domain="sync")
def process():
yield Delay(1e-5)
sim.add_testbench(process)
self.assertEqual(output.getvalue(), dedent("""\
Counter: 000
Counter: 003
Counter: 006
Counter: 009
"""))
def test_print(self):
def enc(s):
return Cat(
Const(b, 8)
for b in s.encode()
)
m = Module()
ctr = Signal(16)
m.d.sync += ctr.eq(ctr + 1)
msg = Signal(8 * 8)
with m.If(ctr == 0):
m.d.comb += msg.eq(enc("zero"))
with m.Else():
m.d.comb += msg.eq(enc("non-zero"))
with m.If(ctr % 3 == 0):
m.d.sync += Print(Format("Counter: {:>8s}", msg))
output = StringIO()
with redirect_stdout(output):
with self.assertSimulation(m) as sim:
sim.add_clock(1e-6, domain="sync")
def process():
yield Delay(1e-5)
sim.add_testbench(process)
self.assertEqual(output.getvalue(), dedent("""\
Counter: zero
Counter: non-zero
Counter: non-zero
Counter: non-zero
"""))
def test_assert(self):
m = Module()
ctr = Signal(16)
m.d.sync += ctr.eq(ctr + 1)
m.d.sync += Assert(ctr < 4, Format("Counter too large: {}", ctr))
with self.assertRaisesRegex(AssertionError,
r"^Assertion violated: Counter too large: 4$"):
with self.assertSimulation(m) as sim:
sim.add_clock(1e-6, domain="sync")
def process():
yield Delay(1e-5)
sim.add_testbench(process)
def test_assume(self):
m = Module()
ctr = Signal(16)
m.d.sync += ctr.eq(ctr + 1)
m.d.comb += Assume(ctr < 4)
with self.assertRaisesRegex(AssertionError,
r"^Assumption violated$"):
with self.assertSimulation(m) as sim:
sim.add_clock(1e-6, domain="sync")
def process():
yield Delay(1e-5)
sim.add_testbench(process)
def test_cover(self):
m = Module()
ctr = Signal(16)
m.d.sync += ctr.eq(ctr + 1)
cover = Cover(ctr % 3 == 0, Format("Counter: {ctr:03d}", ctr=ctr))
m.d.sync += cover
m.d.sync += Cover(ctr % 3 == 1)
output = StringIO()
with redirect_stdout(output):
with self.assertSimulation(m) as sim:
sim.add_clock(1e-6, domain="sync")
def process():
yield Delay(1e-5)
sim.add_testbench(process)
self.assertRegex(output.getvalue(), dedent(r"""
Coverage hit at .*test_sim\.py:\d+: Counter: 000
Coverage hit at .*test_sim\.py:\d+: Counter: 003
Coverage hit at .*test_sim\.py:\d+: Counter: 006
Coverage hit at .*test_sim\.py:\d+: Counter: 009
""").lstrip())
class SimulatorRegressionTestCase(FHDLTestCase):
def test_bug_325(self):