From 79adbed3134a4490887a6b4450381217136ffe93 Mon Sep 17 00:00:00 2001 From: Catherine Date: Sat, 25 Nov 2023 04:21:00 +0000 Subject: [PATCH] sim.pysim: move name extractor functionality to Fragment. At the moment there are two issues with assignment of names in pysim: 1. Names are not deduplicated. It is possible (and frequent) for names to be included twice in VCD output. 2. Names are different compared to what is emitted in RTLIL, Verilog, or CXXRTL output. This commit fixes issue (1), and issue (2) will be fixed by the new IR. --- amaranth/hdl/ir.py | 78 +++++++++++++++++++++++++++++++++++++++++++ amaranth/sim/pysim.py | 42 +++++------------------ tests/test_hdl_ir.py | 68 +++++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 34 deletions(-) diff --git a/amaranth/hdl/ir.py b/amaranth/hdl/ir.py index a38479e..5b2811f 100644 --- a/amaranth/hdl/ir.py +++ b/amaranth/hdl/ir.py @@ -538,6 +538,84 @@ class Fragment: fragment._propagate_ports(ports=mapped_ports, all_undef_as_ports=False) return fragment + def _assign_names_to_signals(self): + """Assign names to signals used in this fragment. + + Returns + ------- + SignalDict of Signal to str + A mapping from signals used in this fragment to their local names. Because names are + deduplicated using local information only, the same signal used in a different fragment + may get a different name. + """ + + signal_names = SignalDict() + assigned_names = set() + + def add_signal_name(signal): + if signal not in signal_names: + if signal.name not in assigned_names: + name = signal.name + else: + name = f"{signal.name}${len(assigned_names)}" + assert name not in assigned_names + signal_names[signal] = name + assigned_names.add(name) + + for port in self.ports.keys(): + add_signal_name(port) + + for domain_name, domain_signals in self.drivers.items(): + if domain_name is not None: + domain = self.domains[domain_name] + add_signal_name(domain.clk) + if domain.rst is not None: + add_signal_name(domain.rst) + + for statement in self.statements: + for signal in statement._lhs_signals() | statement._rhs_signals(): + if not isinstance(signal, (ClockSignal, ResetSignal)): + add_signal_name(signal) + + return signal_names + + def _assign_names_to_fragments(self, hierarchy=("top",), *, _names=None): + """Assign names to this fragment and its subfragments. + + Subfragments may not necessarily have a name. This method assigns every such subfragment + a name, ``U$``, where ```` is based on its location in the hierarchy. + + Subfragment names may collide with signal names safely in Amaranth, but this may confuse + backends. This method assigns every such subfragment a name, ``$U$``, where + ``name`` is its original name, and ```` is based on its location in the hierarchy. + + Arguments + --------- + hierarchy : tuple of str + Name of this fragment. + + Returns + ------- + dict of Fragment to tuple of str + A mapping from this fragment and its subfragments to their full hierarchical names. + """ + + if _names is None: + _names = dict() + _names[self] = hierarchy + + signal_names = set(self._assign_names_to_signals().values()) + for subfragment_index, (subfragment, subfragment_name) in enumerate(self.subfragments): + if subfragment_name is None: + subfragment_name = f"U${subfragment_index}" + elif subfragment_name in signal_names: + subfragment_name = f"{subfragment_name}$U${subfragment_index}" + assert subfragment_name not in signal_names + subfragment._assign_names_to_fragments(hierarchy=(*hierarchy, subfragment_name), + _names=_names) + + return _names + class Instance(Fragment): def __init__(self, type, *args, **kwargs): diff --git a/amaranth/sim/pysim.py b/amaranth/sim/pysim.py index 57fb396..7cfe340 100644 --- a/amaranth/sim/pysim.py +++ b/amaranth/sim/pysim.py @@ -15,38 +15,6 @@ from ._pyclock import PyClockProcess __all__ = ["PySimEngine"] -class _NameExtractor: - def __init__(self): - self.names = SignalDict() - - def __call__(self, fragment, *, hierarchy=("bench", "top",)): - def add_signal_name(signal): - hierarchical_signal_name = (*hierarchy, signal.name) - if signal not in self.names: - self.names[signal] = {hierarchical_signal_name} - else: - self.names[signal].add(hierarchical_signal_name) - - for domain_name, domain_signals in fragment.drivers.items(): - if domain_name is not None: - domain = fragment.domains[domain_name] - add_signal_name(domain.clk) - if domain.rst is not None: - add_signal_name(domain.rst) - - for statement in fragment.statements: - for signal in statement._lhs_signals() | statement._rhs_signals(): - if not isinstance(signal, (ClockSignal, ResetSignal)): - add_signal_name(signal) - - for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments): - if subfragment_name is None: - subfragment_name = f"U${subfragment_index}" - self(subfragment, hierarchy=(*hierarchy, subfragment_name)) - - return self.names - - class _VCDWriter: @staticmethod def decode_to_vcd(signal, value): @@ -69,12 +37,18 @@ class _VCDWriter: self.traces = [] - signal_names = _NameExtractor()(fragment) + signal_names = SignalDict() + for subfragment, subfragment_name in \ + fragment._assign_names_to_fragments(hierarchy=("bench", "top",)).items(): + for signal, signal_name in subfragment._assign_names_to_signals().items(): + if signal not in signal_names: + signal_names[signal] = set() + signal_names[signal].add((*subfragment_name, signal_name)) trace_names = SignalDict() for trace in traces: if trace not in signal_names: - trace_names[trace] = {('bench', trace.name)} + trace_names[trace] = {("bench", trace.name)} self.traces.append(trace) if self.vcd_writer is None: diff --git a/tests/test_hdl_ir.py b/tests/test_hdl_ir.py index 19bf03b..7a34727 100644 --- a/tests/test_hdl_ir.py +++ b/tests/test_hdl_ir.py @@ -834,3 +834,71 @@ class InstanceTestCase(FHDLTestCase): self.assertEqual(f.attrs, OrderedDict([ ("ATTR", 1), ])) + + def test_assign_names_to_signals(self): + i = Signal() + rst = Signal() + o1 = Signal() + o2 = Signal() + o3 = Signal() + i1 = Signal(name="i") + + f = Fragment() + f.add_domains(cd_sync := ClockDomain()) + f.add_domains(cd_sync_norst := ClockDomain(reset_less=True)) + f.add_ports((i, rst), dir="i") + f.add_ports((o1, o2, o3), dir="o") + f.add_statements([o1.eq(0)]) + f.add_driver(o1, domain=None) + f.add_statements([o2.eq(i1)]) + f.add_driver(o2, domain="sync") + f.add_statements([o3.eq(i1)]) + f.add_driver(o3, domain="sync_norst") + + names = f._assign_names_to_signals() + self.assertEqual(names, SignalDict([ + (i, "i"), + (rst, "rst"), + (o1, "o1"), + (o2, "o2"), + (o3, "o3"), + (cd_sync.clk, "clk"), + (cd_sync.rst, "rst$6"), + (cd_sync_norst.clk, "sync_norst_clk"), + (i1, "i$8"), + ])) + + def test_assign_names_to_fragments(self): + f = Fragment() + f.add_subfragment(a := Fragment()) + f.add_subfragment(b := Fragment(), name="b") + + names = f._assign_names_to_fragments() + self.assertEqual(names, { + f: ("top",), + a: ("top", "U$0"), + b: ("top", "b") + }) + + def test_assign_names_to_fragments_rename_top(self): + f = Fragment() + f.add_subfragment(a := Fragment()) + f.add_subfragment(b := Fragment(), name="b") + + names = f._assign_names_to_fragments(hierarchy=("bench", "cpu")) + self.assertEqual(names, { + f: ("bench", "cpu",), + a: ("bench", "cpu", "U$0"), + b: ("bench", "cpu", "b") + }) + + def test_assign_names_to_fragments_collide_with_signal(self): + f = Fragment() + f.add_subfragment(a_f := Fragment(), name="a") + f.add_ports((a_s := Signal(name="a"),), dir="o") + + names = f._assign_names_to_fragments() + self.assertEqual(names, { + f: ("top",), + a_f: ("top", "a$U$0") + })