Implement RFC 53: Low-level I/O primitives.

Co-authored-by: Catherine <whitequark@whitequark.org>
Co-authored-by: mcclure <mcclure@users.noreply.github.com>
This commit is contained in:
Wanda 2024-03-15 06:37:17 +01:00 committed by Catherine
parent 18b54ded0a
commit 744576011f
16 changed files with 1364 additions and 436 deletions

View file

@ -272,7 +272,7 @@ class MemoryInfo:
class ModuleEmitter:
def __init__(self, builder, netlist, module, name_map, empty_checker):
def __init__(self, builder, netlist: _nir.Netlist, module: _nir.Module, name_map, empty_checker):
self.builder = builder
self.netlist = netlist
self.module = module
@ -293,6 +293,7 @@ class ModuleEmitter:
self.sigport_wires = {} # signal or port name -> (wire, value)
self.driven_sigports = set() # set of signal or port name
self.nets = {} # net -> (wire name, bit idx)
self.ionets = {} # ionet -> (wire name, bit idx)
self.cell_wires = {} # cell idx -> wire name
self.instance_wires = {} # (cell idx, output name) -> wire name
@ -302,6 +303,7 @@ class ModuleEmitter:
self.collect_init_attrs()
self.emit_signal_wires()
self.emit_port_wires()
self.emit_io_port_wires()
self.emit_cell_wires()
self.emit_submodule_wires()
self.emit_connects()
@ -406,11 +408,28 @@ class ModuleEmitter:
self.sigport_wires[name] = (wire, value)
if flow == _nir.ModuleNetFlow.Output:
continue
# If we just emitted an input or inout port, it is driving the value.
# If we just emitted an input port, it is driving the value.
self.driven_sigports.add(name)
for bit, net in enumerate(value):
self.nets[net] = (wire, bit)
def emit_io_port_wires(self):
for idx, (name, (value, dir)) in enumerate(self.module.io_ports.items()):
port_id = idx + len(self.module.ports)
if self.module.parent is None:
port = self.netlist.io_ports[value[0].port]
attrs = port.attrs
src_loc = port.src_loc
else:
attrs = {}
src_loc = None
wire = self.builder.wire(width=len(value),
port_id=port_id, port_kind=dir.value,
name=name, attrs=attrs,
src=_src(src_loc))
for bit, net in enumerate(value):
self.ionets[net] = (wire, bit)
def emit_driven_wire(self, value):
# Emits a wire for a value, in preparation for driving it.
if value in self.value_names:
@ -454,7 +473,9 @@ class ModuleEmitter:
elif isinstance(cell, _nir.Initial):
width = 1
elif isinstance(cell, _nir.IOBuffer):
width = len(cell.pad)
if cell.dir is _nir.IODirection.Output:
continue # No outputs.
width = len(cell.port)
else:
assert False # :nocov:
# Single output cell connected to a wire.
@ -503,6 +524,28 @@ class ModuleEmitter:
return chunks[0]
return "{ " + " ".join(reversed(chunks)) + " }"
def io_sigspec(self, value: _nir.IOValue):
chunks = []
begin_pos = 0
while begin_pos < len(value):
end_pos = begin_pos
wire, start_bit = self.ionets[value[begin_pos]]
bit = start_bit
while (end_pos < len(value) and
self.ionets[value[end_pos]] == (wire, bit)):
end_pos += 1
bit += 1
width = end_pos - begin_pos
if width == 1:
chunks.append(f"{wire} [{start_bit}]")
else:
chunks.append(f"{wire} [{start_bit + width - 1}:{start_bit}]")
begin_pos = end_pos
if len(chunks) == 1:
return chunks[0]
return "{ " + " ".join(reversed(chunks)) + " }"
def emit_connects(self):
for name, (wire, value) in self.sigport_wires.items():
if name not in self.driven_sigports:
@ -513,10 +556,13 @@ class ModuleEmitter:
submodule = self.netlist.modules[submodule_idx]
if not self.empty_checker.is_empty(submodule_idx):
dotted_name = ".".join(submodule.name)
self.builder.cell(f"\\{dotted_name}", submodule.name[-1], ports={
name: self.sigspec(value)
for name, (value, _flow) in submodule.ports.items()
}, src=_src(submodule.cell_src_loc))
ports = {}
for name, (value, _flow) in submodule.ports.items():
ports[name] = self.sigspec(value)
for name, (value, _dir) in submodule.io_ports.items():
ports[name] = self.io_sigspec(value)
self.builder.cell(f"\\{dotted_name}", submodule.name[-1], ports=ports,
src=_src(submodule.cell_src_loc))
def emit_assignment_list(self, cell_idx, cell):
def emit_assignments(case, cond):
@ -761,14 +807,19 @@ class ModuleEmitter:
self.builder.cell(cell_type, ports=ports, params=params, src=_src(cell.src_loc))
def emit_io_buffer(self, cell_idx, cell):
self.builder.cell("$tribuf", ports={
"Y": self.sigspec(cell.pad),
"A": self.sigspec(cell.o),
"EN": self.sigspec(cell.oe),
}, params={
"WIDTH": len(cell.pad),
}, src=_src(cell.src_loc))
self.builder.connect(self.cell_wires[cell_idx], self.sigspec(cell.pad))
if cell.dir is not _nir.IODirection.Input:
if cell.dir is _nir.IODirection.Output and cell.oe == _nir.Net.from_const(1):
self.builder.connect(self.io_sigspec(cell.port), self.sigspec(cell.o))
else:
self.builder.cell("$tribuf", ports={
"Y": self.io_sigspec(cell.port),
"A": self.sigspec(cell.o),
"EN": self.sigspec(cell.oe),
}, params={
"WIDTH": len(cell.port),
}, src=_src(cell.src_loc))
if cell.dir is not _nir.IODirection.Output:
self.builder.connect(self.cell_wires[cell_idx], self.io_sigspec(cell.port))
def emit_memory(self, cell_idx, cell):
memory_info = self.memories[cell_idx]
@ -950,8 +1001,8 @@ class ModuleEmitter:
ports[name] = self.sigspec(nets)
for name in cell.ports_o:
ports[name] = self.instance_wires[cell_idx, name]
for name, nets in cell.ports_io.items():
ports[name] = self.sigspec(nets)
for name, (ionets, _dir) in cell.ports_io.items():
ports[name] = self.io_sigspec(ionets)
self.builder.cell(f"\\{cell.type}", cell.name, ports=ports, params=cell.parameters,
attrs=cell.attributes, src=_src(cell.src_loc))

View file

@ -222,7 +222,7 @@ class Platform(ResourceManager, metaclass=ABCMeta):
m = Module()
m.submodules += IOBufferInstance(
pad=port,
port=port,
o=self._invert_if(invert, pin.o),
oe=pin.oe,
)
@ -235,7 +235,7 @@ class Platform(ResourceManager, metaclass=ABCMeta):
m = Module()
i = Signal.like(pin.i)
m.submodules += IOBufferInstance(
pad=port,
port=port,
i=i,
o=self._invert_if(invert, pin.o),
oe=pin.oe,

View file

@ -14,6 +14,21 @@ class ResourceError(Exception):
pass
class SingleEndedPort:
def __init__(self, io):
self.io = io
class DifferentialPort:
def __init__(self, p, n):
self.p = p
self.n = n
class PortGroup:
pass
class ResourceManager:
def __init__(self, resources, connectors):
self.resources = OrderedDict()
@ -113,21 +128,13 @@ class ResourceManager:
attrs[attr_key] = attr_value
if isinstance(resource.ios[0], Subsignal):
members = OrderedDict()
sig_members = OrderedDict()
res = PortGroup()
for sub in resource.ios:
member = resolve(sub, dir[sub.name], xdr[sub.name],
path=path + (sub.name,),
attrs={**attrs, **sub.attrs})
members[sub.name] = member
sig_members[sub.name] = wiring.Out(member.signature)
signature = wiring.Signature(sig_members)
# Provide members ourselves instead of having the constructor
# create ones for us.
intf = object.__new__(wiring.PureInterface)
intf.signature = signature
intf.__dict__.update(members)
return intf
setattr(res, sub.name, member)
return res
elif isinstance(resource.ios[0], (Pins, DiffPairs)):
phys = resource.ios[0]
@ -136,17 +143,21 @@ class ResourceManager:
# ignore it as well.
if isinstance(phys, Pins):
phys_names = phys.names
port = wiring.Signature({"io": wiring.In(len(phys))}).create(path=path)
io = IOPort(len(phys), name="__".join(path) + "__io")
port = SingleEndedPort(io)
if isinstance(phys, DiffPairs):
phys_names = []
sig_members = {}
if not self.should_skip_port_component(None, attrs, "p"):
p = IOPort(len(phys), name="__".join(path) + "__p")
phys_names += phys.p.names
sig_members["p"] = wiring.In(len(phys))
else:
p = None
if not self.should_skip_port_component(None, attrs, "n"):
n = IOPort(len(phys), name="__".join(path) + "__n")
phys_names += phys.n.names
sig_members["n"] = wiring.In(len(phys))
port = wiring.Signature(sig_members).create(path=path)
else:
n = None
port = DifferentialPort(p, n)
if dir == "-":
pin = None
else:

View file

@ -2,9 +2,11 @@ from ._ast import Shape, unsigned, signed, ShapeCastable, ShapeLike
from ._ast import Value, ValueCastable, ValueLike
from ._ast import Const, C, Mux, Cat, Array, Signal, ClockSignal, ResetSignal
from ._ast import Format, Print, Assert, Assume, Cover
from ._ast import IOValue, IOPort
from ._dsl import SyntaxError, SyntaxWarning, Module
from ._cd import DomainError, ClockDomain
from ._ir import UnusedElaboratable, Elaboratable, DriverConflict, Fragment, Instance
from ._ir import UnusedElaboratable, Elaboratable, DriverConflict, Fragment
from ._ir import Instance, IOBufferInstance
from ._mem import MemoryIdentity, MemoryInstance, Memory, ReadPort, WritePort, DummyPort
from ._rec import Record
from ._xfrm import DomainRenamer, ResetInserter, EnableInserter
@ -16,12 +18,14 @@ __all__ = [
"Value", "ValueCastable", "ValueLike",
"Const", "C", "Mux", "Cat", "Array", "Signal", "ClockSignal", "ResetSignal",
"Format", "Print", "Assert", "Assume", "Cover",
"IOValue", "IOPort",
# _dsl
"SyntaxError", "SyntaxWarning", "Module",
# _cd
"DomainError", "ClockDomain",
# _ir
"UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance",
"UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment",
"Instance", "IOBufferInstance",
# _mem
"MemoryIdentity", "MemoryInstance", "Memory", "ReadPort", "WritePort", "DummyPort",
# _rec

View file

@ -17,7 +17,7 @@ from .._unused import *
__all__ = [
"Shape", "signed", "unsigned", "ShapeCastable", "ShapeLike",
"Value", "Const", "C", "AnyConst", "AnySeq", "Operator", "Mux", "Part", "Slice", "Cat",
"Value", "Const", "C", "AnyConst", "AnySeq", "Operator", "Mux", "Part", "Slice", "Cat", "Concat",
"Array", "ArrayProxy",
"Signal", "ClockSignal", "ResetSignal",
"ValueCastable", "ValueLike",
@ -25,6 +25,7 @@ __all__ = [
"Format",
"Statement", "Switch",
"Property", "Assign", "Print", "Assert", "Assume", "Cover",
"IOValue", "IOPort", "IOConcat", "IOSlice",
"SignalKey", "SignalDict", "SignalSet",
]
@ -1480,7 +1481,7 @@ class Const(Value, metaclass=_ConstMeta):
obj = Value.cast(obj)
if type(obj) is Const:
return obj
elif type(obj) is Cat:
elif type(obj) is Concat:
value = 0
width = 0
for part in obj.parts:
@ -1636,9 +1637,13 @@ def Mux(sel, val1, val0):
@final
class Slice(Value):
def __init__(self, value, start, stop, *, src_loc_at=0):
if not isinstance(start, int):
try:
start = int(operator.index(start))
except TypeError:
raise TypeError(f"Slice start must be an integer, not {start!r}")
if not isinstance(stop, int):
try:
stop = int(operator.index(stop))
except TypeError:
raise TypeError(f"Slice stop must be an integer, not {stop!r}")
value = Value.cast(value)
@ -1656,8 +1661,8 @@ class Slice(Value):
super().__init__(src_loc_at=src_loc_at)
self._value = value
self._start = int(operator.index(start))
self._stop = int(operator.index(stop))
self._start = start
self._stop = stop
@property
def value(self):
@ -1733,8 +1738,7 @@ class Part(Value):
self.width, self.stride)
@final
class Cat(Value):
def Cat(*parts, src_loc_at=0):
"""Concatenate values.
Form a compound ``Value`` from several smaller ones by concatenation.
@ -1758,10 +1762,19 @@ class Cat(Value):
Value, inout
Resulting ``Value`` obtained by concatenation.
"""
def __init__(self, *args, src_loc_at=0):
parts = list(flatten(parts))
if any(isinstance(part, IOValue) for part in parts):
return IOConcat(parts, src_loc_at=src_loc_at + 1)
else:
return Concat(parts, src_loc_at=src_loc_at + 1)
@final
class Concat(Value):
def __init__(self, args, src_loc_at=0):
super().__init__(src_loc_at=src_loc_at)
parts = []
for index, arg in enumerate(flatten(args)):
for index, arg in enumerate(args):
if isinstance(arg, Enum) and (not isinstance(type(arg), ShapeCastable) or
not hasattr(arg, "_amaranth_shape_")):
warnings.warn("Argument #{} of Cat() is an enumerated value {!r} without "
@ -2732,6 +2745,162 @@ class Switch(Statement):
return "(switch {!r} {})".format(self.test, " ".join(case_reprs))
class IOValue(metaclass=ABCMeta):
@staticmethod
def cast(obj):
if isinstance(obj, IOValue):
return obj
elif isinstance(obj, Value) and len(obj) == 0:
return IOConcat(())
else:
raise TypeError(f"Object {obj!r} cannot be converted to an IO value")
def __init__(self, *, src_loc_at=0):
self.src_loc = tracer.get_src_loc(1 + src_loc_at)
@property
@abstractmethod
def metadata(self):
raise NotImplementedError # :nocov:
def __getitem__(self, key):
n = len(self)
if isinstance(key, int):
if key not in range(-n, n):
raise IndexError(f"Index {key} is out of bounds for a {n}-bit IO value")
if key < 0:
key += n
return IOSlice(self, key, key + 1, src_loc_at=1)
elif isinstance(key, slice):
start, stop, step = key.indices(n)
if step != 1:
return IOConcat((self[i] for i in range(start, stop, step)), src_loc_at=1)
return IOSlice(self, start, stop, src_loc_at=1)
else:
raise TypeError(f"Cannot index IO value with {key!r}")
@abstractmethod
def _ioports(self):
raise NotImplementedError # :nocov:
@final
class IOPort(IOValue):
def __init__(self, width, *, name=None, attrs=None, metadata=None, src_loc_at=0):
super().__init__(src_loc_at=src_loc_at)
if name is not None and not isinstance(name, str):
raise TypeError(f"Name must be a string, not {name!r}")
self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
self._width = operator.index(width)
self._attrs = dict(() if attrs is None else attrs)
self._metadata = (None,) * self._width if metadata is None else tuple(metadata)
if len(self._metadata) != self._width:
raise ValueError(f"Metadata length ({len(self._metadata)}) doesn't match port width ({self._width})")
def __len__(self):
return self._width
@property
def width(self):
return self._width
@property
def attrs(self):
return self._attrs
@property
def metadata(self):
return self._metadata
def _ioports(self):
return {self}
def __repr__(self):
return f"(io-port {self.name})"
@final
class IOConcat(IOValue):
def __init__(self, parts, src_loc_at=0):
super().__init__(src_loc_at=src_loc_at)
self._parts = tuple(IOValue.cast(part) for part in parts)
@property
def parts(self):
return self._parts
def __len__(self):
return sum(len(part) for part in self.parts)
@property
def metadata(self):
return tuple(obj for part in self._parts for obj in part.metadata)
def _ioports(self):
return {port for part in self._parts for port in part._ioports()}
def __repr__(self):
return "(io-cat {})".format(" ".join(map(repr, self.parts)))
@final
class IOSlice(IOValue):
def __init__(self, value, start, stop, *, src_loc_at=0):
try:
start = int(operator.index(start))
except TypeError:
raise TypeError(f"Slice start must be an integer, not {start!r}")
try:
stop = int(operator.index(stop))
except TypeError:
raise TypeError(f"Slice stop must be an integer, not {stop!r}")
value = IOValue.cast(value)
n = len(value)
if start not in range(-n, n+1):
raise IndexError(f"Cannot start slice {start} bits into {n}-bit value")
if start < 0:
start += n
if stop not in range(-n, n+1):
raise IndexError(f"Cannot stop slice {stop} bits into {n}-bit value")
if stop < 0:
stop += n
if start > stop:
raise IndexError(f"Slice start {start} must be less than slice stop {stop}")
super().__init__(src_loc_at=src_loc_at)
self._value = value
self._start = start
self._stop = stop
@property
def value(self):
return self._value
@property
def start(self):
return self._start
@property
def stop(self):
return self._stop
def __len__(self):
return self.stop - self.start
@property
def metadata(self):
return self._value.metadata[self.start:self.stop]
def _ioports(self):
return self.value._ioports()
def __repr__(self):
return f"(io-slice {self.value!r} {self.start}:{self.stop})"
class _MappedKeyCollection(metaclass=ABCMeta):
@abstractmethod
def _map_key(self, key):

View file

@ -274,8 +274,8 @@ class Fragment:
if dir is not None and not isinstance(dir, PortDirection):
raise TypeError(
f"Port direction must be a `PortDirection` instance or None, not {dir!r}")
if not isinstance(signal, (_ast.Signal, _ast.ClockSignal, _ast.ResetSignal)):
raise TypeError(f"Only signals may be added as ports, not {signal!r}")
if not isinstance(signal, (_ast.Signal, _ast.ClockSignal, _ast.ResetSignal, _ast.IOPort)):
raise TypeError(f"Only signals and IO ports may be added as ports, not {signal!r}")
return new_ports
@ -328,7 +328,12 @@ class Instance(Fragment):
elif kind == "p":
self.parameters[name] = value
elif kind in ("i", "o", "io"):
self.named_ports[name] = (_ast.Value.cast(value), kind)
if kind == "io":
value = _ast.IOValue.cast(value)
else:
if not isinstance(value, _ast.IOValue):
value = _ast.Value.cast(value)
self.named_ports[name] = (value, kind)
else:
raise NameError("Instance argument {!r} should be a tuple (kind, name, value) "
"where kind is one of \"a\", \"p\", \"i\", \"o\", or \"io\""
@ -340,11 +345,15 @@ class Instance(Fragment):
elif kw.startswith("p_"):
self.parameters[kw[2:]] = arg
elif kw.startswith("i_"):
self.named_ports[kw[2:]] = (_ast.Value.cast(arg), "i")
if not isinstance(arg, _ast.IOValue):
arg = _ast.Value.cast(arg)
self.named_ports[kw[2:]] = (arg, "i")
elif kw.startswith("o_"):
self.named_ports[kw[2:]] = (_ast.Value.cast(arg), "o")
if not isinstance(arg, _ast.IOValue):
arg = _ast.Value.cast(arg)
self.named_ports[kw[2:]] = (arg, "o")
elif kw.startswith("io_"):
self.named_ports[kw[3:]] = (_ast.Value.cast(arg), "io")
self.named_ports[kw[3:]] = (_ast.IOValue.cast(arg), "io")
else:
raise NameError("Instance keyword argument {}={!r} does not start with one of "
"\"a_\", \"p_\", \"i_\", \"o_\", or \"io_\""
@ -352,33 +361,55 @@ class Instance(Fragment):
class IOBufferInstance(Fragment):
def __init__(self, pad, *, i=None, o=None, oe=None, src_loc_at=0, src_loc=None):
def __init__(self, port, *, i=None, o=None, oe=None, src_loc_at=0, src_loc=None):
super().__init__()
self.pad = _ast.Value.cast(pad)
self.port = _ast.IOValue.cast(port)
if i is None:
self.i = None
else:
self.i = _ast.Value.cast(i)
if len(self.pad) != len(self.i):
raise ValueError(f"`pad` length ({len(self.pad)}) doesn't match `i` length ({len(self.i)})")
if len(self.port) != len(self.i):
raise ValueError(f"'port' length ({len(self.port)}) doesn't match 'i' length ({len(self.i)})")
if o is None:
if oe is not None:
raise ValueError("`oe` must not be used if `o` is not used")
self.o = _ast.Const(0, len(self.pad))
self.oe = _ast.Const(0)
raise ValueError("'oe' must not be used if 'o' is not used")
self.o = None
self.oe = None
else:
self.o = _ast.Value.cast(o)
if len(self.pad) != len(self.o):
raise ValueError(f"`pad` length ({len(self.pad)}) doesn't match `o` length ({len(self.o)})")
if len(self.port) != len(self.o):
raise ValueError(f"'port' length ({len(self.port)}) doesn't match 'o' length ({len(self.o)})")
if oe is None:
self.oe = _ast.Const(1)
else:
self.oe = _ast.Value.cast(oe)
if len(self.oe) != 1:
raise ValueError(f"`oe` length ({len(self.oe)}) must be 1")
raise ValueError(f"'oe' length ({len(self.oe)}) must be 1")
self.src_loc = src_loc or tracer.get_src_loc(src_loc_at)
self.src_loc = src_loc or tracer.get_src_loc(src_loc_at)
def _add_name(assigned_names, name):
if name in assigned_names:
name = f"{name}${len(assigned_names)}"
assert name not in assigned_names
assigned_names.add(name)
return name
class DesignFragmentInfo:
def __init__(self, parent, depth):
self.parent = parent
self.depth = depth
self.signal_names = _ast.SignalDict()
self.io_port_names = {}
# Fixed up later.
self.name: "tuple[str]" = ()
self.assigned_names = set()
# These two are used as sets, but are stored as dicts to ensure deterministic iteration order.
self.used_signals = _ast.SignalDict()
self.used_io_ports = {}
class Design:
@ -386,103 +417,199 @@ class Design:
Returned by ``Fragment.prepare``."""
def __init__(self, fragment, ports, *, hierarchy):
def __init__(self, fragment: Fragment, ports, *, hierarchy):
self.fragment = fragment
self.ports = ports
self.ports = list(ports)
self.hierarchy = hierarchy
# dict of Fragment to SignalDict of Signal to name
self.signal_names = {}
self.fragment_names = {}
self._assign_names_to_signals(fragment, ports)
self._assign_names_to_fragments(fragment, hierarchy)
# Use just-assigned signal names to name all unnamed ports.
top_names = self.signal_names[fragment]
self.ports = [
(name or top_names[signal], signal, dir)
for (name, signal, dir) in self.ports
]
self.fragments: dict[Fragment, DesignFragmentInfo] = {}
self.signal_lca = _ast.SignalDict()
self._compute_fragment_depth_parent(fragment, None, 0)
self._collect_used_signals(fragment)
self._add_io_ports()
self._assign_port_names()
for name, conn, dir in self.ports:
if isinstance(conn, _ast.IOPort):
self._use_io_port(fragment, conn)
else:
self._use_signal(fragment, conn)
self._assign_names(fragment, hierarchy)
def _assign_names_to_signals(self, fragment, ports=None):
"""Assign names to signals used in a given fragment.
The mapping is set in ``self.signal_names``. Because names are deduplicated using local
information only, the same signal used in a different fragment may get a different name.
"""
signal_names = _ast.SignalDict()
assigned_names = set()
def add_signal_name(signal):
if signal not in signal_names:
if signal.name not in assigned_names:
name = signal.name
else:
name = f"{signal.name}${len(assigned_names)}"
assert name not in assigned_names
signal_names[signal] = name
assigned_names.add(name)
if ports is not None:
# First pass: reserve names for pre-named top-level ports. If equal to the signal name, let the signal share it.
for name, signal, _dir in ports:
if name is not None:
assigned_names.add(name)
if signal.name == name:
signal_names[signal] = name
# Second pass: ensure non-pre-named top-level ports are named first.
for name, signal, _dir in ports:
if name is None:
add_signal_name(signal)
for domain_name, domain_signals in fragment.drivers.items():
if domain_name != "comb":
domain = fragment.domains[domain_name]
add_signal_name(domain.clk)
if domain.rst is not None:
add_signal_name(domain.rst)
for statements in fragment.statements.values():
for statement in statements:
for signal in statement._lhs_signals() | statement._rhs_signals():
if not isinstance(signal, (_ast.ClockSignal, _ast.ResetSignal)):
add_signal_name(signal)
self.signal_names[fragment] = signal_names
def _compute_fragment_depth_parent(self, fragment: Fragment, parent: "Fragment | None", depth: int):
"""Recursively computes every fragment's depth and parent."""
self.fragments[fragment] = DesignFragmentInfo(parent, depth)
for subfragment, _name, _src_loc in fragment.subfragments:
self._assign_names_to_signals(subfragment)
self._compute_fragment_depth_parent(subfragment, fragment, depth + 1)
def _assign_names_to_fragments(self, fragment, hierarchy):
"""Assign names to this fragment and its subfragments.
def _use_signal(self, fragment: Fragment, signal: _ast.Signal):
"""Marks a signal as used in a given fragment.
Also marks a signal as used if it has to be routed through a given fragment to get from
one part of hierarchy to another. For this purpose, the ``self.signal_lca`` dictionary
is maintained: for every signal, it stores the topmost fragment in which it has been
marked used so far.
"""
if signal in self.fragments[fragment].used_signals:
return
self.fragments[fragment].used_signals[signal] = None
if signal not in self.signal_lca:
# First time we see a signal.
self.signal_lca[signal] = fragment
return
# Signal already seen — go from current fragment to the LCA, marking everything along
# the way as used.
lca = self.signal_lca[signal]
# First, go up from our fragment until it is no deeper than current LCA.
while self.fragments[lca].depth < self.fragments[fragment].depth:
fragment = self.fragments[fragment].parent
# Early return if we reach a part of tree where the signal is already marked.
if signal in self.fragments[fragment].used_signals:
return
self.fragments[fragment].used_signals[signal] = None
# Second, go up from current LCA until it is no deeper than our fragment.
while self.fragments[lca].depth > self.fragments[fragment].depth:
lca = self.fragments[lca].parent
self.fragments[lca].used_signals[signal] = None
# Now, both fragments are at the same depth. Go up from both until the two paths meet.
while fragment is not lca:
lca = self.fragments[lca].parent
self.fragments[lca].used_signals[signal] = None
fragment = self.fragments[fragment].parent
self.fragments[fragment].used_signals[signal] = None
self.signal_lca[signal] = lca
def _use_io_port(self, fragment: Fragment, port: _ast.IOPort):
"""Marks an IO port as used in a given fragment and all its ancestors."""
frag_info = self.fragments[fragment]
if port in frag_info.used_io_ports:
return
frag_info.used_io_ports[port] = None
if frag_info.parent is not None:
self._use_io_port(frag_info.parent, port)
def _collect_used_signals(self, fragment: Fragment):
"""Collects used signals and IO ports for a fragment and all its subfragments."""
from . import _mem
if isinstance(fragment, _ir.Instance):
for conn, kind in fragment.named_ports.values():
if isinstance(conn, _ast.IOValue):
for port in conn._ioports():
self._use_io_port(fragment, port)
elif isinstance(conn, _ast.Value):
for signal in conn._rhs_signals():
self._use_signal(fragment, signal)
else:
assert False # :nocov:
elif isinstance(fragment, _ir.IOBufferInstance):
for port in fragment.port._ioports():
self._use_io_port(fragment, port)
if fragment.i is not None:
for signal in fragment.i._rhs_signals():
self._use_signal(fragment, signal)
if fragment.o is not None:
for signal in fragment.o._rhs_signals():
self._use_signal(fragment, signal)
for signal in fragment.oe._rhs_signals():
self._use_signal(fragment, signal)
elif isinstance(fragment, _mem.MemoryInstance):
for port in fragment._read_ports:
for signal in port._addr._rhs_signals():
self._use_signal(fragment, signal)
for signal in port._data._rhs_signals():
self._use_signal(fragment, signal)
for signal in port._en._rhs_signals():
self._use_signal(fragment, signal)
if port._domain != "comb":
domain = fragment.domains[port._domain]
self._use_signal(fragment, domain.clk)
if domain.rst is not None:
self._use_signal(fragment, domain.rst)
for port in fragment._write_ports:
for signal in port._addr._rhs_signals():
self._use_signal(fragment, signal)
for signal in port._data._rhs_signals():
self._use_signal(fragment, signal)
for signal in port._en._rhs_signals():
self._use_signal(fragment, signal)
domain = fragment.domains[port._domain]
self._use_signal(fragment, domain.clk)
if domain.rst is not None:
self._use_signal(fragment, domain.rst)
else:
for domain_name, statements in fragment.statements.items():
if domain_name != "comb":
domain = fragment.domains[domain_name]
self._use_signal(fragment, domain.clk)
if domain.rst is not None:
self._use_signal(fragment, domain.rst)
for statement in statements:
for signal in statement._lhs_signals() | statement._rhs_signals():
if not isinstance(signal, (_ast.ClockSignal, _ast.ResetSignal)):
self._use_signal(fragment, signal)
for subfragment, _name, _src_loc in fragment.subfragments:
self._collect_used_signals(subfragment)
def _add_io_ports(self):
"""Adds all used IO ports to our list of top-level ports, if they aren't there already."""
io_ports = {conn for name, conn, dir in self.ports if isinstance(conn, _ast.IOPort)}
for port in self.fragments[self.fragment].used_io_ports:
if port not in io_ports:
self.ports.append((None, port, None))
def _assign_port_names(self):
"""Assigns names to all ports that haven't been explicitly named."""
new_ports = []
assigned_names = {name for name, conn, dir in self.ports if name is not None}
for name, conn, dir in self.ports:
if name is None:
name = _add_name(assigned_names, conn.name)
assigned_names.add(name)
new_ports.append((name, conn, dir))
self.ports = new_ports
def _assign_names(self, fragment: Fragment, hierarchy: "tuple[str]"):
"""Assign names to signals and IO ports used in a given fragment, as well as its
subfragments.
The signal mapping is set in ``self.signal_names``, and the IO port mapping is set in
``self.io_port_names``. Because names are deduplicated using local information only,
the same signal used in a different fragment may get a different name.
Subfragments may not necessarily have a name. This method assigns every such subfragment
a name, ``U$<number>``, where ``<number>`` is based on its location in the hierarchy.
Subfragment names may collide with signal names safely in Amaranth, but this may confuse
backends. This method assigns every such subfragment a name, ``<name>$U$<number>``, where
``name`` is its original name, and ``<number>`` is based on its location in the hierarchy.
backends. This method assigns every such subfragment a new name.
Arguments
---------
hierarchy : tuple of str
Name of this fragment.
Returns
-------
dict of Fragment to tuple of str
A mapping from this fragment and its subfragments to their full hierarchical names.
"""
self.fragment_names[fragment] = hierarchy
taken_names = set(self.signal_names[fragment].values())
frag_info = self.fragments[fragment]
frag_info.name = hierarchy
if fragment is self.fragment:
# Reserve names for top-level ports. If equal to the signal name, let the signal share it.
for name, conn, _dir in self.ports:
frag_info.assigned_names.add(name)
if isinstance(conn, _ast.Signal) and conn.name == name:
frag_info.signal_names[conn] = name
elif isinstance(conn, _ast.IOPort) and conn.name == name:
frag_info.io_port_names[conn] = name
for signal in frag_info.used_signals:
if signal not in frag_info.signal_names:
frag_info.signal_names[signal] = _add_name(frag_info.assigned_names, signal.name)
for port in frag_info.used_io_ports:
if port not in frag_info.io_port_names:
frag_info.io_port_names[port] = _add_name(frag_info.assigned_names, port.name)
for subfragment_index, (subfragment, subfragment_name, subfragment_src_loc) in enumerate(fragment.subfragments):
if subfragment_name is None:
subfragment_name = f"U${subfragment_index}"
elif subfragment_name in taken_names:
subfragment_name = f"{subfragment_name}$U${subfragment_index}"
assert subfragment_name not in taken_names
taken_names.add(subfragment_name)
self._assign_names_to_fragments(subfragment, hierarchy=(*hierarchy, subfragment_name))
subfragment_name = _add_name(frag_info.assigned_names, subfragment_name)
self._assign_names(subfragment, hierarchy=(*hierarchy, subfragment_name))
############################################################################################### >:3
@ -523,11 +650,13 @@ class NetlistEmitter:
self.netlist = netlist
self.design = design
self.drivers = _ast.SignalDict()
self.io_ports: dict[_ast.IOPort, int] = {}
self.rhs_cache: dict[int, Tuple[_nir.Value, bool, _ast.Value]] = {}
# Collected for driver conflict diagnostics only.
self.late_net_to_signal = {}
self.connect_src_loc = {}
self.ionet_src_loc = {}
def emit_signal(self, signal) -> _nir.Value:
if signal in self.netlist.signals:
@ -538,11 +667,43 @@ class NetlistEmitter:
self.late_net_to_signal[net] = (signal, bit)
return value
def emit_io(self, value: _ast.IOValue) -> _nir.IOValue:
if isinstance(value, _ast.IOPort):
if value not in self.io_ports:
port = len(self.netlist.io_ports)
self.netlist.io_ports.append(value)
self.io_ports[value] = _nir.IOValue(
_nir.IONet.from_port(port, bit)
for bit in range(0, len(value))
)
return self.io_ports[value]
elif isinstance(value, _ast.IOConcat):
result = []
for part in value.parts:
result += self.emit_io(part)
return _nir.IOValue(result)
elif isinstance(value, _ast.IOSlice):
return self.emit_io(value.value)[value.start:value.stop]
else:
raise TypeError # :nocov:
def emit_io_use(self, value: _ast.IOValue, *, src_loc) -> _nir.IOValue:
res = self.emit_io(value)
for net in res:
if net not in self.ionet_src_loc:
self.ionet_src_loc[net] = src_loc
else:
prev_src_loc = self.ionet_src_loc[net]
port = self.netlist.io_ports[net.port]
raise DriverConflict(f"Bit {net.bit} of I/O port {port!r} used twice, at "
f"{prev_src_loc[0]}:{prev_src_loc[1]} and {src_loc[0]}:{src_loc[1]}")
return res
# Used for instance outputs and read port data, not used for actual assignments.
def emit_lhs(self, value: _ast.Value):
if isinstance(value, _ast.Signal):
return self.emit_signal(value)
elif isinstance(value, _ast.Cat):
elif isinstance(value, _ast.Concat):
result = []
for part in value.parts:
result += self.emit_lhs(part)
@ -739,7 +900,7 @@ class NetlistEmitter:
src_loc=value.src_loc)
result = self.netlist.add_value_cell(shape.width, cell)
signed = shape.signed
elif isinstance(value, _ast.Cat):
elif isinstance(value, _ast.Concat):
nets = []
for val in value.parts:
inner, _signed = self.emit_rhs(module_idx, val)
@ -801,7 +962,7 @@ class NetlistEmitter:
src_loc=src_loc))
elif isinstance(lhs, _ast.Slice):
self.emit_assign(module_idx, cd, lhs.value, lhs_start + lhs.start, rhs, cond, src_loc=src_loc)
elif isinstance(lhs, _ast.Cat):
elif isinstance(lhs, _ast.Concat):
part_stop = 0
for part in lhs.parts:
part_start = part_stop
@ -958,12 +1119,21 @@ class NetlistEmitter:
assert False # :nocov:
def emit_iobuffer(self, module_idx: int, instance: _ir.IOBufferInstance):
pad = self.emit_lhs(instance.pad)
o, _signed = self.emit_rhs(module_idx, instance.o)
(oe,), _signed = self.emit_rhs(module_idx, instance.oe)
assert len(pad) == len(o)
cell = _nir.IOBuffer(module_idx, pad=pad, o=o, oe=oe, src_loc=instance.src_loc)
value = self.netlist.add_value_cell(len(pad), cell)
port = self.emit_io_use(instance.port, src_loc=instance.src_loc)
if instance.o is None:
o = None
oe = None
dir = _nir.IODirection.Input
else:
o, _signed = self.emit_rhs(module_idx, instance.o)
(oe,), _signed = self.emit_rhs(module_idx, instance.oe)
assert len(port) == len(o)
if instance.i is None:
dir = _nir.IODirection.Output
else:
dir = _nir.IODirection.Bidir
cell = _nir.IOBuffer(module_idx, port=port, dir=dir, o=o, oe=oe, src_loc=instance.src_loc)
value = self.netlist.add_value_cell(len(port), cell)
if instance.i is not None:
self.connect(self.emit_lhs(instance.i), value, src_loc=instance.src_loc)
@ -1032,15 +1202,23 @@ class NetlistEmitter:
outputs = []
next_output_bit = 0
for port_name, (port_conn, dir) in instance.named_ports.items():
if dir == 'i':
if isinstance(port_conn, _ast.IOValue):
if dir == 'i':
xlat_dir = _nir.IODirection.Input
elif dir == 'o':
xlat_dir = _nir.IODirection.Output
elif dir == 'io':
xlat_dir = _nir.IODirection.Bidir
else:
assert False # :nocov:
ports_io[port_name] = (self.emit_io_use(port_conn, src_loc=instance.src_loc), xlat_dir)
elif dir == 'i':
ports_i[port_name], _signed = self.emit_rhs(module_idx, port_conn)
elif dir == 'o':
port_conn = self.emit_lhs(port_conn)
ports_o[port_name] = (next_output_bit, len(port_conn))
outputs.append((next_output_bit, port_conn))
next_output_bit += len(port_conn)
elif dir == 'io':
ports_io[port_name] = self.emit_lhs(port_conn)
else:
assert False # :nocov:
cell = _nir.Instance(module_idx,
@ -1059,31 +1237,20 @@ class NetlistEmitter:
src_loc=instance.src_loc)
def emit_top_ports(self, fragment: _ir.Fragment):
inouts = set()
for cell in self.netlist.cells:
if isinstance(cell, _nir.IOBuffer):
inouts.update(cell.pad)
if isinstance(cell, _nir.Instance):
for value in cell.ports_io.values():
inouts.update(value)
next_input_bit = 2 # 0 and 1 are reserved for constants
top = self.netlist.top
for name, signal, dir in self.design.ports:
if isinstance(signal, _ast.IOPort):
continue
signal_value = self.emit_signal(signal)
if dir is None:
is_driven = False
is_inout = False
for net in signal_value:
if net in self.netlist.connections:
is_driven = True
if net in inouts:
is_inout = True
if is_driven:
dir = PortDirection.Output
elif is_inout:
dir = PortDirection.Inout
else:
dir = PortDirection.Input
if dir == PortDirection.Input:
@ -1097,13 +1264,7 @@ class NetlistEmitter:
elif dir == PortDirection.Output:
top.ports_o[name] = signal_value
elif dir == PortDirection.Inout:
top.ports_io[name] = (next_input_bit, signal.width)
value = _nir.Value(
_nir.Net.from_cell(0, bit)
for bit in range(next_input_bit, next_input_bit + signal.width)
)
next_input_bit += signal.width
self.connect(signal_value, value, src_loc=signal.src_loc)
raise ValueError(f"Port direction 'Inout' can only be used for 'IOPort', not 'Signal'")
else:
raise ValueError(f"Invalid port direction {dir!r}")
@ -1159,7 +1320,7 @@ class NetlistEmitter:
def emit_fragment(self, fragment: _ir.Fragment, parent_module_idx: 'int | None', *, cell_src_loc=None):
from . import _mem
fragment_name = self.design.fragment_names[fragment]
fragment_name = self.design.fragments[fragment].name
if isinstance(fragment, _ir.Instance):
assert parent_module_idx is not None
self.emit_instance(parent_module_idx, fragment, name=fragment_name[-1])
@ -1176,10 +1337,14 @@ class NetlistEmitter:
self.emit_iobuffer(parent_module_idx, fragment)
elif type(fragment) is _ir.Fragment:
module_idx = self.netlist.add_module(parent_module_idx, fragment_name, src_loc=fragment.src_loc, cell_src_loc=cell_src_loc)
signal_names = self.design.signal_names[fragment]
signal_names = self.design.fragments[fragment].signal_names
self.netlist.modules[module_idx].signal_names = signal_names
io_port_names = self.design.fragments[fragment].io_port_names
self.netlist.modules[module_idx].io_port_names = io_port_names
for signal in signal_names:
self.emit_signal(signal)
for port in io_port_names:
self.emit_io(port)
for domain, stmts in fragment.statements.items():
for stmt in stmts:
self.emit_stmt(module_idx, fragment, domain, stmt, _nir.Net.from_const(1))
@ -1227,8 +1392,6 @@ def _compute_net_flows(netlist: _nir.Netlist):
# - [no flow]
# - [no flow]
# - [no flow]
#
# This function doesn't assign the Inout flow — that is corrected later, in compute_ports.
lca = {}
# Initialize by marking the definition point of every net.
@ -1293,20 +1456,11 @@ def _compute_ports(netlist: _nir.Netlist):
port_starts = set()
for start, _ in netlist.top.ports_i.values():
port_starts.add(_nir.Net.from_cell(0, start))
for start, width in netlist.top.ports_io.values():
port_starts.add(_nir.Net.from_cell(0, start))
for cell_idx, cell in enumerate(netlist.cells):
if isinstance(cell, _nir.Instance):
for start, _ in cell.ports_o.values():
port_starts.add(_nir.Net.from_cell(cell_idx, start))
# Compute the set of all inout nets. Currently, a net has inout flow iff it is connected to
# a toplevel inout port.
inouts = set()
for start, width in netlist.top.ports_io.values():
for idx in range(start, start + width):
inouts.add(_nir.Net.from_cell(0, idx))
for module in netlist.modules:
# Collect preferred names for ports. If a port exactly matches a signal, we reuse
# the signal name for the port. Otherwise, we synthesize a private name.
@ -1316,11 +1470,6 @@ def _compute_ports(netlist: _nir.Netlist):
if value not in name_table and not name.startswith('$'):
name_table[value] = name
# Adjust any input flows to inout as necessary.
for (net, flow) in module.net_flow.items():
if flow == _nir.ModuleNetFlow.Input and net in inouts:
module.net_flow[net] = _nir.ModuleNetFlow.Inout
# Gather together "adjacent" nets with the same flow into ports.
visited = set()
for net in sorted(module.net_flow):
@ -1360,15 +1509,88 @@ def _compute_ports(netlist: _nir.Netlist):
_nir.Value(_nir.Net.from_cell(0, start + bit) for bit in range(width)),
_nir.ModuleNetFlow.Input
)
for name, (start, width) in netlist.top.ports_io.items():
top_module.ports[name] = (
_nir.Value(_nir.Net.from_cell(0, start + bit) for bit in range(width)),
_nir.ModuleNetFlow.Inout
)
for name, value in netlist.top.ports_o.items():
top_module.ports[name] = (value, _nir.ModuleNetFlow.Output)
def _compute_ionet_dirs(netlist: _nir.Netlist):
# Collects the direction of every IO net, for every module it needs to be routed through.
for cell in netlist.cells:
for (net, dir) in cell.io_nets():
module_idx = cell.module_idx
while module_idx is not None:
netlist.modules[module_idx].ionet_dir[net] = dir
module_idx = netlist.modules[module_idx].parent
def _compute_io_ports(netlist: _nir.Netlist, ports):
io_ports = {
port: _nir.IOValue(_nir.IONet.from_port(idx, bit) for bit in range(len(port)))
for idx, port in enumerate(netlist.io_ports)
}
for module in netlist.modules:
if module.parent is None:
# Top module gets special treatment: each IOPort is added in its entirety.
for (name, port, dir) in ports:
dir = {
PortDirection.Input: _nir.IODirection.Input,
PortDirection.Output: _nir.IODirection.Output,
PortDirection.Inout: _nir.IODirection.Bidir,
None: None,
}[dir]
if not isinstance(port, _ast.IOPort):
continue
auto_dir = None
for net in io_ports[port]:
if net in module.ionet_dir:
if auto_dir is None:
auto_dir = module.ionet_dir[net]
else:
auto_dir |= module.ionet_dir[net]
if dir is None:
dir = auto_dir
if auto_dir is None:
dir = _nir.IODirection.Bidir
else:
if auto_dir is not None and (auto_dir | dir) != dir:
raise ValueError(f"Port {name} is {dir.value}, but is used as {auto_dir.value}")
module.io_ports[name] = (io_ports[port], dir)
else:
# Collect preferred names for ports. If a port exactly matches a signal, we reuse
# the signal name for the port. Otherwise, we synthesize a private name.
name_table = {}
for port, name in module.io_port_names.items():
value = io_ports[port]
if value not in name_table and not name.startswith('$'):
name_table[value] = name
# Gather together "adjacent" nets with the same flow into ports.
visited = set()
for net in sorted(module.ionet_dir):
dir = module.ionet_dir[net]
if net in visited:
continue
# We found a net that needs a port. Keep joining the next nets output by the same
# cell into the same port, if applicable, but stop at instance/top port boundaries.
nets = [net]
while True:
succ = _nir.IONet.from_port(net.port, net.bit + 1)
if succ not in module.ionet_dir:
break
if module.ionet_dir[succ] != module.ionet_dir[net]:
break
net = succ
nets.append(net)
value = _nir.IOValue(nets)
# Joined as many nets as we could, now name and add the port.
if value in name_table:
name = name_table[value]
else:
name = f"ioport${value[0].port}${value[0].bit}"
module.io_ports[name] = (value, dir)
visited.update(value)
def build_netlist(fragment, ports=(), *, name="top", **kwargs):
if isinstance(fragment, Design):
design = fragment
@ -1379,4 +1601,6 @@ def build_netlist(fragment, ports=(), *, name="top", **kwargs):
netlist.resolve_all_nets()
_compute_net_flows(netlist)
_compute_ports(netlist)
_compute_ionet_dirs(netlist)
_compute_io_ports(netlist, design.ports)
return netlist

View file

@ -2,12 +2,14 @@ from typing import Iterable
import enum
from ._ast import SignalDict
from . import _ast
__all__ = [
# Netlist core
"Net", "Value", "FormatValue", "Format",
"Netlist", "ModuleNetFlow", "Module", "Cell", "Top",
"Net", "Value", "IONet", "IOValue",
"FormatValue", "Format",
"Netlist", "ModuleNetFlow", "IODirection", "Module", "Cell", "Top",
# Computation cells
"Operator", "Part",
# Decision tree cells
@ -153,7 +155,7 @@ class Value(tuple):
chunks.append(f"{cell}.{start_bit}:{end_bit}")
pos = next_pos
if len(chunks) == 0:
return "(0'd0)"
return "()"
elif len(chunks) == 1:
return chunks[0]
else:
@ -162,6 +164,76 @@ class Value(tuple):
__str__ = __repr__
class IONet(int):
__slots__ = ()
@classmethod
def from_port(cls, port: int, bit: int):
assert bit in range(1 << 16)
assert port >= 0
return cls((port << 16) | bit)
@property
def port(self):
return self >> 16
@property
def bit(self):
return self & 0xffff
@classmethod
def ensure(cls, value: 'IONet'):
assert isinstance(value, cls)
return value
def __repr__(self):
return f"{self.port}.{self.bit}"
__str__ = __repr__
class IOValue(tuple):
__slots__ = ()
def __new__(cls, nets: 'IONet | Iterable[IONet]' = ()):
if isinstance(nets, IONet):
return super().__new__(cls, (nets,))
return super().__new__(cls, (IONet.ensure(net) for net in nets))
def __getitem__(self, index):
if isinstance(index, slice):
return type(self)(super().__getitem__(index))
else:
return super().__getitem__(index)
def __repr__(self):
pos = 0
chunks = []
while pos < len(self):
next_pos = pos
port = self[pos].port
start_bit = self[pos].bit
while (next_pos < len(self) and
self[next_pos].port == port and
self[next_pos].bit == start_bit + (next_pos - pos)):
next_pos += 1
width = next_pos - pos
end_bit = start_bit + width
if width == 1:
chunks.append(f"{port}.{start_bit}")
else:
chunks.append(f"{port}.{start_bit}:{end_bit}")
pos = next_pos
if len(chunks) == 0:
return "()"
elif len(chunks) == 1:
return chunks[0]
else:
return f"(io-cat {' '.join(chunks)})"
__str__ = __repr__
class FormatValue:
"""A single formatted value within ``Format``.
@ -240,14 +312,18 @@ class Netlist:
Attributes
----------
modules : list of ``Module``
cells : list of ``Cell``
connections : dict of (negative) int to int
io_ports : list of ``IOPort``
signals : dict of Signal to ``Value``
last_late_net: int
"""
def __init__(self):
self.modules: list[Module] = []
self.cells: list[Cell] = [Top()]
self.connections: dict[Net, Net] = {}
self.io_ports: list[_ast.IOPort] = []
self.signals = SignalDict()
self.last_late_net = 0
@ -270,11 +346,16 @@ class Netlist:
result = ["("]
for module_idx, module in enumerate(self.modules):
name = " ".join(repr(name) for name in module.name)
ports = " ".join(
ports = [
f"({flow.value} {name!r} {val})"
for name, (val, flow) in module.ports.items()
)
result.append(f"(module {module_idx} {module.parent} ({name}) {ports})")
]
io_ports = [
f"(io {dir.value} {name!r} {val})"
for name, (val, dir) in module.io_ports.items()
]
ports = "".join(" " + port for port in ports + io_ports)
result.append(f"(module {module_idx} {module.parent} ({name}){ports})")
for cell_idx, cell in enumerate(self.cells):
result.append(f"(cell {cell_idx} {cell.module_idx} {cell!r})")
result.append(")")
@ -332,9 +413,18 @@ class ModuleNetFlow(enum.Enum):
#: It is thus an output port of this module.
Output = "output"
#: The net is a special top-level inout net that is used within
#: this module or its submodules. It is an inout port of this module.
Inout = "inout"
class IODirection(enum.Enum):
Input = "input"
Output = "output"
Bidir = "inout"
def __or__(self, other):
assert isinstance(other, IODirection)
if self == other:
return self
else:
return IODirection.Bidir
class Module:
@ -349,7 +439,8 @@ class Module:
submodules: a list of nested module indices
signal_names: a SignalDict from Signal to str, signal names visible in this module
net_flow: a dict from Net to NetFlow, describes how a net is used within this module
ports: a dict from port name to (Value, NetFlow) pair
ports: a dict from port name to (Value, ModuleNetFlow) pair
io_ports: a dict from port name to (IOValue, IODirection) pair
cells: a list of cell indices that belong to this module
"""
def __init__(self, parent, name, *, src_loc, cell_src_loc):
@ -359,8 +450,11 @@ class Module:
self.cell_src_loc = cell_src_loc
self.submodules = []
self.signal_names = SignalDict()
self.net_flow = {}
self.io_port_names = {}
self.net_flow: dict[Net, ModuleNetFlow] = {}
self.ionet_dir: dict[IONet, IODirection] = {}
self.ports = {}
self.io_ports = {}
self.cells = []
@ -384,36 +478,34 @@ class Cell:
def output_nets(self, self_idx: int):
raise NotImplementedError
def io_nets(self):
return set()
def resolve_nets(self, netlist: Netlist):
raise NotImplementedError
class Top(Cell):
"""A special cell type representing top-level ports. Must be present in the netlist exactly
"""A special cell type representing top-level non-IO ports. Must be present in the netlist exactly
once, at index 0.
Top-level outputs are stored as a dict of names to their assigned values.
Top-level inputs and inouts are effectively the output of this cell. They are both stored
Top-level inputs are effectively the output of this cell. They are stored
as a dict of names to a (start bit index, width) tuple. Output bit indices 0 and 1 are reserved
for constant nets, so the lowest bit index that can be assigned to a port is 2.
Top-level inouts are special and can only be used by inout ports of instances, or in the pad
value of an ``IoBuf`` cell.
Attributes
----------
ports_o: dict of str to Value
ports_i: dict of str to (int, int)
ports_io: dict of str to (int, int)
"""
def __init__(self):
super().__init__(module_idx=0, src_loc=None)
self.ports_o = {}
self.ports_i = {}
self.ports_io = {}
def input_nets(self):
nets = set()
@ -426,9 +518,6 @@ class Top(Cell):
for start, width in self.ports_i.values():
for bit in range(start, start + width):
nets.add(Net.from_cell(self_idx, bit))
for start, width in self.ports_io.values():
for bit in range(start, start + width):
nets.add(Net.from_cell(self_idx, bit))
return nets
def resolve_nets(self, netlist: Netlist):
@ -438,13 +527,11 @@ class Top(Cell):
def __repr__(self):
ports = []
for (name, (start, width)) in self.ports_i.items():
ports.append(f"(input {name!r} {start}:{start+width})")
ports.append(f" (input {name!r} {start}:{start+width})")
for (name, val) in self.ports_o.items():
ports.append(f"(output {name!r} {val})")
for (name, (start, width)) in self.ports_io.items():
ports.append(f"(inout {name!r} {start}:{start+width})")
ports = " ".join(ports)
return f"(top {ports})"
ports.append(f" (output {name!r} {val})")
ports = "".join(ports)
return f"(top{ports})"
class Operator(Cell):
@ -1108,7 +1195,7 @@ class Instance(Cell):
attributes: dict of str to Const, int, or str
ports_i: dict of str to Value
ports_o: dict of str to pair of int (index start, width)
ports_io: dict of str to Value
ports_io: dict of str to (IOValue, IODirection)
"""
def __init__(self, module_idx, *, type, name, parameters, attributes, ports_i, ports_o, ports_io, src_loc):
@ -1120,14 +1207,12 @@ class Instance(Cell):
self.attributes = attributes
self.ports_i = {name: Value(val) for name, val in ports_i.items()}
self.ports_o = ports_o
self.ports_io = {name: Value(val) for name, val in ports_io.items()}
self.ports_io = {name: (IOValue(val), IODirection(dir)) for name, (val, dir) in ports_io.items()}
def input_nets(self):
nets = set()
for val in self.ports_i.values():
nets |= set(val)
for val in self.ports_io.values():
nets |= set(val)
return nets
def output_nets(self, self_idx: int):
@ -1137,11 +1222,15 @@ class Instance(Cell):
nets.add(Net.from_cell(self_idx, bit))
return nets
def io_nets(self):
nets = set()
for val, dir in self.ports_io.values():
nets |= {(net, dir) for net in val}
return nets
def resolve_nets(self, netlist: Netlist):
for port in self.ports_i:
self.ports_i[port] = netlist.resolve_value(self.ports_i[port])
for port in self.ports_io:
self.ports_io[port] = netlist.resolve_value(self.ports_io[port])
def __repr__(self):
items = []
@ -1153,43 +1242,62 @@ class Instance(Cell):
items.append(f"(input {name!r} {val})")
for name, (start, width) in self.ports_o.items():
items.append(f"(output {name!r} {start}:{start+width})")
for name, val in self.ports_io.items():
items.append(f"(inout {name!r} {val})")
for name, (val, dir) in self.ports_io.items():
items.append(f"(io {dir.value} {name!r} {val})")
items = " ".join(items)
return f"(instance {self.type!r} {self.name!r} {items})"
class IOBuffer(Cell):
"""An IO buffer cell. ``pad`` must be connected to nets corresponding to an IO port
of the ``Top`` cell. This cell does two things:
"""An IO buffer cell. This cell does two things:
- a tristate buffer is inserted driving ``pad`` based on ``o`` and ``oe`` nets (output buffer)
- the value of ``pad`` is sampled and made available as output of this cell (input buffer)
- a tristate buffer is inserted driving ``port`` based on ``o`` and ``oe`` nets (output buffer)
- the value of ``port`` is sampled and made available as output of this cell (input buffer)
Attributes
----------
pad: Value
o: Value
oe: Net
port: IOValue
dir: IODirection
o: Value or None
oe: Net or None
"""
def __init__(self, module_idx, *, pad, o, oe, src_loc):
def __init__(self, module_idx, *, port, dir, o=None, oe=None, src_loc):
super().__init__(module_idx, src_loc=src_loc)
self.pad = Value(pad)
self.o = Value(o)
self.oe = Net.ensure(oe)
self.port = IOValue(port)
self.dir = IODirection(dir)
if self.dir is IODirection.Input:
assert o is None
assert oe is None
self.o = None
self.oe = None
else:
self.o = Value(o)
self.oe = Net.ensure(oe)
def input_nets(self):
return set(self.pad) | set(self.o) | {self.oe}
if self.dir is IODirection.Input:
return set()
else:
return set(self.o) | {self.oe}
def output_nets(self, self_idx: int):
return {Net.from_cell(self_idx, bit) for bit in range(len(self.pad))}
if self.dir is IODirection.Output:
return set()
else:
return {Net.from_cell(self_idx, bit) for bit in range(len(self.port))}
def io_nets(self):
return {(net, self.dir) for net in self.port}
def resolve_nets(self, netlist: Netlist):
self.pad = netlist.resolve_value(self.pad)
self.o = netlist.resolve_value(self.o)
self.oe = netlist.resolve_net(self.oe)
if self.dir is not IODirection.Input:
self.o = netlist.resolve_value(self.o)
self.oe = netlist.resolve_net(self.oe)
def __repr__(self):
return f"(iob {self.pad} {self.o} {self.oe})"
if self.dir is IODirection.Input:
return f"(iob {self.dir.value} {self.port})"
else:
return f"(iob {self.dir.value} {self.port} {self.o} {self.oe})"

View file

@ -53,7 +53,7 @@ class ValueVisitor(metaclass=ABCMeta):
pass # :nocov:
@abstractmethod
def on_Cat(self, value):
def on_Concat(self, value):
pass # :nocov:
@abstractmethod
@ -87,8 +87,8 @@ class ValueVisitor(metaclass=ABCMeta):
new_value = self.on_Slice(value)
elif type(value) is Part:
new_value = self.on_Part(value)
elif type(value) is Cat:
new_value = self.on_Cat(value)
elif type(value) is Concat:
new_value = self.on_Concat(value)
elif type(value) is ArrayProxy:
new_value = self.on_ArrayProxy(value)
elif type(value) is Initial:
@ -129,8 +129,8 @@ class ValueTransformer(ValueVisitor):
return Part(self.on_value(value.value), self.on_value(value.offset),
value.width, value.stride)
def on_Cat(self, value):
return Cat(self.on_value(o) for o in value.parts)
def on_Concat(self, value):
return Concat(self.on_value(o) for o in value.parts)
def on_ArrayProxy(self, value):
return ArrayProxy([self.on_value(elem) for elem in value._iter_as_values()],
@ -235,7 +235,10 @@ class FragmentTransformer:
def map_named_ports(self, fragment, new_fragment):
if hasattr(self, "on_value"):
for name, (value, dir) in fragment.named_ports.items():
new_fragment.named_ports[name] = self.on_value(value), dir
if isinstance(value, Value):
new_fragment.named_ports[name] = self.on_value(value), dir
else:
new_fragment.named_ports[name] = value, dir
else:
new_fragment.named_ports = OrderedDict(fragment.named_ports.items())
@ -303,15 +306,15 @@ class FragmentTransformer:
elif isinstance(fragment, IOBufferInstance):
if hasattr(self, "on_value"):
new_fragment = IOBufferInstance(
pad=self.on_value(fragment.pad),
port=fragment.port,
i=self.on_value(fragment.i) if fragment.i is not None else None,
o=self.on_value(fragment.o),
oe=self.on_value(fragment.oe),
o=self.on_value(fragment.o) if fragment.o is not None else None,
oe=self.on_value(fragment.oe) if fragment.o is not None else None,
src_loc=fragment.src_loc,
)
else:
new_fragment = IOBufferInstance(
pad=fragment.pad,
port=fragment.port,
i=fragment.i,
o=fragment.o,
oe=fragment.oe,
@ -396,7 +399,7 @@ class DomainCollector(ValueVisitor, StatementVisitor):
self.on_value(value.value)
self.on_value(value.offset)
def on_Cat(self, value):
def on_Concat(self, value):
for o in value.parts:
self.on_value(o)
@ -450,7 +453,15 @@ class DomainCollector(ValueVisitor, StatementVisitor):
if isinstance(fragment, Instance):
for name, (value, dir) in fragment.named_ports.items():
self.on_value(value)
if not isinstance(value, IOValue):
self.on_value(value)
if isinstance(fragment, IOBufferInstance):
if fragment.o is not None:
self.on_value(fragment.o)
self.on_value(fragment.oe)
if fragment.i is not None:
self.on_value(fragment.i)
old_local_domains, self._local_domains = self._local_domains, set(self._local_domains)
for domain_name, domain in fragment.domains.items():

View file

@ -212,7 +212,7 @@ class _RHSValueCompiler(_ValueCompiler):
return f"({(1 << value.width) - 1} & " \
f"{self(value.value)} >> {offset})"
def on_Cat(self, value):
def on_Concat(self, value):
gen_parts = []
offset = 0
for part in value.parts:
@ -313,7 +313,7 @@ class _LHSValueCompiler(_ValueCompiler):
f"(({width_mask:#x} & {arg}) << {offset}))")
return gen
def on_Cat(self, value):
def on_Concat(self, value):
def gen(arg):
gen_arg = self.emitter.def_var("cat", arg)
offset = 0

View file

@ -65,9 +65,9 @@ class _VCDWriter:
signal_names = SignalDict()
memories = {}
for fragment, fragment_name in design.fragment_names.items():
fragment_name = ("bench", *fragment_name)
for signal, signal_name in design.signal_names[fragment].items():
for fragment, fragment_info in design.fragments.items():
fragment_name = ("bench", *fragment_info.name)
for signal, signal_name in fragment_info.signal_names.items():
if signal not in signal_names:
signal_names[signal] = set()
signal_names[signal].add((*fragment_name, signal_name))

View file

@ -49,6 +49,7 @@ Implemented RFCs
.. _RFC 45: https://amaranth-lang.org/rfcs/0045-lib-memory.html
.. _RFC 46: https://amaranth-lang.org/rfcs/0046-shape-range-1.html
.. _RFC 50: https://amaranth-lang.org/rfcs/0050-print.html
.. _RFC 53: https://amaranth-lang.org/rfcs/0053-ioport.html
* `RFC 17`_: Remove ``log2_int``
* `RFC 27`_: Testbench processes for the simulator
@ -57,6 +58,7 @@ Implemented RFCs
* `RFC 45`_: Move ``hdl.Memory`` to ``lib.Memory``
* `RFC 46`_: Change ``Shape.cast(range(1))`` to ``unsigned(0)``
* `RFC 50`_: ``Print`` statement and string formatting
* `RFC 53`_: Low-level I/O primitives
Language changes
@ -64,9 +66,10 @@ Language changes
.. currentmodule:: amaranth.hdl
* Added: :class:`ast.Slice` objects have been made const-castable.
* Added: :class:`Slice` objects have been made const-castable.
* Added: :func:`amaranth.utils.ceil_log2`, :func:`amaranth.utils.exact_log2`. (`RFC 17`_)
* Added: :class:`Format` objects, :class:`Print` statements, messages in :class:`Assert`, :class:`Assume` and :class:`Cover`. (`RFC 50`_)
* Added: IO values, :class:`IOPort` objects, :class:`IOBufferInstance` objects. (`RFC 53`_)
* Changed: ``m.Case()`` with no patterns is never active instead of always active. (`RFC 39`_)
* Changed: ``Value.matches()`` with no patterns is ``Const(0)`` instead of ``Const(1)``. (`RFC 39`_)
* Changed: ``Signal(range(stop), init=stop)`` warning has been changed into a hard error and made to trigger on any out-of range value.
@ -75,6 +78,7 @@ Language changes
* Changed: the ``reset=`` argument of :class:`Signal`, :meth:`Signal.like`, :class:`amaranth.lib.wiring.Member`, :class:`amaranth.lib.cdc.FFSynchronizer`, and ``m.FSM()`` has been renamed to ``init=``. (`RFC 43`_)
* Changed: :class:`Shape` has been made immutable and hashable.
* Changed: :class:`Assert`, :class:`Assume`, :class:`Cover` have been moved to :mod:`amaranth.hdl` from :mod:`amaranth.asserts`. (`RFC 50`_)
* Changed: :class:`Instance` IO ports now accept only IO values, not plain values. (`RFC 53`_)
* Deprecated: :func:`amaranth.utils.log2_int`. (`RFC 17`_)
* Deprecated: :class:`amaranth.hdl.Memory`. (`RFC 45`_)
* Removed: (deprecated in 0.4) :meth:`Const.normalize`. (`RFC 5`_)
@ -218,7 +222,7 @@ Language changes
* Changed: :meth:`Value.cast` casts :class:`ValueCastable` objects recursively.
* Changed: :meth:`Value.cast` treats instances of classes derived from both :class:`enum.Enum` and :class:`int` (including :class:`enum.IntEnum`) as enumerations rather than integers.
* Changed: :meth:`Value.matches` with an empty list of patterns returns ``Const(1)`` rather than ``Const(0)``, to match the behavior of ``with m.Case():``.
* Changed: :class:`Cat` warns if an enumeration without an explicitly specified shape is used. (`RFC 3`_)
* Changed: :func:`Cat` warns if an enumeration without an explicitly specified shape is used. (`RFC 3`_)
* Changed: ``signed(0)`` is no longer constructible. (The semantics of this shape were never defined.)
* Changed: :meth:`Value.__abs__` returns an unsigned value.
* Deprecated: :class:`ast.Sample`, :class:`ast.Past`, :class:`ast.Stable`, :class:`ast.Rose`, :class:`ast.Fell`. (Predating the RFC process.)

View file

@ -328,7 +328,7 @@ They may also be provided as a pattern to the :ref:`match operator <lang-matchop
At the moment, only the following expressions are constant-castable:
* :class:`Const`
* :class:`Cat`
* :func:`Cat`
* :class:`Slice`
This list will be expanded in the future.
@ -707,21 +707,21 @@ The result of any bit sequence operation is an unsigned value.
The following table lists the bit sequence operations provided by Amaranth:
======================= ================================================ ======
Operation Description Notes
======================= ================================================ ======
``len(a)`` bit length; value width [#opS1]_
``a[i:j:k]`` bit slicing by constant subscripts [#opS2]_
``iter(a)`` bit iteration
``a.bit_select(b, w)`` overlapping part select with variable offset
``a.word_select(b, w)`` non-overlapping part select with variable offset
``Cat(a, b)`` concatenation [#opS3]_
``a.replicate(n)`` replication
======================= ================================================ ======
========================= ================================================ ========
Operation Description Notes
========================= ================================================ ========
:py:`len(a)` bit length; value width [#opS1]_
:py:`a[i:j:k]` bit slicing by constant subscripts [#opS2]_
:py:`iter(a)` bit iteration
:py:`a.bit_select(b, w)` overlapping part select with variable offset
:py:`a.word_select(b, w)` non-overlapping part select with variable offset
:py:`Cat(a, b)` concatenation [#opS3]_
:py:`a.replicate(n)` replication
========================= ================================================ ========
.. [#opS1] Words "length" and "width" have the same meaning when talking about Amaranth values. Conventionally, "width" is used.
.. [#opS2] All variations of the Python slice notation are supported, including "extended slicing". E.g. all of ``a[0]``, ``a[1:9]``, ``a[2:]``, ``a[:-2]``, ``a[::-1]``, ``a[0:8:2]`` select bits in the same way as other Python sequence types select their elements.
.. [#opS3] In the concatenated value, ``a`` occupies the least significant bits, and ``b`` the most significant bits. Any number of arguments (zero, one, two, or more) are supported.
.. [#opS2] All variations of the Python slice notation are supported, including "extended slicing". E.g. all of :py:`a[0]`, :py:`a[1:9]`, :py:`a[2:]`, :py:`a[:-2]`, :py:`a[::-1]`, :py:`a[0:8:2]` select bits in the same way as other Python sequence types select their elements.
.. [#opS3] In the concatenated value, :py:`a` occupies the least significant bits, and :py:`b` the most significant bits. Any number of arguments (zero, one, two, or more) are supported.
For the operators introduced by Amaranth, the following table explains them in terms of Python code operating on tuples of bits rather than Amaranth values:
@ -1677,6 +1677,66 @@ Memories
Amaranth provides support for memories in the standard library module :mod:`amaranth.lib.memory`.
.. _lang-iovalues:
I/O values
==========
To interoperate with external circuitry, Amaranth provides *I/O values*, which represent bundles of wires carrying uninterpreted signals. Unlike regular :ref:`values <lang-values>`, which represent binary numbers and can be :ref:`assigned <lang-assigns>` to create a unidirectional connection or used in computations, I/O values represent electrical signals that may be digital or analog and have no :ref:`shape <lang-shapes>`, cannot be assigned, used in computations, or simulated.
I/O values are only used to define connections between non-Amaranth building blocks that traverse an Amaranth design, including :ref:`instances <lang-instance>` and :ref:`I/O buffer instances <lang-iobufferinstance>`.
.. _lang-ioports:
I/O ports
---------
An *I/O port* is an I/O value representing a connection to a port of the topmost module in the :ref:`design hierarchy <lang-submodules>`. It can be created with an explicitly specified width.
.. testcode::
from amaranth.hdl import IOPort
.. doctest::
>>> port = IOPort(4)
>>> port.width
4
I/O ports can be named in the same way as :ref:`signals <lang-signalname>`:
.. doctest::
>>> clk_port = IOPort(1, name="clk")
>>> clk_port.name
'clk'
If two I/O ports with the same name exist in a design, one of them will be renamed to remove the ambiguity. Because the name of an I/O port is significant, they should be named unambiguously.
.. _lang-ioops:
I/O operators
-------------
I/O values support only a limited set of :ref:`sequence <python:typesseq>` operators, all of which return another I/O value. The following table lists the I/O operators provided by Amaranth:
=============== ============================== ===================
Operation Description Notes
=============== ============================== ===================
:py:`len(a)` length; width [#iopS1]_
:py:`a[i:j:k]` slicing by constant subscripts [#iopS2]_
:py:`iter(a)` iteration
:py:`Cat(a, b)` concatenation [#iopS3]_ [#iopS4]_
=============== ============================== ===================
.. [#iopS1] Words "length" and "width" have the same meaning when talking about Amaranth I/O values. Conventionally, "width" is used.
.. [#iopS2] All variations of the Python slice notation are supported, including "extended slicing". E.g. all of :py:`a[0]`, :py:`a[1:9]`, :py:`a[2:]`, :py:`a[:-2]`, :py:`a[::-1]`, :py:`a[0:8:2]` select wires in the same way as other Python sequence types select their elements.
.. [#iopS3] In the concatenated value, :py:`a` occupies the lower indices and :py:`b` the higher indices. Any number of arguments (zero, one, two, or more) are supported.
.. [#iopS4] Concatenation of zero arguments, :py:`Cat()`, returns a 0-bit regular value, however any such value is accepted (and ignored) anywhere an I/O value is expected.
.. _lang-instance:
Instances
@ -1690,14 +1750,15 @@ A submodule written in a non-Amaranth language is called an *instance*. An insta
* The *name* of an instance is the name of the submodule within the containing elaboratable.
* The *attributes* of an instance correspond to attributes of a (System)Verilog module instance, or a custom attribute of a VHDL entity or component instance. Attributes applied to instances are interpreted by the synthesis toolchain rather than the HDL.
* The *parameters* of an instance correspond to parameters of a (System)Verilog module instance, or a generic constant of a VHDL entity or component instance. Not all HDLs allow their design units to be parameterized during instantiation.
* The *inputs* and *outputs* of an instance correspond to inputs and outputs of the external design unit.
* The *inputs*, *outputs*, and *inouts* of an instance correspond to input ports, output ports, and bidirectional ports of the external design unit.
An instance can be added as a submodule using the :py:`m.submodules.name = Instance("type", ...)` syntax, where :py:`"type"` is the type of the instance as a string (which is passed to the synthesis toolchain uninterpreted), and :py:`...` is a list of parameters, inputs, and outputs. Depending on whether the name of an attribute, parameter, input, or output can be written as a part of a Python identifier or not, one of two possible syntaxes is used to specify them:
* An attribute is specified using the :py:`a_ANAME=attr` or :py:`("a", "ANAME", attr)` syntaxes. The :py:`attr` must be an :class:`int`, a :class:`str`, or a :class:`Const`.
* A parameter is specified using the :py:`p_PNAME=param` or :py:`("p", "PNAME", param)` syntaxes. The :py:`param` must be an :class:`int`, a :class:`str`, or a :class:`Const`.
* An input is specified using the :py:`i_INAME=in_val` or :py:`("i", "INAME", in_val)` syntaxes. The :py:`in_val` must be a :ref:`value-like <lang-valuelike>` object.
* An output is specified using the :py:`o_ONAME=out_val` or :py:`("o", "ONAME", out_val)` syntaxes. The :py:`out_val` must be a :ref:`value-like <lang-valuelike>` object that casts to a :class:`Signal`.
* An input is specified using the :py:`i_INAME=in_val` or :py:`("i", "INAME", in_val)` syntaxes. The :py:`in_val` must be an :ref:`I/O value <lang-iovalues>` or a :ref:`value-like <lang-valuelike>` object.
* An output is specified using the :py:`o_ONAME=out_val` or :py:`("o", "ONAME", out_val)` syntaxes. The :py:`out_val` must be an :ref:`I/O value <lang-iovalues>` or a :ref:`value-like <lang-valuelike>` object that casts to a :ref:`signal <lang-signals>`, a concatenation of signals, or a slice of a signal.
* An inout is specified using the :py:`io_IONAME=inout_val` or :py:`("io", "IONAME", inout_val)` syntaxes. The :py:`inout_val` must be an :ref:`I/O value <lang-iovalues>`.
The two following examples use both syntaxes to add the same instance of type ``external`` as a submodule named ``processor``:
@ -1706,6 +1767,7 @@ The two following examples use both syntaxes to add the same instance of type ``
i_data = Signal(8)
o_data = Signal(8)
io_pin = IOPort(1)
m = Module()
.. testcode::
@ -1718,6 +1780,7 @@ The two following examples use both syntaxes to add the same instance of type ``
i_mode=Const(3, unsigned(4)),
i_data_in=i_data,
o_data_out=o_data,
io_pin=io_pin,
)
.. testcode::
@ -1735,6 +1798,7 @@ The two following examples use both syntaxes to add the same instance of type ``
("i", "mode", Const(3, unsigned(4))),
("i", "data_in", i_data),
("o", "data_out", o_data),
("io", "pin", io_pin),
)
Like a regular submodule, an instance can also be added without specifying a name:
@ -1770,4 +1834,55 @@ Although an :class:`Instance` is not an elaboratable, as a special case, it can
o_Q=self.q
)
else:
raise NotImplementedError
raise NotImplementedError
.. _lang-iobufferinstance:
I/O buffer instances
====================
An *I/O buffer instance* is a submodule that allows assigning :ref:`I/O values <lang-iovalues>` to or from regular :ref:`values <lang-values>` without the use of an external, toolchain- and technology-dependent :ref:`instance <lang-instance>`. It can be created in four configurations: input, output, tristatable output, and bidirectional (input/output).
.. testcode::
from amaranth.hdl import IOBufferInstance
m = Module()
In the input configuration, the buffer combinatorially drives a signal :py:`i` by the port:
.. testcode::
port = IOPort(4)
port_i = Signal(4)
m.submodules.ibuf = IOBufferInstance(port, i=port_i)
In the output configuration, the buffer combinatorially drives the port by a value :py:`o`:
.. testcode::
port = IOPort(4)
port_o = Signal(4)
m.submodules.obuf = IOBufferInstance(port, o=port_o)
In the tristatable output configuration, the buffer combinatorially drives the port by a value :py:`o` if :py:`oe` is asserted, and does not drive (leaves in a high-impedance state, or tristates) the port otherwise:
.. testcode::
port = IOPort(4)
port_o = Signal(4)
port_oe = Signal()
m.submodules.obuft = IOBufferInstance(port, o=port_o, oe=port_oe)
In the bidirectional (input/output) configuration, the buffer combiatorially drives a signal :py:`i` by the port, combinatorially drives the port by a value :py:`o` if :py:`oe` is asserted, and does not drive (leaves in a high-impedance state, or tristates) the port otherwise:
.. testcode::
port = IOPort(4)
port_i = Signal(4)
port_o = Signal(4)
port_oe = Signal()
m.submodules.iobuf = IOBufferInstance(port, i=port_i, o=port_o, oe=port_oe)
The width of the :py:`i` and :py:`o` values (when present) must be the same as the width of the port, and the width of the :py:`oe` value must be 1.

View file

@ -58,7 +58,7 @@ The prelude exports exactly the following names:
* :class:`Const`
* :func:`C`
* :func:`Mux`
* :class:`Cat`
* :func:`Cat`
* :class:`Array`
* :class:`Signal`
* :class:`ClockSignal`

View file

@ -1,11 +1,6 @@
# amaranth: UnusedElaboratable=no
import warnings
from amaranth import *
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore", category=DeprecationWarning)
from amaranth.hdl.rec import *
from amaranth.hdl import *
from amaranth.lib.wiring import *
from amaranth.lib.io import *
from amaranth.build.dsl import *
@ -77,14 +72,11 @@ class ResourceManagerTestCase(FHDLTestCase):
def test_request_with_dir(self):
i2c = self.cm.request("i2c", 0, dir={"sda": "o"})
self.assertIsInstance(i2c, PureInterface)
self.assertTrue(i2c.signature.is_compliant(i2c))
self.assertIsInstance(flipped(i2c.sda), Pin)
self.assertEqual(i2c.sda.dir, "o")
def test_request_tristate(self):
i2c = self.cm.request("i2c", 0)
self.assertTrue(i2c.signature.is_compliant(i2c))
self.assertEqual(i2c.sda.dir, "io")
ports = list(self.cm.iter_ports())
@ -158,7 +150,7 @@ class ResourceManagerTestCase(FHDLTestCase):
def test_request_raw(self):
clk50 = self.cm.request("clk50", 0, dir="-")
self.assertIsInstance(clk50.io, Signal)
self.assertIsInstance(clk50.io, IOPort)
ports = list(self.cm.iter_ports())
self.assertEqual(len(ports), 1)
@ -166,8 +158,8 @@ class ResourceManagerTestCase(FHDLTestCase):
def test_request_raw_diffpairs(self):
clk100 = self.cm.request("clk100", 0, dir="-")
self.assertIsInstance(clk100.p, Signal)
self.assertIsInstance(clk100.n, Signal)
self.assertIsInstance(clk100.p, IOPort)
self.assertIsInstance(clk100.n, IOPort)
ports = list(self.cm.iter_ports())
self.assertEqual(len(ports), 2)

View file

@ -308,7 +308,7 @@ class ValueTestCase(FHDLTestCase):
self.assertEqual(s2.start, 1)
self.assertEqual(s2.stop, 2)
s3 = Const(31)[::2]
self.assertIsInstance(s3, Cat)
self.assertIsInstance(s3, Concat)
self.assertIsInstance(s3.parts[0], Slice)
self.assertEqual(s3.parts[0].start, 0)
self.assertEqual(s3.parts[0].stop, 1)
@ -1679,3 +1679,109 @@ class SwitchTestCase(FHDLTestCase):
def test_two_cases(self):
s = Switch(Const(0, 8), {("00001111", 123): []})
self.assertEqual(s.cases, {("00001111", "01111011"): []})
class IOValueTestCase(FHDLTestCase):
def test_ioport(self):
a = IOPort(4)
self.assertEqual(len(a), 4)
self.assertEqual(a.attrs, {})
self.assertEqual(a.metadata, (None, None, None, None))
self.assertEqual(a._ioports(), {a})
self.assertRepr(a, "(io-port a)")
b = IOPort(3, name="b", attrs={"a": "b"}, metadata=["x", "y", "z"])
self.assertEqual(len(b), 3)
self.assertEqual(b.attrs, {"a": "b"})
self.assertEqual(b.metadata, ("x", "y", "z"))
self.assertEqual(b._ioports(), {b})
self.assertRepr(b, "(io-port b)")
def test_ioport_wrong(self):
with self.assertRaisesRegex(TypeError,
r"^Name must be a string, not 3$"):
a = IOPort(2, name=3)
with self.assertRaises(TypeError):
a = IOPort("a")
with self.assertRaises(TypeError):
a = IOPort(8, attrs=3)
with self.assertRaises(TypeError):
a = IOPort(8, metadata=3)
with self.assertRaisesRegex(ValueError,
r"^Metadata length \(3\) doesn't match port width \(2\)$"):
a = IOPort(2, metadata=["a", "b", "c"])
def test_ioslice(self):
a = IOPort(8, metadata=["a", "b", "c", "d", "e", "f", "g", "h"])
s = a[2:5]
self.assertEqual(len(s), 3)
self.assertEqual(s.metadata, ("c", "d", "e"))
self.assertEqual(s._ioports(), {a})
self.assertRepr(s, "(io-slice (io-port a) 2:5)")
s = a[-5:-2]
self.assertEqual(len(s), 3)
self.assertEqual(s.metadata, ("d", "e", "f"))
self.assertEqual(s._ioports(), {a})
self.assertRepr(s, "(io-slice (io-port a) 3:6)")
s = IOSlice(a, -5, -2)
self.assertEqual(len(s), 3)
self.assertEqual(s.metadata, ("d", "e", "f"))
self.assertEqual(s._ioports(), {a})
self.assertRepr(s, "(io-slice (io-port a) 3:6)")
s = a[5]
self.assertEqual(len(s), 1)
self.assertEqual(s.metadata, ("f",))
self.assertEqual(s._ioports(), {a})
self.assertRepr(s, "(io-slice (io-port a) 5:6)")
s = a[-1]
self.assertEqual(len(s), 1)
self.assertEqual(s.metadata, ("h",))
self.assertEqual(s._ioports(), {a})
self.assertRepr(s, "(io-slice (io-port a) 7:8)")
s = a[::2]
self.assertEqual(len(s), 4)
self.assertEqual(s.metadata, ("a", "c", "e", "g"))
self.assertEqual(s._ioports(), {a})
self.assertRepr(s, "(io-cat (io-slice (io-port a) 0:1) (io-slice (io-port a) 2:3) (io-slice (io-port a) 4:5) (io-slice (io-port a) 6:7))")
def test_ioslice_wrong(self):
a = IOPort(8)
with self.assertRaises(IndexError):
a[8]
with self.assertRaises(IndexError):
a[-9]
with self.assertRaises(TypeError):
a["a"]
with self.assertRaises(IndexError):
IOSlice(a, 0, 9)
with self.assertRaises(IndexError):
IOSlice(a, -10, 8)
with self.assertRaises(TypeError):
IOSlice(a, 0, "a")
with self.assertRaises(TypeError):
IOSlice(a, "a", 8)
with self.assertRaises(IndexError):
a[5:3]
def test_iocat(self):
a = IOPort(3, name="a", metadata=["a", "b", "c"])
b = IOPort(2, name="b", metadata=["x", "y"])
c = Cat(a, b)
self.assertEqual(len(c), 5)
self.assertEqual(c.metadata, ("a", "b", "c", "x", "y"))
self.assertEqual(c._ioports(), {a, b})
self.assertRepr(c, "(io-cat (io-port a) (io-port b))")
c = Cat(a, Cat())
self.assertEqual(len(c), 3)
self.assertEqual(c.metadata, ("a", "b", "c"))
self.assertEqual(c._ioports(), {a})
self.assertRepr(c, "(io-cat (io-port a) (io-cat ))")
c = Cat(a, Cat()[:])
self.assertEqual(len(c), 3)
self.assertRepr(c, "(io-cat (io-port a) (io-cat ))")
def test_iocat_wrong(self):
a = IOPort(3, name="a")
b = Signal()
with self.assertRaisesRegex(TypeError,
r"^Object \(sig b\) cannot be converted to an IO value$"):
Cat(a, b)

View file

@ -92,7 +92,7 @@ class FragmentPortsTestCase(FHDLTestCase):
self.assertRepr(nl, """
(
(module 0 None ('top'))
(cell 0 0 (top ))
(cell 0 0 (top))
)
""")
@ -177,21 +177,21 @@ class FragmentPortsTestCase(FHDLTestCase):
(output 'c2' 2.0)
(output 'c3' 1.0))
(module 1 0 ('top' 'f1')
(input 'port$0$2' 0.2)
(output 'port$1$0' 1.0)
(output 'port$2$0' 2.0)
(output 'port$5$0' 5.0)
(input 'port$10$0' 10.0))
(module 2 1 ('top' 'f1' 'f11')
(input 'port$0$2' 0.2)
(output 'port$1$0' 1.0)
(output 'port$2$0' 2.0)
(input 'port$6$0' 6.0))
(module 3 2 ('top' 'f1' 'f11' 'f111')
(input 'port$0$2' 0.2)
(input 's1' 0.2)
(output 'c3' 1.0)
(output 'c2' 2.0)
(input 'port$6$0' 6.0))
(output 'c1' 5.0)
(input 's2' 10.0))
(module 2 1 ('top' 'f1' 'f11')
(input 's1' 0.2)
(output 'c3' 1.0)
(output 'c2' 2.0)
(input 's3' 6.0))
(module 3 2 ('top' 'f1' 'f11' 'f111')
(input 's1' 0.2)
(output 'c3' 1.0)
(output 'c2' 2.0)
(input 's3' 6.0))
(module 4 3 ('top' 'f1' 'f11' 'f111' 'f1111')
(input 's1' 0.2)
(output 'c2' 2.0)
@ -200,9 +200,9 @@ class FragmentPortsTestCase(FHDLTestCase):
(output 'c1' 5.0)
(input 's3' 6.0))
(module 6 1 ('top' 'f1' 'f13')
(input 'port$0$2' 0.2)
(output 'port$6$0' 6.0)
(input 'port$10$0' 10.0))
(input 's1' 0.2)
(output 's3' 6.0)
(input 's2' 10.0))
(module 7 6 ('top' 'f1' 'f13' 'f131')
(input 's1' 0.2)
(output 's3' 6.0)
@ -229,12 +229,12 @@ class FragmentPortsTestCase(FHDLTestCase):
nl = build_netlist(f, ports={
"a": (self.s1, PortDirection.Output),
"b": (self.s2, PortDirection.Input),
"c": (self.s3, PortDirection.Inout),
"c": (IOPort(1, name="io3"), PortDirection.Inout),
})
self.assertRepr(nl, """
(
(module 0 None ('top') (input 'b' 0.2) (inout 'c' 0.3) (output 'a' 1'd0))
(cell 0 0 (top (input 'b' 2:3) (output 'a' 1'd0) (inout 'c' 3:4)))
(module 0 None ('top') (input 'b' 0.2) (output 'a' 1'd0) (io inout 'c' 0.0))
(cell 0 0 (top (input 'b' 2:3) (output 'a' 1'd0)))
)
""")
@ -308,6 +308,65 @@ class FragmentPortsTestCase(FHDLTestCase):
)
""")
def test_port_io(self):
io = IOPort(8)
f = Fragment()
f1 = Fragment()
f1.add_subfragment(Instance("t", i_io=io[:2]), "i")
f.add_subfragment(f1, "f1")
f2 = Fragment()
f2.add_subfragment(Instance("t", o_io=io[2:4]), "i")
f.add_subfragment(f2, "f2")
f3 = Fragment()
f3.add_subfragment(Instance("t", io_io=io[4:6]), "i")
f.add_subfragment(f3, "f3")
nl = build_netlist(f, ports=[])
self.assertRepr(nl, """
(
(module 0 None ('top')
(io inout 'io' 0.0:8)
)
(module 1 0 ('top' 'f1')
(io input 'ioport$0$0' 0.0:2)
)
(module 2 0 ('top' 'f2')
(io output 'ioport$0$2' 0.2:4)
)
(module 3 0 ('top' 'f3')
(io inout 'ioport$0$4' 0.4:6)
)
(cell 0 0 (top))
(cell 1 1 (instance 't' 'i' (io input 'io' 0.0:2)))
(cell 2 2 (instance 't' 'i' (io output 'io' 0.2:4)))
(cell 3 3 (instance 't' 'i' (io inout 'io' 0.4:6)))
)
""")
def test_port_io_part(self):
io = IOPort(4)
f = Fragment()
f1 = Fragment()
f1.add_subfragment(Instance("t", i_i=io[0], o_o=io[1], io_io=io[2]), "i")
f.add_subfragment(f1, "f1")
nl = build_netlist(f, ports=[])
self.assertRepr(nl, """
(
(module 0 None ('top')
(io inout 'io' 0.0:4)
)
(module 1 0 ('top' 'f1')
(io input 'ioport$0$0' 0.0)
(io output 'ioport$0$1' 0.1)
(io inout 'ioport$0$2' 0.2)
)
(cell 0 0 (top))
(cell 1 1 (instance 't' 'i'
(io input 'i' 0.0)
(io output 'o' 0.1)
(io inout 'io' 0.2)
))
)
""")
def test_port_instance(self):
f = Fragment()
@ -316,40 +375,53 @@ class FragmentPortsTestCase(FHDLTestCase):
a = Signal(4)
b = Signal(4)
c = Signal(4)
d = Signal(4)
ioa = IOPort(4)
iob = IOPort(4)
ioc = IOPort(4)
f1.add_subfragment(Instance("t",
p_p = "meow",
a_a = True,
i_aa=a,
io_bb=b,
o_bb=b,
o_cc=c,
o_dd=d,
i_aaa=ioa,
o_bbb=iob,
io_ccc=ioc,
), "i")
nl = build_netlist(f, ports=[a, b, c, d])
nl = build_netlist(f, ports=[a, b, c])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 'a' 0.2:6)
(inout 'b' 0.6:10)
(output 'c' 1.0:4)
(output 'd' 1.4:8))
(output 'b' 1.0:4)
(output 'c' 1.4:8)
(io input 'ioa' 0.0:4)
(io output 'iob' 1.0:4)
(io inout 'ioc' 2.0:4)
)
(module 1 0 ('top' 'f1')
(input 'port$0$2' 0.2:6)
(inout 'port$0$6' 0.6:10)
(output 'port$1$0' 1.0:4)
(output 'port$1$4' 1.4:8))
(input 'a' 0.2:6)
(output 'b' 1.0:4)
(output 'c' 1.4:8)
(io input 'ioa' 0.0:4)
(io output 'iob' 1.0:4)
(io inout 'ioc' 2.0:4)
)
(cell 0 0 (top
(input 'a' 2:6)
(output 'c' 1.0:4)
(output 'd' 1.4:8)
(inout 'b' 6:10)))
(output 'b' 1.0:4)
(output 'c' 1.4:8)
))
(cell 1 1 (instance 't' 'i'
(param 'p' 'meow')
(attr 'a' True)
(input 'aa' 0.2:6)
(output 'cc' 0:4)
(output 'dd' 4:8)
(inout 'bb' 0.6:10)))
(output 'bb' 0:4)
(output 'cc' 4:8)
(io input 'aaa' 0.0:4)
(io output 'bbb' 1.0:4)
(io inout 'ccc' 2.0:4)
))
)
""")
@ -357,7 +429,7 @@ class FragmentPortsTestCase(FHDLTestCase):
f = Fragment()
a = Signal()
with self.assertRaisesRegex(TypeError,
r"^Only signals may be added as ports, not \(const 1'd1\)$"):
r"^Only signals and IO ports may be added as ports, not \(const 1'd1\)$"):
build_netlist(f, ports=(Const(1),))
with self.assertRaisesRegex(TypeError,
r"^Port name must be a string, not 1$"):
@ -619,19 +691,27 @@ class InstanceTestCase(FHDLTestCase):
s2 = Signal()
s3 = Signal()
s4 = Signal()
s5 = Signal()
s6 = Signal()
io1 = IOPort(1)
io2 = IOPort(1)
io3 = IOPort(1)
io4 = IOPort(1)
io5 = IOPort(1)
io6 = IOPort(1)
inst = Instance("foo",
("a", "ATTR1", 1),
("p", "PARAM1", 0x1234),
("i", "s1", s1),
("o", "s2", s2),
("io", "s3", s3),
("i", "io1", io1),
("o", "io2", io2),
("io", "io3", io3),
a_ATTR2=2,
p_PARAM2=0x5678,
i_s4=s4,
o_s5=s5,
io_s6=s6,
i_s3=s3,
o_s4=s4,
i_io4=io4,
o_io5=io5,
io_io6=io6,
)
self.assertEqual(inst.attrs, OrderedDict([
("ATTR1", 1),
@ -644,27 +724,27 @@ class InstanceTestCase(FHDLTestCase):
self.assertEqual(inst.named_ports, OrderedDict([
("s1", (s1, "i")),
("s2", (s2, "o")),
("s3", (s3, "io")),
("s4", (s4, "i")),
("s5", (s5, "o")),
("s6", (s6, "io")),
("io1", (io1, "i")),
("io2", (io2, "o")),
("io3", (io3, "io")),
("s3", (s3, "i")),
("s4", (s4, "o")),
("io4", (io4, "i")),
("io5", (io5, "o")),
("io6", (io6, "io")),
]))
def test_cast_ports(self):
inst = Instance("foo",
("i", "s1", 1),
("o", "s2", 2),
("io", "s3", 3),
i_s4=4,
o_s5=5,
io_s6=6,
("io", "s2", Cat()),
i_s3=3,
io_s4=Cat(),
)
self.assertRepr(inst.named_ports["s1"][0], "(const 1'd1)")
self.assertRepr(inst.named_ports["s2"][0], "(const 2'd2)")
self.assertRepr(inst.named_ports["s2"][0], "(io-cat )")
self.assertRepr(inst.named_ports["s3"][0], "(const 2'd3)")
self.assertRepr(inst.named_ports["s4"][0], "(const 3'd4)")
self.assertRepr(inst.named_ports["s5"][0], "(const 3'd5)")
self.assertRepr(inst.named_ports["s6"][0], "(const 3'd6)")
self.assertRepr(inst.named_ports["s4"][0], "(io-cat )")
def test_wrong_construct_arg(self):
s = Signal()
@ -683,7 +763,7 @@ class InstanceTestCase(FHDLTestCase):
def setUp_cpu(self):
self.rst = Signal()
self.stb = Signal()
self.pins = Signal(8)
self.pins = IOPort(8)
self.datal = Signal(4)
self.datah = Signal(4)
self.inst = Instance("cpu",
@ -716,33 +796,40 @@ class InstanceTestCase(FHDLTestCase):
f = Fragment()
i = Signal(3)
o = Signal(4)
io = Signal(5)
ioa = IOPort(5)
iob = IOPort(6)
ioc = IOPort(7)
f.add_subfragment(Instance("gadget",
i_i=i,
o_o=o,
io_io=io,
i_ioa=ioa,
o_iob=iob,
io_ioc=ioc,
p_param="TEST",
a_attr=1234,
), "my_gadget")
nl = build_netlist(f, [i, o, io])
nl = build_netlist(f, [i, o])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 'i' 0.2:5)
(inout 'io' 0.5:10)
(output 'o' 1.0:4)
(io input 'ioa' 0.0:5)
(io output 'iob' 1.0:6)
(io inout 'ioc' 2.0:7)
)
(cell 0 0 (top
(input 'i' 2:5)
(output 'o' 1.0:4)
(inout 'io' 5:10)
))
(cell 1 0 (instance 'gadget' 'my_gadget'
(param 'param' 'TEST')
(attr 'attr' 1234)
(input 'i' 0.2:5)
(output 'o' 0:4)
(inout 'io' 0.5:10)
(io input 'ioa' 0.0:5)
(io output 'iob' 1.0:6)
(io inout 'ioc' 2.0:7)
))
)
""")
@ -798,33 +885,72 @@ class InstanceTestCase(FHDLTestCase):
)
""")
def test_nir_io_slice(self):
f = Fragment()
io = IOPort(8)
f.add_subfragment(Instance("test",
i_i=io[:2],
o_o=io[2:4],
io_io=io[4:6],
), "t1")
nl = build_netlist(f, [])
self.assertRepr(nl, """
(
(module 0 None ('top')
(io inout 'io' 0.0:8)
)
(cell 0 0 (top))
(cell 1 0 (instance 'test' 't1'
(io input 'i' 0.0:2)
(io output 'o' 0.2:4)
(io inout 'io' 0.4:6)
))
)
""")
def test_nir_io_concat(self):
f = Fragment()
io1 = IOPort(4)
io2 = IOPort(4)
f.add_subfragment(Instance("test",
io_io=Cat(io1, io2),
))
nl = build_netlist(f, [io1, io2])
self.assertRepr(nl, """
(
(module 0 None ('top')
(io inout 'io1' 0.0:4)
(io inout 'io2' 1.0:4)
)
(cell 0 0 (top))
(cell 1 0 (instance 'test' 'U$0'
(io inout 'io' (io-cat 0.0:4 1.0:4))
))
)
""")
def test_nir_operator(self):
f = Fragment()
i = Signal(3)
o = Signal(4)
io = Signal(5)
f.add_subfragment(Instance("gadget",
i_i=i.as_signed(),
o_o=o.as_signed(),
io_io=io.as_signed(),
), "my_gadget")
nl = build_netlist(f, [i, o, io])
nl = build_netlist(f, [i, o])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 'i' 0.2:5)
(inout 'io' 0.5:10)
(output 'o' 1.0:4)
)
(cell 0 0 (top
(input 'i' 2:5)
(output 'o' 1.0:4)
(inout 'io' 5:10)
))
(cell 1 0 (instance 'gadget' 'my_gadget'
(input 'i' 0.2:5)
(output 'o' 0:4)
(inout 'io' 0.5:10)
))
)
""")
@ -856,7 +982,7 @@ class NamesTestCase(FHDLTestCase):
"o3": (o3, PortDirection.Output),
}
design = f.prepare(ports)
self.assertEqual(design.signal_names[design.fragment], SignalDict([
self.assertEqual(design.fragments[design.fragment].signal_names, SignalDict([
(i, "i"),
(rst, "rst"),
(o1, "o1"),
@ -865,7 +991,7 @@ class NamesTestCase(FHDLTestCase):
(cd_sync.clk, "clk"),
(cd_sync.rst, "rst$6"),
(cd_sync_norst.clk, "sync_norst_clk"),
(i1, "i$8"),
(i1, "i$7"),
]))
def test_assign_names_to_fragments(self):
@ -874,11 +1000,9 @@ class NamesTestCase(FHDLTestCase):
f.add_subfragment(b := Fragment(), name="b")
design = Design(f, ports=(), hierarchy=("top",))
self.assertEqual(design.fragment_names, {
f: ("top",),
a: ("top", "U$0"),
b: ("top", "b")
})
self.assertEqual(design.fragments[f].name, ("top",))
self.assertEqual(design.fragments[a].name, ("top", "U$0"))
self.assertEqual(design.fragments[b].name, ("top", "b"))
def test_assign_names_to_fragments_rename_top(self):
f = Fragment()
@ -886,11 +1010,9 @@ class NamesTestCase(FHDLTestCase):
f.add_subfragment(b := Fragment(), name="b")
design = Design(f, ports=[], hierarchy=("bench", "cpu"))
self.assertEqual(design.fragment_names, {
f: ("bench", "cpu",),
a: ("bench", "cpu", "U$0"),
b: ("bench", "cpu", "b")
})
self.assertEqual(design.fragments[f].name, ("bench", "cpu",))
self.assertEqual(design.fragments[a].name, ("bench", "cpu", "U$0"))
self.assertEqual(design.fragments[b].name, ("bench", "cpu", "b"))
def test_assign_names_to_fragments_collide_with_signal(self):
f = Fragment()
@ -898,10 +1020,8 @@ class NamesTestCase(FHDLTestCase):
a_s = Signal(name="a")
design = Design(f, ports=[("a", a_s, None)], hierarchy=("top",))
self.assertEqual(design.fragment_names, {
f: ("top",),
a_f: ("top", "a$U$0")
})
self.assertEqual(design.fragments[f].name, ("top",))
self.assertEqual(design.fragments[a_f].name, ("top", "a$1"))
def test_assign_names_to_fragments_duplicate(self):
f = Fragment()
@ -909,11 +1029,9 @@ class NamesTestCase(FHDLTestCase):
f.add_subfragment(a2_f := Fragment(), name="a")
design = Design(f, ports=[], hierarchy=("top",))
self.assertEqual(design.fragment_names, {
f: ("top",),
a1_f: ("top", "a"),
a2_f: ("top", "a$U$1"),
})
self.assertEqual(design.fragments[f].name, ("top",))
self.assertEqual(design.fragments[a1_f].name, ("top", "a"))
self.assertEqual(design.fragments[a2_f].name, ("top", "a$1"))
class ElaboratesTo(Elaboratable):
@ -944,122 +1062,137 @@ class OriginsTestCase(FHDLTestCase):
class IOBufferTestCase(FHDLTestCase):
def test_nir_i(self):
pad = Signal(4)
port = IOPort(4)
i = Signal(4)
f = Fragment()
f.add_subfragment(IOBufferInstance(pad, i=i))
nl = build_netlist(f, ports=[pad, i])
f.add_subfragment(IOBufferInstance(port, i=i))
nl = build_netlist(f, ports=[i])
self.assertRepr(nl, """
(
(module 0 None ('top')
(inout 'pad' 0.2:6)
(output 'i' 1.0:4)
(io input 'port' 0.0:4)
)
(cell 0 0 (top
(output 'i' 1.0:4)
(inout 'pad' 2:6)
))
(cell 1 0 (iob 0.2:6 4'd0 0))
(cell 1 0 (iob input 0.0:4))
)
""")
def test_nir_o(self):
pad = Signal(4)
port = IOPort(4)
o = Signal(4)
f = Fragment()
f.add_subfragment(IOBufferInstance(pad, o=o))
nl = build_netlist(f, ports=[pad, o])
f.add_subfragment(IOBufferInstance(port, o=o))
nl = build_netlist(f, ports=[o])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 'o' 0.6:10)
(inout 'pad' 0.2:6)
(input 'o' 0.2:6)
(io output 'port' 0.0:4)
)
(cell 0 0 (top
(input 'o' 6:10)
(inout 'pad' 2:6)
(input 'o' 2:6)
))
(cell 1 0 (iob 0.2:6 0.6:10 1))
(cell 1 0 (iob output 0.0:4 0.2:6 1))
)
""")
def test_nir_oe(self):
pad = Signal(4)
port = IOPort(4)
o = Signal(4)
oe = Signal()
f = Fragment()
f.add_subfragment(IOBufferInstance(pad, o=o, oe=oe))
nl = build_netlist(f, ports=[pad, o, oe])
f.add_subfragment(IOBufferInstance(port, o=o, oe=oe))
nl = build_netlist(f, ports=[ o, oe])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 'o' 0.6:10)
(input 'oe' 0.10)
(inout 'pad' 0.2:6)
(input 'o' 0.2:6)
(input 'oe' 0.6)
(io output 'port' 0.0:4)
)
(cell 0 0 (top
(input 'o' 6:10)
(input 'oe' 10:11)
(inout 'pad' 2:6)
(input 'o' 2:6)
(input 'oe' 6:7)
))
(cell 1 0 (iob 0.2:6 0.6:10 0.10))
(cell 1 0 (iob output 0.0:4 0.2:6 0.6))
)
""")
def test_nir_io(self):
pad = Signal(4)
port = IOPort(4)
i = Signal(4)
o = Signal(4)
oe = Signal()
f = Fragment()
f.add_subfragment(IOBufferInstance(pad, i=i, o=o, oe=oe))
nl = build_netlist(f, ports=[pad, i, o, oe])
f.add_subfragment(IOBufferInstance(port, i=i, o=o, oe=oe))
nl = build_netlist(f, ports=[i, o, oe])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 'o' 0.6:10)
(input 'oe' 0.10)
(inout 'pad' 0.2:6)
(input 'o' 0.2:6)
(input 'oe' 0.6)
(output 'i' 1.0:4)
(io inout 'port' 0.0:4)
)
(cell 0 0 (top
(input 'o' 6:10)
(input 'oe' 10:11)
(input 'o' 2:6)
(input 'oe' 6:7)
(output 'i' 1.0:4)
(inout 'pad' 2:6)
))
(cell 1 0 (iob 0.2:6 0.6:10 0.10))
(cell 1 0 (iob inout 0.0:4 0.2:6 0.6))
)
""")
def test_wrong_port(self):
port = Signal(4)
i = Signal(4)
with self.assertRaisesRegex(TypeError,
r"^Object \(sig port\) cannot be converted to an IO value"):
IOBufferInstance(port, i=i)
def test_wrong_i(self):
pad = Signal(4)
port = IOPort(4)
i = Signal()
with self.assertRaisesRegex(ValueError,
r"^`pad` length \(4\) doesn't match `i` length \(1\)"):
IOBufferInstance(pad, i=i)
r"^'port' length \(4\) doesn't match 'i' length \(1\)"):
IOBufferInstance(port, i=i)
def test_wrong_o(self):
pad = Signal(4)
port = IOPort(4)
o = Signal()
with self.assertRaisesRegex(ValueError,
r"^`pad` length \(4\) doesn't match `o` length \(1\)"):
IOBufferInstance(pad, o=o)
r"^'port' length \(4\) doesn't match 'o' length \(1\)"):
IOBufferInstance(port, o=o)
def test_wrong_oe(self):
pad = Signal(4)
port = IOPort(4)
o = Signal(4)
oe = Signal(4)
with self.assertRaisesRegex(ValueError,
r"^`oe` length \(4\) must be 1"):
IOBufferInstance(pad, o=o, oe=oe)
r"^'oe' length \(4\) must be 1"):
IOBufferInstance(port, o=o, oe=oe)
def test_wrong_oe_without_o(self):
pad = Signal(4)
port = IOPort(4)
oe = Signal()
with self.assertRaisesRegex(ValueError,
r"^`oe` must not be used if `o` is not used"):
IOBufferInstance(pad, oe=oe)
r"^'oe' must not be used if 'o' is not used"):
IOBufferInstance(port, oe=oe)
def test_conflict(self):
port = IOPort(4)
i1 = Signal(4)
i2 = Signal(4)
f = Fragment()
f.add_subfragment(IOBufferInstance(port, i=i1))
f.add_subfragment(IOBufferInstance(port, i=i2))
with self.assertRaisesRegex(DriverConflict,
r"^Bit 0 of I/O port \(io-port port\) used twice, at .*test_hdl_ir.py:\d+ and "
r".*test_hdl_ir.py:\d+$"):
build_netlist(f, ports=[i1, i2])
class AssignTestCase(FHDLTestCase):