sim: add support for dumping structure fields in VCD.

See #790.

This commit adds an entirely private API for describing formatting of
values that is used in the standard library, in departure from our
standing policy of not using private APIs in the standard library.

This is a temporary measure intended to get the version 0.4 released
faster, as it has been years in the making. It is expected that this
API will be made public in the version 0.5 after going through the usual
RFC process.

This commit only adds VCD lines for fields defined in `lib.data.Layout`
when using `sim.pysim`. The emitted RTLIL and Verilog remain the same.
It is expected that when `sim.cxxsim` lands, RTLIL/Verilog output will
include aliases for layout fields as well.

The value representation API also handles formatting of enumerations,
with no changes visible to the designer. The implementation of
`Signal(decoder=)` is changed as well to use the new API, with full
backwards compatibility and no public API changes.

Co-authored-by: Wanda <wanda@phinode.net>
This commit is contained in:
Catherine 2023-11-26 02:13:26 +00:00
parent 04f906965a
commit 4bfe2cde6f
6 changed files with 197 additions and 66 deletions

View file

@ -5,7 +5,7 @@ import warnings
import re import re
from .._utils import bits_for, flatten from .._utils import bits_for, flatten
from ..hdl import ast, ir, mem, xfrm from ..hdl import ast, ir, mem, xfrm, _repr
from ..lib import wiring from ..lib import wiring
@ -337,12 +337,14 @@ class _ValueCompilerState:
wire_name = signal.name wire_name = signal.name
is_sync_driven = signal in self.driven and self.driven[signal] is_sync_driven = signal in self.driven and self.driven[signal]
attrs = dict(signal.attrs) attrs = dict(signal.attrs)
if signal._enum_class is not None: for repr in signal._value_repr:
attrs["enum_base_type"] = signal._enum_class.__name__ if repr.path == () and isinstance(repr.format, _repr.FormatEnum):
for value in signal._enum_class: enum = repr.format.enum
attrs["enum_value_{:0{}b}".format(value.value, signal.width)] = value.name attrs["enum_base_type"] = enum.__name__
for value in enum:
attrs["enum_value_{:0{}b}".format(value.value, signal.width)] = value.name
# For every signal in the sync domain, assign \sig's initial value (using the \init reg # For every signal in the sync domain, assign \sig's initial value (using the \init reg
# attribute) to the reset value. # attribute) to the reset value.
@ -872,7 +874,7 @@ def _convert_fragment(builder, fragment, name_map, hierarchy):
if sub_type == "$mem_v2" and "MEMID" not in sub_params: if sub_type == "$mem_v2" and "MEMID" not in sub_params:
sub_params["MEMID"] = builder._make_name(sub_name, local=False) sub_params["MEMID"] = builder._make_name(sub_name, local=False)
sub_ports = OrderedDict() sub_ports = OrderedDict()
for port, value in sub_port_map.items(): for port, value in sub_port_map.items():
if not isinstance(subfragment, ir.Instance): if not isinstance(subfragment, ir.Instance):
@ -917,7 +919,7 @@ def _convert_fragment(builder, fragment, name_map, hierarchy):
stmt_compiler._has_rhs = False stmt_compiler._has_rhs = False
stmt_compiler._wrap_assign = False stmt_compiler._wrap_assign = False
stmt_compiler(group_stmts) stmt_compiler(group_stmts)
# For every driven signal in the sync domain, create a flop of appropriate type. Which type # For every driven signal in the sync domain, create a flop of appropriate type. Which type
# is appropriate depends on the domain: for domains with sync reset, it is a $dff, for # is appropriate depends on the domain: for domains with sync reset, it is a $dff, for
# domains with async reset it is an $adff. The latter is directly provided with the reset # domains with async reset it is an $adff. The latter is directly provided with the reset
@ -930,7 +932,7 @@ def _convert_fragment(builder, fragment, name_map, hierarchy):
wire_curr, wire_next = compiler_state.resolve(signal) wire_curr, wire_next = compiler_state.resolve(signal)
if not cd.async_reset: if not cd.async_reset:
# For sync reset flops, the reset value comes from logic inserted by # For sync reset flops, the reset value comes from logic inserted by
# `hdl.xfrm.DomainLowerer`. # `hdl.xfrm.DomainLowerer`.
module.cell("$dff", ports={ module.cell("$dff", ports={
"\\CLK": wire_clk, "\\CLK": wire_clk,

46
amaranth/hdl/_repr.py Normal file
View file

@ -0,0 +1,46 @@
from abc import ABCMeta, abstractmethod
__all__ = ["Format", "FormatInt", "FormatEnum", "FormatCustom", "Repr"]
class Format(metaclass=ABCMeta):
@abstractmethod
def format(self, value):
raise NotImplementedError
class FormatInt(Format):
def format(self, value):
return f"{value:d}"
class FormatEnum(Format):
def __init__(self, enum):
self.enum = enum
def format(self, value):
try:
return f"{self.enum(value).name}/{value:d}"
except ValueError:
return f"?/{value:d}"
class FormatCustom(Format):
def __init__(self, formatter):
self.formatter = formatter
def format(self, value):
return self.formatter(value)
class Repr:
def __init__(self, format, value, *, path=()):
from .ast import Value # avoid a circular dependency
assert isinstance(format, Format)
assert isinstance(value, Value)
assert isinstance(path, tuple) and all(isinstance(part, (str, int)) for part in path)
self.format = format
self.value = value
self.path = path

View file

@ -7,6 +7,7 @@ from collections.abc import Iterable, MutableMapping, MutableSet, MutableSequenc
from enum import Enum from enum import Enum
from itertools import chain from itertools import chain
from ._repr import *
from .. import tracer from .. import tracer
from .._utils import * from .._utils import *
from .._utils import _ignore_deprecated from .._utils import _ignore_deprecated
@ -53,6 +54,9 @@ class ShapeCastable:
raise TypeError(f"Class '{cls.__name__}' deriving from `ShapeCastable` must override " raise TypeError(f"Class '{cls.__name__}' deriving from `ShapeCastable` must override "
f"the `const` method") f"the `const` method")
def _value_repr(self, value):
return (Repr(FormatInt(), value),)
class Shape: class Shape:
"""Bit width and signedness of a value. """Bit width and signedness of a value.
@ -1127,19 +1131,51 @@ class Signal(Value, DUID, metaclass=_SignalMeta):
self.attrs = OrderedDict(() if attrs is None else attrs) self.attrs = OrderedDict(() if attrs is None else attrs)
if decoder is None and isinstance(orig_shape, type) and issubclass(orig_shape, Enum): if decoder is not None:
decoder = orig_shape # The value representation is specified explicitly. Since we do not expose `hdl._repr`,
if isinstance(decoder, type) and issubclass(decoder, Enum): # this is the only way to add a custom filter to the signal right now. The setter sets
# `self._value_repr` as well as the compatibility `self.decoder`.
self.decoder = decoder
else:
# If it's an enum, expose it via `self.decoder` for compatibility, whether it's a Python
# enum or an Amaranth enum. This also sets the value representation, even for custom
# shape-castables that implement their own `_value_repr`.
if isinstance(orig_shape, type) and issubclass(orig_shape, Enum):
self.decoder = orig_shape
else:
self.decoder = None
# The value representation is specified implicitly in the shape of the signal.
if isinstance(orig_shape, ShapeCastable):
# A custom shape-castable always has a `_value_repr`, at least the default one.
self._value_repr = tuple(orig_shape._value_repr(self))
elif isinstance(orig_shape, type) and issubclass(orig_shape, Enum):
# A non-Amaranth enum needs a value repr constructed for it.
self._value_repr = (Repr(FormatEnum(orig_shape), self),)
else:
# Any other case is formatted as a plain integer.
self._value_repr = (Repr(FormatInt(), self),)
@property
def decoder(self):
return self._decoder
@decoder.setter
def decoder(self, decoder):
# Compute the value representation that will be used by Amaranth.
if decoder is None:
self._value_repr = (Repr(FormatInt(), self),)
self._decoder = None
elif not (isinstance(decoder, type) and issubclass(decoder, Enum)):
self._value_repr = (Repr(FormatCustom(decoder), self),)
self._decoder = decoder
else: # Violence. In the name of backwards compatibility!
self._value_repr = (Repr(FormatEnum(decoder), self),)
def enum_decoder(value): def enum_decoder(value):
try: try:
return "{0.name:}/{0.value:}".format(decoder(value)) return "{0.name:}/{0.value:}".format(decoder(value))
except ValueError: except ValueError:
return str(value) return str(value)
self.decoder = enum_decoder self._decoder = enum_decoder
self._enum_class = decoder
else:
self.decoder = decoder
self._enum_class = None
# Not a @classmethod because amaranth.compat requires it. # Not a @classmethod because amaranth.compat requires it.
@staticmethod @staticmethod
@ -1914,3 +1950,6 @@ class SignalDict(_MappedKeyDict):
class SignalSet(_MappedKeySet): class SignalSet(_MappedKeySet):
_map_key = SignalKey _map_key = SignalKey
_unmap_key = lambda self, key: key.signal _unmap_key = lambda self, key: key.signal
from ._repr import *

View file

@ -1,8 +1,10 @@
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from enum import Enum
from collections.abc import Mapping, Sequence from collections.abc import Mapping, Sequence
import warnings import warnings
from amaranth.hdl import * from amaranth.hdl import *
from amaranth.hdl._repr import *
from amaranth.hdl.ast import ShapeCastable, ValueCastable from amaranth.hdl.ast import ShapeCastable, ValueCastable
@ -231,6 +233,21 @@ class Layout(ShapeCastable, metaclass=ABCMeta):
int_value |= (key_value.value << field.offset) & mask int_value |= (key_value.value << field.offset) & mask
return View(self, Const(int_value, self.as_shape())) return View(self, Const(int_value, self.as_shape()))
def _value_repr(self, value):
yield Repr(FormatInt(), value)
for key, field in self:
shape = Shape.cast(field.shape)
field_value = value[field.offset:field.offset+shape.width]
if shape.signed:
field_value = field_value.as_signed()
if isinstance(field.shape, ShapeCastable):
for repr in field.shape._value_repr(field_value):
yield Repr(repr.format, repr.value, path=(key,) + repr.path)
elif isinstance(field.shape, type) and issubclass(field.shape, Enum):
yield Repr(FormatEnum(field.shape), field_value, path=(key,))
else:
yield Repr(FormatInt(), field_value, path=(key,))
class StructLayout(Layout): class StructLayout(Layout):
"""Description of a structure layout. """Description of a structure layout.
@ -774,6 +791,9 @@ class _AggregateMeta(ShapeCastable, type):
fields.update(init or {}) fields.update(init or {})
return cls.as_shape().const(fields) return cls.as_shape().const(fields)
def _value_repr(cls, value):
return cls.__layout._value_repr(value)
class Struct(View, metaclass=_AggregateMeta): class Struct(View, metaclass=_AggregateMeta):
"""Structures defined with annotations. """Structures defined with annotations.

View file

@ -2,6 +2,7 @@ import enum as py_enum
import warnings import warnings
from ..hdl.ast import Value, Shape, ShapeCastable, Const from ..hdl.ast import Value, Shape, ShapeCastable, Const
from ..hdl._repr import *
__all__ = py_enum.__all__ __all__ = py_enum.__all__
@ -150,6 +151,9 @@ class EnumMeta(ShapeCastable, py_enum.EnumMeta):
member = cls(init) member = cls(init)
return Const(member.value, cls.as_shape()) return Const(member.value, cls.as_shape())
def _value_repr(cls, value):
yield Repr(FormatEnum(cls), value)
class Enum(py_enum.Enum): class Enum(py_enum.Enum):
"""Subclass of the standard :class:`enum.Enum` that has :class:`EnumMeta` as """Subclass of the standard :class:`enum.Enum` that has :class:`EnumMeta` as

View file

@ -5,7 +5,8 @@ from vcd import VCDWriter
from vcd.gtkw import GTKWSave from vcd.gtkw import GTKWSave
from ..hdl import * from ..hdl import *
from ..hdl.ast import SignalDict from ..hdl._repr import *
from ..hdl.ast import SignalDict, Slice, Operator
from ._base import * from ._base import *
from ._pyrtl import _FragmentCompiler from ._pyrtl import _FragmentCompiler
from ._pycoro import PyCoroProcess from ._pycoro import PyCoroProcess
@ -17,8 +18,24 @@ __all__ = ["PySimEngine"]
class _VCDWriter: class _VCDWriter:
@staticmethod @staticmethod
def decode_to_vcd(signal, value): def decode_to_vcd(format, value):
return signal.decoder(value).expandtabs().replace(" ", "_") return format.format(value).expandtabs().replace(" ", "_")
@staticmethod
def eval_field(field, signal, value):
if isinstance(field, Signal):
assert field is signal
return value
elif isinstance(field, Const):
return field.value
elif isinstance(field, Slice):
sub = _VCDWriter.eval_field(field.value, signal, value)
return (sub >> field.start) & ((1 << (field.stop - field.start)) - 1)
elif isinstance(field, Operator) and field.operator in ('s', 'u'):
sub = _VCDWriter.eval_field(field.operands[0], signal, value)
return Const(sub, field.shape()).value
else:
raise NotImplementedError
def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()): def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()):
if isinstance(vcd_file, str): if isinstance(vcd_file, str):
@ -55,53 +72,59 @@ class _VCDWriter:
return return
for signal, names in itertools.chain(signal_names.items(), trace_names.items()): for signal, names in itertools.chain(signal_names.items(), trace_names.items()):
if signal.decoder: fields = []
var_type = "string" self.vcd_vars[signal] = []
var_size = 1 self.gtkw_names[signal] = []
var_init = self.decode_to_vcd(signal, signal.reset) for repr in signal._value_repr:
else: var_init = self.eval_field(repr.value, signal, signal.reset)
var_type = "wire" if isinstance(repr.format, FormatInt):
var_size = signal.width var_type = "wire"
var_init = signal.reset var_size = repr.value.shape().width
else:
var_type = "string"
var_size = 1
var_init = self.decode_to_vcd(repr.format, var_init)
for (*var_scope, var_name) in names: vcd_var = None
if re.search(r"[ \t\r\n]", var_name): for (*var_scope, var_name) in names:
raise NameError("Signal '{}.{}' contains a whitespace character" if re.search(r"[ \t\r\n]", var_name):
.format(".".join(var_scope), var_name)) raise NameError("Signal '{}.{}' contains a whitespace character"
.format(".".join(var_scope), var_name))
suffix = None field_name = var_name
while True: for item in repr.path:
try: if isinstance(item, int):
if suffix is None: field_name += f"[{item}]"
var_name_suffix = var_name
else: else:
var_name_suffix = f"{var_name}${suffix}" field_name += f".{item}"
if signal not in self.vcd_vars:
vcd_var = self.vcd_writer.register_var( if vcd_var is None:
scope=var_scope, name=var_name_suffix, vcd_var = self.vcd_writer.register_var(
var_type=var_type, size=var_size, init=var_init) scope=var_scope, name=field_name,
self.vcd_vars[signal] = vcd_var var_type=var_type, size=var_size, init=var_init)
else: if var_size > 1:
self.vcd_writer.register_alias( suffix = f"[{var_size - 1}:0]"
scope=var_scope, name=var_name_suffix, else:
var=self.vcd_vars[signal]) suffix = ""
break if repr.path:
except KeyError: gtkw_field_name = '\\' + field_name
suffix = (suffix or 0) + 1 else:
gtkw_field_name = field_name
self.gtkw_names[signal].append(".".join((*var_scope, gtkw_field_name)) + suffix)
else:
self.vcd_writer.register_alias(
scope=var_scope, name=field_name,
var=vcd_var)
self.vcd_vars[signal].append((vcd_var, repr))
if signal not in self.gtkw_names:
self.gtkw_names[signal] = (*var_scope, var_name_suffix)
def update(self, timestamp, signal, value): def update(self, timestamp, signal, value):
vcd_var = self.vcd_vars.get(signal) for (vcd_var, repr) in self.vcd_vars.get(signal, ()):
if vcd_var is None: var_value = self.eval_field(repr.value, signal, value)
return if not isinstance(repr.format, FormatInt):
var_value = self.decode_to_vcd(repr.format, var_value)
if signal.decoder: self.vcd_writer.change(vcd_var, timestamp, var_value)
var_value = self.decode_to_vcd(signal, value)
else:
var_value = value
self.vcd_writer.change(vcd_var, timestamp, var_value)
def close(self, timestamp): def close(self, timestamp):
if self.vcd_writer is not None: if self.vcd_writer is not None:
@ -113,11 +136,8 @@ class _VCDWriter:
self.gtkw_save.treeopen("top") self.gtkw_save.treeopen("top")
for signal in self.traces: for signal in self.traces:
if len(signal) > 1 and not signal.decoder: for name in self.gtkw_names[signal]:
suffix = f"[{len(signal) - 1}:0]" self.gtkw_save.trace(name)
else:
suffix = ""
self.gtkw_save.trace(".".join(self.gtkw_names[signal]) + suffix)
if self.vcd_file is not None: if self.vcd_file is not None:
self.vcd_file.close() self.vcd_file.close()