ir: kill Fragment.ports

This commit is contained in:
Wanda 2024-02-11 12:07:45 +01:00 committed by Catherine
parent a725282751
commit 751e0f4b57
9 changed files with 576 additions and 666 deletions

View file

@ -401,7 +401,7 @@ class ModuleEmitter:
port_id=port_id, port_kind=flow.value,
name=name, attrs=self.value_attrs.get(value, {}))
self.sigport_wires[name] = (wire, value)
if flow == _nir.ModuleNetFlow.OUTPUT:
if flow == _nir.ModuleNetFlow.Output:
continue
# If we just emitted an input or inout port, it is driving the value.
self.driven_sigports.add(name)
@ -463,7 +463,7 @@ class ModuleEmitter:
for submodule_idx in self.module.submodules:
submodule = self.netlist.modules[submodule_idx]
for _name, (value, flow) in submodule.ports.items():
if flow == _nir.ModuleNetFlow.OUTPUT:
if flow == _nir.ModuleNetFlow.Output:
self.emit_driven_wire(value)
def sigspec(self, *parts: '_nir.Net | Iterable[_nir.Net]'):
@ -989,10 +989,10 @@ class EmptyModuleChecker:
return module_idx in self.empty
def convert_fragment(fragment, name="top", *, emit_src=True):
def convert_fragment(fragment, ports, name="top", *, emit_src=True, **kwargs):
assert isinstance(fragment, _ir.Fragment)
name_map = _ast.SignalDict()
netlist = _ir.build_netlist(fragment, name=name)
netlist = _ir.build_netlist(fragment, ports=ports, name=name, **kwargs)
empty_checker = EmptyModuleChecker(netlist)
builder = _Builder(emit_src=emit_src)
for module_idx, module in enumerate(netlist.modules):
@ -1011,14 +1011,18 @@ def convert(elaboratable, name="top", platform=None, *, ports=None, emit_src=Tru
if (ports is None and
hasattr(elaboratable, "signature") and
isinstance(elaboratable.signature, wiring.Signature)):
ports = []
for _path, _member, value in elaboratable.signature.flatten(elaboratable):
ports = {}
for path, member, value in elaboratable.signature.flatten(elaboratable):
if isinstance(value, _ast.ValueCastable):
value = value.as_value()
if isinstance(value, _ast.Value):
ports.append(value)
if member.flow == wiring.In:
dir = _ir.PortDirection.Input
else:
dir = _ir.PortDirection.Output
ports["__".join(path)] = (value, dir)
elif ports is None:
raise TypeError("The `convert()` function requires a `ports=` argument")
fragment = _ir.Fragment.get(elaboratable, platform).prepare(ports=ports, **kwargs)
il_text, _name_map = convert_fragment(fragment, name, emit_src=emit_src)
fragment = _ir.Fragment.get(elaboratable, platform)
il_text, _name_map = convert_fragment(fragment, ports, name, emit_src=emit_src, **kwargs)
return il_text

View file

@ -45,14 +45,18 @@ def convert(elaboratable, name="top", platform=None, *, ports=None, emit_src=Tru
if (ports is None and
hasattr(elaboratable, "signature") and
isinstance(elaboratable.signature, wiring.Signature)):
ports = []
ports = {}
for path, member, value in elaboratable.signature.flatten(elaboratable):
if isinstance(value, _ast.ValueCastable):
value = value.as_value()
if isinstance(value, _ast.Value):
ports.append(value)
if member.flow == wiring.In:
dir = _ir.PortDirection.Input
else:
dir = _ir.PortDirection.Output
ports["__".join(path)] = (value, dir)
elif ports is None:
raise TypeError("The `convert()` function requires a `ports=` argument")
fragment = _ir.Fragment.get(elaboratable, platform).prepare(ports=ports, **kwargs)
verilog_text, name_map = convert_fragment(fragment, name, emit_src=emit_src, strip_internal_attrs=strip_internal_attrs)
fragment = _ir.Fragment.get(elaboratable, platform)
verilog_text, name_map = convert_fragment(fragment, ports, name, emit_src=emit_src, strip_internal_attrs=strip_internal_attrs, **kwargs)
return verilog_text

View file

@ -163,11 +163,11 @@ class Platform(ResourceManager, metaclass=ABCMeta):
if pin.dir == "io":
add_pin_fragment(pin, self.get_diff_input_output(pin, port, attrs, invert))
fragment._propagate_ports(ports=self.iter_ports(), all_undef_as_ports=False)
return self.toolchain_prepare(fragment, name, **kwargs)
ports = list(self.iter_ports())
return self.toolchain_prepare(fragment, ports, name, **kwargs)
@abstractmethod
def toolchain_prepare(self, fragment, name, **kwargs):
def toolchain_prepare(self, fragment, ports, name, **kwargs):
"""
Convert the ``fragment`` and constraints recorded in this :class:`Platform` into
a :class:`BuildPlan`.
@ -290,7 +290,7 @@ class TemplatedPlatform(Platform):
continue
yield net_signal, port_signal, frequency
def toolchain_prepare(self, fragment, name, *, emit_src=True, **kwargs):
def toolchain_prepare(self, fragment, ports, name, *, emit_src=True, **kwargs):
# Restrict the name of the design to a strict alphanumeric character set. Platforms will
# interpolate the name of the design in many different contexts: filesystem paths, Python
# scripts, Tcl scripts, ad-hoc constraint files, and so on. It is not practical to add
@ -306,7 +306,8 @@ class TemplatedPlatform(Platform):
# and to incorporate the Amaranth version into generated code.
autogenerated = f"Automatically generated by Amaranth {__version__}. Do not edit."
rtlil_text, self._name_map = rtlil.convert_fragment(fragment, name=name, emit_src=emit_src)
rtlil_text, self._name_map = rtlil.convert_fragment(fragment, ports=ports, name=name,
emit_src=emit_src)
# Retrieve an override specified in either the environment or as a kwarg.
# expected_type parameter is used to assert the type of kwargs, passing `None` will disable

View file

@ -1,6 +1,7 @@
from typing import Tuple
from collections import defaultdict, OrderedDict
from functools import reduce
import enum
import warnings
from .._utils import flatten, memoize
@ -8,7 +9,10 @@ from .. import tracer, _unused
from . import _ast, _cd, _ir, _nir
__all__ = ["UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance"]
__all__ = [
"UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance",
"PortDirection", "Design", "build_netlist",
]
class UnusedElaboratable(_unused.UnusedMustUse):
@ -65,7 +69,6 @@ class Fragment:
obj = new_obj
def __init__(self, *, src_loc=None):
self.ports = _ast.SignalDict()
self.drivers = OrderedDict()
self.statements = {}
self.domains = OrderedDict()
@ -76,19 +79,6 @@ class Fragment:
self.src_loc = src_loc
self.origins = None
def add_ports(self, *ports, dir):
assert dir in ("i", "o", "io")
for port in flatten(ports):
self.ports[port] = dir
def iter_ports(self, dir=None):
if dir is None:
yield from self.ports
else:
for port, port_dir in self.ports.items():
if port_dir == dir:
yield port
def add_driver(self, signal, domain="comb"):
assert isinstance(domain, str)
if domain not in self.drivers:
@ -111,18 +101,6 @@ class Fragment:
for signal in signals:
yield domain, signal
def iter_signals(self):
signals = _ast.SignalSet()
signals |= self.ports.keys()
for domain, domain_signals in self.drivers.items():
if domain != "comb":
cd = self.domains[domain]
signals.add(cd.clk)
if cd.rst is not None:
signals.add(cd.rst)
signals |= domain_signals
return signals
def add_domains(self, *domains):
for domain in flatten(domains):
assert isinstance(domain, _cd.ClockDomain)
@ -169,7 +147,6 @@ class Fragment:
# Merge subfragment's everything except clock domains into this fragment.
# Flattening is done after clock domain propagation, so we can assume the domains
# are already the same in every involved fragment in the first place.
self.ports.update(subfragment.ports)
for domain, signal in subfragment.iter_drivers():
self.add_driver(signal, domain)
for domain, statements in subfragment.statements.items():
@ -372,277 +349,78 @@ class Fragment:
self._propagate_domains_down()
return new_domains
def _prepare_use_def_graph(self, parent, level, uses, defs, ios, top):
from ._mem import MemoryInstance
def add_uses(*sigs, self=self):
for sig in flatten(sigs):
if sig not in uses:
uses[sig] = set()
uses[sig].add(self)
def add_defs(*sigs):
for sig in flatten(sigs):
if sig not in defs:
defs[sig] = self
def _prepare_ports(self, ports):
# Normalize ports to a list.
new_ports = []
if isinstance(ports, dict):
for port_name, (signal, dir) in ports.items():
new_ports.append((port_name, signal, dir))
elif isinstance(ports, (list, tuple)):
for port in ports:
if isinstance(port, tuple):
port_name, signal, dir = port
new_ports.append((port_name, signal, dir))
else:
assert defs[sig] is self
new_ports.append((None, port, None))
else:
msg = f"`ports` must be a dict, a list or a tuple, not {ports!r}"
if isinstance(ports, _ast.Value):
msg += " (did you mean `ports=(<signal>,)`, rather than `ports=<signal>`?)"
raise TypeError(msg)
def add_io(*sigs):
for sig in flatten(sigs):
if sig not in ios:
ios[sig] = self
# Validate ports.
prenamed_ports = set()
for (port_name, signal, dir) in new_ports:
if isinstance(port_name, str):
if port_name in prenamed_ports:
raise TypeError(f"Duplicate port name {port_name!r}")
else:
assert ios[sig] is self
prenamed_ports.add(port_name)
elif port_name is not None:
raise TypeError(f"Port name must be a string, not {port_name!r}")
if dir is not None and not isinstance(dir, PortDirection):
raise TypeError(
f"Port direction must be a `PortDirection` instance or None, not {dir!r}")
if not isinstance(signal, (_ast.Signal, _ast.ClockSignal, _ast.ResetSignal)):
raise TypeError(f"Only signals may be added as ports, not {signal!r}")
# Collect all signals we're driving (on LHS of statements), and signals we're using
# (on RHS of statements, or in clock domains).
for stmts in self.statements.values():
for stmt in stmts:
add_uses(stmt._rhs_signals())
add_defs(stmt._lhs_signals())
return new_ports
for domain, _ in self.iter_sync():
cd = self.domains[domain]
add_uses(cd.clk)
if cd.rst is not None:
add_uses(cd.rst)
# Repeat for subfragments.
for subfrag, name, src_loc in self.subfragments:
if isinstance(subfrag, Instance):
for port_name, (value, dir) in subfrag.named_ports.items():
if dir == "i":
# Prioritize defs over uses.
rhs_without_outputs = value._rhs_signals() - subfrag.iter_ports(dir="o")
subfrag.add_ports(rhs_without_outputs, dir=dir)
add_uses(value._rhs_signals())
if dir == "o":
subfrag.add_ports(value._lhs_signals(), dir=dir)
add_defs(value._lhs_signals())
if dir == "io":
subfrag.add_ports(value._lhs_signals(), dir=dir)
add_io(value._lhs_signals())
elif isinstance(subfrag, MemoryInstance):
for port in subfrag._read_ports:
subfrag.add_ports(port._data._lhs_signals(), dir="o")
add_defs(port._data._lhs_signals())
for value in [port._addr, port._en]:
subfrag.add_ports(value._rhs_signals(), dir="i")
add_uses(value._rhs_signals())
for port in subfrag._write_ports:
for value in [port._addr, port._en, port._data]:
subfrag.add_ports(value._rhs_signals(), dir="i")
add_uses(value._rhs_signals())
for domain, _ in subfrag.iter_sync():
cd = subfrag.domains[domain]
add_uses(cd.clk)
if cd.rst is not None:
add_uses(cd.rst)
else:
parent[subfrag] = self
level [subfrag] = level[self] + 1
subfrag._prepare_use_def_graph(parent, level, uses, defs, ios, top)
def _propagate_ports(self, ports, all_undef_as_ports):
# Take this fragment graph:
#
# __ B (def: q, use: p r)
# /
# A (def: p, use: q r)
# \
# \_ C (def: r, use: p q)
#
# We need to consider three cases.
# 1. Signal p requires an input port in B;
# 2. Signal r requires an output port in C;
# 3. Signal r requires an output port in C and an input port in B.
#
# Adding these ports can be in general done in three steps for each signal:
# 1. Find the least common ancestor of all uses and defs.
# 2. Going upwards from the single def, add output ports.
# 3. Going upwards from all uses, add input ports.
parent = {self: None}
level = {self: 0}
uses = _ast.SignalDict()
defs = _ast.SignalDict()
ios = _ast.SignalDict()
self._prepare_use_def_graph(parent, level, uses, defs, ios, self)
ports = _ast.SignalSet(ports)
if all_undef_as_ports:
for sig in uses:
if sig in defs:
continue
ports.add(sig)
for sig in ports:
if sig not in uses:
uses[sig] = set()
uses[sig].add(self)
@memoize
def lca_of(fragu, fragv):
# Normalize fragu to be deeper than fragv.
if level[fragu] < level[fragv]:
fragu, fragv = fragv, fragu
# Find ancestor of fragu on the same level as fragv.
for _ in range(level[fragu] - level[fragv]):
fragu = parent[fragu]
# If fragv was the ancestor of fragv, we're done.
if fragu == fragv:
return fragu
# Otherwise, they are at the same level but in different branches. Step both fragu
# and fragv until we find the common ancestor.
while parent[fragu] != parent[fragv]:
fragu = parent[fragu]
fragv = parent[fragv]
return parent[fragu]
for sig in uses:
if sig in defs:
lca = reduce(lca_of, uses[sig], defs[sig])
else:
lca = reduce(lca_of, uses[sig])
for frag in uses[sig]:
if sig in defs and frag is defs[sig]:
continue
while frag != lca:
frag.add_ports(sig, dir="i")
frag = parent[frag]
if sig in defs:
frag = defs[sig]
while frag != lca:
frag.add_ports(sig, dir="o")
frag = parent[frag]
for sig in ios:
frag = ios[sig]
while frag is not None:
frag.add_ports(sig, dir="io")
frag = parent[frag]
for sig in ports:
if sig in ios:
continue
if sig in defs:
self.add_ports(sig, dir="o")
else:
self.add_ports(sig, dir="i")
def prepare(self, ports=None, missing_domain=lambda name: _cd.ClockDomain(name)):
def prepare(self, ports=(), *, hierarchy=("top",), legalize_assignments=False, missing_domain=lambda name: _cd.ClockDomain(name)):
from ._xfrm import DomainLowerer
ports = self._prepare_ports(ports)
new_domains = self._propagate_domains(missing_domain)
for domain in new_domains:
ports.append((None, domain.clk, PortDirection.Input))
if domain.rst is not None:
ports.append((None, domain.rst, PortDirection.Input))
def resolve_signal(signal):
if isinstance(signal, _ast.ClockSignal):
domain = self.domains[signal.domain]
return domain.clk
elif isinstance(signal, _ast.ResetSignal):
domain = self.domains[signal.domain]
if domain.rst is None:
raise ValueError(f"Using ResetSignal for a reset-less domain {signal.domain}")
return domain.rst
else:
return signal
ports = [
(name, resolve_signal(signal), dir)
for name, signal, dir in ports
]
fragment = DomainLowerer()(self)
if ports is None:
fragment._propagate_ports(ports=(), all_undef_as_ports=True)
else:
if not isinstance(ports, tuple) and not isinstance(ports, list):
msg = "`ports` must be either a list or a tuple, not {!r}"\
.format(ports)
if isinstance(ports, _ast.Value):
msg += " (did you mean `ports=(<signal>,)`, rather than `ports=<signal>`?)"
raise TypeError(msg)
mapped_ports = []
# Lower late bound signals like ClockSignal() to ports.
port_lowerer = DomainLowerer(fragment.domains)
for port in ports:
if not isinstance(port, (_ast.Signal, _ast.ClockSignal, _ast.ResetSignal)):
raise TypeError("Only signals may be added as ports, not {!r}"
.format(port))
mapped_ports.append(port_lowerer.on_value(port))
# Add ports for all newly created missing clock domains, since not doing so defeats
# the purpose of domain auto-creation. (It's possible to refer to these ports before
# the domain actually exists through late binding, but it's inconvenient.)
for cd in new_domains:
mapped_ports.append(cd.clk)
if cd.rst is not None:
mapped_ports.append(cd.rst)
fragment._propagate_ports(ports=mapped_ports, all_undef_as_ports=False)
return fragment
if legalize_assignments:
from ._xfrm import AssignmentLegalizer
fragment = AssignmentLegalizer()(fragment)
def _assign_names_to_signals(self):
"""Assign names to signals used in this fragment.
Returns
-------
SignalDict of Signal to str
A mapping from signals used in this fragment to their local names. Because names are
deduplicated using local information only, the same signal used in a different fragment
may get a different name.
"""
signal_names = _ast.SignalDict()
assigned_names = set()
def add_signal_name(signal):
if signal not in signal_names:
if signal.name not in assigned_names:
name = signal.name
else:
name = f"{signal.name}${len(assigned_names)}"
assert name not in assigned_names
signal_names[signal] = name
assigned_names.add(name)
for port in self.ports.keys():
add_signal_name(port)
for domain_name, domain_signals in self.drivers.items():
if domain_name != "comb":
domain = self.domains[domain_name]
add_signal_name(domain.clk)
if domain.rst is not None:
add_signal_name(domain.rst)
for statements in self.statements.values():
for statement in statements:
for signal in statement._lhs_signals() | statement._rhs_signals():
if not isinstance(signal, (_ast.ClockSignal, _ast.ResetSignal)):
add_signal_name(signal)
return signal_names
def _assign_names_to_fragments(self, hierarchy=("top",), *, _names=None):
"""Assign names to this fragment and its subfragments.
Subfragments may not necessarily have a name. This method assigns every such subfragment
a name, ``U$<number>``, where ``<number>`` is based on its location in the hierarchy.
Subfragment names may collide with signal names safely in Amaranth, but this may confuse
backends. This method assigns every such subfragment a name, ``<name>$U$<number>``, where
``name`` is its original name, and ``<number>`` is based on its location in the hierarchy.
Arguments
---------
hierarchy : tuple of str
Name of this fragment.
Returns
-------
dict of Fragment to tuple of str
A mapping from this fragment and its subfragments to their full hierarchical names.
"""
if _names is None:
_names = dict()
_names[self] = hierarchy
signal_names = set(self._assign_names_to_signals().values())
for subfragment_index, (subfragment, subfragment_name, subfragment_src_loc) in enumerate(self.subfragments):
if subfragment_name is None:
subfragment_name = f"U${subfragment_index}"
elif subfragment_name in signal_names:
subfragment_name = f"{subfragment_name}$U${subfragment_index}"
assert subfragment_name not in signal_names
subfragment._assign_names_to_fragments(hierarchy=(*hierarchy, subfragment_name),
_names=_names)
return _names
# Create design and let it do the rest.
return Design(fragment, ports, hierarchy=hierarchy)
class Instance(Fragment):
@ -682,9 +460,118 @@ class Instance(Fragment):
.format(kw, arg))
class Design:
"""Represents a design ready for simulation or netlist building.
Returned by ``Fragment.prepare``."""
def __init__(self, fragment, ports, *, hierarchy):
self.fragment = fragment
self.ports = ports
self.hierarchy = hierarchy
# dict of Fragment to SignalDict of Signal to name
self.signal_names = {}
self.fragment_names = {}
self._assign_names_to_signals(fragment, ports)
self._assign_names_to_fragments(fragment, hierarchy)
# Use just-assigned signal names to name all unnamed ports.
top_names = self.signal_names[fragment]
self.ports = [
(name or top_names[signal], signal, dir)
for (name, signal, dir) in self.ports
]
def _assign_names_to_signals(self, fragment, ports=None):
"""Assign names to signals used in a given fragment.
The mapping is set in ``self.signal_names``. Because names are deduplicated using local
information only, the same signal used in a different fragment may get a different name.
"""
signal_names = _ast.SignalDict()
assigned_names = set()
def add_signal_name(signal):
if signal not in signal_names:
if signal.name not in assigned_names:
name = signal.name
else:
name = f"{signal.name}${len(assigned_names)}"
assert name not in assigned_names
signal_names[signal] = name
assigned_names.add(name)
if ports is not None:
# First pass: reserve names for pre-named top-level ports. If equal to the signal name, let the signal share it.
for name, signal, _dir in ports:
if name is not None:
assigned_names.add(name)
if signal.name == name:
signal_names[signal] = name
# Second pass: ensure non-pre-named top-level ports are named first.
for name, signal, _dir in ports:
if name is None:
add_signal_name(signal)
for domain_name, domain_signals in fragment.drivers.items():
if domain_name != "comb":
domain = fragment.domains[domain_name]
add_signal_name(domain.clk)
if domain.rst is not None:
add_signal_name(domain.rst)
for statements in fragment.statements.values():
for statement in statements:
for signal in statement._lhs_signals() | statement._rhs_signals():
if not isinstance(signal, (_ast.ClockSignal, _ast.ResetSignal)):
add_signal_name(signal)
self.signal_names[fragment] = signal_names
for subfragment, _name, _src_loc in fragment.subfragments:
self._assign_names_to_signals(subfragment)
def _assign_names_to_fragments(self, fragment, hierarchy):
"""Assign names to this fragment and its subfragments.
Subfragments may not necessarily have a name. This method assigns every such subfragment
a name, ``U$<number>``, where ``<number>`` is based on its location in the hierarchy.
Subfragment names may collide with signal names safely in Amaranth, but this may confuse
backends. This method assigns every such subfragment a name, ``<name>$U$<number>``, where
``name`` is its original name, and ``<number>`` is based on its location in the hierarchy.
Arguments
---------
hierarchy : tuple of str
Name of this fragment.
Returns
-------
dict of Fragment to tuple of str
A mapping from this fragment and its subfragments to their full hierarchical names.
"""
self.fragment_names[fragment] = hierarchy
signal_names = set(self.signal_names[fragment].values())
for subfragment_index, (subfragment, subfragment_name, subfragment_src_loc) in enumerate(fragment.subfragments):
if subfragment_name is None:
subfragment_name = f"U${subfragment_index}"
elif subfragment_name in signal_names:
subfragment_name = f"{subfragment_name}$U${subfragment_index}"
assert subfragment_name not in signal_names
self._assign_names_to_fragments(subfragment, hierarchy=(*hierarchy, subfragment_name))
############################################################################################### >:3
class PortDirection(enum.Enum):
Input = "input"
Output = "output"
Inout = "inout"
class NetlistDriver:
def __init__(self, module_idx: int, signal: _ast.Signal,
domain: '_cd.ClockDomain | None', *, src_loc):
@ -710,9 +597,9 @@ class NetlistDriver:
class NetlistEmitter:
def __init__(self, netlist: _nir.Netlist, fragment_names: 'dict[_ir.Fragment, str]'):
def __init__(self, netlist: _nir.Netlist, design):
self.netlist = netlist
self.fragment_names = fragment_names
self.design = design
self.drivers = _ast.SignalDict()
self.rhs_cache: dict[int, Tuple[_nir.Value, bool, _ast.Value]] = {}
@ -1155,30 +1042,54 @@ class NetlistEmitter:
self.connect(port_conn, _nir.Value(output_nets[start_bit:start_bit + len(port_conn)]),
src_loc=instance.src_loc)
def emit_top_ports(self, fragment: _ir.Fragment, signal_names: _ast.SignalDict):
def emit_top_ports(self, fragment: _ir.Fragment):
inouts = set()
for cell in self.netlist.cells:
if isinstance(cell, _nir.IOBuffer):
inouts.update(cell.pad)
if isinstance(cell, _nir.Instance):
for value in cell.ports_io.values():
inouts.update(value)
next_input_bit = 2 # 0 and 1 are reserved for constants
top = self.netlist.top
for signal, dir in fragment.ports.items():
assert signal not in self.netlist.signals
name = signal_names[signal]
if dir == 'i':
for name, signal, dir in self.design.ports:
signal_value = self.emit_signal(signal)
if dir is None:
is_driven = False
is_inout = False
for net in signal_value:
if net in self.netlist.connections:
is_driven = True
if net in inouts:
is_inout = True
if is_driven:
dir = PortDirection.Output
elif is_inout:
dir = PortDirection.Inout
else:
dir = PortDirection.Input
if dir == PortDirection.Input:
top.ports_i[name] = (next_input_bit, signal.width)
nets = _nir.Value(
value = _nir.Value(
_nir.Net.from_cell(0, bit)
for bit in range(next_input_bit, next_input_bit + signal.width)
)
next_input_bit += signal.width
self.netlist.signals[signal] = nets
elif dir == 'o':
top.ports_o[name] = self.emit_signal(signal)
elif dir == 'io':
self.connect(signal_value, value, src_loc=signal.src_loc)
elif dir == PortDirection.Output:
top.ports_o[name] = signal_value
elif dir == PortDirection.Inout:
top.ports_io[name] = (next_input_bit, signal.width)
nets = _nir.Value(
value = _nir.Value(
_nir.Net.from_cell(0, bit)
for bit in range(next_input_bit, next_input_bit + signal.width)
)
next_input_bit += signal.width
self.netlist.signals[signal] = nets
self.connect(signal_value, value, src_loc=signal.src_loc)
else:
raise ValueError(f"Invalid port direction {dir!r}")
def emit_drivers(self):
for driver in self.drivers.values():
@ -1205,6 +1116,7 @@ class NetlistEmitter:
src_loc = driver.signal.src_loc
self.connect(self.emit_signal(driver.signal), value, src_loc=src_loc)
def emit_undriven(self):
# Connect all undriven signal bits to their initial values. This can only happen for entirely
# undriven signals, or signals that are partially driven by instances.
for signal, value in self.netlist.signals.items():
@ -1215,7 +1127,7 @@ class NetlistEmitter:
def emit_fragment(self, fragment: _ir.Fragment, parent_module_idx: 'int | None', *, cell_src_loc=None):
from . import _mem
fragment_name = self.fragment_names[fragment]
fragment_name = self.design.fragment_names[fragment]
if isinstance(fragment, _ir.Instance):
assert parent_module_idx is not None
if fragment.type == "$tribuf":
@ -1232,10 +1144,8 @@ class NetlistEmitter:
self.emit_read_port(parent_module_idx, fragment, port, memory, write_ports)
elif type(fragment) is _ir.Fragment:
module_idx = self.netlist.add_module(parent_module_idx, fragment_name, src_loc=fragment.src_loc, cell_src_loc=cell_src_loc)
signal_names = fragment._assign_names_to_signals()
signal_names = self.design.signal_names[fragment]
self.netlist.modules[module_idx].signal_names = signal_names
if parent_module_idx is None:
self.emit_top_ports(fragment, signal_names)
for signal in signal_names:
self.emit_signal(signal)
for domain, stmts in fragment.statements.items():
@ -1245,13 +1155,14 @@ class NetlistEmitter:
self.emit_fragment(subfragment, module_idx, cell_src_loc=sub_src_loc)
if parent_module_idx is None:
self.emit_drivers()
self.emit_top_ports(fragment)
self.emit_undriven()
else:
assert False # :nocov:
def _emit_netlist(netlist: _nir.Netlist, fragment, hierarchy):
fragment_names = fragment._assign_names_to_fragments(hierarchy)
NetlistEmitter(netlist, fragment_names).emit_fragment(fragment, None)
def _emit_netlist(netlist: _nir.Netlist, design):
NetlistEmitter(netlist, design).emit_fragment(design.fragment, None)
def _compute_net_flows(netlist: _nir.Netlist):
@ -1260,39 +1171,39 @@ def _compute_net_flows(netlist: _nir.Netlist):
# The rules for net flows are as follows:
#
# - the modules that have a given net in their net_flow form a subtree of the hierarchy
# - INTERNAL is used in the root of the subtree and nowhere else
# - OUTPUT is used for modules that contain the definition of the net, or are on the
# - Internal is used in the root of the subtree and nowhere else
# - Output is used for modules that contain the definition of the net, or are on the
# path from the definition to the root
# - remaining modules have a flow of INPUT (unless the net is a top-level inout port,
# in which case it is INOUT)
# - remaining modules have a flow of Input (unless the net is a top-level inout port,
# in which case it is Inout)
#
# In other words, the tree looks something like this:
#
# - [no flow] <<< top
# - [no flow]
# - INTERNAL
# - INPUT << use
# - Internal
# - Input << use
# - [no flow]
# - INPUT
# - INPUT << use
# - OUTPUT
# - INPUT << use
# - Input
# - Input << use
# - Output
# - Input << use
# - [no flow]
# - OUTPUT << def
# - INPUT
# - INPUT
# - Output << def
# - Input
# - Input
# - [no flow]
# - [no flow]
# - [no flow]
#
# This function doesn't assign the INOUT flow — that is corrected later, in compute_ports.
# This function doesn't assign the Inout flow — that is corrected later, in compute_ports.
lca = {}
# Initialize by marking the definition point of every net.
for cell_idx, cell in enumerate(netlist.cells):
for net in cell.output_nets(cell_idx):
lca[net] = cell.module_idx
netlist.modules[cell.module_idx].net_flow[net] = _nir.ModuleNetFlow.INTERNAL
netlist.modules[cell.module_idx].net_flow[net] = _nir.ModuleNetFlow.Internal
# Marks a use of a net within a given module, and adjusts its netflows in all modules
# as required.
@ -1309,7 +1220,7 @@ def _compute_net_flows(netlist: _nir.Netlist):
def_module = lca[net]
# While def_module deeper than use_module, go up with def_module.
while len(modules[def_module].name) > len(modules[use_module].name):
modules[def_module].net_flow[net] = _nir.ModuleNetFlow.OUTPUT
modules[def_module].net_flow[net] = _nir.ModuleNetFlow.Output
def_module = modules[def_module].parent
# While use_module deeper than def_module, go up with use_module.
# If use_module is below def_module in the hierarchy, we may hit
@ -1318,19 +1229,19 @@ def _compute_net_flows(netlist: _nir.Netlist):
while len(modules[def_module].name) < len(modules[use_module].name):
if net in modules[use_module].net_flow:
return
modules[use_module].net_flow[net] = _nir.ModuleNetFlow.INPUT
modules[use_module].net_flow[net] = _nir.ModuleNetFlow.Input
use_module = modules[use_module].parent
# Now both pointers should be at the same depth within the hierarchy.
assert len(modules[def_module].name) == len(modules[use_module].name)
# Move both pointers up until they meet.
while def_module != use_module:
modules[def_module].net_flow[net] = _nir.ModuleNetFlow.OUTPUT
modules[def_module].net_flow[net] = _nir.ModuleNetFlow.Output
def_module = modules[def_module].parent
modules[use_module].net_flow[net] = _nir.ModuleNetFlow.INPUT
modules[use_module].net_flow[net] = _nir.ModuleNetFlow.Input
use_module = modules[use_module].parent
assert len(modules[def_module].name) == len(modules[use_module].name)
# And mark the new LCA.
modules[def_module].net_flow[net] = _nir.ModuleNetFlow.INTERNAL
modules[def_module].net_flow[net] = _nir.ModuleNetFlow.Internal
lca[net] = def_module
# Now mark all uses and flesh out the structure.
@ -1373,14 +1284,17 @@ def _compute_ports(netlist: _nir.Netlist):
if value not in name_table and not name.startswith('$'):
name_table[value] = name
# Adjust any input flows to inout as necessary.
for (net, flow) in module.net_flow.items():
if flow == _nir.ModuleNetFlow.Input and net in inouts:
module.net_flow[net] = _nir.ModuleNetFlow.Inout
# Gather together "adjacent" nets with the same flow into ports.
visited = set()
for net in sorted(module.net_flow):
flow = module.net_flow[net]
if flow == _nir.ModuleNetFlow.INTERNAL:
if flow == _nir.ModuleNetFlow.Internal:
continue
if flow == _nir.ModuleNetFlow.INPUT and net in inouts:
flow = module.net_flow[net] = _nir.ModuleNetFlow.INOUT
if net in visited:
continue
# We found a net that needs a port. Keep joining the next nets output by the same
@ -1412,23 +1326,21 @@ def _compute_ports(netlist: _nir.Netlist):
for name, (start, width) in netlist.top.ports_i.items():
top_module.ports[name] = (
_nir.Value(_nir.Net.from_cell(0, start + bit) for bit in range(width)),
_nir.ModuleNetFlow.INPUT
_nir.ModuleNetFlow.Input
)
for name, (start, width) in netlist.top.ports_io.items():
top_module.ports[name] = (
_nir.Value(_nir.Net.from_cell(0, start + bit) for bit in range(width)),
_nir.ModuleNetFlow.INOUT
_nir.ModuleNetFlow.Inout
)
for name, value in netlist.top.ports_o.items():
top_module.ports[name] = (value, _nir.ModuleNetFlow.OUTPUT)
top_module.ports[name] = (value, _nir.ModuleNetFlow.Output)
def build_netlist(fragment, *, name="top"):
from ._xfrm import AssignmentLegalizer
fragment = AssignmentLegalizer()(fragment)
def build_netlist(fragment, ports, *, name="top", **kwargs):
design = fragment.prepare(ports=ports, hierarchy=(name,), legalize_assignments=True, **kwargs)
netlist = _nir.Netlist()
_emit_netlist(netlist, fragment, hierarchy=(name,))
_emit_netlist(netlist, design)
netlist.resolve_all_nets()
_compute_net_flows(netlist)
_compute_ports(netlist)

View file

@ -199,7 +199,7 @@ class Netlist:
self.signals[sig] = self.resolve_value(self.signals[sig])
def __repr__(self):
result = []
result = ["("]
for module_idx, module in enumerate(self.modules):
name = " ".join(repr(name) for name in module.name)
ports = " ".join(
@ -209,6 +209,7 @@ class Netlist:
result.append(f"(module {module_idx} {module.parent} ({name}) {ports})")
for cell_idx, cell in enumerate(self.cells):
result.append(f"(cell {cell_idx} {cell.module_idx} {cell!r})")
result.append(")")
return "\n".join(result)
def add_module(self, parent, name: str, *, src_loc=None, cell_src_loc=None):
@ -251,21 +252,21 @@ class ModuleNetFlow(enum.Enum):
#: The net is present in the module (used in the module or needs
#: to be routed through it between its submodules), but is not
#: present outside its subtree and thus is not a port of this module.
INTERNAL = "internal"
Internal = "internal"
#: The net is present in the module, and is not driven from
#: the module or any of its submodules. It is thus an input
#: port of this module.
INPUT = "input"
Input = "input"
#: The net is present in the module, is driven from the module or
#: one of its submodules, and is also used outside of its subtree.
#: It is thus an output port of this module.
OUTPUT = "output"
Output = "output"
#: The net is a special top-level inout net that is used within
#: this module or its submodules. It is an inout port of this module.
INOUT = "inout"
Inout = "inout"
class Module:
@ -1039,7 +1040,7 @@ class Instance(Cell):
items.append(f"(attr {name!r} {val!r})")
for name, val in self.ports_i.items():
items.append(f"(input {name!r} {val})")
for name, (start, width) in self.ports_i.items():
for name, (start, width) in self.ports_o.items():
items.append(f"(output {name!r} {start}:{start+width})")
for name, val in self.ports_io.items():
items.append(f"(inout {name!r} {val})")

View file

@ -209,10 +209,6 @@ class FragmentTransformer:
for subfragment, name, src_loc in fragment.subfragments:
new_fragment.add_subfragment(self(subfragment), name, src_loc=src_loc)
def map_ports(self, fragment, new_fragment):
for port, dir in fragment.ports.items():
new_fragment.add_ports(port, dir=dir)
def map_named_ports(self, fragment, new_fragment):
if hasattr(self, "on_value"):
for name, (value, dir) in fragment.named_ports.items():
@ -285,7 +281,6 @@ class FragmentTransformer:
new_fragment = Fragment(src_loc=fragment.src_loc)
new_fragment.flatten = fragment.flatten
new_fragment.attrs = OrderedDict(fragment.attrs)
self.map_ports(fragment, new_fragment)
self.map_subfragments(fragment, new_fragment)
self.map_domains(fragment, new_fragment)
self.map_statements(fragment, new_fragment)

View file

@ -70,8 +70,8 @@ class Simulator:
"a simulation engine name"
.format(engine))
self._fragment = Fragment.get(fragment, platform=None).prepare()
self._engine = engine(self._fragment)
self._design = Fragment.get(fragment, platform=None).prepare()
self._engine = engine(self._design)
self._clocked = set()
def _check_process(self, process):
@ -165,15 +165,15 @@ class Simulator:
in this case.
"""
if isinstance(domain, ClockDomain):
if (domain.name in self._fragment.domains and
domain is not self._fragment.domains[domain.name]):
if (domain.name in self._design.fragment.domains and
domain is not self._design.fragment.domains[domain.name]):
warnings.warn("Adding a clock process that drives a clock domain object "
"named {!r}, which is distinct from an identically named domain "
"in the simulated design"
.format(domain.name),
UserWarning, stacklevel=2)
elif domain in self._fragment.domains:
domain = self._fragment.domains[domain]
elif domain in self._design.fragment.domains:
domain = self._design.fragment.domains[domain]
elif if_exists:
return
else:

View file

@ -36,7 +36,7 @@ class _VCDWriter:
else:
raise NotImplementedError
def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()):
def __init__(self, design, *, vcd_file, gtkw_file=None, traces=()):
# Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that
# the simulator is still usable if it's not installed for some reason.
import vcd, vcd.gtkw
@ -65,14 +65,14 @@ class _VCDWriter:
signal_names = SignalDict()
memories = {}
for subfragment, subfragment_name in \
fragment._assign_names_to_fragments(hierarchy=("bench", "top",)).items():
for signal, signal_name in subfragment._assign_names_to_signals().items():
for fragment, fragment_name in design.fragment_names.items():
fragment_name = ("bench", *fragment_name)
for signal, signal_name in design.signal_names[fragment].items():
if signal not in signal_names:
signal_names[signal] = set()
signal_names[signal].add((*subfragment_name, signal_name))
if isinstance(subfragment, MemoryInstance):
memories[subfragment._identity] = (subfragment, subfragment_name)
signal_names[signal].add((*fragment_name, signal_name))
if isinstance(fragment, MemoryInstance):
memories[fragment._identity] = (fragment, fragment_name)
trace_names = SignalDict()
assigned_names = set()
@ -403,16 +403,16 @@ class _PySimulation(BaseSimulation):
class PySimEngine(BaseEngine):
def __init__(self, fragment):
def __init__(self, design):
self._state = _PySimulation()
self._timeline = self._state.timeline
self._fragment = fragment
self._processes = _FragmentCompiler(self._state)(self._fragment)
self._design = design
self._processes = _FragmentCompiler(self._state)(self._design.fragment)
self._vcd_writers = []
def add_coroutine_process(self, process, *, default_cmd):
self._processes.add(PyCoroProcess(self._state, self._fragment.domains, process,
self._processes.add(PyCoroProcess(self._state, self._design.fragment.domains, process,
default_cmd=default_cmd))
def add_clock_process(self, clock, *, phase, period):
@ -462,7 +462,7 @@ class PySimEngine(BaseEngine):
@contextmanager
def write_vcd(self, *, vcd_file, gtkw_file, traces):
vcd_writer = _VCDWriter(self._fragment,
vcd_writer = _VCDWriter(self._design,
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)
try:
self._vcd_writers.append(vcd_writer)

View file

@ -88,275 +88,295 @@ class FragmentPortsTestCase(FHDLTestCase):
def test_empty(self):
f = Fragment()
self.assertEqual(list(f.iter_ports()), [])
nl = build_netlist(f, ports=[])
self.assertRepr(nl, """
(
(module 0 None ('top'))
(cell 0 0 (top ))
)
""")
f._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f.ports, SignalDict([]))
def test_iter_signals(self):
f = Fragment()
f.add_ports(self.s1, self.s2, dir="io")
self.assertEqual(SignalSet((self.s1, self.s2)), f.iter_signals())
def test_self_contained(self):
def test_loopback(self):
f = Fragment()
f.add_statements(
"comb",
self.c1.eq(self.s1),
self.s1.eq(self.c1)
)
nl = build_netlist(f, ports=[self.c1, self.s1])
self.assertRepr(nl, """
(
(module 0 None ('top') (input 's1' 0.2) (output 'c1' 0.2))
(cell 0 0 (top (output 'c1' 0.2) (input 's1' 2:3)))
)
""")
f._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f.ports, SignalDict([]))
def test_subfragment_simple(self):
f1 = Fragment()
f2 = Fragment()
f2.add_statements(
"comb",
self.c1.eq(~self.s1),
)
f1.add_subfragment(f2, "f2")
nl = build_netlist(f1, ports=[self.c1, self.s1])
self.assertRepr(nl, """
(
(module 0 None ('top') (input 's1' 0.2) (output 'c1' 1.0))
(module 1 0 ('top' 'f2') (input 's1' 0.2) (output 'c1' 1.0))
(cell 0 0 (top (output 'c1' 1.0) (input 's1' 2:3)))
(cell 1 1 (~ 0.2))
)
""")
def test_infer_input(self):
def test_tree(self):
f = Fragment()
f.add_statements(
f1 = Fragment()
f.add_subfragment(f1, "f1")
f11 = Fragment()
f1.add_subfragment(f11, "f11")
f111 = Fragment()
f11.add_subfragment(f111, "f111")
f1111 = Fragment()
f111.add_subfragment(f1111, "f1111")
f12 = Fragment()
f1.add_subfragment(f12, "f12")
f13 = Fragment()
f1.add_subfragment(f13, "f13")
f131 = Fragment()
f13.add_subfragment(f131, "f131")
f2 = Fragment()
f.add_subfragment(f2, "f2")
f2.add_statements(
"comb",
self.c1.eq(self.s1)
self.s2.eq(~self.s1),
)
f131.add_statements(
"comb",
self.s3.eq(~self.s2),
Assert(~self.s1),
)
f12.add_statements(
"comb",
self.c1.eq(~self.s3),
)
f1111.add_statements(
"comb",
self.c2.eq(~self.s3),
Assert(self.s1),
)
f111.add_statements(
"comb",
self.c3.eq(~self.c2),
)
nl = build_netlist(f, ports=[self.c1, self.c2, self.c3, self.s1])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 's1' 0.2)
(output 'c1' 5.0)
(output 'c2' 2.0)
(output 'c3' 1.0))
(module 1 0 ('top' 'f1')
(input 'port$0$2' 0.2)
(output 'port$1$0' 1.0)
(output 'port$2$0' 2.0)
(output 'port$5$0' 5.0)
(input 'port$10$0' 10.0))
(module 2 1 ('top' 'f1' 'f11')
(input 'port$0$2' 0.2)
(output 'port$1$0' 1.0)
(output 'port$2$0' 2.0)
(input 'port$6$0' 6.0))
(module 3 2 ('top' 'f1' 'f11' 'f111')
(input 'port$0$2' 0.2)
(output 'c3' 1.0)
(output 'c2' 2.0)
(input 'port$6$0' 6.0))
(module 4 3 ('top' 'f1' 'f11' 'f111' 'f1111')
(input 's1' 0.2)
(output 'c2' 2.0)
(input 's3' 6.0))
(module 5 1 ('top' 'f1' 'f12')
(output 'c1' 5.0)
(input 's3' 6.0))
(module 6 1 ('top' 'f1' 'f13')
(input 'port$0$2' 0.2)
(output 'port$6$0' 6.0)
(input 'port$10$0' 10.0))
(module 7 6 ('top' 'f1' 'f13' 'f131')
(input 's1' 0.2)
(output 's3' 6.0)
(input 's2' 10.0))
(module 8 0 ('top' 'f2')
(input 's1' 0.2)
(output 's2' 10.0))
(cell 0 0 (top (output 'c1' 5.0) (output 'c2' 2.0) (output 'c3' 1.0) (input 's1' 2:3)))
(cell 1 3 (~ 2.0))
(cell 2 4 (~ 6.0))
(cell 3 4 (assignment_list 1'd0 (1 0:1 1'd1)))
(cell 4 4 (assert None 0.2 3.0))
(cell 5 5 (~ 6.0))
(cell 6 7 (~ 10.0))
(cell 7 7 (~ 0.2))
(cell 8 7 (assignment_list 1'd0 (1 0:1 1'd1)))
(cell 9 7 (assert None 7.0 8.0))
(cell 10 8 (~ 0.2))
)
""")
f._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f.ports, SignalDict([
(self.s1, "i")
]))
def test_request_output(self):
def test_port_dict(self):
f = Fragment()
f.add_statements(
"comb",
self.c1.eq(self.s1)
nl = build_netlist(f, ports={
"a": (self.s1, PortDirection.Output),
"b": (self.s2, PortDirection.Input),
"c": (self.s3, PortDirection.Inout),
})
self.assertRepr(nl, """
(
(module 0 None ('top') (input 'b' 0.2) (inout 'c' 0.3) (output 'a' 1'd0))
(cell 0 0 (top (output 'a' 1'd0) (input 'b' 2:3) (inout 'c' 3:4)))
)
""")
f._propagate_ports(ports=(self.c1,), all_undef_as_ports=True)
self.assertEqual(f.ports, SignalDict([
(self.s1, "i"),
(self.c1, "o")
]))
def test_input_in_subfragment(self):
f1 = Fragment()
f1.add_statements(
"comb",
self.c1.eq(self.s1)
)
f2 = Fragment()
f2.add_statements(
"comb",
self.s1.eq(0)
)
f1.add_subfragment(f2)
f1._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f1.ports, SignalDict())
self.assertEqual(f2.ports, SignalDict([
(self.s1, "o"),
]))
def test_input_only_in_subfragment(self):
f1 = Fragment()
f2 = Fragment()
f2.add_statements(
"comb",
self.c1.eq(self.s1)
)
f1.add_subfragment(f2)
f1._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f1.ports, SignalDict([
(self.s1, "i"),
]))
self.assertEqual(f2.ports, SignalDict([
(self.s1, "i"),
]))
def test_output_from_subfragment(self):
f1 = Fragment()
f1.add_statements(
"comb",
self.c1.eq(0)
)
f2 = Fragment()
f2.add_statements(
"comb",
self.c2.eq(1)
)
f1.add_subfragment(f2)
f1._propagate_ports(ports=(self.c2,), all_undef_as_ports=True)
self.assertEqual(f1.ports, SignalDict([
(self.c2, "o"),
]))
self.assertEqual(f2.ports, SignalDict([
(self.c2, "o"),
]))
def test_output_from_subfragment_2(self):
f1 = Fragment()
f1.add_statements(
"comb",
self.c1.eq(self.s1)
)
f2 = Fragment()
f2.add_statements(
"comb",
self.c2.eq(self.s1)
)
f1.add_subfragment(f2)
f3 = Fragment()
f3.add_statements(
"comb",
self.s1.eq(0)
)
f2.add_subfragment(f3)
f1._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f2.ports, SignalDict([
(self.s1, "o"),
]))
def test_input_output_sibling(self):
f1 = Fragment()
f2 = Fragment()
f2.add_statements(
"comb",
self.c1.eq(self.c2)
)
f1.add_subfragment(f2)
f3 = Fragment()
f3.add_statements(
"comb",
self.c2.eq(0)
)
f3.add_driver(self.c2)
f1.add_subfragment(f3)
f1._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f1.ports, SignalDict())
def test_output_input_sibling(self):
f1 = Fragment()
f2 = Fragment()
f2.add_statements(
"comb",
self.c2.eq(0)
)
f2.add_driver(self.c2)
f1.add_subfragment(f2)
f3 = Fragment()
f3.add_statements(
"comb",
self.c1.eq(self.c2)
)
f1.add_subfragment(f3)
f1._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f1.ports, SignalDict())
def test_input_cd(self):
sync = ClockDomain()
def test_port_domain(self):
f = Fragment()
f.add_statements(
"sync",
self.c1.eq(self.s1)
cd_sync = ClockDomain()
ctr = Signal(4)
f.add_domains(cd_sync)
f.add_driver(ctr, "sync")
f.add_statements("sync", ctr.eq(ctr + 1))
nl = build_netlist(f, ports=[
ClockSignal("sync"),
ResetSignal("sync"),
ctr,
])
self.assertRepr(nl, """
(
(module 0 None ('top') (input 'clk' 0.2) (input 'rst' 0.3) (output 'ctr' 5.0:4))
(cell 0 0 (top (output 'ctr' 5.0:4) (input 'clk' 2:3) (input 'rst' 3:4)))
(cell 1 0 (+ (cat 5.0:4 1'd0) 5'd1))
(cell 2 0 (matches 0.3 1))
(cell 3 0 (priority_match 1 2.0))
(cell 4 0 (assignment_list 5.0:4 (1 0:4 1.0:4) (3.0 0:4 4'd0)))
(cell 5 0 (flipflop 4.0:4 0 pos 0.2 0))
)
f.add_domains(sync)
f.add_driver(self.c1, "sync")
""")
f._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f.ports, SignalDict([
(self.s1, "i"),
(sync.clk, "i"),
(sync.rst, "i"),
]))
def test_input_cd_reset_less(self):
sync = ClockDomain(reset_less=True)
def test_port_autodomain(self):
f = Fragment()
f.add_statements(
"sync",
self.c1.eq(self.s1)
ctr = Signal(4)
f.add_driver(ctr, "sync")
f.add_statements("sync", ctr.eq(ctr + 1))
nl = build_netlist(f, ports=[ctr])
self.assertRepr(nl, """
(
(module 0 None ('top') (input 'clk' 0.2) (input 'rst' 0.3) (output 'ctr' 5.0:4))
(cell 0 0 (top (output 'ctr' 5.0:4) (input 'clk' 2:3) (input 'rst' 3:4)))
(cell 1 0 (+ (cat 5.0:4 1'd0) 5'd1))
(cell 2 0 (matches 0.3 1))
(cell 3 0 (priority_match 1 2.0))
(cell 4 0 (assignment_list 5.0:4 (1 0:4 1.0:4) (3.0 0:4 4'd0)))
(cell 5 0 (flipflop 4.0:4 0 pos 0.2 0))
)
f.add_domains(sync)
f.add_driver(self.c1, "sync")
""")
f._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f.ports, SignalDict([
(self.s1, "i"),
(sync.clk, "i"),
]))
def test_inout(self):
s = Signal()
def test_port_partial(self):
f = Fragment()
f1 = Fragment()
f2 = Instance("foo", io_x=s)
f1.add_subfragment(f2)
f.add_subfragment(f1, "f1")
a = Signal(4)
b = Signal(4)
c = Signal(3)
f1.add_driver(c)
f1.add_statements("comb", c.eq((a * b).shift_right(4)))
nl = build_netlist(f, ports=[a, b, c])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 'a' 0.2:6)
(input 'b' 0.6:10)
(output 'c' 1.4:7))
(module 1 0 ('top' 'f1')
(input 'a' 0.2:6)
(input 'b' 0.6:10)
(output 'c' 1.4:7))
(cell 0 0 (top
(output 'c' 1.4:7)
(input 'a' 2:6)
(input 'b' 6:10)))
(cell 1 1 (* (cat 0.2:6 4'd0) (cat 0.6:10 4'd0)))
)
""")
f1._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f1.ports, SignalDict([
(s, "io")
]))
def test_in_out_same_signal(self):
s = Signal()
f1 = Instance("foo", i_x=s, o_y=s)
f2 = Fragment()
f2.add_subfragment(f1)
f2._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f1.ports, SignalDict([
(s, "o")
]))
f3 = Instance("foo", o_y=s, i_x=s)
f4 = Fragment()
f4.add_subfragment(f3)
f4._propagate_ports(ports=(), all_undef_as_ports=True)
self.assertEqual(f3.ports, SignalDict([
(s, "o")
]))
def test_clk_rst(self):
sync = ClockDomain()
def test_port_instance(self):
f = Fragment()
f.add_domains(sync)
f = f.prepare(ports=(ClockSignal("sync"), ResetSignal("sync")))
self.assertEqual(f.ports, SignalDict([
(sync.clk, "i"),
(sync.rst, "i"),
]))
f1 = Fragment()
f.add_subfragment(f1, "f1")
a = Signal(4)
b = Signal(4)
c = Signal(4)
d = Signal(4)
f1.add_subfragment(Instance("t",
p_p = "meow",
a_a = True,
i_aa=a,
io_bb=b,
o_cc=c,
o_dd=d,
), "i")
nl = build_netlist(f, ports=[a, b, c, d])
self.assertRepr(nl, """
(
(module 0 None ('top')
(input 'a' 0.2:6)
(inout 'b' 0.6:10)
(output 'c' 1.0:4)
(output 'd' 1.4:8))
(module 1 0 ('top' 'f1')
(input 'port$0$2' 0.2:6)
(inout 'port$0$6' 0.6:10)
(output 'port$1$0' 1.0:4)
(output 'port$1$4' 1.4:8))
(cell 0 0 (top
(output 'c' 1.0:4)
(output 'd' 1.4:8)
(input 'a' 2:6)
(inout 'b' 6:10)))
(cell 1 1 (instance 't' 'i'
(param 'p' 'meow')
(attr 'a' True)
(input 'aa' 0.2:6)
(output 'cc' 0:4)
(output 'dd' 4:8)
(inout 'bb' 0.6:10)))
)
""")
def test_port_wrong(self):
f = Fragment()
a = Signal()
with self.assertRaisesRegex(TypeError,
r"^Only signals may be added as ports, not \(const 1'd1\)$"):
f.prepare(ports=(Const(1),))
build_netlist(f, ports=(Const(1),))
with self.assertRaisesRegex(TypeError,
r"^Port name must be a string, not 1$"):
build_netlist(f, ports={1: (a, PortDirection.Input)})
with self.assertRaisesRegex(TypeError,
r"^Port direction must be a `PortDirection` instance or None, not 'i'$"):
build_netlist(f, ports={"a": (a, "i")})
def test_port_not_iterable(self):
f = Fragment()
with self.assertRaisesRegex(TypeError,
r"^`ports` must be either a list or a tuple, not 1$"):
f.prepare(ports=1)
r"^`ports` must be a dict, a list or a tuple, not 1$"):
build_netlist(f, ports=1)
with self.assertRaisesRegex(TypeError,
(r"^`ports` must be either a list or a tuple, not \(const 1'd1\)"
(r"^`ports` must be a dict, a list or a tuple, not \(const 1'd1\)"
r" \(did you mean `ports=\(<signal>,\)`, rather than `ports=<signal>`\?\)$")):
f.prepare(ports=Const(1))
build_netlist(f, ports=Const(1))
class FragmentDomainsTestCase(FHDLTestCase):
def test_iter_signals(self):
cd1 = ClockDomain()
cd2 = ClockDomain(reset_less=True)
s1 = Signal()
s2 = Signal()
f = Fragment()
f.add_domains(cd1, cd2)
f.add_driver(s1, "cd1")
self.assertEqual(SignalSet((cd1.clk, cd1.rst, s1)), f.iter_signals())
f.add_driver(s2, "cd2")
self.assertEqual(SignalSet((cd1.clk, cd1.rst, cd2.clk, s1, s2)), f.iter_signals())
def test_propagate_up(self):
cd = ClockDomain()
@ -810,49 +830,17 @@ class InstanceTestCase(FHDLTestCase):
self.assertEqual(f.type, "cpu")
self.assertEqual(f.parameters, OrderedDict([("RESET", 0x1234)]))
self.assertEqual(list(f.named_ports.keys()), ["clk", "rst", "stb", "data", "pins"])
self.assertEqual(f.ports, SignalDict([]))
def test_prepare(self):
self.setUp_cpu()
f = self.wrap.prepare()
sync_clk = f.domains["sync"].clk
self.assertEqual(f.ports, SignalDict([
(sync_clk, "i"),
(self.rst, "i"),
(self.pins, "io"),
]))
def test_prepare_explicit_ports(self):
self.setUp_cpu()
f = self.wrap.prepare(ports=[self.rst, self.stb])
sync_clk = f.domains["sync"].clk
sync_rst = f.domains["sync"].rst
self.assertEqual(f.ports, SignalDict([
(sync_clk, "i"),
(sync_rst, "i"),
(self.rst, "i"),
(self.stb, "o"),
(self.pins, "io"),
]))
def test_prepare_slice_in_port(self):
s = Signal(2)
f = Fragment()
f.add_subfragment(Instance("foo", o_O=s[0]))
f.add_subfragment(Instance("foo", o_O=s[1]))
fp = f.prepare(ports=[s], missing_domain=lambda name: None)
self.assertEqual(fp.ports, SignalDict([
(s, "o"),
]))
def test_prepare_attrs(self):
self.setUp_cpu()
self.inst.attrs["ATTR"] = 1
f = self.inst.prepare()
self.assertEqual(f.attrs, OrderedDict([
design = self.inst.prepare()
self.assertEqual(design.fragment.attrs, OrderedDict([
("ATTR", 1),
]))
class NamesTestCase(FHDLTestCase):
def test_assign_names_to_signals(self):
i = Signal()
rst = Signal()
@ -864,8 +852,6 @@ class InstanceTestCase(FHDLTestCase):
f = Fragment()
f.add_domains(cd_sync := ClockDomain())
f.add_domains(cd_sync_norst := ClockDomain(reset_less=True))
f.add_ports((i, rst), dir="i")
f.add_ports((o1, o2, o3), dir="o")
f.add_statements("comb", [o1.eq(0)])
f.add_driver(o1, domain="comb")
f.add_statements("sync", [o2.eq(i1)])
@ -873,8 +859,15 @@ class InstanceTestCase(FHDLTestCase):
f.add_statements("sync_norst", [o3.eq(i1)])
f.add_driver(o3, domain="sync_norst")
names = f._assign_names_to_signals()
self.assertEqual(names, SignalDict([
ports = {
"i": (i, PortDirection.Input),
"rst": (rst, PortDirection.Input),
"o1": (o1, PortDirection.Output),
"o2": (o2, PortDirection.Output),
"o3": (o3, PortDirection.Output),
}
design = f.prepare(ports)
self.assertEqual(design.signal_names[design.fragment], SignalDict([
(i, "i"),
(rst, "rst"),
(o1, "o1"),
@ -891,8 +884,8 @@ class InstanceTestCase(FHDLTestCase):
f.add_subfragment(a := Fragment())
f.add_subfragment(b := Fragment(), name="b")
names = f._assign_names_to_fragments()
self.assertEqual(names, {
design = Design(f, ports=(), hierarchy=("top",))
self.assertEqual(design.fragment_names, {
f: ("top",),
a: ("top", "U$0"),
b: ("top", "b")
@ -903,8 +896,8 @@ class InstanceTestCase(FHDLTestCase):
f.add_subfragment(a := Fragment())
f.add_subfragment(b := Fragment(), name="b")
names = f._assign_names_to_fragments(hierarchy=("bench", "cpu"))
self.assertEqual(names, {
design = Design(f, ports=[], hierarchy=("bench", "cpu"))
self.assertEqual(design.fragment_names, {
f: ("bench", "cpu",),
a: ("bench", "cpu", "U$0"),
b: ("bench", "cpu", "b")
@ -913,10 +906,10 @@ class InstanceTestCase(FHDLTestCase):
def test_assign_names_to_fragments_collide_with_signal(self):
f = Fragment()
f.add_subfragment(a_f := Fragment(), name="a")
f.add_ports((a_s := Signal(name="a"),), dir="o")
a_s = Signal(name="a")
names = f._assign_names_to_fragments()
self.assertEqual(names, {
design = Design(f, ports=[("a", a_s, None)], hierarchy=("top",))
self.assertEqual(design.fragment_names, {
f: ("top",),
a_f: ("top", "a$U$0")
})