From 744576011f5e183d5de412d3bfb3c12733ae7cdf Mon Sep 17 00:00:00 2001 From: Wanda Date: Fri, 15 Mar 2024 06:37:17 +0100 Subject: [PATCH] Implement RFC 53: Low-level I/O primitives. Co-authored-by: Catherine Co-authored-by: mcclure --- amaranth/back/rtlil.py | 85 +++++-- amaranth/build/plat.py | 4 +- amaranth/build/res.py | 43 ++-- amaranth/hdl/__init__.py | 8 +- amaranth/hdl/_ast.py | 189 +++++++++++++- amaranth/hdl/_ir.py | 522 ++++++++++++++++++++++++++++----------- amaranth/hdl/_nir.py | 212 ++++++++++++---- amaranth/hdl/_xfrm.py | 35 ++- amaranth/sim/_pyrtl.py | 4 +- amaranth/sim/pysim.py | 6 +- docs/changes.rst | 8 +- docs/guide.rst | 151 +++++++++-- docs/reference.rst | 2 +- tests/test_build_res.py | 16 +- tests/test_hdl_ast.py | 108 +++++++- tests/test_hdl_ir.py | 407 ++++++++++++++++++++---------- 16 files changed, 1364 insertions(+), 436 deletions(-) diff --git a/amaranth/back/rtlil.py b/amaranth/back/rtlil.py index 2b2a823..edc1dbf 100644 --- a/amaranth/back/rtlil.py +++ b/amaranth/back/rtlil.py @@ -272,7 +272,7 @@ class MemoryInfo: class ModuleEmitter: - def __init__(self, builder, netlist, module, name_map, empty_checker): + def __init__(self, builder, netlist: _nir.Netlist, module: _nir.Module, name_map, empty_checker): self.builder = builder self.netlist = netlist self.module = module @@ -293,6 +293,7 @@ class ModuleEmitter: self.sigport_wires = {} # signal or port name -> (wire, value) self.driven_sigports = set() # set of signal or port name self.nets = {} # net -> (wire name, bit idx) + self.ionets = {} # ionet -> (wire name, bit idx) self.cell_wires = {} # cell idx -> wire name self.instance_wires = {} # (cell idx, output name) -> wire name @@ -302,6 +303,7 @@ class ModuleEmitter: self.collect_init_attrs() self.emit_signal_wires() self.emit_port_wires() + self.emit_io_port_wires() self.emit_cell_wires() self.emit_submodule_wires() self.emit_connects() @@ -406,11 +408,28 @@ class ModuleEmitter: self.sigport_wires[name] = (wire, value) if flow == _nir.ModuleNetFlow.Output: continue - # If we just emitted an input or inout port, it is driving the value. + # If we just emitted an input port, it is driving the value. self.driven_sigports.add(name) for bit, net in enumerate(value): self.nets[net] = (wire, bit) + def emit_io_port_wires(self): + for idx, (name, (value, dir)) in enumerate(self.module.io_ports.items()): + port_id = idx + len(self.module.ports) + if self.module.parent is None: + port = self.netlist.io_ports[value[0].port] + attrs = port.attrs + src_loc = port.src_loc + else: + attrs = {} + src_loc = None + wire = self.builder.wire(width=len(value), + port_id=port_id, port_kind=dir.value, + name=name, attrs=attrs, + src=_src(src_loc)) + for bit, net in enumerate(value): + self.ionets[net] = (wire, bit) + def emit_driven_wire(self, value): # Emits a wire for a value, in preparation for driving it. if value in self.value_names: @@ -454,7 +473,9 @@ class ModuleEmitter: elif isinstance(cell, _nir.Initial): width = 1 elif isinstance(cell, _nir.IOBuffer): - width = len(cell.pad) + if cell.dir is _nir.IODirection.Output: + continue # No outputs. + width = len(cell.port) else: assert False # :nocov: # Single output cell connected to a wire. @@ -503,6 +524,28 @@ class ModuleEmitter: return chunks[0] return "{ " + " ".join(reversed(chunks)) + " }" + def io_sigspec(self, value: _nir.IOValue): + chunks = [] + begin_pos = 0 + while begin_pos < len(value): + end_pos = begin_pos + wire, start_bit = self.ionets[value[begin_pos]] + bit = start_bit + while (end_pos < len(value) and + self.ionets[value[end_pos]] == (wire, bit)): + end_pos += 1 + bit += 1 + width = end_pos - begin_pos + if width == 1: + chunks.append(f"{wire} [{start_bit}]") + else: + chunks.append(f"{wire} [{start_bit + width - 1}:{start_bit}]") + begin_pos = end_pos + + if len(chunks) == 1: + return chunks[0] + return "{ " + " ".join(reversed(chunks)) + " }" + def emit_connects(self): for name, (wire, value) in self.sigport_wires.items(): if name not in self.driven_sigports: @@ -513,10 +556,13 @@ class ModuleEmitter: submodule = self.netlist.modules[submodule_idx] if not self.empty_checker.is_empty(submodule_idx): dotted_name = ".".join(submodule.name) - self.builder.cell(f"\\{dotted_name}", submodule.name[-1], ports={ - name: self.sigspec(value) - for name, (value, _flow) in submodule.ports.items() - }, src=_src(submodule.cell_src_loc)) + ports = {} + for name, (value, _flow) in submodule.ports.items(): + ports[name] = self.sigspec(value) + for name, (value, _dir) in submodule.io_ports.items(): + ports[name] = self.io_sigspec(value) + self.builder.cell(f"\\{dotted_name}", submodule.name[-1], ports=ports, + src=_src(submodule.cell_src_loc)) def emit_assignment_list(self, cell_idx, cell): def emit_assignments(case, cond): @@ -761,14 +807,19 @@ class ModuleEmitter: self.builder.cell(cell_type, ports=ports, params=params, src=_src(cell.src_loc)) def emit_io_buffer(self, cell_idx, cell): - self.builder.cell("$tribuf", ports={ - "Y": self.sigspec(cell.pad), - "A": self.sigspec(cell.o), - "EN": self.sigspec(cell.oe), - }, params={ - "WIDTH": len(cell.pad), - }, src=_src(cell.src_loc)) - self.builder.connect(self.cell_wires[cell_idx], self.sigspec(cell.pad)) + if cell.dir is not _nir.IODirection.Input: + if cell.dir is _nir.IODirection.Output and cell.oe == _nir.Net.from_const(1): + self.builder.connect(self.io_sigspec(cell.port), self.sigspec(cell.o)) + else: + self.builder.cell("$tribuf", ports={ + "Y": self.io_sigspec(cell.port), + "A": self.sigspec(cell.o), + "EN": self.sigspec(cell.oe), + }, params={ + "WIDTH": len(cell.port), + }, src=_src(cell.src_loc)) + if cell.dir is not _nir.IODirection.Output: + self.builder.connect(self.cell_wires[cell_idx], self.io_sigspec(cell.port)) def emit_memory(self, cell_idx, cell): memory_info = self.memories[cell_idx] @@ -950,8 +1001,8 @@ class ModuleEmitter: ports[name] = self.sigspec(nets) for name in cell.ports_o: ports[name] = self.instance_wires[cell_idx, name] - for name, nets in cell.ports_io.items(): - ports[name] = self.sigspec(nets) + for name, (ionets, _dir) in cell.ports_io.items(): + ports[name] = self.io_sigspec(ionets) self.builder.cell(f"\\{cell.type}", cell.name, ports=ports, params=cell.parameters, attrs=cell.attributes, src=_src(cell.src_loc)) diff --git a/amaranth/build/plat.py b/amaranth/build/plat.py index c311257..cec85c3 100644 --- a/amaranth/build/plat.py +++ b/amaranth/build/plat.py @@ -222,7 +222,7 @@ class Platform(ResourceManager, metaclass=ABCMeta): m = Module() m.submodules += IOBufferInstance( - pad=port, + port=port, o=self._invert_if(invert, pin.o), oe=pin.oe, ) @@ -235,7 +235,7 @@ class Platform(ResourceManager, metaclass=ABCMeta): m = Module() i = Signal.like(pin.i) m.submodules += IOBufferInstance( - pad=port, + port=port, i=i, o=self._invert_if(invert, pin.o), oe=pin.oe, diff --git a/amaranth/build/res.py b/amaranth/build/res.py index 74bccea..3e1c054 100644 --- a/amaranth/build/res.py +++ b/amaranth/build/res.py @@ -14,6 +14,21 @@ class ResourceError(Exception): pass +class SingleEndedPort: + def __init__(self, io): + self.io = io + + +class DifferentialPort: + def __init__(self, p, n): + self.p = p + self.n = n + + +class PortGroup: + pass + + class ResourceManager: def __init__(self, resources, connectors): self.resources = OrderedDict() @@ -113,21 +128,13 @@ class ResourceManager: attrs[attr_key] = attr_value if isinstance(resource.ios[0], Subsignal): - members = OrderedDict() - sig_members = OrderedDict() + res = PortGroup() for sub in resource.ios: member = resolve(sub, dir[sub.name], xdr[sub.name], path=path + (sub.name,), attrs={**attrs, **sub.attrs}) - members[sub.name] = member - sig_members[sub.name] = wiring.Out(member.signature) - signature = wiring.Signature(sig_members) - # Provide members ourselves instead of having the constructor - # create ones for us. - intf = object.__new__(wiring.PureInterface) - intf.signature = signature - intf.__dict__.update(members) - return intf + setattr(res, sub.name, member) + return res elif isinstance(resource.ios[0], (Pins, DiffPairs)): phys = resource.ios[0] @@ -136,17 +143,21 @@ class ResourceManager: # ignore it as well. if isinstance(phys, Pins): phys_names = phys.names - port = wiring.Signature({"io": wiring.In(len(phys))}).create(path=path) + io = IOPort(len(phys), name="__".join(path) + "__io") + port = SingleEndedPort(io) if isinstance(phys, DiffPairs): phys_names = [] - sig_members = {} if not self.should_skip_port_component(None, attrs, "p"): + p = IOPort(len(phys), name="__".join(path) + "__p") phys_names += phys.p.names - sig_members["p"] = wiring.In(len(phys)) + else: + p = None if not self.should_skip_port_component(None, attrs, "n"): + n = IOPort(len(phys), name="__".join(path) + "__n") phys_names += phys.n.names - sig_members["n"] = wiring.In(len(phys)) - port = wiring.Signature(sig_members).create(path=path) + else: + n = None + port = DifferentialPort(p, n) if dir == "-": pin = None else: diff --git a/amaranth/hdl/__init__.py b/amaranth/hdl/__init__.py index 3c82528..de09d75 100644 --- a/amaranth/hdl/__init__.py +++ b/amaranth/hdl/__init__.py @@ -2,9 +2,11 @@ from ._ast import Shape, unsigned, signed, ShapeCastable, ShapeLike from ._ast import Value, ValueCastable, ValueLike from ._ast import Const, C, Mux, Cat, Array, Signal, ClockSignal, ResetSignal from ._ast import Format, Print, Assert, Assume, Cover +from ._ast import IOValue, IOPort from ._dsl import SyntaxError, SyntaxWarning, Module from ._cd import DomainError, ClockDomain -from ._ir import UnusedElaboratable, Elaboratable, DriverConflict, Fragment, Instance +from ._ir import UnusedElaboratable, Elaboratable, DriverConflict, Fragment +from ._ir import Instance, IOBufferInstance from ._mem import MemoryIdentity, MemoryInstance, Memory, ReadPort, WritePort, DummyPort from ._rec import Record from ._xfrm import DomainRenamer, ResetInserter, EnableInserter @@ -16,12 +18,14 @@ __all__ = [ "Value", "ValueCastable", "ValueLike", "Const", "C", "Mux", "Cat", "Array", "Signal", "ClockSignal", "ResetSignal", "Format", "Print", "Assert", "Assume", "Cover", + "IOValue", "IOPort", # _dsl "SyntaxError", "SyntaxWarning", "Module", # _cd "DomainError", "ClockDomain", # _ir - "UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance", + "UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", + "Instance", "IOBufferInstance", # _mem "MemoryIdentity", "MemoryInstance", "Memory", "ReadPort", "WritePort", "DummyPort", # _rec diff --git a/amaranth/hdl/_ast.py b/amaranth/hdl/_ast.py index aa2d46b..dc4d3d4 100644 --- a/amaranth/hdl/_ast.py +++ b/amaranth/hdl/_ast.py @@ -17,7 +17,7 @@ from .._unused import * __all__ = [ "Shape", "signed", "unsigned", "ShapeCastable", "ShapeLike", - "Value", "Const", "C", "AnyConst", "AnySeq", "Operator", "Mux", "Part", "Slice", "Cat", + "Value", "Const", "C", "AnyConst", "AnySeq", "Operator", "Mux", "Part", "Slice", "Cat", "Concat", "Array", "ArrayProxy", "Signal", "ClockSignal", "ResetSignal", "ValueCastable", "ValueLike", @@ -25,6 +25,7 @@ __all__ = [ "Format", "Statement", "Switch", "Property", "Assign", "Print", "Assert", "Assume", "Cover", + "IOValue", "IOPort", "IOConcat", "IOSlice", "SignalKey", "SignalDict", "SignalSet", ] @@ -1480,7 +1481,7 @@ class Const(Value, metaclass=_ConstMeta): obj = Value.cast(obj) if type(obj) is Const: return obj - elif type(obj) is Cat: + elif type(obj) is Concat: value = 0 width = 0 for part in obj.parts: @@ -1636,9 +1637,13 @@ def Mux(sel, val1, val0): @final class Slice(Value): def __init__(self, value, start, stop, *, src_loc_at=0): - if not isinstance(start, int): + try: + start = int(operator.index(start)) + except TypeError: raise TypeError(f"Slice start must be an integer, not {start!r}") - if not isinstance(stop, int): + try: + stop = int(operator.index(stop)) + except TypeError: raise TypeError(f"Slice stop must be an integer, not {stop!r}") value = Value.cast(value) @@ -1656,8 +1661,8 @@ class Slice(Value): super().__init__(src_loc_at=src_loc_at) self._value = value - self._start = int(operator.index(start)) - self._stop = int(operator.index(stop)) + self._start = start + self._stop = stop @property def value(self): @@ -1733,8 +1738,7 @@ class Part(Value): self.width, self.stride) -@final -class Cat(Value): +def Cat(*parts, src_loc_at=0): """Concatenate values. Form a compound ``Value`` from several smaller ones by concatenation. @@ -1758,10 +1762,19 @@ class Cat(Value): Value, inout Resulting ``Value`` obtained by concatenation. """ - def __init__(self, *args, src_loc_at=0): + parts = list(flatten(parts)) + if any(isinstance(part, IOValue) for part in parts): + return IOConcat(parts, src_loc_at=src_loc_at + 1) + else: + return Concat(parts, src_loc_at=src_loc_at + 1) + + +@final +class Concat(Value): + def __init__(self, args, src_loc_at=0): super().__init__(src_loc_at=src_loc_at) parts = [] - for index, arg in enumerate(flatten(args)): + for index, arg in enumerate(args): if isinstance(arg, Enum) and (not isinstance(type(arg), ShapeCastable) or not hasattr(arg, "_amaranth_shape_")): warnings.warn("Argument #{} of Cat() is an enumerated value {!r} without " @@ -2732,6 +2745,162 @@ class Switch(Statement): return "(switch {!r} {})".format(self.test, " ".join(case_reprs)) +class IOValue(metaclass=ABCMeta): + @staticmethod + def cast(obj): + if isinstance(obj, IOValue): + return obj + elif isinstance(obj, Value) and len(obj) == 0: + return IOConcat(()) + else: + raise TypeError(f"Object {obj!r} cannot be converted to an IO value") + + def __init__(self, *, src_loc_at=0): + self.src_loc = tracer.get_src_loc(1 + src_loc_at) + + @property + @abstractmethod + def metadata(self): + raise NotImplementedError # :nocov: + + def __getitem__(self, key): + n = len(self) + if isinstance(key, int): + if key not in range(-n, n): + raise IndexError(f"Index {key} is out of bounds for a {n}-bit IO value") + if key < 0: + key += n + return IOSlice(self, key, key + 1, src_loc_at=1) + elif isinstance(key, slice): + start, stop, step = key.indices(n) + if step != 1: + return IOConcat((self[i] for i in range(start, stop, step)), src_loc_at=1) + return IOSlice(self, start, stop, src_loc_at=1) + else: + raise TypeError(f"Cannot index IO value with {key!r}") + + @abstractmethod + def _ioports(self): + raise NotImplementedError # :nocov: + + +@final +class IOPort(IOValue): + def __init__(self, width, *, name=None, attrs=None, metadata=None, src_loc_at=0): + super().__init__(src_loc_at=src_loc_at) + + if name is not None and not isinstance(name, str): + raise TypeError(f"Name must be a string, not {name!r}") + self.name = name or tracer.get_var_name(depth=2 + src_loc_at) + + self._width = operator.index(width) + self._attrs = dict(() if attrs is None else attrs) + self._metadata = (None,) * self._width if metadata is None else tuple(metadata) + if len(self._metadata) != self._width: + raise ValueError(f"Metadata length ({len(self._metadata)}) doesn't match port width ({self._width})") + + def __len__(self): + return self._width + + @property + def width(self): + return self._width + + @property + def attrs(self): + return self._attrs + + @property + def metadata(self): + return self._metadata + + def _ioports(self): + return {self} + + def __repr__(self): + return f"(io-port {self.name})" + + +@final +class IOConcat(IOValue): + def __init__(self, parts, src_loc_at=0): + super().__init__(src_loc_at=src_loc_at) + self._parts = tuple(IOValue.cast(part) for part in parts) + + @property + def parts(self): + return self._parts + + def __len__(self): + return sum(len(part) for part in self.parts) + + @property + def metadata(self): + return tuple(obj for part in self._parts for obj in part.metadata) + + def _ioports(self): + return {port for part in self._parts for port in part._ioports()} + + def __repr__(self): + return "(io-cat {})".format(" ".join(map(repr, self.parts))) + + +@final +class IOSlice(IOValue): + def __init__(self, value, start, stop, *, src_loc_at=0): + try: + start = int(operator.index(start)) + except TypeError: + raise TypeError(f"Slice start must be an integer, not {start!r}") + try: + stop = int(operator.index(stop)) + except TypeError: + raise TypeError(f"Slice stop must be an integer, not {stop!r}") + + value = IOValue.cast(value) + n = len(value) + if start not in range(-n, n+1): + raise IndexError(f"Cannot start slice {start} bits into {n}-bit value") + if start < 0: + start += n + if stop not in range(-n, n+1): + raise IndexError(f"Cannot stop slice {stop} bits into {n}-bit value") + if stop < 0: + stop += n + if start > stop: + raise IndexError(f"Slice start {start} must be less than slice stop {stop}") + + super().__init__(src_loc_at=src_loc_at) + self._value = value + self._start = start + self._stop = stop + + @property + def value(self): + return self._value + + @property + def start(self): + return self._start + + @property + def stop(self): + return self._stop + + def __len__(self): + return self.stop - self.start + + @property + def metadata(self): + return self._value.metadata[self.start:self.stop] + + def _ioports(self): + return self.value._ioports() + + def __repr__(self): + return f"(io-slice {self.value!r} {self.start}:{self.stop})" + + class _MappedKeyCollection(metaclass=ABCMeta): @abstractmethod def _map_key(self, key): diff --git a/amaranth/hdl/_ir.py b/amaranth/hdl/_ir.py index 762683b..aa8f8a7 100644 --- a/amaranth/hdl/_ir.py +++ b/amaranth/hdl/_ir.py @@ -274,8 +274,8 @@ class Fragment: if dir is not None and not isinstance(dir, PortDirection): raise TypeError( f"Port direction must be a `PortDirection` instance or None, not {dir!r}") - if not isinstance(signal, (_ast.Signal, _ast.ClockSignal, _ast.ResetSignal)): - raise TypeError(f"Only signals may be added as ports, not {signal!r}") + if not isinstance(signal, (_ast.Signal, _ast.ClockSignal, _ast.ResetSignal, _ast.IOPort)): + raise TypeError(f"Only signals and IO ports may be added as ports, not {signal!r}") return new_ports @@ -328,7 +328,12 @@ class Instance(Fragment): elif kind == "p": self.parameters[name] = value elif kind in ("i", "o", "io"): - self.named_ports[name] = (_ast.Value.cast(value), kind) + if kind == "io": + value = _ast.IOValue.cast(value) + else: + if not isinstance(value, _ast.IOValue): + value = _ast.Value.cast(value) + self.named_ports[name] = (value, kind) else: raise NameError("Instance argument {!r} should be a tuple (kind, name, value) " "where kind is one of \"a\", \"p\", \"i\", \"o\", or \"io\"" @@ -340,11 +345,15 @@ class Instance(Fragment): elif kw.startswith("p_"): self.parameters[kw[2:]] = arg elif kw.startswith("i_"): - self.named_ports[kw[2:]] = (_ast.Value.cast(arg), "i") + if not isinstance(arg, _ast.IOValue): + arg = _ast.Value.cast(arg) + self.named_ports[kw[2:]] = (arg, "i") elif kw.startswith("o_"): - self.named_ports[kw[2:]] = (_ast.Value.cast(arg), "o") + if not isinstance(arg, _ast.IOValue): + arg = _ast.Value.cast(arg) + self.named_ports[kw[2:]] = (arg, "o") elif kw.startswith("io_"): - self.named_ports[kw[3:]] = (_ast.Value.cast(arg), "io") + self.named_ports[kw[3:]] = (_ast.IOValue.cast(arg), "io") else: raise NameError("Instance keyword argument {}={!r} does not start with one of " "\"a_\", \"p_\", \"i_\", \"o_\", or \"io_\"" @@ -352,33 +361,55 @@ class Instance(Fragment): class IOBufferInstance(Fragment): - def __init__(self, pad, *, i=None, o=None, oe=None, src_loc_at=0, src_loc=None): + def __init__(self, port, *, i=None, o=None, oe=None, src_loc_at=0, src_loc=None): super().__init__() - self.pad = _ast.Value.cast(pad) + self.port = _ast.IOValue.cast(port) if i is None: self.i = None else: self.i = _ast.Value.cast(i) - if len(self.pad) != len(self.i): - raise ValueError(f"`pad` length ({len(self.pad)}) doesn't match `i` length ({len(self.i)})") + if len(self.port) != len(self.i): + raise ValueError(f"'port' length ({len(self.port)}) doesn't match 'i' length ({len(self.i)})") if o is None: if oe is not None: - raise ValueError("`oe` must not be used if `o` is not used") - self.o = _ast.Const(0, len(self.pad)) - self.oe = _ast.Const(0) + raise ValueError("'oe' must not be used if 'o' is not used") + self.o = None + self.oe = None else: self.o = _ast.Value.cast(o) - if len(self.pad) != len(self.o): - raise ValueError(f"`pad` length ({len(self.pad)}) doesn't match `o` length ({len(self.o)})") + if len(self.port) != len(self.o): + raise ValueError(f"'port' length ({len(self.port)}) doesn't match 'o' length ({len(self.o)})") if oe is None: self.oe = _ast.Const(1) else: self.oe = _ast.Value.cast(oe) if len(self.oe) != 1: - raise ValueError(f"`oe` length ({len(self.oe)}) must be 1") + raise ValueError(f"'oe' length ({len(self.oe)}) must be 1") - self.src_loc = src_loc or tracer.get_src_loc(src_loc_at) + self.src_loc = src_loc or tracer.get_src_loc(src_loc_at) + + +def _add_name(assigned_names, name): + if name in assigned_names: + name = f"{name}${len(assigned_names)}" + assert name not in assigned_names + assigned_names.add(name) + return name + + +class DesignFragmentInfo: + def __init__(self, parent, depth): + self.parent = parent + self.depth = depth + self.signal_names = _ast.SignalDict() + self.io_port_names = {} + # Fixed up later. + self.name: "tuple[str]" = () + self.assigned_names = set() + # These two are used as sets, but are stored as dicts to ensure deterministic iteration order. + self.used_signals = _ast.SignalDict() + self.used_io_ports = {} class Design: @@ -386,103 +417,199 @@ class Design: Returned by ``Fragment.prepare``.""" - def __init__(self, fragment, ports, *, hierarchy): + def __init__(self, fragment: Fragment, ports, *, hierarchy): self.fragment = fragment - self.ports = ports + self.ports = list(ports) self.hierarchy = hierarchy - # dict of Fragment to SignalDict of Signal to name - self.signal_names = {} - self.fragment_names = {} - self._assign_names_to_signals(fragment, ports) - self._assign_names_to_fragments(fragment, hierarchy) - # Use just-assigned signal names to name all unnamed ports. - top_names = self.signal_names[fragment] - self.ports = [ - (name or top_names[signal], signal, dir) - for (name, signal, dir) in self.ports - ] + self.fragments: dict[Fragment, DesignFragmentInfo] = {} + self.signal_lca = _ast.SignalDict() + self._compute_fragment_depth_parent(fragment, None, 0) + self._collect_used_signals(fragment) + self._add_io_ports() + self._assign_port_names() + for name, conn, dir in self.ports: + if isinstance(conn, _ast.IOPort): + self._use_io_port(fragment, conn) + else: + self._use_signal(fragment, conn) + self._assign_names(fragment, hierarchy) - def _assign_names_to_signals(self, fragment, ports=None): - """Assign names to signals used in a given fragment. - - The mapping is set in ``self.signal_names``. Because names are deduplicated using local - information only, the same signal used in a different fragment may get a different name. - """ - - signal_names = _ast.SignalDict() - assigned_names = set() - - def add_signal_name(signal): - if signal not in signal_names: - if signal.name not in assigned_names: - name = signal.name - else: - name = f"{signal.name}${len(assigned_names)}" - assert name not in assigned_names - signal_names[signal] = name - assigned_names.add(name) - - if ports is not None: - # First pass: reserve names for pre-named top-level ports. If equal to the signal name, let the signal share it. - for name, signal, _dir in ports: - if name is not None: - assigned_names.add(name) - if signal.name == name: - signal_names[signal] = name - - # Second pass: ensure non-pre-named top-level ports are named first. - for name, signal, _dir in ports: - if name is None: - add_signal_name(signal) - - for domain_name, domain_signals in fragment.drivers.items(): - if domain_name != "comb": - domain = fragment.domains[domain_name] - add_signal_name(domain.clk) - if domain.rst is not None: - add_signal_name(domain.rst) - - for statements in fragment.statements.values(): - for statement in statements: - for signal in statement._lhs_signals() | statement._rhs_signals(): - if not isinstance(signal, (_ast.ClockSignal, _ast.ResetSignal)): - add_signal_name(signal) - - self.signal_names[fragment] = signal_names + def _compute_fragment_depth_parent(self, fragment: Fragment, parent: "Fragment | None", depth: int): + """Recursively computes every fragment's depth and parent.""" + self.fragments[fragment] = DesignFragmentInfo(parent, depth) for subfragment, _name, _src_loc in fragment.subfragments: - self._assign_names_to_signals(subfragment) + self._compute_fragment_depth_parent(subfragment, fragment, depth + 1) - def _assign_names_to_fragments(self, fragment, hierarchy): - """Assign names to this fragment and its subfragments. + def _use_signal(self, fragment: Fragment, signal: _ast.Signal): + """Marks a signal as used in a given fragment. + + Also marks a signal as used if it has to be routed through a given fragment to get from + one part of hierarchy to another. For this purpose, the ``self.signal_lca`` dictionary + is maintained: for every signal, it stores the topmost fragment in which it has been + marked used so far. + """ + if signal in self.fragments[fragment].used_signals: + return + self.fragments[fragment].used_signals[signal] = None + if signal not in self.signal_lca: + # First time we see a signal. + self.signal_lca[signal] = fragment + return + # Signal already seen — go from current fragment to the LCA, marking everything along + # the way as used. + lca = self.signal_lca[signal] + # First, go up from our fragment until it is no deeper than current LCA. + while self.fragments[lca].depth < self.fragments[fragment].depth: + fragment = self.fragments[fragment].parent + # Early return if we reach a part of tree where the signal is already marked. + if signal in self.fragments[fragment].used_signals: + return + self.fragments[fragment].used_signals[signal] = None + # Second, go up from current LCA until it is no deeper than our fragment. + while self.fragments[lca].depth > self.fragments[fragment].depth: + lca = self.fragments[lca].parent + self.fragments[lca].used_signals[signal] = None + # Now, both fragments are at the same depth. Go up from both until the two paths meet. + while fragment is not lca: + lca = self.fragments[lca].parent + self.fragments[lca].used_signals[signal] = None + fragment = self.fragments[fragment].parent + self.fragments[fragment].used_signals[signal] = None + self.signal_lca[signal] = lca + + def _use_io_port(self, fragment: Fragment, port: _ast.IOPort): + """Marks an IO port as used in a given fragment and all its ancestors.""" + frag_info = self.fragments[fragment] + if port in frag_info.used_io_ports: + return + frag_info.used_io_ports[port] = None + if frag_info.parent is not None: + self._use_io_port(frag_info.parent, port) + + def _collect_used_signals(self, fragment: Fragment): + """Collects used signals and IO ports for a fragment and all its subfragments.""" + from . import _mem + if isinstance(fragment, _ir.Instance): + for conn, kind in fragment.named_ports.values(): + if isinstance(conn, _ast.IOValue): + for port in conn._ioports(): + self._use_io_port(fragment, port) + elif isinstance(conn, _ast.Value): + for signal in conn._rhs_signals(): + self._use_signal(fragment, signal) + else: + assert False # :nocov: + elif isinstance(fragment, _ir.IOBufferInstance): + for port in fragment.port._ioports(): + self._use_io_port(fragment, port) + if fragment.i is not None: + for signal in fragment.i._rhs_signals(): + self._use_signal(fragment, signal) + if fragment.o is not None: + for signal in fragment.o._rhs_signals(): + self._use_signal(fragment, signal) + for signal in fragment.oe._rhs_signals(): + self._use_signal(fragment, signal) + elif isinstance(fragment, _mem.MemoryInstance): + for port in fragment._read_ports: + for signal in port._addr._rhs_signals(): + self._use_signal(fragment, signal) + for signal in port._data._rhs_signals(): + self._use_signal(fragment, signal) + for signal in port._en._rhs_signals(): + self._use_signal(fragment, signal) + if port._domain != "comb": + domain = fragment.domains[port._domain] + self._use_signal(fragment, domain.clk) + if domain.rst is not None: + self._use_signal(fragment, domain.rst) + for port in fragment._write_ports: + for signal in port._addr._rhs_signals(): + self._use_signal(fragment, signal) + for signal in port._data._rhs_signals(): + self._use_signal(fragment, signal) + for signal in port._en._rhs_signals(): + self._use_signal(fragment, signal) + domain = fragment.domains[port._domain] + self._use_signal(fragment, domain.clk) + if domain.rst is not None: + self._use_signal(fragment, domain.rst) + else: + for domain_name, statements in fragment.statements.items(): + if domain_name != "comb": + domain = fragment.domains[domain_name] + self._use_signal(fragment, domain.clk) + if domain.rst is not None: + self._use_signal(fragment, domain.rst) + for statement in statements: + for signal in statement._lhs_signals() | statement._rhs_signals(): + if not isinstance(signal, (_ast.ClockSignal, _ast.ResetSignal)): + self._use_signal(fragment, signal) + for subfragment, _name, _src_loc in fragment.subfragments: + self._collect_used_signals(subfragment) + + def _add_io_ports(self): + """Adds all used IO ports to our list of top-level ports, if they aren't there already.""" + io_ports = {conn for name, conn, dir in self.ports if isinstance(conn, _ast.IOPort)} + for port in self.fragments[self.fragment].used_io_ports: + if port not in io_ports: + self.ports.append((None, port, None)) + + def _assign_port_names(self): + """Assigns names to all ports that haven't been explicitly named.""" + new_ports = [] + assigned_names = {name for name, conn, dir in self.ports if name is not None} + for name, conn, dir in self.ports: + if name is None: + name = _add_name(assigned_names, conn.name) + assigned_names.add(name) + new_ports.append((name, conn, dir)) + self.ports = new_ports + + def _assign_names(self, fragment: Fragment, hierarchy: "tuple[str]"): + """Assign names to signals and IO ports used in a given fragment, as well as its + subfragments. + + The signal mapping is set in ``self.signal_names``, and the IO port mapping is set in + ``self.io_port_names``. Because names are deduplicated using local information only, + the same signal used in a different fragment may get a different name. Subfragments may not necessarily have a name. This method assigns every such subfragment a name, ``U$``, where ```` is based on its location in the hierarchy. Subfragment names may collide with signal names safely in Amaranth, but this may confuse - backends. This method assigns every such subfragment a name, ``$U$``, where - ``name`` is its original name, and ```` is based on its location in the hierarchy. + backends. This method assigns every such subfragment a new name. Arguments --------- hierarchy : tuple of str Name of this fragment. - - Returns - ------- - dict of Fragment to tuple of str - A mapping from this fragment and its subfragments to their full hierarchical names. """ - self.fragment_names[fragment] = hierarchy - taken_names = set(self.signal_names[fragment].values()) + frag_info = self.fragments[fragment] + frag_info.name = hierarchy + + if fragment is self.fragment: + # Reserve names for top-level ports. If equal to the signal name, let the signal share it. + for name, conn, _dir in self.ports: + frag_info.assigned_names.add(name) + if isinstance(conn, _ast.Signal) and conn.name == name: + frag_info.signal_names[conn] = name + elif isinstance(conn, _ast.IOPort) and conn.name == name: + frag_info.io_port_names[conn] = name + + for signal in frag_info.used_signals: + if signal not in frag_info.signal_names: + frag_info.signal_names[signal] = _add_name(frag_info.assigned_names, signal.name) + for port in frag_info.used_io_ports: + if port not in frag_info.io_port_names: + frag_info.io_port_names[port] = _add_name(frag_info.assigned_names, port.name) + for subfragment_index, (subfragment, subfragment_name, subfragment_src_loc) in enumerate(fragment.subfragments): if subfragment_name is None: subfragment_name = f"U${subfragment_index}" - elif subfragment_name in taken_names: - subfragment_name = f"{subfragment_name}$U${subfragment_index}" - assert subfragment_name not in taken_names - taken_names.add(subfragment_name) - self._assign_names_to_fragments(subfragment, hierarchy=(*hierarchy, subfragment_name)) + subfragment_name = _add_name(frag_info.assigned_names, subfragment_name) + self._assign_names(subfragment, hierarchy=(*hierarchy, subfragment_name)) ############################################################################################### >:3 @@ -523,11 +650,13 @@ class NetlistEmitter: self.netlist = netlist self.design = design self.drivers = _ast.SignalDict() + self.io_ports: dict[_ast.IOPort, int] = {} self.rhs_cache: dict[int, Tuple[_nir.Value, bool, _ast.Value]] = {} # Collected for driver conflict diagnostics only. self.late_net_to_signal = {} self.connect_src_loc = {} + self.ionet_src_loc = {} def emit_signal(self, signal) -> _nir.Value: if signal in self.netlist.signals: @@ -538,11 +667,43 @@ class NetlistEmitter: self.late_net_to_signal[net] = (signal, bit) return value + def emit_io(self, value: _ast.IOValue) -> _nir.IOValue: + if isinstance(value, _ast.IOPort): + if value not in self.io_ports: + port = len(self.netlist.io_ports) + self.netlist.io_ports.append(value) + self.io_ports[value] = _nir.IOValue( + _nir.IONet.from_port(port, bit) + for bit in range(0, len(value)) + ) + return self.io_ports[value] + elif isinstance(value, _ast.IOConcat): + result = [] + for part in value.parts: + result += self.emit_io(part) + return _nir.IOValue(result) + elif isinstance(value, _ast.IOSlice): + return self.emit_io(value.value)[value.start:value.stop] + else: + raise TypeError # :nocov: + + def emit_io_use(self, value: _ast.IOValue, *, src_loc) -> _nir.IOValue: + res = self.emit_io(value) + for net in res: + if net not in self.ionet_src_loc: + self.ionet_src_loc[net] = src_loc + else: + prev_src_loc = self.ionet_src_loc[net] + port = self.netlist.io_ports[net.port] + raise DriverConflict(f"Bit {net.bit} of I/O port {port!r} used twice, at " + f"{prev_src_loc[0]}:{prev_src_loc[1]} and {src_loc[0]}:{src_loc[1]}") + return res + # Used for instance outputs and read port data, not used for actual assignments. def emit_lhs(self, value: _ast.Value): if isinstance(value, _ast.Signal): return self.emit_signal(value) - elif isinstance(value, _ast.Cat): + elif isinstance(value, _ast.Concat): result = [] for part in value.parts: result += self.emit_lhs(part) @@ -739,7 +900,7 @@ class NetlistEmitter: src_loc=value.src_loc) result = self.netlist.add_value_cell(shape.width, cell) signed = shape.signed - elif isinstance(value, _ast.Cat): + elif isinstance(value, _ast.Concat): nets = [] for val in value.parts: inner, _signed = self.emit_rhs(module_idx, val) @@ -801,7 +962,7 @@ class NetlistEmitter: src_loc=src_loc)) elif isinstance(lhs, _ast.Slice): self.emit_assign(module_idx, cd, lhs.value, lhs_start + lhs.start, rhs, cond, src_loc=src_loc) - elif isinstance(lhs, _ast.Cat): + elif isinstance(lhs, _ast.Concat): part_stop = 0 for part in lhs.parts: part_start = part_stop @@ -958,12 +1119,21 @@ class NetlistEmitter: assert False # :nocov: def emit_iobuffer(self, module_idx: int, instance: _ir.IOBufferInstance): - pad = self.emit_lhs(instance.pad) - o, _signed = self.emit_rhs(module_idx, instance.o) - (oe,), _signed = self.emit_rhs(module_idx, instance.oe) - assert len(pad) == len(o) - cell = _nir.IOBuffer(module_idx, pad=pad, o=o, oe=oe, src_loc=instance.src_loc) - value = self.netlist.add_value_cell(len(pad), cell) + port = self.emit_io_use(instance.port, src_loc=instance.src_loc) + if instance.o is None: + o = None + oe = None + dir = _nir.IODirection.Input + else: + o, _signed = self.emit_rhs(module_idx, instance.o) + (oe,), _signed = self.emit_rhs(module_idx, instance.oe) + assert len(port) == len(o) + if instance.i is None: + dir = _nir.IODirection.Output + else: + dir = _nir.IODirection.Bidir + cell = _nir.IOBuffer(module_idx, port=port, dir=dir, o=o, oe=oe, src_loc=instance.src_loc) + value = self.netlist.add_value_cell(len(port), cell) if instance.i is not None: self.connect(self.emit_lhs(instance.i), value, src_loc=instance.src_loc) @@ -1032,15 +1202,23 @@ class NetlistEmitter: outputs = [] next_output_bit = 0 for port_name, (port_conn, dir) in instance.named_ports.items(): - if dir == 'i': + if isinstance(port_conn, _ast.IOValue): + if dir == 'i': + xlat_dir = _nir.IODirection.Input + elif dir == 'o': + xlat_dir = _nir.IODirection.Output + elif dir == 'io': + xlat_dir = _nir.IODirection.Bidir + else: + assert False # :nocov: + ports_io[port_name] = (self.emit_io_use(port_conn, src_loc=instance.src_loc), xlat_dir) + elif dir == 'i': ports_i[port_name], _signed = self.emit_rhs(module_idx, port_conn) elif dir == 'o': port_conn = self.emit_lhs(port_conn) ports_o[port_name] = (next_output_bit, len(port_conn)) outputs.append((next_output_bit, port_conn)) next_output_bit += len(port_conn) - elif dir == 'io': - ports_io[port_name] = self.emit_lhs(port_conn) else: assert False # :nocov: cell = _nir.Instance(module_idx, @@ -1059,31 +1237,20 @@ class NetlistEmitter: src_loc=instance.src_loc) def emit_top_ports(self, fragment: _ir.Fragment): - inouts = set() - for cell in self.netlist.cells: - if isinstance(cell, _nir.IOBuffer): - inouts.update(cell.pad) - if isinstance(cell, _nir.Instance): - for value in cell.ports_io.values(): - inouts.update(value) - next_input_bit = 2 # 0 and 1 are reserved for constants top = self.netlist.top for name, signal, dir in self.design.ports: + if isinstance(signal, _ast.IOPort): + continue signal_value = self.emit_signal(signal) if dir is None: is_driven = False - is_inout = False for net in signal_value: if net in self.netlist.connections: is_driven = True - if net in inouts: - is_inout = True if is_driven: dir = PortDirection.Output - elif is_inout: - dir = PortDirection.Inout else: dir = PortDirection.Input if dir == PortDirection.Input: @@ -1097,13 +1264,7 @@ class NetlistEmitter: elif dir == PortDirection.Output: top.ports_o[name] = signal_value elif dir == PortDirection.Inout: - top.ports_io[name] = (next_input_bit, signal.width) - value = _nir.Value( - _nir.Net.from_cell(0, bit) - for bit in range(next_input_bit, next_input_bit + signal.width) - ) - next_input_bit += signal.width - self.connect(signal_value, value, src_loc=signal.src_loc) + raise ValueError(f"Port direction 'Inout' can only be used for 'IOPort', not 'Signal'") else: raise ValueError(f"Invalid port direction {dir!r}") @@ -1159,7 +1320,7 @@ class NetlistEmitter: def emit_fragment(self, fragment: _ir.Fragment, parent_module_idx: 'int | None', *, cell_src_loc=None): from . import _mem - fragment_name = self.design.fragment_names[fragment] + fragment_name = self.design.fragments[fragment].name if isinstance(fragment, _ir.Instance): assert parent_module_idx is not None self.emit_instance(parent_module_idx, fragment, name=fragment_name[-1]) @@ -1176,10 +1337,14 @@ class NetlistEmitter: self.emit_iobuffer(parent_module_idx, fragment) elif type(fragment) is _ir.Fragment: module_idx = self.netlist.add_module(parent_module_idx, fragment_name, src_loc=fragment.src_loc, cell_src_loc=cell_src_loc) - signal_names = self.design.signal_names[fragment] + signal_names = self.design.fragments[fragment].signal_names self.netlist.modules[module_idx].signal_names = signal_names + io_port_names = self.design.fragments[fragment].io_port_names + self.netlist.modules[module_idx].io_port_names = io_port_names for signal in signal_names: self.emit_signal(signal) + for port in io_port_names: + self.emit_io(port) for domain, stmts in fragment.statements.items(): for stmt in stmts: self.emit_stmt(module_idx, fragment, domain, stmt, _nir.Net.from_const(1)) @@ -1227,8 +1392,6 @@ def _compute_net_flows(netlist: _nir.Netlist): # - [no flow] # - [no flow] # - [no flow] - # - # This function doesn't assign the Inout flow — that is corrected later, in compute_ports. lca = {} # Initialize by marking the definition point of every net. @@ -1293,20 +1456,11 @@ def _compute_ports(netlist: _nir.Netlist): port_starts = set() for start, _ in netlist.top.ports_i.values(): port_starts.add(_nir.Net.from_cell(0, start)) - for start, width in netlist.top.ports_io.values(): - port_starts.add(_nir.Net.from_cell(0, start)) for cell_idx, cell in enumerate(netlist.cells): if isinstance(cell, _nir.Instance): for start, _ in cell.ports_o.values(): port_starts.add(_nir.Net.from_cell(cell_idx, start)) - # Compute the set of all inout nets. Currently, a net has inout flow iff it is connected to - # a toplevel inout port. - inouts = set() - for start, width in netlist.top.ports_io.values(): - for idx in range(start, start + width): - inouts.add(_nir.Net.from_cell(0, idx)) - for module in netlist.modules: # Collect preferred names for ports. If a port exactly matches a signal, we reuse # the signal name for the port. Otherwise, we synthesize a private name. @@ -1316,11 +1470,6 @@ def _compute_ports(netlist: _nir.Netlist): if value not in name_table and not name.startswith('$'): name_table[value] = name - # Adjust any input flows to inout as necessary. - for (net, flow) in module.net_flow.items(): - if flow == _nir.ModuleNetFlow.Input and net in inouts: - module.net_flow[net] = _nir.ModuleNetFlow.Inout - # Gather together "adjacent" nets with the same flow into ports. visited = set() for net in sorted(module.net_flow): @@ -1360,15 +1509,88 @@ def _compute_ports(netlist: _nir.Netlist): _nir.Value(_nir.Net.from_cell(0, start + bit) for bit in range(width)), _nir.ModuleNetFlow.Input ) - for name, (start, width) in netlist.top.ports_io.items(): - top_module.ports[name] = ( - _nir.Value(_nir.Net.from_cell(0, start + bit) for bit in range(width)), - _nir.ModuleNetFlow.Inout - ) for name, value in netlist.top.ports_o.items(): top_module.ports[name] = (value, _nir.ModuleNetFlow.Output) +def _compute_ionet_dirs(netlist: _nir.Netlist): + # Collects the direction of every IO net, for every module it needs to be routed through. + for cell in netlist.cells: + for (net, dir) in cell.io_nets(): + module_idx = cell.module_idx + while module_idx is not None: + netlist.modules[module_idx].ionet_dir[net] = dir + module_idx = netlist.modules[module_idx].parent + + +def _compute_io_ports(netlist: _nir.Netlist, ports): + io_ports = { + port: _nir.IOValue(_nir.IONet.from_port(idx, bit) for bit in range(len(port))) + for idx, port in enumerate(netlist.io_ports) + } + for module in netlist.modules: + if module.parent is None: + # Top module gets special treatment: each IOPort is added in its entirety. + for (name, port, dir) in ports: + dir = { + PortDirection.Input: _nir.IODirection.Input, + PortDirection.Output: _nir.IODirection.Output, + PortDirection.Inout: _nir.IODirection.Bidir, + None: None, + }[dir] + if not isinstance(port, _ast.IOPort): + continue + auto_dir = None + for net in io_ports[port]: + if net in module.ionet_dir: + if auto_dir is None: + auto_dir = module.ionet_dir[net] + else: + auto_dir |= module.ionet_dir[net] + if dir is None: + dir = auto_dir + if auto_dir is None: + dir = _nir.IODirection.Bidir + else: + if auto_dir is not None and (auto_dir | dir) != dir: + raise ValueError(f"Port {name} is {dir.value}, but is used as {auto_dir.value}") + module.io_ports[name] = (io_ports[port], dir) + else: + # Collect preferred names for ports. If a port exactly matches a signal, we reuse + # the signal name for the port. Otherwise, we synthesize a private name. + name_table = {} + for port, name in module.io_port_names.items(): + value = io_ports[port] + if value not in name_table and not name.startswith('$'): + name_table[value] = name + + # Gather together "adjacent" nets with the same flow into ports. + visited = set() + for net in sorted(module.ionet_dir): + dir = module.ionet_dir[net] + if net in visited: + continue + # We found a net that needs a port. Keep joining the next nets output by the same + # cell into the same port, if applicable, but stop at instance/top port boundaries. + nets = [net] + while True: + succ = _nir.IONet.from_port(net.port, net.bit + 1) + if succ not in module.ionet_dir: + break + if module.ionet_dir[succ] != module.ionet_dir[net]: + break + net = succ + nets.append(net) + value = _nir.IOValue(nets) + # Joined as many nets as we could, now name and add the port. + if value in name_table: + name = name_table[value] + else: + name = f"ioport${value[0].port}${value[0].bit}" + module.io_ports[name] = (value, dir) + visited.update(value) + + def build_netlist(fragment, ports=(), *, name="top", **kwargs): if isinstance(fragment, Design): design = fragment @@ -1379,4 +1601,6 @@ def build_netlist(fragment, ports=(), *, name="top", **kwargs): netlist.resolve_all_nets() _compute_net_flows(netlist) _compute_ports(netlist) + _compute_ionet_dirs(netlist) + _compute_io_ports(netlist, design.ports) return netlist diff --git a/amaranth/hdl/_nir.py b/amaranth/hdl/_nir.py index fdf58c5..045c1b8 100644 --- a/amaranth/hdl/_nir.py +++ b/amaranth/hdl/_nir.py @@ -2,12 +2,14 @@ from typing import Iterable import enum from ._ast import SignalDict +from . import _ast __all__ = [ # Netlist core - "Net", "Value", "FormatValue", "Format", - "Netlist", "ModuleNetFlow", "Module", "Cell", "Top", + "Net", "Value", "IONet", "IOValue", + "FormatValue", "Format", + "Netlist", "ModuleNetFlow", "IODirection", "Module", "Cell", "Top", # Computation cells "Operator", "Part", # Decision tree cells @@ -153,7 +155,7 @@ class Value(tuple): chunks.append(f"{cell}.{start_bit}:{end_bit}") pos = next_pos if len(chunks) == 0: - return "(0'd0)" + return "()" elif len(chunks) == 1: return chunks[0] else: @@ -162,6 +164,76 @@ class Value(tuple): __str__ = __repr__ +class IONet(int): + __slots__ = () + + @classmethod + def from_port(cls, port: int, bit: int): + assert bit in range(1 << 16) + assert port >= 0 + return cls((port << 16) | bit) + + @property + def port(self): + return self >> 16 + + @property + def bit(self): + return self & 0xffff + + @classmethod + def ensure(cls, value: 'IONet'): + assert isinstance(value, cls) + return value + + def __repr__(self): + return f"{self.port}.{self.bit}" + + __str__ = __repr__ + + +class IOValue(tuple): + __slots__ = () + + def __new__(cls, nets: 'IONet | Iterable[IONet]' = ()): + if isinstance(nets, IONet): + return super().__new__(cls, (nets,)) + return super().__new__(cls, (IONet.ensure(net) for net in nets)) + + def __getitem__(self, index): + if isinstance(index, slice): + return type(self)(super().__getitem__(index)) + else: + return super().__getitem__(index) + + def __repr__(self): + pos = 0 + chunks = [] + while pos < len(self): + next_pos = pos + port = self[pos].port + start_bit = self[pos].bit + while (next_pos < len(self) and + self[next_pos].port == port and + self[next_pos].bit == start_bit + (next_pos - pos)): + next_pos += 1 + width = next_pos - pos + end_bit = start_bit + width + if width == 1: + chunks.append(f"{port}.{start_bit}") + else: + chunks.append(f"{port}.{start_bit}:{end_bit}") + pos = next_pos + if len(chunks) == 0: + return "()" + elif len(chunks) == 1: + return chunks[0] + else: + return f"(io-cat {' '.join(chunks)})" + + __str__ = __repr__ + + class FormatValue: """A single formatted value within ``Format``. @@ -240,14 +312,18 @@ class Netlist: Attributes ---------- + modules : list of ``Module`` cells : list of ``Cell`` connections : dict of (negative) int to int + io_ports : list of ``IOPort`` signals : dict of Signal to ``Value`` + last_late_net: int """ def __init__(self): self.modules: list[Module] = [] self.cells: list[Cell] = [Top()] self.connections: dict[Net, Net] = {} + self.io_ports: list[_ast.IOPort] = [] self.signals = SignalDict() self.last_late_net = 0 @@ -270,11 +346,16 @@ class Netlist: result = ["("] for module_idx, module in enumerate(self.modules): name = " ".join(repr(name) for name in module.name) - ports = " ".join( + ports = [ f"({flow.value} {name!r} {val})" for name, (val, flow) in module.ports.items() - ) - result.append(f"(module {module_idx} {module.parent} ({name}) {ports})") + ] + io_ports = [ + f"(io {dir.value} {name!r} {val})" + for name, (val, dir) in module.io_ports.items() + ] + ports = "".join(" " + port for port in ports + io_ports) + result.append(f"(module {module_idx} {module.parent} ({name}){ports})") for cell_idx, cell in enumerate(self.cells): result.append(f"(cell {cell_idx} {cell.module_idx} {cell!r})") result.append(")") @@ -332,9 +413,18 @@ class ModuleNetFlow(enum.Enum): #: It is thus an output port of this module. Output = "output" - #: The net is a special top-level inout net that is used within - #: this module or its submodules. It is an inout port of this module. - Inout = "inout" + +class IODirection(enum.Enum): + Input = "input" + Output = "output" + Bidir = "inout" + + def __or__(self, other): + assert isinstance(other, IODirection) + if self == other: + return self + else: + return IODirection.Bidir class Module: @@ -349,7 +439,8 @@ class Module: submodules: a list of nested module indices signal_names: a SignalDict from Signal to str, signal names visible in this module net_flow: a dict from Net to NetFlow, describes how a net is used within this module - ports: a dict from port name to (Value, NetFlow) pair + ports: a dict from port name to (Value, ModuleNetFlow) pair + io_ports: a dict from port name to (IOValue, IODirection) pair cells: a list of cell indices that belong to this module """ def __init__(self, parent, name, *, src_loc, cell_src_loc): @@ -359,8 +450,11 @@ class Module: self.cell_src_loc = cell_src_loc self.submodules = [] self.signal_names = SignalDict() - self.net_flow = {} + self.io_port_names = {} + self.net_flow: dict[Net, ModuleNetFlow] = {} + self.ionet_dir: dict[IONet, IODirection] = {} self.ports = {} + self.io_ports = {} self.cells = [] @@ -384,36 +478,34 @@ class Cell: def output_nets(self, self_idx: int): raise NotImplementedError + def io_nets(self): + return set() + def resolve_nets(self, netlist: Netlist): raise NotImplementedError class Top(Cell): - """A special cell type representing top-level ports. Must be present in the netlist exactly + """A special cell type representing top-level non-IO ports. Must be present in the netlist exactly once, at index 0. Top-level outputs are stored as a dict of names to their assigned values. - Top-level inputs and inouts are effectively the output of this cell. They are both stored + Top-level inputs are effectively the output of this cell. They are stored as a dict of names to a (start bit index, width) tuple. Output bit indices 0 and 1 are reserved for constant nets, so the lowest bit index that can be assigned to a port is 2. - Top-level inouts are special and can only be used by inout ports of instances, or in the pad - value of an ``IoBuf`` cell. - Attributes ---------- ports_o: dict of str to Value ports_i: dict of str to (int, int) - ports_io: dict of str to (int, int) """ def __init__(self): super().__init__(module_idx=0, src_loc=None) self.ports_o = {} self.ports_i = {} - self.ports_io = {} def input_nets(self): nets = set() @@ -426,9 +518,6 @@ class Top(Cell): for start, width in self.ports_i.values(): for bit in range(start, start + width): nets.add(Net.from_cell(self_idx, bit)) - for start, width in self.ports_io.values(): - for bit in range(start, start + width): - nets.add(Net.from_cell(self_idx, bit)) return nets def resolve_nets(self, netlist: Netlist): @@ -438,13 +527,11 @@ class Top(Cell): def __repr__(self): ports = [] for (name, (start, width)) in self.ports_i.items(): - ports.append(f"(input {name!r} {start}:{start+width})") + ports.append(f" (input {name!r} {start}:{start+width})") for (name, val) in self.ports_o.items(): - ports.append(f"(output {name!r} {val})") - for (name, (start, width)) in self.ports_io.items(): - ports.append(f"(inout {name!r} {start}:{start+width})") - ports = " ".join(ports) - return f"(top {ports})" + ports.append(f" (output {name!r} {val})") + ports = "".join(ports) + return f"(top{ports})" class Operator(Cell): @@ -1108,7 +1195,7 @@ class Instance(Cell): attributes: dict of str to Const, int, or str ports_i: dict of str to Value ports_o: dict of str to pair of int (index start, width) - ports_io: dict of str to Value + ports_io: dict of str to (IOValue, IODirection) """ def __init__(self, module_idx, *, type, name, parameters, attributes, ports_i, ports_o, ports_io, src_loc): @@ -1120,14 +1207,12 @@ class Instance(Cell): self.attributes = attributes self.ports_i = {name: Value(val) for name, val in ports_i.items()} self.ports_o = ports_o - self.ports_io = {name: Value(val) for name, val in ports_io.items()} + self.ports_io = {name: (IOValue(val), IODirection(dir)) for name, (val, dir) in ports_io.items()} def input_nets(self): nets = set() for val in self.ports_i.values(): nets |= set(val) - for val in self.ports_io.values(): - nets |= set(val) return nets def output_nets(self, self_idx: int): @@ -1137,11 +1222,15 @@ class Instance(Cell): nets.add(Net.from_cell(self_idx, bit)) return nets + def io_nets(self): + nets = set() + for val, dir in self.ports_io.values(): + nets |= {(net, dir) for net in val} + return nets + def resolve_nets(self, netlist: Netlist): for port in self.ports_i: self.ports_i[port] = netlist.resolve_value(self.ports_i[port]) - for port in self.ports_io: - self.ports_io[port] = netlist.resolve_value(self.ports_io[port]) def __repr__(self): items = [] @@ -1153,43 +1242,62 @@ class Instance(Cell): items.append(f"(input {name!r} {val})") for name, (start, width) in self.ports_o.items(): items.append(f"(output {name!r} {start}:{start+width})") - for name, val in self.ports_io.items(): - items.append(f"(inout {name!r} {val})") + for name, (val, dir) in self.ports_io.items(): + items.append(f"(io {dir.value} {name!r} {val})") items = " ".join(items) return f"(instance {self.type!r} {self.name!r} {items})" class IOBuffer(Cell): - """An IO buffer cell. ``pad`` must be connected to nets corresponding to an IO port - of the ``Top`` cell. This cell does two things: + """An IO buffer cell. This cell does two things: - - a tristate buffer is inserted driving ``pad`` based on ``o`` and ``oe`` nets (output buffer) - - the value of ``pad`` is sampled and made available as output of this cell (input buffer) + - a tristate buffer is inserted driving ``port`` based on ``o`` and ``oe`` nets (output buffer) + - the value of ``port`` is sampled and made available as output of this cell (input buffer) Attributes ---------- - pad: Value - o: Value - oe: Net + port: IOValue + dir: IODirection + o: Value or None + oe: Net or None """ - def __init__(self, module_idx, *, pad, o, oe, src_loc): + def __init__(self, module_idx, *, port, dir, o=None, oe=None, src_loc): super().__init__(module_idx, src_loc=src_loc) - self.pad = Value(pad) - self.o = Value(o) - self.oe = Net.ensure(oe) + self.port = IOValue(port) + self.dir = IODirection(dir) + if self.dir is IODirection.Input: + assert o is None + assert oe is None + self.o = None + self.oe = None + else: + self.o = Value(o) + self.oe = Net.ensure(oe) def input_nets(self): - return set(self.pad) | set(self.o) | {self.oe} + if self.dir is IODirection.Input: + return set() + else: + return set(self.o) | {self.oe} def output_nets(self, self_idx: int): - return {Net.from_cell(self_idx, bit) for bit in range(len(self.pad))} + if self.dir is IODirection.Output: + return set() + else: + return {Net.from_cell(self_idx, bit) for bit in range(len(self.port))} + + def io_nets(self): + return {(net, self.dir) for net in self.port} def resolve_nets(self, netlist: Netlist): - self.pad = netlist.resolve_value(self.pad) - self.o = netlist.resolve_value(self.o) - self.oe = netlist.resolve_net(self.oe) + if self.dir is not IODirection.Input: + self.o = netlist.resolve_value(self.o) + self.oe = netlist.resolve_net(self.oe) def __repr__(self): - return f"(iob {self.pad} {self.o} {self.oe})" + if self.dir is IODirection.Input: + return f"(iob {self.dir.value} {self.port})" + else: + return f"(iob {self.dir.value} {self.port} {self.o} {self.oe})" diff --git a/amaranth/hdl/_xfrm.py b/amaranth/hdl/_xfrm.py index d92b0e5..85ff344 100644 --- a/amaranth/hdl/_xfrm.py +++ b/amaranth/hdl/_xfrm.py @@ -53,7 +53,7 @@ class ValueVisitor(metaclass=ABCMeta): pass # :nocov: @abstractmethod - def on_Cat(self, value): + def on_Concat(self, value): pass # :nocov: @abstractmethod @@ -87,8 +87,8 @@ class ValueVisitor(metaclass=ABCMeta): new_value = self.on_Slice(value) elif type(value) is Part: new_value = self.on_Part(value) - elif type(value) is Cat: - new_value = self.on_Cat(value) + elif type(value) is Concat: + new_value = self.on_Concat(value) elif type(value) is ArrayProxy: new_value = self.on_ArrayProxy(value) elif type(value) is Initial: @@ -129,8 +129,8 @@ class ValueTransformer(ValueVisitor): return Part(self.on_value(value.value), self.on_value(value.offset), value.width, value.stride) - def on_Cat(self, value): - return Cat(self.on_value(o) for o in value.parts) + def on_Concat(self, value): + return Concat(self.on_value(o) for o in value.parts) def on_ArrayProxy(self, value): return ArrayProxy([self.on_value(elem) for elem in value._iter_as_values()], @@ -235,7 +235,10 @@ class FragmentTransformer: def map_named_ports(self, fragment, new_fragment): if hasattr(self, "on_value"): for name, (value, dir) in fragment.named_ports.items(): - new_fragment.named_ports[name] = self.on_value(value), dir + if isinstance(value, Value): + new_fragment.named_ports[name] = self.on_value(value), dir + else: + new_fragment.named_ports[name] = value, dir else: new_fragment.named_ports = OrderedDict(fragment.named_ports.items()) @@ -303,15 +306,15 @@ class FragmentTransformer: elif isinstance(fragment, IOBufferInstance): if hasattr(self, "on_value"): new_fragment = IOBufferInstance( - pad=self.on_value(fragment.pad), + port=fragment.port, i=self.on_value(fragment.i) if fragment.i is not None else None, - o=self.on_value(fragment.o), - oe=self.on_value(fragment.oe), + o=self.on_value(fragment.o) if fragment.o is not None else None, + oe=self.on_value(fragment.oe) if fragment.o is not None else None, src_loc=fragment.src_loc, ) else: new_fragment = IOBufferInstance( - pad=fragment.pad, + port=fragment.port, i=fragment.i, o=fragment.o, oe=fragment.oe, @@ -396,7 +399,7 @@ class DomainCollector(ValueVisitor, StatementVisitor): self.on_value(value.value) self.on_value(value.offset) - def on_Cat(self, value): + def on_Concat(self, value): for o in value.parts: self.on_value(o) @@ -450,7 +453,15 @@ class DomainCollector(ValueVisitor, StatementVisitor): if isinstance(fragment, Instance): for name, (value, dir) in fragment.named_ports.items(): - self.on_value(value) + if not isinstance(value, IOValue): + self.on_value(value) + + if isinstance(fragment, IOBufferInstance): + if fragment.o is not None: + self.on_value(fragment.o) + self.on_value(fragment.oe) + if fragment.i is not None: + self.on_value(fragment.i) old_local_domains, self._local_domains = self._local_domains, set(self._local_domains) for domain_name, domain in fragment.domains.items(): diff --git a/amaranth/sim/_pyrtl.py b/amaranth/sim/_pyrtl.py index 395e873..8edee13 100644 --- a/amaranth/sim/_pyrtl.py +++ b/amaranth/sim/_pyrtl.py @@ -212,7 +212,7 @@ class _RHSValueCompiler(_ValueCompiler): return f"({(1 << value.width) - 1} & " \ f"{self(value.value)} >> {offset})" - def on_Cat(self, value): + def on_Concat(self, value): gen_parts = [] offset = 0 for part in value.parts: @@ -313,7 +313,7 @@ class _LHSValueCompiler(_ValueCompiler): f"(({width_mask:#x} & {arg}) << {offset}))") return gen - def on_Cat(self, value): + def on_Concat(self, value): def gen(arg): gen_arg = self.emitter.def_var("cat", arg) offset = 0 diff --git a/amaranth/sim/pysim.py b/amaranth/sim/pysim.py index cdd248f..c5ccf2f 100644 --- a/amaranth/sim/pysim.py +++ b/amaranth/sim/pysim.py @@ -65,9 +65,9 @@ class _VCDWriter: signal_names = SignalDict() memories = {} - for fragment, fragment_name in design.fragment_names.items(): - fragment_name = ("bench", *fragment_name) - for signal, signal_name in design.signal_names[fragment].items(): + for fragment, fragment_info in design.fragments.items(): + fragment_name = ("bench", *fragment_info.name) + for signal, signal_name in fragment_info.signal_names.items(): if signal not in signal_names: signal_names[signal] = set() signal_names[signal].add((*fragment_name, signal_name)) diff --git a/docs/changes.rst b/docs/changes.rst index affbb1c..ab79f20 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -49,6 +49,7 @@ Implemented RFCs .. _RFC 45: https://amaranth-lang.org/rfcs/0045-lib-memory.html .. _RFC 46: https://amaranth-lang.org/rfcs/0046-shape-range-1.html .. _RFC 50: https://amaranth-lang.org/rfcs/0050-print.html +.. _RFC 53: https://amaranth-lang.org/rfcs/0053-ioport.html * `RFC 17`_: Remove ``log2_int`` * `RFC 27`_: Testbench processes for the simulator @@ -57,6 +58,7 @@ Implemented RFCs * `RFC 45`_: Move ``hdl.Memory`` to ``lib.Memory`` * `RFC 46`_: Change ``Shape.cast(range(1))`` to ``unsigned(0)`` * `RFC 50`_: ``Print`` statement and string formatting +* `RFC 53`_: Low-level I/O primitives Language changes @@ -64,9 +66,10 @@ Language changes .. currentmodule:: amaranth.hdl -* Added: :class:`ast.Slice` objects have been made const-castable. +* Added: :class:`Slice` objects have been made const-castable. * Added: :func:`amaranth.utils.ceil_log2`, :func:`amaranth.utils.exact_log2`. (`RFC 17`_) * Added: :class:`Format` objects, :class:`Print` statements, messages in :class:`Assert`, :class:`Assume` and :class:`Cover`. (`RFC 50`_) +* Added: IO values, :class:`IOPort` objects, :class:`IOBufferInstance` objects. (`RFC 53`_) * Changed: ``m.Case()`` with no patterns is never active instead of always active. (`RFC 39`_) * Changed: ``Value.matches()`` with no patterns is ``Const(0)`` instead of ``Const(1)``. (`RFC 39`_) * Changed: ``Signal(range(stop), init=stop)`` warning has been changed into a hard error and made to trigger on any out-of range value. @@ -75,6 +78,7 @@ Language changes * Changed: the ``reset=`` argument of :class:`Signal`, :meth:`Signal.like`, :class:`amaranth.lib.wiring.Member`, :class:`amaranth.lib.cdc.FFSynchronizer`, and ``m.FSM()`` has been renamed to ``init=``. (`RFC 43`_) * Changed: :class:`Shape` has been made immutable and hashable. * Changed: :class:`Assert`, :class:`Assume`, :class:`Cover` have been moved to :mod:`amaranth.hdl` from :mod:`amaranth.asserts`. (`RFC 50`_) +* Changed: :class:`Instance` IO ports now accept only IO values, not plain values. (`RFC 53`_) * Deprecated: :func:`amaranth.utils.log2_int`. (`RFC 17`_) * Deprecated: :class:`amaranth.hdl.Memory`. (`RFC 45`_) * Removed: (deprecated in 0.4) :meth:`Const.normalize`. (`RFC 5`_) @@ -218,7 +222,7 @@ Language changes * Changed: :meth:`Value.cast` casts :class:`ValueCastable` objects recursively. * Changed: :meth:`Value.cast` treats instances of classes derived from both :class:`enum.Enum` and :class:`int` (including :class:`enum.IntEnum`) as enumerations rather than integers. * Changed: :meth:`Value.matches` with an empty list of patterns returns ``Const(1)`` rather than ``Const(0)``, to match the behavior of ``with m.Case():``. -* Changed: :class:`Cat` warns if an enumeration without an explicitly specified shape is used. (`RFC 3`_) +* Changed: :func:`Cat` warns if an enumeration without an explicitly specified shape is used. (`RFC 3`_) * Changed: ``signed(0)`` is no longer constructible. (The semantics of this shape were never defined.) * Changed: :meth:`Value.__abs__` returns an unsigned value. * Deprecated: :class:`ast.Sample`, :class:`ast.Past`, :class:`ast.Stable`, :class:`ast.Rose`, :class:`ast.Fell`. (Predating the RFC process.) diff --git a/docs/guide.rst b/docs/guide.rst index 39c7d60..7cc9a13 100644 --- a/docs/guide.rst +++ b/docs/guide.rst @@ -328,7 +328,7 @@ They may also be provided as a pattern to the :ref:`match operator `, which represent binary numbers and can be :ref:`assigned ` to create a unidirectional connection or used in computations, I/O values represent electrical signals that may be digital or analog and have no :ref:`shape `, cannot be assigned, used in computations, or simulated. + +I/O values are only used to define connections between non-Amaranth building blocks that traverse an Amaranth design, including :ref:`instances ` and :ref:`I/O buffer instances `. + + +.. _lang-ioports: + +I/O ports +--------- + +An *I/O port* is an I/O value representing a connection to a port of the topmost module in the :ref:`design hierarchy `. It can be created with an explicitly specified width. + +.. testcode:: + + from amaranth.hdl import IOPort + +.. doctest:: + + >>> port = IOPort(4) + >>> port.width + 4 + +I/O ports can be named in the same way as :ref:`signals `: + +.. doctest:: + + >>> clk_port = IOPort(1, name="clk") + >>> clk_port.name + 'clk' + +If two I/O ports with the same name exist in a design, one of them will be renamed to remove the ambiguity. Because the name of an I/O port is significant, they should be named unambiguously. + + +.. _lang-ioops: + +I/O operators +------------- + +I/O values support only a limited set of :ref:`sequence ` operators, all of which return another I/O value. The following table lists the I/O operators provided by Amaranth: + +=============== ============================== =================== +Operation Description Notes +=============== ============================== =================== +:py:`len(a)` length; width [#iopS1]_ +:py:`a[i:j:k]` slicing by constant subscripts [#iopS2]_ +:py:`iter(a)` iteration +:py:`Cat(a, b)` concatenation [#iopS3]_ [#iopS4]_ +=============== ============================== =================== + +.. [#iopS1] Words "length" and "width" have the same meaning when talking about Amaranth I/O values. Conventionally, "width" is used. +.. [#iopS2] All variations of the Python slice notation are supported, including "extended slicing". E.g. all of :py:`a[0]`, :py:`a[1:9]`, :py:`a[2:]`, :py:`a[:-2]`, :py:`a[::-1]`, :py:`a[0:8:2]` select wires in the same way as other Python sequence types select their elements. +.. [#iopS3] In the concatenated value, :py:`a` occupies the lower indices and :py:`b` the higher indices. Any number of arguments (zero, one, two, or more) are supported. +.. [#iopS4] Concatenation of zero arguments, :py:`Cat()`, returns a 0-bit regular value, however any such value is accepted (and ignored) anywhere an I/O value is expected. + + .. _lang-instance: Instances @@ -1690,14 +1750,15 @@ A submodule written in a non-Amaranth language is called an *instance*. An insta * The *name* of an instance is the name of the submodule within the containing elaboratable. * The *attributes* of an instance correspond to attributes of a (System)Verilog module instance, or a custom attribute of a VHDL entity or component instance. Attributes applied to instances are interpreted by the synthesis toolchain rather than the HDL. * The *parameters* of an instance correspond to parameters of a (System)Verilog module instance, or a generic constant of a VHDL entity or component instance. Not all HDLs allow their design units to be parameterized during instantiation. -* The *inputs* and *outputs* of an instance correspond to inputs and outputs of the external design unit. +* The *inputs*, *outputs*, and *inouts* of an instance correspond to input ports, output ports, and bidirectional ports of the external design unit. An instance can be added as a submodule using the :py:`m.submodules.name = Instance("type", ...)` syntax, where :py:`"type"` is the type of the instance as a string (which is passed to the synthesis toolchain uninterpreted), and :py:`...` is a list of parameters, inputs, and outputs. Depending on whether the name of an attribute, parameter, input, or output can be written as a part of a Python identifier or not, one of two possible syntaxes is used to specify them: * An attribute is specified using the :py:`a_ANAME=attr` or :py:`("a", "ANAME", attr)` syntaxes. The :py:`attr` must be an :class:`int`, a :class:`str`, or a :class:`Const`. * A parameter is specified using the :py:`p_PNAME=param` or :py:`("p", "PNAME", param)` syntaxes. The :py:`param` must be an :class:`int`, a :class:`str`, or a :class:`Const`. -* An input is specified using the :py:`i_INAME=in_val` or :py:`("i", "INAME", in_val)` syntaxes. The :py:`in_val` must be a :ref:`value-like ` object. -* An output is specified using the :py:`o_ONAME=out_val` or :py:`("o", "ONAME", out_val)` syntaxes. The :py:`out_val` must be a :ref:`value-like ` object that casts to a :class:`Signal`. +* An input is specified using the :py:`i_INAME=in_val` or :py:`("i", "INAME", in_val)` syntaxes. The :py:`in_val` must be an :ref:`I/O value ` or a :ref:`value-like ` object. +* An output is specified using the :py:`o_ONAME=out_val` or :py:`("o", "ONAME", out_val)` syntaxes. The :py:`out_val` must be an :ref:`I/O value ` or a :ref:`value-like ` object that casts to a :ref:`signal `, a concatenation of signals, or a slice of a signal. +* An inout is specified using the :py:`io_IONAME=inout_val` or :py:`("io", "IONAME", inout_val)` syntaxes. The :py:`inout_val` must be an :ref:`I/O value `. The two following examples use both syntaxes to add the same instance of type ``external`` as a submodule named ``processor``: @@ -1706,6 +1767,7 @@ The two following examples use both syntaxes to add the same instance of type `` i_data = Signal(8) o_data = Signal(8) + io_pin = IOPort(1) m = Module() .. testcode:: @@ -1718,6 +1780,7 @@ The two following examples use both syntaxes to add the same instance of type `` i_mode=Const(3, unsigned(4)), i_data_in=i_data, o_data_out=o_data, + io_pin=io_pin, ) .. testcode:: @@ -1735,6 +1798,7 @@ The two following examples use both syntaxes to add the same instance of type `` ("i", "mode", Const(3, unsigned(4))), ("i", "data_in", i_data), ("o", "data_out", o_data), + ("io", "pin", io_pin), ) Like a regular submodule, an instance can also be added without specifying a name: @@ -1770,4 +1834,55 @@ Although an :class:`Instance` is not an elaboratable, as a special case, it can o_Q=self.q ) else: - raise NotImplementedError \ No newline at end of file + raise NotImplementedError + + +.. _lang-iobufferinstance: + +I/O buffer instances +==================== + +An *I/O buffer instance* is a submodule that allows assigning :ref:`I/O values ` to or from regular :ref:`values ` without the use of an external, toolchain- and technology-dependent :ref:`instance `. It can be created in four configurations: input, output, tristatable output, and bidirectional (input/output). + +.. testcode:: + + from amaranth.hdl import IOBufferInstance + + m = Module() + +In the input configuration, the buffer combinatorially drives a signal :py:`i` by the port: + +.. testcode:: + + port = IOPort(4) + port_i = Signal(4) + m.submodules.ibuf = IOBufferInstance(port, i=port_i) + +In the output configuration, the buffer combinatorially drives the port by a value :py:`o`: + +.. testcode:: + + port = IOPort(4) + port_o = Signal(4) + m.submodules.obuf = IOBufferInstance(port, o=port_o) + +In the tristatable output configuration, the buffer combinatorially drives the port by a value :py:`o` if :py:`oe` is asserted, and does not drive (leaves in a high-impedance state, or tristates) the port otherwise: + +.. testcode:: + + port = IOPort(4) + port_o = Signal(4) + port_oe = Signal() + m.submodules.obuft = IOBufferInstance(port, o=port_o, oe=port_oe) + +In the bidirectional (input/output) configuration, the buffer combiatorially drives a signal :py:`i` by the port, combinatorially drives the port by a value :py:`o` if :py:`oe` is asserted, and does not drive (leaves in a high-impedance state, or tristates) the port otherwise: + +.. testcode:: + + port = IOPort(4) + port_i = Signal(4) + port_o = Signal(4) + port_oe = Signal() + m.submodules.iobuf = IOBufferInstance(port, i=port_i, o=port_o, oe=port_oe) + +The width of the :py:`i` and :py:`o` values (when present) must be the same as the width of the port, and the width of the :py:`oe` value must be 1. diff --git a/docs/reference.rst b/docs/reference.rst index 51399a1..779c0c6 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -58,7 +58,7 @@ The prelude exports exactly the following names: * :class:`Const` * :func:`C` * :func:`Mux` -* :class:`Cat` +* :func:`Cat` * :class:`Array` * :class:`Signal` * :class:`ClockSignal` diff --git a/tests/test_build_res.py b/tests/test_build_res.py index 0100d9b..cacf964 100644 --- a/tests/test_build_res.py +++ b/tests/test_build_res.py @@ -1,11 +1,6 @@ # amaranth: UnusedElaboratable=no -import warnings - -from amaranth import * -with warnings.catch_warnings(): - warnings.filterwarnings(action="ignore", category=DeprecationWarning) - from amaranth.hdl.rec import * +from amaranth.hdl import * from amaranth.lib.wiring import * from amaranth.lib.io import * from amaranth.build.dsl import * @@ -77,14 +72,11 @@ class ResourceManagerTestCase(FHDLTestCase): def test_request_with_dir(self): i2c = self.cm.request("i2c", 0, dir={"sda": "o"}) - self.assertIsInstance(i2c, PureInterface) - self.assertTrue(i2c.signature.is_compliant(i2c)) self.assertIsInstance(flipped(i2c.sda), Pin) self.assertEqual(i2c.sda.dir, "o") def test_request_tristate(self): i2c = self.cm.request("i2c", 0) - self.assertTrue(i2c.signature.is_compliant(i2c)) self.assertEqual(i2c.sda.dir, "io") ports = list(self.cm.iter_ports()) @@ -158,7 +150,7 @@ class ResourceManagerTestCase(FHDLTestCase): def test_request_raw(self): clk50 = self.cm.request("clk50", 0, dir="-") - self.assertIsInstance(clk50.io, Signal) + self.assertIsInstance(clk50.io, IOPort) ports = list(self.cm.iter_ports()) self.assertEqual(len(ports), 1) @@ -166,8 +158,8 @@ class ResourceManagerTestCase(FHDLTestCase): def test_request_raw_diffpairs(self): clk100 = self.cm.request("clk100", 0, dir="-") - self.assertIsInstance(clk100.p, Signal) - self.assertIsInstance(clk100.n, Signal) + self.assertIsInstance(clk100.p, IOPort) + self.assertIsInstance(clk100.n, IOPort) ports = list(self.cm.iter_ports()) self.assertEqual(len(ports), 2) diff --git a/tests/test_hdl_ast.py b/tests/test_hdl_ast.py index 18653a5..47d89cc 100644 --- a/tests/test_hdl_ast.py +++ b/tests/test_hdl_ast.py @@ -308,7 +308,7 @@ class ValueTestCase(FHDLTestCase): self.assertEqual(s2.start, 1) self.assertEqual(s2.stop, 2) s3 = Const(31)[::2] - self.assertIsInstance(s3, Cat) + self.assertIsInstance(s3, Concat) self.assertIsInstance(s3.parts[0], Slice) self.assertEqual(s3.parts[0].start, 0) self.assertEqual(s3.parts[0].stop, 1) @@ -1679,3 +1679,109 @@ class SwitchTestCase(FHDLTestCase): def test_two_cases(self): s = Switch(Const(0, 8), {("00001111", 123): []}) self.assertEqual(s.cases, {("00001111", "01111011"): []}) + + +class IOValueTestCase(FHDLTestCase): + def test_ioport(self): + a = IOPort(4) + self.assertEqual(len(a), 4) + self.assertEqual(a.attrs, {}) + self.assertEqual(a.metadata, (None, None, None, None)) + self.assertEqual(a._ioports(), {a}) + self.assertRepr(a, "(io-port a)") + b = IOPort(3, name="b", attrs={"a": "b"}, metadata=["x", "y", "z"]) + self.assertEqual(len(b), 3) + self.assertEqual(b.attrs, {"a": "b"}) + self.assertEqual(b.metadata, ("x", "y", "z")) + self.assertEqual(b._ioports(), {b}) + self.assertRepr(b, "(io-port b)") + + def test_ioport_wrong(self): + with self.assertRaisesRegex(TypeError, + r"^Name must be a string, not 3$"): + a = IOPort(2, name=3) + with self.assertRaises(TypeError): + a = IOPort("a") + with self.assertRaises(TypeError): + a = IOPort(8, attrs=3) + with self.assertRaises(TypeError): + a = IOPort(8, metadata=3) + with self.assertRaisesRegex(ValueError, + r"^Metadata length \(3\) doesn't match port width \(2\)$"): + a = IOPort(2, metadata=["a", "b", "c"]) + + def test_ioslice(self): + a = IOPort(8, metadata=["a", "b", "c", "d", "e", "f", "g", "h"]) + s = a[2:5] + self.assertEqual(len(s), 3) + self.assertEqual(s.metadata, ("c", "d", "e")) + self.assertEqual(s._ioports(), {a}) + self.assertRepr(s, "(io-slice (io-port a) 2:5)") + s = a[-5:-2] + self.assertEqual(len(s), 3) + self.assertEqual(s.metadata, ("d", "e", "f")) + self.assertEqual(s._ioports(), {a}) + self.assertRepr(s, "(io-slice (io-port a) 3:6)") + s = IOSlice(a, -5, -2) + self.assertEqual(len(s), 3) + self.assertEqual(s.metadata, ("d", "e", "f")) + self.assertEqual(s._ioports(), {a}) + self.assertRepr(s, "(io-slice (io-port a) 3:6)") + s = a[5] + self.assertEqual(len(s), 1) + self.assertEqual(s.metadata, ("f",)) + self.assertEqual(s._ioports(), {a}) + self.assertRepr(s, "(io-slice (io-port a) 5:6)") + s = a[-1] + self.assertEqual(len(s), 1) + self.assertEqual(s.metadata, ("h",)) + self.assertEqual(s._ioports(), {a}) + self.assertRepr(s, "(io-slice (io-port a) 7:8)") + s = a[::2] + self.assertEqual(len(s), 4) + self.assertEqual(s.metadata, ("a", "c", "e", "g")) + self.assertEqual(s._ioports(), {a}) + self.assertRepr(s, "(io-cat (io-slice (io-port a) 0:1) (io-slice (io-port a) 2:3) (io-slice (io-port a) 4:5) (io-slice (io-port a) 6:7))") + + def test_ioslice_wrong(self): + a = IOPort(8) + with self.assertRaises(IndexError): + a[8] + with self.assertRaises(IndexError): + a[-9] + with self.assertRaises(TypeError): + a["a"] + with self.assertRaises(IndexError): + IOSlice(a, 0, 9) + with self.assertRaises(IndexError): + IOSlice(a, -10, 8) + with self.assertRaises(TypeError): + IOSlice(a, 0, "a") + with self.assertRaises(TypeError): + IOSlice(a, "a", 8) + with self.assertRaises(IndexError): + a[5:3] + + def test_iocat(self): + a = IOPort(3, name="a", metadata=["a", "b", "c"]) + b = IOPort(2, name="b", metadata=["x", "y"]) + c = Cat(a, b) + self.assertEqual(len(c), 5) + self.assertEqual(c.metadata, ("a", "b", "c", "x", "y")) + self.assertEqual(c._ioports(), {a, b}) + self.assertRepr(c, "(io-cat (io-port a) (io-port b))") + c = Cat(a, Cat()) + self.assertEqual(len(c), 3) + self.assertEqual(c.metadata, ("a", "b", "c")) + self.assertEqual(c._ioports(), {a}) + self.assertRepr(c, "(io-cat (io-port a) (io-cat ))") + c = Cat(a, Cat()[:]) + self.assertEqual(len(c), 3) + self.assertRepr(c, "(io-cat (io-port a) (io-cat ))") + + def test_iocat_wrong(self): + a = IOPort(3, name="a") + b = Signal() + with self.assertRaisesRegex(TypeError, + r"^Object \(sig b\) cannot be converted to an IO value$"): + Cat(a, b) diff --git a/tests/test_hdl_ir.py b/tests/test_hdl_ir.py index f37e05a..d87600e 100644 --- a/tests/test_hdl_ir.py +++ b/tests/test_hdl_ir.py @@ -92,7 +92,7 @@ class FragmentPortsTestCase(FHDLTestCase): self.assertRepr(nl, """ ( (module 0 None ('top')) - (cell 0 0 (top )) + (cell 0 0 (top)) ) """) @@ -177,21 +177,21 @@ class FragmentPortsTestCase(FHDLTestCase): (output 'c2' 2.0) (output 'c3' 1.0)) (module 1 0 ('top' 'f1') - (input 'port$0$2' 0.2) - (output 'port$1$0' 1.0) - (output 'port$2$0' 2.0) - (output 'port$5$0' 5.0) - (input 'port$10$0' 10.0)) - (module 2 1 ('top' 'f1' 'f11') - (input 'port$0$2' 0.2) - (output 'port$1$0' 1.0) - (output 'port$2$0' 2.0) - (input 'port$6$0' 6.0)) - (module 3 2 ('top' 'f1' 'f11' 'f111') - (input 'port$0$2' 0.2) + (input 's1' 0.2) (output 'c3' 1.0) (output 'c2' 2.0) - (input 'port$6$0' 6.0)) + (output 'c1' 5.0) + (input 's2' 10.0)) + (module 2 1 ('top' 'f1' 'f11') + (input 's1' 0.2) + (output 'c3' 1.0) + (output 'c2' 2.0) + (input 's3' 6.0)) + (module 3 2 ('top' 'f1' 'f11' 'f111') + (input 's1' 0.2) + (output 'c3' 1.0) + (output 'c2' 2.0) + (input 's3' 6.0)) (module 4 3 ('top' 'f1' 'f11' 'f111' 'f1111') (input 's1' 0.2) (output 'c2' 2.0) @@ -200,9 +200,9 @@ class FragmentPortsTestCase(FHDLTestCase): (output 'c1' 5.0) (input 's3' 6.0)) (module 6 1 ('top' 'f1' 'f13') - (input 'port$0$2' 0.2) - (output 'port$6$0' 6.0) - (input 'port$10$0' 10.0)) + (input 's1' 0.2) + (output 's3' 6.0) + (input 's2' 10.0)) (module 7 6 ('top' 'f1' 'f13' 'f131') (input 's1' 0.2) (output 's3' 6.0) @@ -229,12 +229,12 @@ class FragmentPortsTestCase(FHDLTestCase): nl = build_netlist(f, ports={ "a": (self.s1, PortDirection.Output), "b": (self.s2, PortDirection.Input), - "c": (self.s3, PortDirection.Inout), + "c": (IOPort(1, name="io3"), PortDirection.Inout), }) self.assertRepr(nl, """ ( - (module 0 None ('top') (input 'b' 0.2) (inout 'c' 0.3) (output 'a' 1'd0)) - (cell 0 0 (top (input 'b' 2:3) (output 'a' 1'd0) (inout 'c' 3:4))) + (module 0 None ('top') (input 'b' 0.2) (output 'a' 1'd0) (io inout 'c' 0.0)) + (cell 0 0 (top (input 'b' 2:3) (output 'a' 1'd0))) ) """) @@ -308,6 +308,65 @@ class FragmentPortsTestCase(FHDLTestCase): ) """) + def test_port_io(self): + io = IOPort(8) + f = Fragment() + f1 = Fragment() + f1.add_subfragment(Instance("t", i_io=io[:2]), "i") + f.add_subfragment(f1, "f1") + f2 = Fragment() + f2.add_subfragment(Instance("t", o_io=io[2:4]), "i") + f.add_subfragment(f2, "f2") + f3 = Fragment() + f3.add_subfragment(Instance("t", io_io=io[4:6]), "i") + f.add_subfragment(f3, "f3") + nl = build_netlist(f, ports=[]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') + (io inout 'io' 0.0:8) + ) + (module 1 0 ('top' 'f1') + (io input 'ioport$0$0' 0.0:2) + ) + (module 2 0 ('top' 'f2') + (io output 'ioport$0$2' 0.2:4) + ) + (module 3 0 ('top' 'f3') + (io inout 'ioport$0$4' 0.4:6) + ) + (cell 0 0 (top)) + (cell 1 1 (instance 't' 'i' (io input 'io' 0.0:2))) + (cell 2 2 (instance 't' 'i' (io output 'io' 0.2:4))) + (cell 3 3 (instance 't' 'i' (io inout 'io' 0.4:6))) + ) + """) + + def test_port_io_part(self): + io = IOPort(4) + f = Fragment() + f1 = Fragment() + f1.add_subfragment(Instance("t", i_i=io[0], o_o=io[1], io_io=io[2]), "i") + f.add_subfragment(f1, "f1") + nl = build_netlist(f, ports=[]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') + (io inout 'io' 0.0:4) + ) + (module 1 0 ('top' 'f1') + (io input 'ioport$0$0' 0.0) + (io output 'ioport$0$1' 0.1) + (io inout 'ioport$0$2' 0.2) + ) + (cell 0 0 (top)) + (cell 1 1 (instance 't' 'i' + (io input 'i' 0.0) + (io output 'o' 0.1) + (io inout 'io' 0.2) + )) + ) + """) def test_port_instance(self): f = Fragment() @@ -316,40 +375,53 @@ class FragmentPortsTestCase(FHDLTestCase): a = Signal(4) b = Signal(4) c = Signal(4) - d = Signal(4) + ioa = IOPort(4) + iob = IOPort(4) + ioc = IOPort(4) f1.add_subfragment(Instance("t", p_p = "meow", a_a = True, i_aa=a, - io_bb=b, + o_bb=b, o_cc=c, - o_dd=d, + i_aaa=ioa, + o_bbb=iob, + io_ccc=ioc, ), "i") - nl = build_netlist(f, ports=[a, b, c, d]) + nl = build_netlist(f, ports=[a, b, c]) self.assertRepr(nl, """ ( (module 0 None ('top') (input 'a' 0.2:6) - (inout 'b' 0.6:10) - (output 'c' 1.0:4) - (output 'd' 1.4:8)) + (output 'b' 1.0:4) + (output 'c' 1.4:8) + (io input 'ioa' 0.0:4) + (io output 'iob' 1.0:4) + (io inout 'ioc' 2.0:4) + ) (module 1 0 ('top' 'f1') - (input 'port$0$2' 0.2:6) - (inout 'port$0$6' 0.6:10) - (output 'port$1$0' 1.0:4) - (output 'port$1$4' 1.4:8)) + (input 'a' 0.2:6) + (output 'b' 1.0:4) + (output 'c' 1.4:8) + (io input 'ioa' 0.0:4) + (io output 'iob' 1.0:4) + (io inout 'ioc' 2.0:4) + ) (cell 0 0 (top (input 'a' 2:6) - (output 'c' 1.0:4) - (output 'd' 1.4:8) - (inout 'b' 6:10))) + (output 'b' 1.0:4) + (output 'c' 1.4:8) + )) (cell 1 1 (instance 't' 'i' (param 'p' 'meow') (attr 'a' True) (input 'aa' 0.2:6) - (output 'cc' 0:4) - (output 'dd' 4:8) - (inout 'bb' 0.6:10))) + (output 'bb' 0:4) + (output 'cc' 4:8) + (io input 'aaa' 0.0:4) + (io output 'bbb' 1.0:4) + (io inout 'ccc' 2.0:4) + )) ) """) @@ -357,7 +429,7 @@ class FragmentPortsTestCase(FHDLTestCase): f = Fragment() a = Signal() with self.assertRaisesRegex(TypeError, - r"^Only signals may be added as ports, not \(const 1'd1\)$"): + r"^Only signals and IO ports may be added as ports, not \(const 1'd1\)$"): build_netlist(f, ports=(Const(1),)) with self.assertRaisesRegex(TypeError, r"^Port name must be a string, not 1$"): @@ -619,19 +691,27 @@ class InstanceTestCase(FHDLTestCase): s2 = Signal() s3 = Signal() s4 = Signal() - s5 = Signal() - s6 = Signal() + io1 = IOPort(1) + io2 = IOPort(1) + io3 = IOPort(1) + io4 = IOPort(1) + io5 = IOPort(1) + io6 = IOPort(1) inst = Instance("foo", ("a", "ATTR1", 1), ("p", "PARAM1", 0x1234), ("i", "s1", s1), ("o", "s2", s2), - ("io", "s3", s3), + ("i", "io1", io1), + ("o", "io2", io2), + ("io", "io3", io3), a_ATTR2=2, p_PARAM2=0x5678, - i_s4=s4, - o_s5=s5, - io_s6=s6, + i_s3=s3, + o_s4=s4, + i_io4=io4, + o_io5=io5, + io_io6=io6, ) self.assertEqual(inst.attrs, OrderedDict([ ("ATTR1", 1), @@ -644,27 +724,27 @@ class InstanceTestCase(FHDLTestCase): self.assertEqual(inst.named_ports, OrderedDict([ ("s1", (s1, "i")), ("s2", (s2, "o")), - ("s3", (s3, "io")), - ("s4", (s4, "i")), - ("s5", (s5, "o")), - ("s6", (s6, "io")), + ("io1", (io1, "i")), + ("io2", (io2, "o")), + ("io3", (io3, "io")), + ("s3", (s3, "i")), + ("s4", (s4, "o")), + ("io4", (io4, "i")), + ("io5", (io5, "o")), + ("io6", (io6, "io")), ])) def test_cast_ports(self): inst = Instance("foo", ("i", "s1", 1), - ("o", "s2", 2), - ("io", "s3", 3), - i_s4=4, - o_s5=5, - io_s6=6, + ("io", "s2", Cat()), + i_s3=3, + io_s4=Cat(), ) self.assertRepr(inst.named_ports["s1"][0], "(const 1'd1)") - self.assertRepr(inst.named_ports["s2"][0], "(const 2'd2)") + self.assertRepr(inst.named_ports["s2"][0], "(io-cat )") self.assertRepr(inst.named_ports["s3"][0], "(const 2'd3)") - self.assertRepr(inst.named_ports["s4"][0], "(const 3'd4)") - self.assertRepr(inst.named_ports["s5"][0], "(const 3'd5)") - self.assertRepr(inst.named_ports["s6"][0], "(const 3'd6)") + self.assertRepr(inst.named_ports["s4"][0], "(io-cat )") def test_wrong_construct_arg(self): s = Signal() @@ -683,7 +763,7 @@ class InstanceTestCase(FHDLTestCase): def setUp_cpu(self): self.rst = Signal() self.stb = Signal() - self.pins = Signal(8) + self.pins = IOPort(8) self.datal = Signal(4) self.datah = Signal(4) self.inst = Instance("cpu", @@ -716,33 +796,40 @@ class InstanceTestCase(FHDLTestCase): f = Fragment() i = Signal(3) o = Signal(4) - io = Signal(5) + ioa = IOPort(5) + iob = IOPort(6) + ioc = IOPort(7) f.add_subfragment(Instance("gadget", i_i=i, o_o=o, - io_io=io, + i_ioa=ioa, + o_iob=iob, + io_ioc=ioc, p_param="TEST", a_attr=1234, ), "my_gadget") - nl = build_netlist(f, [i, o, io]) + nl = build_netlist(f, [i, o]) self.assertRepr(nl, """ ( (module 0 None ('top') (input 'i' 0.2:5) - (inout 'io' 0.5:10) (output 'o' 1.0:4) + (io input 'ioa' 0.0:5) + (io output 'iob' 1.0:6) + (io inout 'ioc' 2.0:7) ) (cell 0 0 (top (input 'i' 2:5) (output 'o' 1.0:4) - (inout 'io' 5:10) )) (cell 1 0 (instance 'gadget' 'my_gadget' (param 'param' 'TEST') (attr 'attr' 1234) (input 'i' 0.2:5) (output 'o' 0:4) - (inout 'io' 0.5:10) + (io input 'ioa' 0.0:5) + (io output 'iob' 1.0:6) + (io inout 'ioc' 2.0:7) )) ) """) @@ -798,33 +885,72 @@ class InstanceTestCase(FHDLTestCase): ) """) + def test_nir_io_slice(self): + f = Fragment() + io = IOPort(8) + f.add_subfragment(Instance("test", + i_i=io[:2], + o_o=io[2:4], + io_io=io[4:6], + ), "t1") + nl = build_netlist(f, []) + self.assertRepr(nl, """ + ( + (module 0 None ('top') + (io inout 'io' 0.0:8) + ) + (cell 0 0 (top)) + (cell 1 0 (instance 'test' 't1' + (io input 'i' 0.0:2) + (io output 'o' 0.2:4) + (io inout 'io' 0.4:6) + )) + ) + """) + + def test_nir_io_concat(self): + f = Fragment() + io1 = IOPort(4) + io2 = IOPort(4) + f.add_subfragment(Instance("test", + io_io=Cat(io1, io2), + )) + nl = build_netlist(f, [io1, io2]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') + (io inout 'io1' 0.0:4) + (io inout 'io2' 1.0:4) + ) + (cell 0 0 (top)) + (cell 1 0 (instance 'test' 'U$0' + (io inout 'io' (io-cat 0.0:4 1.0:4)) + )) + ) + """) + def test_nir_operator(self): f = Fragment() i = Signal(3) o = Signal(4) - io = Signal(5) f.add_subfragment(Instance("gadget", i_i=i.as_signed(), o_o=o.as_signed(), - io_io=io.as_signed(), ), "my_gadget") - nl = build_netlist(f, [i, o, io]) + nl = build_netlist(f, [i, o]) self.assertRepr(nl, """ ( (module 0 None ('top') (input 'i' 0.2:5) - (inout 'io' 0.5:10) (output 'o' 1.0:4) ) (cell 0 0 (top (input 'i' 2:5) (output 'o' 1.0:4) - (inout 'io' 5:10) )) (cell 1 0 (instance 'gadget' 'my_gadget' (input 'i' 0.2:5) (output 'o' 0:4) - (inout 'io' 0.5:10) )) ) """) @@ -856,7 +982,7 @@ class NamesTestCase(FHDLTestCase): "o3": (o3, PortDirection.Output), } design = f.prepare(ports) - self.assertEqual(design.signal_names[design.fragment], SignalDict([ + self.assertEqual(design.fragments[design.fragment].signal_names, SignalDict([ (i, "i"), (rst, "rst"), (o1, "o1"), @@ -865,7 +991,7 @@ class NamesTestCase(FHDLTestCase): (cd_sync.clk, "clk"), (cd_sync.rst, "rst$6"), (cd_sync_norst.clk, "sync_norst_clk"), - (i1, "i$8"), + (i1, "i$7"), ])) def test_assign_names_to_fragments(self): @@ -874,11 +1000,9 @@ class NamesTestCase(FHDLTestCase): f.add_subfragment(b := Fragment(), name="b") design = Design(f, ports=(), hierarchy=("top",)) - self.assertEqual(design.fragment_names, { - f: ("top",), - a: ("top", "U$0"), - b: ("top", "b") - }) + self.assertEqual(design.fragments[f].name, ("top",)) + self.assertEqual(design.fragments[a].name, ("top", "U$0")) + self.assertEqual(design.fragments[b].name, ("top", "b")) def test_assign_names_to_fragments_rename_top(self): f = Fragment() @@ -886,11 +1010,9 @@ class NamesTestCase(FHDLTestCase): f.add_subfragment(b := Fragment(), name="b") design = Design(f, ports=[], hierarchy=("bench", "cpu")) - self.assertEqual(design.fragment_names, { - f: ("bench", "cpu",), - a: ("bench", "cpu", "U$0"), - b: ("bench", "cpu", "b") - }) + self.assertEqual(design.fragments[f].name, ("bench", "cpu",)) + self.assertEqual(design.fragments[a].name, ("bench", "cpu", "U$0")) + self.assertEqual(design.fragments[b].name, ("bench", "cpu", "b")) def test_assign_names_to_fragments_collide_with_signal(self): f = Fragment() @@ -898,10 +1020,8 @@ class NamesTestCase(FHDLTestCase): a_s = Signal(name="a") design = Design(f, ports=[("a", a_s, None)], hierarchy=("top",)) - self.assertEqual(design.fragment_names, { - f: ("top",), - a_f: ("top", "a$U$0") - }) + self.assertEqual(design.fragments[f].name, ("top",)) + self.assertEqual(design.fragments[a_f].name, ("top", "a$1")) def test_assign_names_to_fragments_duplicate(self): f = Fragment() @@ -909,11 +1029,9 @@ class NamesTestCase(FHDLTestCase): f.add_subfragment(a2_f := Fragment(), name="a") design = Design(f, ports=[], hierarchy=("top",)) - self.assertEqual(design.fragment_names, { - f: ("top",), - a1_f: ("top", "a"), - a2_f: ("top", "a$U$1"), - }) + self.assertEqual(design.fragments[f].name, ("top",)) + self.assertEqual(design.fragments[a1_f].name, ("top", "a")) + self.assertEqual(design.fragments[a2_f].name, ("top", "a$1")) class ElaboratesTo(Elaboratable): @@ -944,122 +1062,137 @@ class OriginsTestCase(FHDLTestCase): class IOBufferTestCase(FHDLTestCase): def test_nir_i(self): - pad = Signal(4) + port = IOPort(4) i = Signal(4) f = Fragment() - f.add_subfragment(IOBufferInstance(pad, i=i)) - nl = build_netlist(f, ports=[pad, i]) + f.add_subfragment(IOBufferInstance(port, i=i)) + nl = build_netlist(f, ports=[i]) self.assertRepr(nl, """ ( (module 0 None ('top') - (inout 'pad' 0.2:6) (output 'i' 1.0:4) + (io input 'port' 0.0:4) ) (cell 0 0 (top (output 'i' 1.0:4) - (inout 'pad' 2:6) )) - (cell 1 0 (iob 0.2:6 4'd0 0)) + (cell 1 0 (iob input 0.0:4)) ) """) def test_nir_o(self): - pad = Signal(4) + port = IOPort(4) o = Signal(4) f = Fragment() - f.add_subfragment(IOBufferInstance(pad, o=o)) - nl = build_netlist(f, ports=[pad, o]) + f.add_subfragment(IOBufferInstance(port, o=o)) + nl = build_netlist(f, ports=[o]) self.assertRepr(nl, """ ( (module 0 None ('top') - (input 'o' 0.6:10) - (inout 'pad' 0.2:6) + (input 'o' 0.2:6) + (io output 'port' 0.0:4) ) (cell 0 0 (top - (input 'o' 6:10) - (inout 'pad' 2:6) + (input 'o' 2:6) )) - (cell 1 0 (iob 0.2:6 0.6:10 1)) + (cell 1 0 (iob output 0.0:4 0.2:6 1)) ) """) def test_nir_oe(self): - pad = Signal(4) + port = IOPort(4) o = Signal(4) oe = Signal() f = Fragment() - f.add_subfragment(IOBufferInstance(pad, o=o, oe=oe)) - nl = build_netlist(f, ports=[pad, o, oe]) + f.add_subfragment(IOBufferInstance(port, o=o, oe=oe)) + nl = build_netlist(f, ports=[ o, oe]) self.assertRepr(nl, """ ( (module 0 None ('top') - (input 'o' 0.6:10) - (input 'oe' 0.10) - (inout 'pad' 0.2:6) + (input 'o' 0.2:6) + (input 'oe' 0.6) + (io output 'port' 0.0:4) ) (cell 0 0 (top - (input 'o' 6:10) - (input 'oe' 10:11) - (inout 'pad' 2:6) + (input 'o' 2:6) + (input 'oe' 6:7) )) - (cell 1 0 (iob 0.2:6 0.6:10 0.10)) + (cell 1 0 (iob output 0.0:4 0.2:6 0.6)) ) """) def test_nir_io(self): - pad = Signal(4) + port = IOPort(4) i = Signal(4) o = Signal(4) oe = Signal() f = Fragment() - f.add_subfragment(IOBufferInstance(pad, i=i, o=o, oe=oe)) - nl = build_netlist(f, ports=[pad, i, o, oe]) + f.add_subfragment(IOBufferInstance(port, i=i, o=o, oe=oe)) + nl = build_netlist(f, ports=[i, o, oe]) self.assertRepr(nl, """ ( (module 0 None ('top') - (input 'o' 0.6:10) - (input 'oe' 0.10) - (inout 'pad' 0.2:6) + (input 'o' 0.2:6) + (input 'oe' 0.6) (output 'i' 1.0:4) + (io inout 'port' 0.0:4) ) (cell 0 0 (top - (input 'o' 6:10) - (input 'oe' 10:11) + (input 'o' 2:6) + (input 'oe' 6:7) (output 'i' 1.0:4) - (inout 'pad' 2:6) )) - (cell 1 0 (iob 0.2:6 0.6:10 0.10)) + (cell 1 0 (iob inout 0.0:4 0.2:6 0.6)) ) """) + def test_wrong_port(self): + port = Signal(4) + i = Signal(4) + with self.assertRaisesRegex(TypeError, + r"^Object \(sig port\) cannot be converted to an IO value"): + IOBufferInstance(port, i=i) + def test_wrong_i(self): - pad = Signal(4) + port = IOPort(4) i = Signal() with self.assertRaisesRegex(ValueError, - r"^`pad` length \(4\) doesn't match `i` length \(1\)"): - IOBufferInstance(pad, i=i) + r"^'port' length \(4\) doesn't match 'i' length \(1\)"): + IOBufferInstance(port, i=i) def test_wrong_o(self): - pad = Signal(4) + port = IOPort(4) o = Signal() with self.assertRaisesRegex(ValueError, - r"^`pad` length \(4\) doesn't match `o` length \(1\)"): - IOBufferInstance(pad, o=o) + r"^'port' length \(4\) doesn't match 'o' length \(1\)"): + IOBufferInstance(port, o=o) def test_wrong_oe(self): - pad = Signal(4) + port = IOPort(4) o = Signal(4) oe = Signal(4) with self.assertRaisesRegex(ValueError, - r"^`oe` length \(4\) must be 1"): - IOBufferInstance(pad, o=o, oe=oe) + r"^'oe' length \(4\) must be 1"): + IOBufferInstance(port, o=o, oe=oe) def test_wrong_oe_without_o(self): - pad = Signal(4) + port = IOPort(4) oe = Signal() with self.assertRaisesRegex(ValueError, - r"^`oe` must not be used if `o` is not used"): - IOBufferInstance(pad, oe=oe) + r"^'oe' must not be used if 'o' is not used"): + IOBufferInstance(port, oe=oe) + + def test_conflict(self): + port = IOPort(4) + i1 = Signal(4) + i2 = Signal(4) + f = Fragment() + f.add_subfragment(IOBufferInstance(port, i=i1)) + f.add_subfragment(IOBufferInstance(port, i=i2)) + with self.assertRaisesRegex(DriverConflict, + r"^Bit 0 of I/O port \(io-port port\) used twice, at .*test_hdl_ir.py:\d+ and " + r".*test_hdl_ir.py:\d+$"): + build_netlist(f, ports=[i1, i2]) class AssignTestCase(FHDLTestCase):