diff --git a/amaranth/hdl/ir.py b/amaranth/hdl/ir.py index a38479e..5b2811f 100644 --- a/amaranth/hdl/ir.py +++ b/amaranth/hdl/ir.py @@ -538,6 +538,84 @@ class Fragment: fragment._propagate_ports(ports=mapped_ports, all_undef_as_ports=False) return fragment + def _assign_names_to_signals(self): + """Assign names to signals used in this fragment. + + Returns + ------- + SignalDict of Signal to str + A mapping from signals used in this fragment to their local names. Because names are + deduplicated using local information only, the same signal used in a different fragment + may get a different name. + """ + + signal_names = SignalDict() + assigned_names = set() + + def add_signal_name(signal): + if signal not in signal_names: + if signal.name not in assigned_names: + name = signal.name + else: + name = f"{signal.name}${len(assigned_names)}" + assert name not in assigned_names + signal_names[signal] = name + assigned_names.add(name) + + for port in self.ports.keys(): + add_signal_name(port) + + for domain_name, domain_signals in self.drivers.items(): + if domain_name is not None: + domain = self.domains[domain_name] + add_signal_name(domain.clk) + if domain.rst is not None: + add_signal_name(domain.rst) + + for statement in self.statements: + for signal in statement._lhs_signals() | statement._rhs_signals(): + if not isinstance(signal, (ClockSignal, ResetSignal)): + add_signal_name(signal) + + return signal_names + + def _assign_names_to_fragments(self, hierarchy=("top",), *, _names=None): + """Assign names to this fragment and its subfragments. + + Subfragments may not necessarily have a name. This method assigns every such subfragment + a name, ``U$``, where ```` is based on its location in the hierarchy. + + Subfragment names may collide with signal names safely in Amaranth, but this may confuse + backends. This method assigns every such subfragment a name, ``$U$``, where + ``name`` is its original name, and ```` is based on its location in the hierarchy. + + Arguments + --------- + hierarchy : tuple of str + Name of this fragment. + + Returns + ------- + dict of Fragment to tuple of str + A mapping from this fragment and its subfragments to their full hierarchical names. + """ + + if _names is None: + _names = dict() + _names[self] = hierarchy + + signal_names = set(self._assign_names_to_signals().values()) + for subfragment_index, (subfragment, subfragment_name) in enumerate(self.subfragments): + if subfragment_name is None: + subfragment_name = f"U${subfragment_index}" + elif subfragment_name in signal_names: + subfragment_name = f"{subfragment_name}$U${subfragment_index}" + assert subfragment_name not in signal_names + subfragment._assign_names_to_fragments(hierarchy=(*hierarchy, subfragment_name), + _names=_names) + + return _names + class Instance(Fragment): def __init__(self, type, *args, **kwargs): diff --git a/amaranth/sim/pysim.py b/amaranth/sim/pysim.py index 57fb396..7cfe340 100644 --- a/amaranth/sim/pysim.py +++ b/amaranth/sim/pysim.py @@ -15,38 +15,6 @@ from ._pyclock import PyClockProcess __all__ = ["PySimEngine"] -class _NameExtractor: - def __init__(self): - self.names = SignalDict() - - def __call__(self, fragment, *, hierarchy=("bench", "top",)): - def add_signal_name(signal): - hierarchical_signal_name = (*hierarchy, signal.name) - if signal not in self.names: - self.names[signal] = {hierarchical_signal_name} - else: - self.names[signal].add(hierarchical_signal_name) - - for domain_name, domain_signals in fragment.drivers.items(): - if domain_name is not None: - domain = fragment.domains[domain_name] - add_signal_name(domain.clk) - if domain.rst is not None: - add_signal_name(domain.rst) - - for statement in fragment.statements: - for signal in statement._lhs_signals() | statement._rhs_signals(): - if not isinstance(signal, (ClockSignal, ResetSignal)): - add_signal_name(signal) - - for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments): - if subfragment_name is None: - subfragment_name = f"U${subfragment_index}" - self(subfragment, hierarchy=(*hierarchy, subfragment_name)) - - return self.names - - class _VCDWriter: @staticmethod def decode_to_vcd(signal, value): @@ -69,12 +37,18 @@ class _VCDWriter: self.traces = [] - signal_names = _NameExtractor()(fragment) + signal_names = SignalDict() + for subfragment, subfragment_name in \ + fragment._assign_names_to_fragments(hierarchy=("bench", "top",)).items(): + for signal, signal_name in subfragment._assign_names_to_signals().items(): + if signal not in signal_names: + signal_names[signal] = set() + signal_names[signal].add((*subfragment_name, signal_name)) trace_names = SignalDict() for trace in traces: if trace not in signal_names: - trace_names[trace] = {('bench', trace.name)} + trace_names[trace] = {("bench", trace.name)} self.traces.append(trace) if self.vcd_writer is None: diff --git a/tests/test_hdl_ir.py b/tests/test_hdl_ir.py index 19bf03b..7a34727 100644 --- a/tests/test_hdl_ir.py +++ b/tests/test_hdl_ir.py @@ -834,3 +834,71 @@ class InstanceTestCase(FHDLTestCase): self.assertEqual(f.attrs, OrderedDict([ ("ATTR", 1), ])) + + def test_assign_names_to_signals(self): + i = Signal() + rst = Signal() + o1 = Signal() + o2 = Signal() + o3 = Signal() + i1 = Signal(name="i") + + f = Fragment() + f.add_domains(cd_sync := ClockDomain()) + f.add_domains(cd_sync_norst := ClockDomain(reset_less=True)) + f.add_ports((i, rst), dir="i") + f.add_ports((o1, o2, o3), dir="o") + f.add_statements([o1.eq(0)]) + f.add_driver(o1, domain=None) + f.add_statements([o2.eq(i1)]) + f.add_driver(o2, domain="sync") + f.add_statements([o3.eq(i1)]) + f.add_driver(o3, domain="sync_norst") + + names = f._assign_names_to_signals() + self.assertEqual(names, SignalDict([ + (i, "i"), + (rst, "rst"), + (o1, "o1"), + (o2, "o2"), + (o3, "o3"), + (cd_sync.clk, "clk"), + (cd_sync.rst, "rst$6"), + (cd_sync_norst.clk, "sync_norst_clk"), + (i1, "i$8"), + ])) + + def test_assign_names_to_fragments(self): + f = Fragment() + f.add_subfragment(a := Fragment()) + f.add_subfragment(b := Fragment(), name="b") + + names = f._assign_names_to_fragments() + self.assertEqual(names, { + f: ("top",), + a: ("top", "U$0"), + b: ("top", "b") + }) + + def test_assign_names_to_fragments_rename_top(self): + f = Fragment() + f.add_subfragment(a := Fragment()) + f.add_subfragment(b := Fragment(), name="b") + + names = f._assign_names_to_fragments(hierarchy=("bench", "cpu")) + self.assertEqual(names, { + f: ("bench", "cpu",), + a: ("bench", "cpu", "U$0"), + b: ("bench", "cpu", "b") + }) + + def test_assign_names_to_fragments_collide_with_signal(self): + f = Fragment() + f.add_subfragment(a_f := Fragment(), name="a") + f.add_ports((a_s := Signal(name="a"),), dir="o") + + names = f._assign_names_to_fragments() + self.assertEqual(names, { + f: ("top",), + a_f: ("top", "a$U$0") + })