diff --git a/amaranth/_utils.py b/amaranth/_utils.py index 7faf21e..886de59 100644 --- a/amaranth/_utils.py +++ b/amaranth/_utils.py @@ -7,7 +7,7 @@ from collections import OrderedDict from collections.abc import Iterable -__all__ = ["flatten", "union", "memoize", "final", "deprecated", "get_linter_options", +__all__ = ["flatten", "union", "final", "deprecated", "get_linter_options", "get_linter_option"] @@ -29,16 +29,6 @@ def union(i, start=None): return r -def memoize(f): - memo = OrderedDict() - @functools.wraps(f) - def g(*args): - if args not in memo: - memo[args] = f(*args) - return memo[args] - return g - - def final(cls): def init_subclass(): raise TypeError("Subclassing {}.{} is not supported" diff --git a/amaranth/build/plat.py b/amaranth/build/plat.py index 6844b8d..c311257 100644 --- a/amaranth/build/plat.py +++ b/amaranth/build/plat.py @@ -140,8 +140,6 @@ class Platform(ResourceManager, metaclass=ABCMeta): def add_pin_fragment(pin, pin_fragment): pin_fragment = Fragment.get(pin_fragment, self) - if not isinstance(pin_fragment, Instance): - pin_fragment.flatten = True fragment.add_subfragment(pin_fragment, name=f"pin_{pin.name}") for pin, port, attrs, invert in self.iter_single_ended_pins(): diff --git a/amaranth/hdl/_ir.py b/amaranth/hdl/_ir.py index 1f5e0be..5c6ace6 100644 --- a/amaranth/hdl/_ir.py +++ b/amaranth/hdl/_ir.py @@ -1,10 +1,9 @@ from typing import Tuple from collections import defaultdict, OrderedDict -from functools import reduce import enum import warnings -from .._utils import flatten, memoize +from .._utils import flatten from .. import tracer, _unused from . import _ast, _cd, _ir, _nir @@ -75,7 +74,6 @@ class Fragment: self.subfragments = [] self.attrs = OrderedDict() self.generated = OrderedDict() - self.flatten = False self.src_loc = src_loc self.origins = None @@ -143,110 +141,6 @@ class Fragment: def elaborate(self, platform): return self - def _merge_subfragment(self, subfragment): - # Merge subfragment's everything except clock domains into this fragment. - # Flattening is done after clock domain propagation, so we can assume the domains - # are already the same in every involved fragment in the first place. - for domain, signal in subfragment.iter_drivers(): - self.add_driver(signal, domain) - for domain, statements in subfragment.statements.items(): - self.statements.setdefault(domain, []).extend(statements) - self.subfragments += subfragment.subfragments - - # Remove the merged subfragment. - found = False - for i, (check_subfrag, check_name, check_src_loc) in enumerate(self.subfragments): # :nobr: - if subfragment == check_subfrag: - del self.subfragments[i] - found = True - break - assert found - - def _resolve_hierarchy_conflicts(self, hierarchy=("top",), mode="warn"): - assert mode in ("silent", "warn", "error") - from ._mem import MemoryInstance - - driver_subfrags = _ast.SignalDict() - def add_subfrag(registry, entity, entry): - # Because of missing domain insertion, at the point when this code runs, we have - # a mixture of bound and unbound {Clock,Reset}Signals. Map the bound ones to - # the actual signals (because the signal itself can be driven as well); but leave - # the unbound ones as it is, because there's no concrete signal for it yet anyway. - if isinstance(entity, _ast.ClockSignal) and entity.domain in self.domains: - entity = self.domains[entity.domain].clk - elif isinstance(entity, _ast.ResetSignal) and entity.domain in self.domains: - entity = self.domains[entity.domain].rst - - if entity not in registry: - registry[entity] = set() - registry[entity].add(entry) - - # For each signal driven by this fragment and/or its subfragments, determine which - # subfragments also drive it. - for domain, signal in self.iter_drivers(): - add_subfrag(driver_subfrags, signal, (None, hierarchy)) - - flatten_subfrags = set() - for i, (subfrag, name, src_loc) in enumerate(self.subfragments): - if name is None: - name = f"" - subfrag_hierarchy = hierarchy + (name,) - - if subfrag.flatten: - # Always flatten subfragments that explicitly request it. - flatten_subfrags.add((subfrag, subfrag_hierarchy)) - - if isinstance(subfrag, (Instance, MemoryInstance, IOBufferInstance)): - # Never flatten instances. - continue - - # First, recurse into subfragments and let them detect driver conflicts as well. - subfrag_drivers = \ - subfrag._resolve_hierarchy_conflicts(subfrag_hierarchy, mode) - - # Second, classify subfragments by signals they drive. - for signal in subfrag_drivers: - add_subfrag(driver_subfrags, signal, (subfrag, subfrag_hierarchy)) - - # Find out the set of subfragments that needs to be flattened into this fragment - # to resolve driver-driver conflicts. - def flatten_subfrags_if_needed(subfrags): - if len(subfrags) == 1: - return [] - flatten_subfrags.update((f, h) for f, h in subfrags if f is not None) - return list(sorted(".".join(h) for f, h in subfrags)) - - for signal, subfrags in driver_subfrags.items(): - subfrag_names = flatten_subfrags_if_needed(subfrags) - if not subfrag_names: - continue - - # While we're at it, show a message. - message = ("Signal '{!r}' is driven from multiple fragments: {}" - .format(signal, ", ".join(subfrag_names))) - if mode == "error": - raise DriverConflict(message) - elif mode == "warn": - message += "; hierarchy will be flattened" - warnings.warn_explicit(message, DriverConflict, *signal.src_loc) - - # Flatten hierarchy. - for subfrag, subfrag_hierarchy in sorted(flatten_subfrags, key=lambda x: x[1]): - self._merge_subfragment(subfrag) - - # If we flattened anything, we might be in a situation where we have a driver conflict - # again, e.g. if we had a tree of fragments like A --- B --- C where only fragments - # A and C were driving a signal S. In that case, since B is not driving S itself, - # processing B will not result in any flattening, but since B is transitively driving S, - # processing A will flatten B into it. Afterwards, we have a tree like AB --- C, which - # has another conflict. - if any(flatten_subfrags): - # Try flattening again. - return self._resolve_hierarchy_conflicts(hierarchy, mode) - - # Nothing was flattened, we're done! - return _ast.SignalSet(driver_subfrags.keys()) - def _propagate_domains_up(self, hierarchy=("top",)): from ._xfrm import DomainRenamer @@ -344,7 +238,6 @@ class Fragment: def _propagate_domains(self, missing_domain, *, platform=None): self._propagate_domains_up() self._propagate_domains_down() - self._resolve_hierarchy_conflicts() new_domains = self._create_missing_domains(missing_domain, platform=platform) self._propagate_domains_down() return new_domains diff --git a/amaranth/hdl/_xfrm.py b/amaranth/hdl/_xfrm.py index 9550534..d92b0e5 100644 --- a/amaranth/hdl/_xfrm.py +++ b/amaranth/hdl/_xfrm.py @@ -319,7 +319,6 @@ class FragmentTransformer: ) else: new_fragment = Fragment(src_loc=fragment.src_loc) - new_fragment.flatten = fragment.flatten new_fragment.attrs = OrderedDict(fragment.attrs) self.map_subfragments(fragment, new_fragment) self.map_domains(fragment, new_fragment) diff --git a/tests/test_hdl_ir.py b/tests/test_hdl_ir.py index 66e1401..f01a773 100644 --- a/tests/test_hdl_ir.py +++ b/tests/test_hdl_ir.py @@ -598,133 +598,6 @@ class FragmentDomainsTestCase(FHDLTestCase): class FragmentHierarchyConflictTestCase(FHDLTestCase): - def setUp_self_sub(self): - self.s1 = Signal() - self.c1 = Signal() - self.c2 = Signal() - - self.f1 = Fragment() - self.f1.add_statements("sync", self.c1.eq(0)) - self.f1.add_driver(self.s1) - self.f1.add_driver(self.c1, "sync") - - self.f1a = Fragment() - self.f1.add_subfragment(self.f1a, "f1a") - - self.f2 = Fragment() - self.f2.add_statements("sync", self.c2.eq(1)) - self.f2.add_driver(self.s1) - self.f2.add_driver(self.c2, "sync") - self.f1.add_subfragment(self.f2) - - self.f1b = Fragment() - self.f1.add_subfragment(self.f1b, "f1b") - - self.f2a = Fragment() - self.f2.add_subfragment(self.f2a, "f2a") - - def test_conflict_self_sub(self): - self.setUp_self_sub() - - self.f1._resolve_hierarchy_conflicts(mode="silent") - self.assertEqual([(f, n) for f, n, _ in self.f1.subfragments], [ - (self.f1a, "f1a"), - (self.f1b, "f1b"), - (self.f2a, "f2a"), - ]) - self.assertRepr(self.f1.statements["sync"], """ - ( - (eq (sig c1) (const 1'd0)) - (eq (sig c2) (const 1'd1)) - ) - """) - self.assertEqual(self.f1.drivers, { - "comb": SignalSet((self.s1,)), - "sync": SignalSet((self.c1, self.c2)), - }) - - def test_conflict_self_sub_error(self): - self.setUp_self_sub() - - with self.assertRaisesRegex(DriverConflict, - r"^Signal '\(sig s1\)' is driven from multiple fragments: top, top.$"): - self.f1._resolve_hierarchy_conflicts(mode="error") - - def test_conflict_self_sub_warning(self): - self.setUp_self_sub() - - with self.assertWarnsRegex(DriverConflict, - (r"^Signal '\(sig s1\)' is driven from multiple fragments: top, top.; " - r"hierarchy will be flattened$")): - self.f1._resolve_hierarchy_conflicts(mode="warn") - - def setUp_sub_sub(self): - self.s1 = Signal() - self.c1 = Signal() - self.c2 = Signal() - - self.f1 = Fragment() - - self.f2 = Fragment() - self.f2.add_driver(self.s1) - self.f2.add_statements("comb", self.c1.eq(0)) - self.f1.add_subfragment(self.f2) - - self.f3 = Fragment() - self.f3.add_driver(self.s1) - self.f3.add_statements("comb", self.c2.eq(1)) - self.f1.add_subfragment(self.f3) - - def test_conflict_sub_sub(self): - self.setUp_sub_sub() - - self.f1._resolve_hierarchy_conflicts(mode="silent") - self.assertEqual(self.f1.subfragments, []) - self.assertRepr(self.f1.statements["comb"], """ - ( - (eq (sig c1) (const 1'd0)) - (eq (sig c2) (const 1'd1)) - ) - """) - - def setUp_self_subsub(self): - self.s1 = Signal() - self.c1 = Signal() - self.c2 = Signal() - - self.f1 = Fragment() - self.f1.add_driver(self.s1) - - self.f2 = Fragment() - self.f2.add_statements("comb", self.c1.eq(0)) - self.f1.add_subfragment(self.f2) - - self.f3 = Fragment() - self.f3.add_driver(self.s1) - self.f3.add_statements("comb", self.c2.eq(1)) - self.f2.add_subfragment(self.f3) - - def test_conflict_self_subsub(self): - self.setUp_self_subsub() - - self.f1._resolve_hierarchy_conflicts(mode="silent") - self.assertEqual(self.f1.subfragments, []) - self.assertRepr(self.f1.statements["comb"], """ - ( - (eq (sig c1) (const 1'd0)) - (eq (sig c2) (const 1'd1)) - ) - """) - - def test_explicit_flatten(self): - self.f1 = Fragment() - self.f2 = Fragment() - self.f2.flatten = True - self.f1.add_subfragment(self.f2) - - self.f1._resolve_hierarchy_conflicts(mode="silent") - self.assertEqual(self.f1.subfragments, []) - def test_no_conflict_local_domains(self): f1 = Fragment() cd1 = ClockDomain("d", local=True)