From 751e0f4b5737b4194b389131b79d3856d2d2c8b0 Mon Sep 17 00:00:00 2001 From: Wanda Date: Sun, 11 Feb 2024 12:07:45 +0100 Subject: [PATCH] ir: kill Fragment.ports --- amaranth/back/rtlil.py | 22 +- amaranth/back/verilog.py | 12 +- amaranth/build/plat.py | 11 +- amaranth/hdl/_ir.py | 594 +++++++++++++++++---------------------- amaranth/hdl/_nir.py | 13 +- amaranth/hdl/_xfrm.py | 5 - amaranth/sim/core.py | 12 +- amaranth/sim/pysim.py | 24 +- tests/test_hdl_ir.py | 549 ++++++++++++++++++------------------ 9 files changed, 576 insertions(+), 666 deletions(-) diff --git a/amaranth/back/rtlil.py b/amaranth/back/rtlil.py index e3bcc0f..f597b84 100644 --- a/amaranth/back/rtlil.py +++ b/amaranth/back/rtlil.py @@ -401,7 +401,7 @@ class ModuleEmitter: port_id=port_id, port_kind=flow.value, name=name, attrs=self.value_attrs.get(value, {})) self.sigport_wires[name] = (wire, value) - if flow == _nir.ModuleNetFlow.OUTPUT: + if flow == _nir.ModuleNetFlow.Output: continue # If we just emitted an input or inout port, it is driving the value. self.driven_sigports.add(name) @@ -463,7 +463,7 @@ class ModuleEmitter: for submodule_idx in self.module.submodules: submodule = self.netlist.modules[submodule_idx] for _name, (value, flow) in submodule.ports.items(): - if flow == _nir.ModuleNetFlow.OUTPUT: + if flow == _nir.ModuleNetFlow.Output: self.emit_driven_wire(value) def sigspec(self, *parts: '_nir.Net | Iterable[_nir.Net]'): @@ -989,10 +989,10 @@ class EmptyModuleChecker: return module_idx in self.empty -def convert_fragment(fragment, name="top", *, emit_src=True): +def convert_fragment(fragment, ports, name="top", *, emit_src=True, **kwargs): assert isinstance(fragment, _ir.Fragment) name_map = _ast.SignalDict() - netlist = _ir.build_netlist(fragment, name=name) + netlist = _ir.build_netlist(fragment, ports=ports, name=name, **kwargs) empty_checker = EmptyModuleChecker(netlist) builder = _Builder(emit_src=emit_src) for module_idx, module in enumerate(netlist.modules): @@ -1011,14 +1011,18 @@ def convert(elaboratable, name="top", platform=None, *, ports=None, emit_src=Tru if (ports is None and hasattr(elaboratable, "signature") and isinstance(elaboratable.signature, wiring.Signature)): - ports = [] - for _path, _member, value in elaboratable.signature.flatten(elaboratable): + ports = {} + for path, member, value in elaboratable.signature.flatten(elaboratable): if isinstance(value, _ast.ValueCastable): value = value.as_value() if isinstance(value, _ast.Value): - ports.append(value) + if member.flow == wiring.In: + dir = _ir.PortDirection.Input + else: + dir = _ir.PortDirection.Output + ports["__".join(path)] = (value, dir) elif ports is None: raise TypeError("The `convert()` function requires a `ports=` argument") - fragment = _ir.Fragment.get(elaboratable, platform).prepare(ports=ports, **kwargs) - il_text, _name_map = convert_fragment(fragment, name, emit_src=emit_src) + fragment = _ir.Fragment.get(elaboratable, platform) + il_text, _name_map = convert_fragment(fragment, ports, name, emit_src=emit_src, **kwargs) return il_text diff --git a/amaranth/back/verilog.py b/amaranth/back/verilog.py index ef32555..545a4ed 100644 --- a/amaranth/back/verilog.py +++ b/amaranth/back/verilog.py @@ -45,14 +45,18 @@ def convert(elaboratable, name="top", platform=None, *, ports=None, emit_src=Tru if (ports is None and hasattr(elaboratable, "signature") and isinstance(elaboratable.signature, wiring.Signature)): - ports = [] + ports = {} for path, member, value in elaboratable.signature.flatten(elaboratable): if isinstance(value, _ast.ValueCastable): value = value.as_value() if isinstance(value, _ast.Value): - ports.append(value) + if member.flow == wiring.In: + dir = _ir.PortDirection.Input + else: + dir = _ir.PortDirection.Output + ports["__".join(path)] = (value, dir) elif ports is None: raise TypeError("The `convert()` function requires a `ports=` argument") - fragment = _ir.Fragment.get(elaboratable, platform).prepare(ports=ports, **kwargs) - verilog_text, name_map = convert_fragment(fragment, name, emit_src=emit_src, strip_internal_attrs=strip_internal_attrs) + fragment = _ir.Fragment.get(elaboratable, platform) + verilog_text, name_map = convert_fragment(fragment, ports, name, emit_src=emit_src, strip_internal_attrs=strip_internal_attrs, **kwargs) return verilog_text diff --git a/amaranth/build/plat.py b/amaranth/build/plat.py index 03a6360..834f5df 100644 --- a/amaranth/build/plat.py +++ b/amaranth/build/plat.py @@ -163,11 +163,11 @@ class Platform(ResourceManager, metaclass=ABCMeta): if pin.dir == "io": add_pin_fragment(pin, self.get_diff_input_output(pin, port, attrs, invert)) - fragment._propagate_ports(ports=self.iter_ports(), all_undef_as_ports=False) - return self.toolchain_prepare(fragment, name, **kwargs) + ports = list(self.iter_ports()) + return self.toolchain_prepare(fragment, ports, name, **kwargs) @abstractmethod - def toolchain_prepare(self, fragment, name, **kwargs): + def toolchain_prepare(self, fragment, ports, name, **kwargs): """ Convert the ``fragment`` and constraints recorded in this :class:`Platform` into a :class:`BuildPlan`. @@ -290,7 +290,7 @@ class TemplatedPlatform(Platform): continue yield net_signal, port_signal, frequency - def toolchain_prepare(self, fragment, name, *, emit_src=True, **kwargs): + def toolchain_prepare(self, fragment, ports, name, *, emit_src=True, **kwargs): # Restrict the name of the design to a strict alphanumeric character set. Platforms will # interpolate the name of the design in many different contexts: filesystem paths, Python # scripts, Tcl scripts, ad-hoc constraint files, and so on. It is not practical to add @@ -306,7 +306,8 @@ class TemplatedPlatform(Platform): # and to incorporate the Amaranth version into generated code. autogenerated = f"Automatically generated by Amaranth {__version__}. Do not edit." - rtlil_text, self._name_map = rtlil.convert_fragment(fragment, name=name, emit_src=emit_src) + rtlil_text, self._name_map = rtlil.convert_fragment(fragment, ports=ports, name=name, + emit_src=emit_src) # Retrieve an override specified in either the environment or as a kwarg. # expected_type parameter is used to assert the type of kwargs, passing `None` will disable diff --git a/amaranth/hdl/_ir.py b/amaranth/hdl/_ir.py index da58fda..43a9ed9 100644 --- a/amaranth/hdl/_ir.py +++ b/amaranth/hdl/_ir.py @@ -1,6 +1,7 @@ from typing import Tuple from collections import defaultdict, OrderedDict from functools import reduce +import enum import warnings from .._utils import flatten, memoize @@ -8,7 +9,10 @@ from .. import tracer, _unused from . import _ast, _cd, _ir, _nir -__all__ = ["UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance"] +__all__ = [ + "UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance", + "PortDirection", "Design", "build_netlist", +] class UnusedElaboratable(_unused.UnusedMustUse): @@ -65,7 +69,6 @@ class Fragment: obj = new_obj def __init__(self, *, src_loc=None): - self.ports = _ast.SignalDict() self.drivers = OrderedDict() self.statements = {} self.domains = OrderedDict() @@ -76,19 +79,6 @@ class Fragment: self.src_loc = src_loc self.origins = None - def add_ports(self, *ports, dir): - assert dir in ("i", "o", "io") - for port in flatten(ports): - self.ports[port] = dir - - def iter_ports(self, dir=None): - if dir is None: - yield from self.ports - else: - for port, port_dir in self.ports.items(): - if port_dir == dir: - yield port - def add_driver(self, signal, domain="comb"): assert isinstance(domain, str) if domain not in self.drivers: @@ -111,18 +101,6 @@ class Fragment: for signal in signals: yield domain, signal - def iter_signals(self): - signals = _ast.SignalSet() - signals |= self.ports.keys() - for domain, domain_signals in self.drivers.items(): - if domain != "comb": - cd = self.domains[domain] - signals.add(cd.clk) - if cd.rst is not None: - signals.add(cd.rst) - signals |= domain_signals - return signals - def add_domains(self, *domains): for domain in flatten(domains): assert isinstance(domain, _cd.ClockDomain) @@ -169,7 +147,6 @@ class Fragment: # Merge subfragment's everything except clock domains into this fragment. # Flattening is done after clock domain propagation, so we can assume the domains # are already the same in every involved fragment in the first place. - self.ports.update(subfragment.ports) for domain, signal in subfragment.iter_drivers(): self.add_driver(signal, domain) for domain, statements in subfragment.statements.items(): @@ -372,277 +349,78 @@ class Fragment: self._propagate_domains_down() return new_domains - def _prepare_use_def_graph(self, parent, level, uses, defs, ios, top): - from ._mem import MemoryInstance - - def add_uses(*sigs, self=self): - for sig in flatten(sigs): - if sig not in uses: - uses[sig] = set() - uses[sig].add(self) - - def add_defs(*sigs): - for sig in flatten(sigs): - if sig not in defs: - defs[sig] = self + def _prepare_ports(self, ports): + # Normalize ports to a list. + new_ports = [] + if isinstance(ports, dict): + for port_name, (signal, dir) in ports.items(): + new_ports.append((port_name, signal, dir)) + elif isinstance(ports, (list, tuple)): + for port in ports: + if isinstance(port, tuple): + port_name, signal, dir = port + new_ports.append((port_name, signal, dir)) else: - assert defs[sig] is self + new_ports.append((None, port, None)) + else: + msg = f"`ports` must be a dict, a list or a tuple, not {ports!r}" + if isinstance(ports, _ast.Value): + msg += " (did you mean `ports=(,)`, rather than `ports=`?)" + raise TypeError(msg) - def add_io(*sigs): - for sig in flatten(sigs): - if sig not in ios: - ios[sig] = self + # Validate ports. + prenamed_ports = set() + for (port_name, signal, dir) in new_ports: + if isinstance(port_name, str): + if port_name in prenamed_ports: + raise TypeError(f"Duplicate port name {port_name!r}") else: - assert ios[sig] is self + prenamed_ports.add(port_name) + elif port_name is not None: + raise TypeError(f"Port name must be a string, not {port_name!r}") + if dir is not None and not isinstance(dir, PortDirection): + raise TypeError( + f"Port direction must be a `PortDirection` instance or None, not {dir!r}") + if not isinstance(signal, (_ast.Signal, _ast.ClockSignal, _ast.ResetSignal)): + raise TypeError(f"Only signals may be added as ports, not {signal!r}") - # Collect all signals we're driving (on LHS of statements), and signals we're using - # (on RHS of statements, or in clock domains). - for stmts in self.statements.values(): - for stmt in stmts: - add_uses(stmt._rhs_signals()) - add_defs(stmt._lhs_signals()) + return new_ports - for domain, _ in self.iter_sync(): - cd = self.domains[domain] - add_uses(cd.clk) - if cd.rst is not None: - add_uses(cd.rst) - - # Repeat for subfragments. - for subfrag, name, src_loc in self.subfragments: - if isinstance(subfrag, Instance): - for port_name, (value, dir) in subfrag.named_ports.items(): - if dir == "i": - # Prioritize defs over uses. - rhs_without_outputs = value._rhs_signals() - subfrag.iter_ports(dir="o") - subfrag.add_ports(rhs_without_outputs, dir=dir) - add_uses(value._rhs_signals()) - if dir == "o": - subfrag.add_ports(value._lhs_signals(), dir=dir) - add_defs(value._lhs_signals()) - if dir == "io": - subfrag.add_ports(value._lhs_signals(), dir=dir) - add_io(value._lhs_signals()) - elif isinstance(subfrag, MemoryInstance): - for port in subfrag._read_ports: - subfrag.add_ports(port._data._lhs_signals(), dir="o") - add_defs(port._data._lhs_signals()) - for value in [port._addr, port._en]: - subfrag.add_ports(value._rhs_signals(), dir="i") - add_uses(value._rhs_signals()) - for port in subfrag._write_ports: - for value in [port._addr, port._en, port._data]: - subfrag.add_ports(value._rhs_signals(), dir="i") - add_uses(value._rhs_signals()) - for domain, _ in subfrag.iter_sync(): - cd = subfrag.domains[domain] - add_uses(cd.clk) - if cd.rst is not None: - add_uses(cd.rst) - else: - parent[subfrag] = self - level [subfrag] = level[self] + 1 - - subfrag._prepare_use_def_graph(parent, level, uses, defs, ios, top) - - def _propagate_ports(self, ports, all_undef_as_ports): - # Take this fragment graph: - # - # __ B (def: q, use: p r) - # / - # A (def: p, use: q r) - # \ - # \_ C (def: r, use: p q) - # - # We need to consider three cases. - # 1. Signal p requires an input port in B; - # 2. Signal r requires an output port in C; - # 3. Signal r requires an output port in C and an input port in B. - # - # Adding these ports can be in general done in three steps for each signal: - # 1. Find the least common ancestor of all uses and defs. - # 2. Going upwards from the single def, add output ports. - # 3. Going upwards from all uses, add input ports. - - parent = {self: None} - level = {self: 0} - uses = _ast.SignalDict() - defs = _ast.SignalDict() - ios = _ast.SignalDict() - self._prepare_use_def_graph(parent, level, uses, defs, ios, self) - - ports = _ast.SignalSet(ports) - if all_undef_as_ports: - for sig in uses: - if sig in defs: - continue - ports.add(sig) - for sig in ports: - if sig not in uses: - uses[sig] = set() - uses[sig].add(self) - - @memoize - def lca_of(fragu, fragv): - # Normalize fragu to be deeper than fragv. - if level[fragu] < level[fragv]: - fragu, fragv = fragv, fragu - # Find ancestor of fragu on the same level as fragv. - for _ in range(level[fragu] - level[fragv]): - fragu = parent[fragu] - # If fragv was the ancestor of fragv, we're done. - if fragu == fragv: - return fragu - # Otherwise, they are at the same level but in different branches. Step both fragu - # and fragv until we find the common ancestor. - while parent[fragu] != parent[fragv]: - fragu = parent[fragu] - fragv = parent[fragv] - return parent[fragu] - - for sig in uses: - if sig in defs: - lca = reduce(lca_of, uses[sig], defs[sig]) - else: - lca = reduce(lca_of, uses[sig]) - - for frag in uses[sig]: - if sig in defs and frag is defs[sig]: - continue - while frag != lca: - frag.add_ports(sig, dir="i") - frag = parent[frag] - - if sig in defs: - frag = defs[sig] - while frag != lca: - frag.add_ports(sig, dir="o") - frag = parent[frag] - - for sig in ios: - frag = ios[sig] - while frag is not None: - frag.add_ports(sig, dir="io") - frag = parent[frag] - - for sig in ports: - if sig in ios: - continue - if sig in defs: - self.add_ports(sig, dir="o") - else: - self.add_ports(sig, dir="i") - - def prepare(self, ports=None, missing_domain=lambda name: _cd.ClockDomain(name)): + def prepare(self, ports=(), *, hierarchy=("top",), legalize_assignments=False, missing_domain=lambda name: _cd.ClockDomain(name)): from ._xfrm import DomainLowerer + ports = self._prepare_ports(ports) + new_domains = self._propagate_domains(missing_domain) + for domain in new_domains: + ports.append((None, domain.clk, PortDirection.Input)) + if domain.rst is not None: + ports.append((None, domain.rst, PortDirection.Input)) + + def resolve_signal(signal): + if isinstance(signal, _ast.ClockSignal): + domain = self.domains[signal.domain] + return domain.clk + elif isinstance(signal, _ast.ResetSignal): + domain = self.domains[signal.domain] + if domain.rst is None: + raise ValueError(f"Using ResetSignal for a reset-less domain {signal.domain}") + return domain.rst + else: + return signal + + ports = [ + (name, resolve_signal(signal), dir) + for name, signal, dir in ports + ] + fragment = DomainLowerer()(self) - if ports is None: - fragment._propagate_ports(ports=(), all_undef_as_ports=True) - else: - if not isinstance(ports, tuple) and not isinstance(ports, list): - msg = "`ports` must be either a list or a tuple, not {!r}"\ - .format(ports) - if isinstance(ports, _ast.Value): - msg += " (did you mean `ports=(,)`, rather than `ports=`?)" - raise TypeError(msg) - mapped_ports = [] - # Lower late bound signals like ClockSignal() to ports. - port_lowerer = DomainLowerer(fragment.domains) - for port in ports: - if not isinstance(port, (_ast.Signal, _ast.ClockSignal, _ast.ResetSignal)): - raise TypeError("Only signals may be added as ports, not {!r}" - .format(port)) - mapped_ports.append(port_lowerer.on_value(port)) - # Add ports for all newly created missing clock domains, since not doing so defeats - # the purpose of domain auto-creation. (It's possible to refer to these ports before - # the domain actually exists through late binding, but it's inconvenient.) - for cd in new_domains: - mapped_ports.append(cd.clk) - if cd.rst is not None: - mapped_ports.append(cd.rst) - fragment._propagate_ports(ports=mapped_ports, all_undef_as_ports=False) - return fragment + if legalize_assignments: + from ._xfrm import AssignmentLegalizer + fragment = AssignmentLegalizer()(fragment) - def _assign_names_to_signals(self): - """Assign names to signals used in this fragment. - - Returns - ------- - SignalDict of Signal to str - A mapping from signals used in this fragment to their local names. Because names are - deduplicated using local information only, the same signal used in a different fragment - may get a different name. - """ - - signal_names = _ast.SignalDict() - assigned_names = set() - - def add_signal_name(signal): - if signal not in signal_names: - if signal.name not in assigned_names: - name = signal.name - else: - name = f"{signal.name}${len(assigned_names)}" - assert name not in assigned_names - signal_names[signal] = name - assigned_names.add(name) - - for port in self.ports.keys(): - add_signal_name(port) - - for domain_name, domain_signals in self.drivers.items(): - if domain_name != "comb": - domain = self.domains[domain_name] - add_signal_name(domain.clk) - if domain.rst is not None: - add_signal_name(domain.rst) - - for statements in self.statements.values(): - for statement in statements: - for signal in statement._lhs_signals() | statement._rhs_signals(): - if not isinstance(signal, (_ast.ClockSignal, _ast.ResetSignal)): - add_signal_name(signal) - - return signal_names - - def _assign_names_to_fragments(self, hierarchy=("top",), *, _names=None): - """Assign names to this fragment and its subfragments. - - Subfragments may not necessarily have a name. This method assigns every such subfragment - a name, ``U$``, where ```` is based on its location in the hierarchy. - - Subfragment names may collide with signal names safely in Amaranth, but this may confuse - backends. This method assigns every such subfragment a name, ``$U$``, where - ``name`` is its original name, and ```` is based on its location in the hierarchy. - - Arguments - --------- - hierarchy : tuple of str - Name of this fragment. - - Returns - ------- - dict of Fragment to tuple of str - A mapping from this fragment and its subfragments to their full hierarchical names. - """ - - if _names is None: - _names = dict() - _names[self] = hierarchy - - signal_names = set(self._assign_names_to_signals().values()) - for subfragment_index, (subfragment, subfragment_name, subfragment_src_loc) in enumerate(self.subfragments): - if subfragment_name is None: - subfragment_name = f"U${subfragment_index}" - elif subfragment_name in signal_names: - subfragment_name = f"{subfragment_name}$U${subfragment_index}" - assert subfragment_name not in signal_names - subfragment._assign_names_to_fragments(hierarchy=(*hierarchy, subfragment_name), - _names=_names) - - return _names + # Create design and let it do the rest. + return Design(fragment, ports, hierarchy=hierarchy) class Instance(Fragment): @@ -682,9 +460,118 @@ class Instance(Fragment): .format(kw, arg)) +class Design: + """Represents a design ready for simulation or netlist building. + + Returned by ``Fragment.prepare``.""" + + def __init__(self, fragment, ports, *, hierarchy): + self.fragment = fragment + self.ports = ports + self.hierarchy = hierarchy + # dict of Fragment to SignalDict of Signal to name + self.signal_names = {} + self.fragment_names = {} + self._assign_names_to_signals(fragment, ports) + self._assign_names_to_fragments(fragment, hierarchy) + # Use just-assigned signal names to name all unnamed ports. + top_names = self.signal_names[fragment] + self.ports = [ + (name or top_names[signal], signal, dir) + for (name, signal, dir) in self.ports + ] + + def _assign_names_to_signals(self, fragment, ports=None): + """Assign names to signals used in a given fragment. + + The mapping is set in ``self.signal_names``. Because names are deduplicated using local + information only, the same signal used in a different fragment may get a different name. + """ + + signal_names = _ast.SignalDict() + assigned_names = set() + + def add_signal_name(signal): + if signal not in signal_names: + if signal.name not in assigned_names: + name = signal.name + else: + name = f"{signal.name}${len(assigned_names)}" + assert name not in assigned_names + signal_names[signal] = name + assigned_names.add(name) + + if ports is not None: + # First pass: reserve names for pre-named top-level ports. If equal to the signal name, let the signal share it. + for name, signal, _dir in ports: + if name is not None: + assigned_names.add(name) + if signal.name == name: + signal_names[signal] = name + + # Second pass: ensure non-pre-named top-level ports are named first. + for name, signal, _dir in ports: + if name is None: + add_signal_name(signal) + + for domain_name, domain_signals in fragment.drivers.items(): + if domain_name != "comb": + domain = fragment.domains[domain_name] + add_signal_name(domain.clk) + if domain.rst is not None: + add_signal_name(domain.rst) + + for statements in fragment.statements.values(): + for statement in statements: + for signal in statement._lhs_signals() | statement._rhs_signals(): + if not isinstance(signal, (_ast.ClockSignal, _ast.ResetSignal)): + add_signal_name(signal) + + self.signal_names[fragment] = signal_names + for subfragment, _name, _src_loc in fragment.subfragments: + self._assign_names_to_signals(subfragment) + + def _assign_names_to_fragments(self, fragment, hierarchy): + """Assign names to this fragment and its subfragments. + + Subfragments may not necessarily have a name. This method assigns every such subfragment + a name, ``U$``, where ```` is based on its location in the hierarchy. + + Subfragment names may collide with signal names safely in Amaranth, but this may confuse + backends. This method assigns every such subfragment a name, ``$U$``, where + ``name`` is its original name, and ```` is based on its location in the hierarchy. + + Arguments + --------- + hierarchy : tuple of str + Name of this fragment. + + Returns + ------- + dict of Fragment to tuple of str + A mapping from this fragment and its subfragments to their full hierarchical names. + """ + self.fragment_names[fragment] = hierarchy + + signal_names = set(self.signal_names[fragment].values()) + for subfragment_index, (subfragment, subfragment_name, subfragment_src_loc) in enumerate(fragment.subfragments): + if subfragment_name is None: + subfragment_name = f"U${subfragment_index}" + elif subfragment_name in signal_names: + subfragment_name = f"{subfragment_name}$U${subfragment_index}" + assert subfragment_name not in signal_names + self._assign_names_to_fragments(subfragment, hierarchy=(*hierarchy, subfragment_name)) + + ############################################################################################### >:3 +class PortDirection(enum.Enum): + Input = "input" + Output = "output" + Inout = "inout" + + class NetlistDriver: def __init__(self, module_idx: int, signal: _ast.Signal, domain: '_cd.ClockDomain | None', *, src_loc): @@ -710,9 +597,9 @@ class NetlistDriver: class NetlistEmitter: - def __init__(self, netlist: _nir.Netlist, fragment_names: 'dict[_ir.Fragment, str]'): + def __init__(self, netlist: _nir.Netlist, design): self.netlist = netlist - self.fragment_names = fragment_names + self.design = design self.drivers = _ast.SignalDict() self.rhs_cache: dict[int, Tuple[_nir.Value, bool, _ast.Value]] = {} @@ -1155,30 +1042,54 @@ class NetlistEmitter: self.connect(port_conn, _nir.Value(output_nets[start_bit:start_bit + len(port_conn)]), src_loc=instance.src_loc) - def emit_top_ports(self, fragment: _ir.Fragment, signal_names: _ast.SignalDict): + def emit_top_ports(self, fragment: _ir.Fragment): + inouts = set() + for cell in self.netlist.cells: + if isinstance(cell, _nir.IOBuffer): + inouts.update(cell.pad) + if isinstance(cell, _nir.Instance): + for value in cell.ports_io.values(): + inouts.update(value) + next_input_bit = 2 # 0 and 1 are reserved for constants top = self.netlist.top - for signal, dir in fragment.ports.items(): - assert signal not in self.netlist.signals - name = signal_names[signal] - if dir == 'i': + + for name, signal, dir in self.design.ports: + signal_value = self.emit_signal(signal) + if dir is None: + is_driven = False + is_inout = False + for net in signal_value: + if net in self.netlist.connections: + is_driven = True + if net in inouts: + is_inout = True + if is_driven: + dir = PortDirection.Output + elif is_inout: + dir = PortDirection.Inout + else: + dir = PortDirection.Input + if dir == PortDirection.Input: top.ports_i[name] = (next_input_bit, signal.width) - nets = _nir.Value( + value = _nir.Value( _nir.Net.from_cell(0, bit) for bit in range(next_input_bit, next_input_bit + signal.width) ) next_input_bit += signal.width - self.netlist.signals[signal] = nets - elif dir == 'o': - top.ports_o[name] = self.emit_signal(signal) - elif dir == 'io': + self.connect(signal_value, value, src_loc=signal.src_loc) + elif dir == PortDirection.Output: + top.ports_o[name] = signal_value + elif dir == PortDirection.Inout: top.ports_io[name] = (next_input_bit, signal.width) - nets = _nir.Value( + value = _nir.Value( _nir.Net.from_cell(0, bit) for bit in range(next_input_bit, next_input_bit + signal.width) ) next_input_bit += signal.width - self.netlist.signals[signal] = nets + self.connect(signal_value, value, src_loc=signal.src_loc) + else: + raise ValueError(f"Invalid port direction {dir!r}") def emit_drivers(self): for driver in self.drivers.values(): @@ -1205,6 +1116,7 @@ class NetlistEmitter: src_loc = driver.signal.src_loc self.connect(self.emit_signal(driver.signal), value, src_loc=src_loc) + def emit_undriven(self): # Connect all undriven signal bits to their initial values. This can only happen for entirely # undriven signals, or signals that are partially driven by instances. for signal, value in self.netlist.signals.items(): @@ -1215,7 +1127,7 @@ class NetlistEmitter: def emit_fragment(self, fragment: _ir.Fragment, parent_module_idx: 'int | None', *, cell_src_loc=None): from . import _mem - fragment_name = self.fragment_names[fragment] + fragment_name = self.design.fragment_names[fragment] if isinstance(fragment, _ir.Instance): assert parent_module_idx is not None if fragment.type == "$tribuf": @@ -1232,10 +1144,8 @@ class NetlistEmitter: self.emit_read_port(parent_module_idx, fragment, port, memory, write_ports) elif type(fragment) is _ir.Fragment: module_idx = self.netlist.add_module(parent_module_idx, fragment_name, src_loc=fragment.src_loc, cell_src_loc=cell_src_loc) - signal_names = fragment._assign_names_to_signals() + signal_names = self.design.signal_names[fragment] self.netlist.modules[module_idx].signal_names = signal_names - if parent_module_idx is None: - self.emit_top_ports(fragment, signal_names) for signal in signal_names: self.emit_signal(signal) for domain, stmts in fragment.statements.items(): @@ -1245,13 +1155,14 @@ class NetlistEmitter: self.emit_fragment(subfragment, module_idx, cell_src_loc=sub_src_loc) if parent_module_idx is None: self.emit_drivers() + self.emit_top_ports(fragment) + self.emit_undriven() else: assert False # :nocov: -def _emit_netlist(netlist: _nir.Netlist, fragment, hierarchy): - fragment_names = fragment._assign_names_to_fragments(hierarchy) - NetlistEmitter(netlist, fragment_names).emit_fragment(fragment, None) +def _emit_netlist(netlist: _nir.Netlist, design): + NetlistEmitter(netlist, design).emit_fragment(design.fragment, None) def _compute_net_flows(netlist: _nir.Netlist): @@ -1260,39 +1171,39 @@ def _compute_net_flows(netlist: _nir.Netlist): # The rules for net flows are as follows: # # - the modules that have a given net in their net_flow form a subtree of the hierarchy - # - INTERNAL is used in the root of the subtree and nowhere else - # - OUTPUT is used for modules that contain the definition of the net, or are on the + # - Internal is used in the root of the subtree and nowhere else + # - Output is used for modules that contain the definition of the net, or are on the # path from the definition to the root - # - remaining modules have a flow of INPUT (unless the net is a top-level inout port, - # in which case it is INOUT) + # - remaining modules have a flow of Input (unless the net is a top-level inout port, + # in which case it is Inout) # # In other words, the tree looks something like this: # # - [no flow] <<< top # - [no flow] - # - INTERNAL - # - INPUT << use + # - Internal + # - Input << use # - [no flow] - # - INPUT - # - INPUT << use - # - OUTPUT - # - INPUT << use + # - Input + # - Input << use + # - Output + # - Input << use # - [no flow] - # - OUTPUT << def - # - INPUT - # - INPUT + # - Output << def + # - Input + # - Input # - [no flow] # - [no flow] # - [no flow] # - # This function doesn't assign the INOUT flow — that is corrected later, in compute_ports. + # This function doesn't assign the Inout flow — that is corrected later, in compute_ports. lca = {} # Initialize by marking the definition point of every net. for cell_idx, cell in enumerate(netlist.cells): for net in cell.output_nets(cell_idx): lca[net] = cell.module_idx - netlist.modules[cell.module_idx].net_flow[net] = _nir.ModuleNetFlow.INTERNAL + netlist.modules[cell.module_idx].net_flow[net] = _nir.ModuleNetFlow.Internal # Marks a use of a net within a given module, and adjusts its netflows in all modules # as required. @@ -1309,7 +1220,7 @@ def _compute_net_flows(netlist: _nir.Netlist): def_module = lca[net] # While def_module deeper than use_module, go up with def_module. while len(modules[def_module].name) > len(modules[use_module].name): - modules[def_module].net_flow[net] = _nir.ModuleNetFlow.OUTPUT + modules[def_module].net_flow[net] = _nir.ModuleNetFlow.Output def_module = modules[def_module].parent # While use_module deeper than def_module, go up with use_module. # If use_module is below def_module in the hierarchy, we may hit @@ -1318,19 +1229,19 @@ def _compute_net_flows(netlist: _nir.Netlist): while len(modules[def_module].name) < len(modules[use_module].name): if net in modules[use_module].net_flow: return - modules[use_module].net_flow[net] = _nir.ModuleNetFlow.INPUT + modules[use_module].net_flow[net] = _nir.ModuleNetFlow.Input use_module = modules[use_module].parent # Now both pointers should be at the same depth within the hierarchy. assert len(modules[def_module].name) == len(modules[use_module].name) # Move both pointers up until they meet. while def_module != use_module: - modules[def_module].net_flow[net] = _nir.ModuleNetFlow.OUTPUT + modules[def_module].net_flow[net] = _nir.ModuleNetFlow.Output def_module = modules[def_module].parent - modules[use_module].net_flow[net] = _nir.ModuleNetFlow.INPUT + modules[use_module].net_flow[net] = _nir.ModuleNetFlow.Input use_module = modules[use_module].parent assert len(modules[def_module].name) == len(modules[use_module].name) # And mark the new LCA. - modules[def_module].net_flow[net] = _nir.ModuleNetFlow.INTERNAL + modules[def_module].net_flow[net] = _nir.ModuleNetFlow.Internal lca[net] = def_module # Now mark all uses and flesh out the structure. @@ -1373,14 +1284,17 @@ def _compute_ports(netlist: _nir.Netlist): if value not in name_table and not name.startswith('$'): name_table[value] = name + # Adjust any input flows to inout as necessary. + for (net, flow) in module.net_flow.items(): + if flow == _nir.ModuleNetFlow.Input and net in inouts: + module.net_flow[net] = _nir.ModuleNetFlow.Inout + # Gather together "adjacent" nets with the same flow into ports. visited = set() for net in sorted(module.net_flow): flow = module.net_flow[net] - if flow == _nir.ModuleNetFlow.INTERNAL: + if flow == _nir.ModuleNetFlow.Internal: continue - if flow == _nir.ModuleNetFlow.INPUT and net in inouts: - flow = module.net_flow[net] = _nir.ModuleNetFlow.INOUT if net in visited: continue # We found a net that needs a port. Keep joining the next nets output by the same @@ -1412,23 +1326,21 @@ def _compute_ports(netlist: _nir.Netlist): for name, (start, width) in netlist.top.ports_i.items(): top_module.ports[name] = ( _nir.Value(_nir.Net.from_cell(0, start + bit) for bit in range(width)), - _nir.ModuleNetFlow.INPUT + _nir.ModuleNetFlow.Input ) for name, (start, width) in netlist.top.ports_io.items(): top_module.ports[name] = ( _nir.Value(_nir.Net.from_cell(0, start + bit) for bit in range(width)), - _nir.ModuleNetFlow.INOUT + _nir.ModuleNetFlow.Inout ) for name, value in netlist.top.ports_o.items(): - top_module.ports[name] = (value, _nir.ModuleNetFlow.OUTPUT) + top_module.ports[name] = (value, _nir.ModuleNetFlow.Output) -def build_netlist(fragment, *, name="top"): - from ._xfrm import AssignmentLegalizer - - fragment = AssignmentLegalizer()(fragment) +def build_netlist(fragment, ports, *, name="top", **kwargs): + design = fragment.prepare(ports=ports, hierarchy=(name,), legalize_assignments=True, **kwargs) netlist = _nir.Netlist() - _emit_netlist(netlist, fragment, hierarchy=(name,)) + _emit_netlist(netlist, design) netlist.resolve_all_nets() _compute_net_flows(netlist) _compute_ports(netlist) diff --git a/amaranth/hdl/_nir.py b/amaranth/hdl/_nir.py index f81fa2f..9fa8e0e 100644 --- a/amaranth/hdl/_nir.py +++ b/amaranth/hdl/_nir.py @@ -199,7 +199,7 @@ class Netlist: self.signals[sig] = self.resolve_value(self.signals[sig]) def __repr__(self): - result = [] + result = ["("] for module_idx, module in enumerate(self.modules): name = " ".join(repr(name) for name in module.name) ports = " ".join( @@ -209,6 +209,7 @@ class Netlist: result.append(f"(module {module_idx} {module.parent} ({name}) {ports})") for cell_idx, cell in enumerate(self.cells): result.append(f"(cell {cell_idx} {cell.module_idx} {cell!r})") + result.append(")") return "\n".join(result) def add_module(self, parent, name: str, *, src_loc=None, cell_src_loc=None): @@ -251,21 +252,21 @@ class ModuleNetFlow(enum.Enum): #: The net is present in the module (used in the module or needs #: to be routed through it between its submodules), but is not #: present outside its subtree and thus is not a port of this module. - INTERNAL = "internal" + Internal = "internal" #: The net is present in the module, and is not driven from #: the module or any of its submodules. It is thus an input #: port of this module. - INPUT = "input" + Input = "input" #: The net is present in the module, is driven from the module or #: one of its submodules, and is also used outside of its subtree. #: It is thus an output port of this module. - OUTPUT = "output" + Output = "output" #: The net is a special top-level inout net that is used within #: this module or its submodules. It is an inout port of this module. - INOUT = "inout" + Inout = "inout" class Module: @@ -1039,7 +1040,7 @@ class Instance(Cell): items.append(f"(attr {name!r} {val!r})") for name, val in self.ports_i.items(): items.append(f"(input {name!r} {val})") - for name, (start, width) in self.ports_i.items(): + for name, (start, width) in self.ports_o.items(): items.append(f"(output {name!r} {start}:{start+width})") for name, val in self.ports_io.items(): items.append(f"(inout {name!r} {val})") diff --git a/amaranth/hdl/_xfrm.py b/amaranth/hdl/_xfrm.py index 054ff47..2dbd266 100644 --- a/amaranth/hdl/_xfrm.py +++ b/amaranth/hdl/_xfrm.py @@ -209,10 +209,6 @@ class FragmentTransformer: for subfragment, name, src_loc in fragment.subfragments: new_fragment.add_subfragment(self(subfragment), name, src_loc=src_loc) - def map_ports(self, fragment, new_fragment): - for port, dir in fragment.ports.items(): - new_fragment.add_ports(port, dir=dir) - def map_named_ports(self, fragment, new_fragment): if hasattr(self, "on_value"): for name, (value, dir) in fragment.named_ports.items(): @@ -285,7 +281,6 @@ class FragmentTransformer: new_fragment = Fragment(src_loc=fragment.src_loc) new_fragment.flatten = fragment.flatten new_fragment.attrs = OrderedDict(fragment.attrs) - self.map_ports(fragment, new_fragment) self.map_subfragments(fragment, new_fragment) self.map_domains(fragment, new_fragment) self.map_statements(fragment, new_fragment) diff --git a/amaranth/sim/core.py b/amaranth/sim/core.py index b757d1d..67003d1 100644 --- a/amaranth/sim/core.py +++ b/amaranth/sim/core.py @@ -70,8 +70,8 @@ class Simulator: "a simulation engine name" .format(engine)) - self._fragment = Fragment.get(fragment, platform=None).prepare() - self._engine = engine(self._fragment) + self._design = Fragment.get(fragment, platform=None).prepare() + self._engine = engine(self._design) self._clocked = set() def _check_process(self, process): @@ -165,15 +165,15 @@ class Simulator: in this case. """ if isinstance(domain, ClockDomain): - if (domain.name in self._fragment.domains and - domain is not self._fragment.domains[domain.name]): + if (domain.name in self._design.fragment.domains and + domain is not self._design.fragment.domains[domain.name]): warnings.warn("Adding a clock process that drives a clock domain object " "named {!r}, which is distinct from an identically named domain " "in the simulated design" .format(domain.name), UserWarning, stacklevel=2) - elif domain in self._fragment.domains: - domain = self._fragment.domains[domain] + elif domain in self._design.fragment.domains: + domain = self._design.fragment.domains[domain] elif if_exists: return else: diff --git a/amaranth/sim/pysim.py b/amaranth/sim/pysim.py index 4802d11..cdd248f 100644 --- a/amaranth/sim/pysim.py +++ b/amaranth/sim/pysim.py @@ -36,7 +36,7 @@ class _VCDWriter: else: raise NotImplementedError - def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()): + def __init__(self, design, *, vcd_file, gtkw_file=None, traces=()): # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that # the simulator is still usable if it's not installed for some reason. import vcd, vcd.gtkw @@ -65,14 +65,14 @@ class _VCDWriter: signal_names = SignalDict() memories = {} - for subfragment, subfragment_name in \ - fragment._assign_names_to_fragments(hierarchy=("bench", "top",)).items(): - for signal, signal_name in subfragment._assign_names_to_signals().items(): + for fragment, fragment_name in design.fragment_names.items(): + fragment_name = ("bench", *fragment_name) + for signal, signal_name in design.signal_names[fragment].items(): if signal not in signal_names: signal_names[signal] = set() - signal_names[signal].add((*subfragment_name, signal_name)) - if isinstance(subfragment, MemoryInstance): - memories[subfragment._identity] = (subfragment, subfragment_name) + signal_names[signal].add((*fragment_name, signal_name)) + if isinstance(fragment, MemoryInstance): + memories[fragment._identity] = (fragment, fragment_name) trace_names = SignalDict() assigned_names = set() @@ -403,16 +403,16 @@ class _PySimulation(BaseSimulation): class PySimEngine(BaseEngine): - def __init__(self, fragment): + def __init__(self, design): self._state = _PySimulation() self._timeline = self._state.timeline - self._fragment = fragment - self._processes = _FragmentCompiler(self._state)(self._fragment) + self._design = design + self._processes = _FragmentCompiler(self._state)(self._design.fragment) self._vcd_writers = [] def add_coroutine_process(self, process, *, default_cmd): - self._processes.add(PyCoroProcess(self._state, self._fragment.domains, process, + self._processes.add(PyCoroProcess(self._state, self._design.fragment.domains, process, default_cmd=default_cmd)) def add_clock_process(self, clock, *, phase, period): @@ -462,7 +462,7 @@ class PySimEngine(BaseEngine): @contextmanager def write_vcd(self, *, vcd_file, gtkw_file, traces): - vcd_writer = _VCDWriter(self._fragment, + vcd_writer = _VCDWriter(self._design, vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces) try: self._vcd_writers.append(vcd_writer) diff --git a/tests/test_hdl_ir.py b/tests/test_hdl_ir.py index 2e9dabe..94251b6 100644 --- a/tests/test_hdl_ir.py +++ b/tests/test_hdl_ir.py @@ -88,275 +88,295 @@ class FragmentPortsTestCase(FHDLTestCase): def test_empty(self): f = Fragment() - self.assertEqual(list(f.iter_ports()), []) + nl = build_netlist(f, ports=[]) + self.assertRepr(nl, """ + ( + (module 0 None ('top')) + (cell 0 0 (top )) + ) + """) - f._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f.ports, SignalDict([])) - - def test_iter_signals(self): - f = Fragment() - f.add_ports(self.s1, self.s2, dir="io") - self.assertEqual(SignalSet((self.s1, self.s2)), f.iter_signals()) - - def test_self_contained(self): + def test_loopback(self): f = Fragment() f.add_statements( "comb", self.c1.eq(self.s1), - self.s1.eq(self.c1) ) + nl = build_netlist(f, ports=[self.c1, self.s1]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') (input 's1' 0.2) (output 'c1' 0.2)) + (cell 0 0 (top (output 'c1' 0.2) (input 's1' 2:3))) + ) + """) - f._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f.ports, SignalDict([])) + def test_subfragment_simple(self): + f1 = Fragment() + f2 = Fragment() + f2.add_statements( + "comb", + self.c1.eq(~self.s1), + ) + f1.add_subfragment(f2, "f2") + nl = build_netlist(f1, ports=[self.c1, self.s1]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') (input 's1' 0.2) (output 'c1' 1.0)) + (module 1 0 ('top' 'f2') (input 's1' 0.2) (output 'c1' 1.0)) + (cell 0 0 (top (output 'c1' 1.0) (input 's1' 2:3))) + (cell 1 1 (~ 0.2)) + ) + """) - def test_infer_input(self): + def test_tree(self): f = Fragment() - f.add_statements( + f1 = Fragment() + f.add_subfragment(f1, "f1") + f11 = Fragment() + f1.add_subfragment(f11, "f11") + f111 = Fragment() + f11.add_subfragment(f111, "f111") + f1111 = Fragment() + f111.add_subfragment(f1111, "f1111") + f12 = Fragment() + f1.add_subfragment(f12, "f12") + f13 = Fragment() + f1.add_subfragment(f13, "f13") + f131 = Fragment() + f13.add_subfragment(f131, "f131") + f2 = Fragment() + f.add_subfragment(f2, "f2") + f2.add_statements( "comb", - self.c1.eq(self.s1) + self.s2.eq(~self.s1), ) + f131.add_statements( + "comb", + self.s3.eq(~self.s2), + Assert(~self.s1), + ) + f12.add_statements( + "comb", + self.c1.eq(~self.s3), + ) + f1111.add_statements( + "comb", + self.c2.eq(~self.s3), + Assert(self.s1), + ) + f111.add_statements( + "comb", + self.c3.eq(~self.c2), + ) + nl = build_netlist(f, ports=[self.c1, self.c2, self.c3, self.s1]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') + (input 's1' 0.2) + (output 'c1' 5.0) + (output 'c2' 2.0) + (output 'c3' 1.0)) + (module 1 0 ('top' 'f1') + (input 'port$0$2' 0.2) + (output 'port$1$0' 1.0) + (output 'port$2$0' 2.0) + (output 'port$5$0' 5.0) + (input 'port$10$0' 10.0)) + (module 2 1 ('top' 'f1' 'f11') + (input 'port$0$2' 0.2) + (output 'port$1$0' 1.0) + (output 'port$2$0' 2.0) + (input 'port$6$0' 6.0)) + (module 3 2 ('top' 'f1' 'f11' 'f111') + (input 'port$0$2' 0.2) + (output 'c3' 1.0) + (output 'c2' 2.0) + (input 'port$6$0' 6.0)) + (module 4 3 ('top' 'f1' 'f11' 'f111' 'f1111') + (input 's1' 0.2) + (output 'c2' 2.0) + (input 's3' 6.0)) + (module 5 1 ('top' 'f1' 'f12') + (output 'c1' 5.0) + (input 's3' 6.0)) + (module 6 1 ('top' 'f1' 'f13') + (input 'port$0$2' 0.2) + (output 'port$6$0' 6.0) + (input 'port$10$0' 10.0)) + (module 7 6 ('top' 'f1' 'f13' 'f131') + (input 's1' 0.2) + (output 's3' 6.0) + (input 's2' 10.0)) + (module 8 0 ('top' 'f2') + (input 's1' 0.2) + (output 's2' 10.0)) + (cell 0 0 (top (output 'c1' 5.0) (output 'c2' 2.0) (output 'c3' 1.0) (input 's1' 2:3))) + (cell 1 3 (~ 2.0)) + (cell 2 4 (~ 6.0)) + (cell 3 4 (assignment_list 1'd0 (1 0:1 1'd1))) + (cell 4 4 (assert None 0.2 3.0)) + (cell 5 5 (~ 6.0)) + (cell 6 7 (~ 10.0)) + (cell 7 7 (~ 0.2)) + (cell 8 7 (assignment_list 1'd0 (1 0:1 1'd1))) + (cell 9 7 (assert None 7.0 8.0)) + (cell 10 8 (~ 0.2)) + ) + """) - f._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f.ports, SignalDict([ - (self.s1, "i") - ])) - - def test_request_output(self): + def test_port_dict(self): f = Fragment() - f.add_statements( - "comb", - self.c1.eq(self.s1) + nl = build_netlist(f, ports={ + "a": (self.s1, PortDirection.Output), + "b": (self.s2, PortDirection.Input), + "c": (self.s3, PortDirection.Inout), + }) + self.assertRepr(nl, """ + ( + (module 0 None ('top') (input 'b' 0.2) (inout 'c' 0.3) (output 'a' 1'd0)) + (cell 0 0 (top (output 'a' 1'd0) (input 'b' 2:3) (inout 'c' 3:4))) ) + """) - f._propagate_ports(ports=(self.c1,), all_undef_as_ports=True) - self.assertEqual(f.ports, SignalDict([ - (self.s1, "i"), - (self.c1, "o") - ])) - - def test_input_in_subfragment(self): - f1 = Fragment() - f1.add_statements( - "comb", - self.c1.eq(self.s1) - ) - f2 = Fragment() - f2.add_statements( - "comb", - self.s1.eq(0) - ) - f1.add_subfragment(f2) - f1._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f1.ports, SignalDict()) - self.assertEqual(f2.ports, SignalDict([ - (self.s1, "o"), - ])) - - def test_input_only_in_subfragment(self): - f1 = Fragment() - f2 = Fragment() - f2.add_statements( - "comb", - self.c1.eq(self.s1) - ) - f1.add_subfragment(f2) - f1._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f1.ports, SignalDict([ - (self.s1, "i"), - ])) - self.assertEqual(f2.ports, SignalDict([ - (self.s1, "i"), - ])) - - def test_output_from_subfragment(self): - f1 = Fragment() - f1.add_statements( - "comb", - self.c1.eq(0) - ) - f2 = Fragment() - f2.add_statements( - "comb", - self.c2.eq(1) - ) - f1.add_subfragment(f2) - - f1._propagate_ports(ports=(self.c2,), all_undef_as_ports=True) - self.assertEqual(f1.ports, SignalDict([ - (self.c2, "o"), - ])) - self.assertEqual(f2.ports, SignalDict([ - (self.c2, "o"), - ])) - - def test_output_from_subfragment_2(self): - f1 = Fragment() - f1.add_statements( - "comb", - self.c1.eq(self.s1) - ) - f2 = Fragment() - f2.add_statements( - "comb", - self.c2.eq(self.s1) - ) - f1.add_subfragment(f2) - f3 = Fragment() - f3.add_statements( - "comb", - self.s1.eq(0) - ) - f2.add_subfragment(f3) - - f1._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f2.ports, SignalDict([ - (self.s1, "o"), - ])) - - def test_input_output_sibling(self): - f1 = Fragment() - f2 = Fragment() - f2.add_statements( - "comb", - self.c1.eq(self.c2) - ) - f1.add_subfragment(f2) - f3 = Fragment() - f3.add_statements( - "comb", - self.c2.eq(0) - ) - f3.add_driver(self.c2) - f1.add_subfragment(f3) - - f1._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f1.ports, SignalDict()) - - def test_output_input_sibling(self): - f1 = Fragment() - f2 = Fragment() - f2.add_statements( - "comb", - self.c2.eq(0) - ) - f2.add_driver(self.c2) - f1.add_subfragment(f2) - f3 = Fragment() - f3.add_statements( - "comb", - self.c1.eq(self.c2) - ) - f1.add_subfragment(f3) - - f1._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f1.ports, SignalDict()) - - def test_input_cd(self): - sync = ClockDomain() + def test_port_domain(self): f = Fragment() - f.add_statements( - "sync", - self.c1.eq(self.s1) + cd_sync = ClockDomain() + ctr = Signal(4) + f.add_domains(cd_sync) + f.add_driver(ctr, "sync") + f.add_statements("sync", ctr.eq(ctr + 1)) + nl = build_netlist(f, ports=[ + ClockSignal("sync"), + ResetSignal("sync"), + ctr, + ]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') (input 'clk' 0.2) (input 'rst' 0.3) (output 'ctr' 5.0:4)) + (cell 0 0 (top (output 'ctr' 5.0:4) (input 'clk' 2:3) (input 'rst' 3:4))) + (cell 1 0 (+ (cat 5.0:4 1'd0) 5'd1)) + (cell 2 0 (matches 0.3 1)) + (cell 3 0 (priority_match 1 2.0)) + (cell 4 0 (assignment_list 5.0:4 (1 0:4 1.0:4) (3.0 0:4 4'd0))) + (cell 5 0 (flipflop 4.0:4 0 pos 0.2 0)) ) - f.add_domains(sync) - f.add_driver(self.c1, "sync") + """) - f._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f.ports, SignalDict([ - (self.s1, "i"), - (sync.clk, "i"), - (sync.rst, "i"), - ])) - - def test_input_cd_reset_less(self): - sync = ClockDomain(reset_less=True) + def test_port_autodomain(self): f = Fragment() - f.add_statements( - "sync", - self.c1.eq(self.s1) + ctr = Signal(4) + f.add_driver(ctr, "sync") + f.add_statements("sync", ctr.eq(ctr + 1)) + nl = build_netlist(f, ports=[ctr]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') (input 'clk' 0.2) (input 'rst' 0.3) (output 'ctr' 5.0:4)) + (cell 0 0 (top (output 'ctr' 5.0:4) (input 'clk' 2:3) (input 'rst' 3:4))) + (cell 1 0 (+ (cat 5.0:4 1'd0) 5'd1)) + (cell 2 0 (matches 0.3 1)) + (cell 3 0 (priority_match 1 2.0)) + (cell 4 0 (assignment_list 5.0:4 (1 0:4 1.0:4) (3.0 0:4 4'd0))) + (cell 5 0 (flipflop 4.0:4 0 pos 0.2 0)) ) - f.add_domains(sync) - f.add_driver(self.c1, "sync") + """) - f._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f.ports, SignalDict([ - (self.s1, "i"), - (sync.clk, "i"), - ])) - - def test_inout(self): - s = Signal() + def test_port_partial(self): + f = Fragment() f1 = Fragment() - f2 = Instance("foo", io_x=s) - f1.add_subfragment(f2) + f.add_subfragment(f1, "f1") + a = Signal(4) + b = Signal(4) + c = Signal(3) + f1.add_driver(c) + f1.add_statements("comb", c.eq((a * b).shift_right(4))) + nl = build_netlist(f, ports=[a, b, c]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') + (input 'a' 0.2:6) + (input 'b' 0.6:10) + (output 'c' 1.4:7)) + (module 1 0 ('top' 'f1') + (input 'a' 0.2:6) + (input 'b' 0.6:10) + (output 'c' 1.4:7)) + (cell 0 0 (top + (output 'c' 1.4:7) + (input 'a' 2:6) + (input 'b' 6:10))) + (cell 1 1 (* (cat 0.2:6 4'd0) (cat 0.6:10 4'd0))) + ) + """) - f1._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f1.ports, SignalDict([ - (s, "io") - ])) - def test_in_out_same_signal(self): - s = Signal() - - f1 = Instance("foo", i_x=s, o_y=s) - f2 = Fragment() - f2.add_subfragment(f1) - - f2._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f1.ports, SignalDict([ - (s, "o") - ])) - - f3 = Instance("foo", o_y=s, i_x=s) - f4 = Fragment() - f4.add_subfragment(f3) - - f4._propagate_ports(ports=(), all_undef_as_ports=True) - self.assertEqual(f3.ports, SignalDict([ - (s, "o") - ])) - - def test_clk_rst(self): - sync = ClockDomain() + def test_port_instance(self): f = Fragment() - f.add_domains(sync) - - f = f.prepare(ports=(ClockSignal("sync"), ResetSignal("sync"))) - self.assertEqual(f.ports, SignalDict([ - (sync.clk, "i"), - (sync.rst, "i"), - ])) + f1 = Fragment() + f.add_subfragment(f1, "f1") + a = Signal(4) + b = Signal(4) + c = Signal(4) + d = Signal(4) + f1.add_subfragment(Instance("t", + p_p = "meow", + a_a = True, + i_aa=a, + io_bb=b, + o_cc=c, + o_dd=d, + ), "i") + nl = build_netlist(f, ports=[a, b, c, d]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') + (input 'a' 0.2:6) + (inout 'b' 0.6:10) + (output 'c' 1.0:4) + (output 'd' 1.4:8)) + (module 1 0 ('top' 'f1') + (input 'port$0$2' 0.2:6) + (inout 'port$0$6' 0.6:10) + (output 'port$1$0' 1.0:4) + (output 'port$1$4' 1.4:8)) + (cell 0 0 (top + (output 'c' 1.0:4) + (output 'd' 1.4:8) + (input 'a' 2:6) + (inout 'b' 6:10))) + (cell 1 1 (instance 't' 'i' + (param 'p' 'meow') + (attr 'a' True) + (input 'aa' 0.2:6) + (output 'cc' 0:4) + (output 'dd' 4:8) + (inout 'bb' 0.6:10))) + ) + """) def test_port_wrong(self): f = Fragment() + a = Signal() with self.assertRaisesRegex(TypeError, r"^Only signals may be added as ports, not \(const 1'd1\)$"): - f.prepare(ports=(Const(1),)) + build_netlist(f, ports=(Const(1),)) + with self.assertRaisesRegex(TypeError, + r"^Port name must be a string, not 1$"): + build_netlist(f, ports={1: (a, PortDirection.Input)}) + with self.assertRaisesRegex(TypeError, + r"^Port direction must be a `PortDirection` instance or None, not 'i'$"): + build_netlist(f, ports={"a": (a, "i")}) def test_port_not_iterable(self): f = Fragment() with self.assertRaisesRegex(TypeError, - r"^`ports` must be either a list or a tuple, not 1$"): - f.prepare(ports=1) + r"^`ports` must be a dict, a list or a tuple, not 1$"): + build_netlist(f, ports=1) with self.assertRaisesRegex(TypeError, - (r"^`ports` must be either a list or a tuple, not \(const 1'd1\)" + (r"^`ports` must be a dict, a list or a tuple, not \(const 1'd1\)" r" \(did you mean `ports=\(,\)`, rather than `ports=`\?\)$")): - f.prepare(ports=Const(1)) + build_netlist(f, ports=Const(1)) class FragmentDomainsTestCase(FHDLTestCase): - def test_iter_signals(self): - cd1 = ClockDomain() - cd2 = ClockDomain(reset_less=True) - s1 = Signal() - s2 = Signal() - - f = Fragment() - f.add_domains(cd1, cd2) - f.add_driver(s1, "cd1") - self.assertEqual(SignalSet((cd1.clk, cd1.rst, s1)), f.iter_signals()) - f.add_driver(s2, "cd2") - self.assertEqual(SignalSet((cd1.clk, cd1.rst, cd2.clk, s1, s2)), f.iter_signals()) - def test_propagate_up(self): cd = ClockDomain() @@ -810,49 +830,17 @@ class InstanceTestCase(FHDLTestCase): self.assertEqual(f.type, "cpu") self.assertEqual(f.parameters, OrderedDict([("RESET", 0x1234)])) self.assertEqual(list(f.named_ports.keys()), ["clk", "rst", "stb", "data", "pins"]) - self.assertEqual(f.ports, SignalDict([])) - - def test_prepare(self): - self.setUp_cpu() - f = self.wrap.prepare() - sync_clk = f.domains["sync"].clk - self.assertEqual(f.ports, SignalDict([ - (sync_clk, "i"), - (self.rst, "i"), - (self.pins, "io"), - ])) - - def test_prepare_explicit_ports(self): - self.setUp_cpu() - f = self.wrap.prepare(ports=[self.rst, self.stb]) - sync_clk = f.domains["sync"].clk - sync_rst = f.domains["sync"].rst - self.assertEqual(f.ports, SignalDict([ - (sync_clk, "i"), - (sync_rst, "i"), - (self.rst, "i"), - (self.stb, "o"), - (self.pins, "io"), - ])) - - def test_prepare_slice_in_port(self): - s = Signal(2) - f = Fragment() - f.add_subfragment(Instance("foo", o_O=s[0])) - f.add_subfragment(Instance("foo", o_O=s[1])) - fp = f.prepare(ports=[s], missing_domain=lambda name: None) - self.assertEqual(fp.ports, SignalDict([ - (s, "o"), - ])) def test_prepare_attrs(self): self.setUp_cpu() self.inst.attrs["ATTR"] = 1 - f = self.inst.prepare() - self.assertEqual(f.attrs, OrderedDict([ + design = self.inst.prepare() + self.assertEqual(design.fragment.attrs, OrderedDict([ ("ATTR", 1), ])) + +class NamesTestCase(FHDLTestCase): def test_assign_names_to_signals(self): i = Signal() rst = Signal() @@ -864,8 +852,6 @@ class InstanceTestCase(FHDLTestCase): f = Fragment() f.add_domains(cd_sync := ClockDomain()) f.add_domains(cd_sync_norst := ClockDomain(reset_less=True)) - f.add_ports((i, rst), dir="i") - f.add_ports((o1, o2, o3), dir="o") f.add_statements("comb", [o1.eq(0)]) f.add_driver(o1, domain="comb") f.add_statements("sync", [o2.eq(i1)]) @@ -873,8 +859,15 @@ class InstanceTestCase(FHDLTestCase): f.add_statements("sync_norst", [o3.eq(i1)]) f.add_driver(o3, domain="sync_norst") - names = f._assign_names_to_signals() - self.assertEqual(names, SignalDict([ + ports = { + "i": (i, PortDirection.Input), + "rst": (rst, PortDirection.Input), + "o1": (o1, PortDirection.Output), + "o2": (o2, PortDirection.Output), + "o3": (o3, PortDirection.Output), + } + design = f.prepare(ports) + self.assertEqual(design.signal_names[design.fragment], SignalDict([ (i, "i"), (rst, "rst"), (o1, "o1"), @@ -891,8 +884,8 @@ class InstanceTestCase(FHDLTestCase): f.add_subfragment(a := Fragment()) f.add_subfragment(b := Fragment(), name="b") - names = f._assign_names_to_fragments() - self.assertEqual(names, { + design = Design(f, ports=(), hierarchy=("top",)) + self.assertEqual(design.fragment_names, { f: ("top",), a: ("top", "U$0"), b: ("top", "b") @@ -903,8 +896,8 @@ class InstanceTestCase(FHDLTestCase): f.add_subfragment(a := Fragment()) f.add_subfragment(b := Fragment(), name="b") - names = f._assign_names_to_fragments(hierarchy=("bench", "cpu")) - self.assertEqual(names, { + design = Design(f, ports=[], hierarchy=("bench", "cpu")) + self.assertEqual(design.fragment_names, { f: ("bench", "cpu",), a: ("bench", "cpu", "U$0"), b: ("bench", "cpu", "b") @@ -913,10 +906,10 @@ class InstanceTestCase(FHDLTestCase): def test_assign_names_to_fragments_collide_with_signal(self): f = Fragment() f.add_subfragment(a_f := Fragment(), name="a") - f.add_ports((a_s := Signal(name="a"),), dir="o") + a_s = Signal(name="a") - names = f._assign_names_to_fragments() - self.assertEqual(names, { + design = Design(f, ports=[("a", a_s, None)], hierarchy=("top",)) + self.assertEqual(design.fragment_names, { f: ("top",), a_f: ("top", "a$U$0") })