Rename nMigen to Amaranth HDL.

This commit is contained in:
whitequark 2021-12-10 05:39:50 +00:00
parent 0b28a97ca0
commit 909a3b8be7
200 changed files with 14493 additions and 14451 deletions

View file

@ -1,20 +1,7 @@
from .ast import Shape, unsigned, signed
from .ast import Value, Const, C, Mux, Cat, Repl, Array, Signal, ClockSignal, ResetSignal
from .dsl import Module
from .cd import ClockDomain
from .ir import Elaboratable, Fragment, Instance
from .mem import Memory
from .rec import Record
from .xfrm import DomainRenamer, ResetInserter, EnableInserter
from amaranth.hdl import *
from amaranth.hdl import __all__
__all__ = [
"Shape", "unsigned", "signed",
"Value", "Const", "C", "Mux", "Cat", "Repl", "Array", "Signal", "ClockSignal", "ResetSignal",
"Module",
"ClockDomain",
"Elaboratable", "Fragment", "Instance",
"Memory",
"Record",
"DomainRenamer", "ResetInserter", "EnableInserter",
]
import warnings
warnings.warn("instead of nmigen.hdl, use amaranth.hdl",
DeprecationWarning, stacklevel=2)

File diff suppressed because it is too large Load diff

View file

@ -1,84 +1,7 @@
from .. import tracer
from .ast import Signal
from amaranth.hdl.cd import *
from amaranth.hdl.cd import __all__
__all__ = ["ClockDomain", "DomainError"]
class DomainError(Exception):
pass
class ClockDomain:
"""Synchronous domain.
Parameters
----------
name : str or None
Domain name. If ``None`` (the default) the name is inferred from the variable name this
``ClockDomain`` is assigned to (stripping any `"cd_"` prefix).
reset_less : bool
If ``True``, the domain does not use a reset signal. Registers within this domain are
still all initialized to their reset state once, e.g. through Verilog `"initial"`
statements.
clk_edge : str
The edge of the clock signal on which signals are sampled. Must be one of "pos" or "neg".
async_reset : bool
If ``True``, the domain uses an asynchronous reset, and registers within this domain
are initialized to their reset state when reset level changes. Otherwise, registers
are initialized to reset state at the next clock cycle when reset is asserted.
local : bool
If ``True``, the domain will propagate only downwards in the design hierarchy. Otherwise,
the domain will propagate everywhere.
Attributes
----------
clk : Signal, inout
The clock for this domain. Can be driven or used to drive other signals (preferably
in combinatorial context).
rst : Signal or None, inout
Reset signal for this domain. Can be driven or used to drive.
"""
@staticmethod
def _name_for(domain_name, signal_name):
if domain_name == "sync":
return signal_name
else:
return "{}_{}".format(domain_name, signal_name)
def __init__(self, name=None, *, clk_edge="pos", reset_less=False, async_reset=False,
local=False):
if name is None:
try:
name = tracer.get_var_name()
except tracer.NameNotFound:
raise ValueError("Clock domain name must be specified explicitly")
if name.startswith("cd_"):
name = name[3:]
if name == "comb":
raise ValueError("Domain '{}' may not be clocked".format(name))
if clk_edge not in ("pos", "neg"):
raise ValueError("Domain clock edge must be one of 'pos' or 'neg', not {!r}"
.format(clk_edge))
self.name = name
self.clk = Signal(name=self._name_for(name, "clk"), src_loc_at=1)
self.clk_edge = clk_edge
if reset_less:
self.rst = None
else:
self.rst = Signal(name=self._name_for(name, "rst"), src_loc_at=1)
self.async_reset = async_reset
self.local = local
def rename(self, new_name):
self.name = new_name
self.clk.name = self._name_for(new_name, "clk")
if self.rst is not None:
self.rst.name = self._name_for(new_name, "rst")
import warnings
warnings.warn("instead of nmigen.hdl.cd, use amaranth.hdl.cd",
DeprecationWarning, stacklevel=2)

View file

@ -1,546 +1,7 @@
from collections import OrderedDict
from contextlib import contextmanager, _GeneratorContextManager
from functools import wraps
from enum import Enum
from amaranth.hdl.dsl import *
from amaranth.hdl.dsl import __all__
import warnings
from .._utils import flatten, bits_for
from .. import tracer
from .ast import *
from .ir import *
from .cd import *
from .xfrm import *
__all__ = ["SyntaxError", "SyntaxWarning", "Module"]
class SyntaxError(Exception):
pass
class SyntaxWarning(Warning):
pass
class _ModuleBuilderProxy:
def __init__(self, builder, depth):
object.__setattr__(self, "_builder", builder)
object.__setattr__(self, "_depth", depth)
class _ModuleBuilderDomain(_ModuleBuilderProxy):
def __init__(self, builder, depth, domain):
super().__init__(builder, depth)
self._domain = domain
def __iadd__(self, assigns):
self._builder._add_statement(assigns, domain=self._domain, depth=self._depth)
return self
class _ModuleBuilderDomains(_ModuleBuilderProxy):
def __getattr__(self, name):
if name == "submodules":
warnings.warn("Using '<module>.d.{}' would add statements to clock domain {!r}; "
"did you mean <module>.{} instead?"
.format(name, name, name),
SyntaxWarning, stacklevel=2)
if name == "comb":
domain = None
else:
domain = name
return _ModuleBuilderDomain(self._builder, self._depth, domain)
def __getitem__(self, name):
return self.__getattr__(name)
def __setattr__(self, name, value):
if name == "_depth":
object.__setattr__(self, name, value)
elif not isinstance(value, _ModuleBuilderDomain):
raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
.format(name, name))
def __setitem__(self, name, value):
return self.__setattr__(name, value)
class _ModuleBuilderRoot:
def __init__(self, builder, depth):
self._builder = builder
self.domain = self.d = _ModuleBuilderDomains(builder, depth)
def __getattr__(self, name):
if name in ("comb", "sync"):
raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
.format(type(self).__name__, name, name))
raise AttributeError("'{}' object has no attribute '{}'"
.format(type(self).__name__, name))
class _ModuleBuilderSubmodules:
def __init__(self, builder):
object.__setattr__(self, "_builder", builder)
def __iadd__(self, modules):
for module in flatten([modules]):
self._builder._add_submodule(module)
return self
def __setattr__(self, name, submodule):
self._builder._add_submodule(submodule, name)
def __setitem__(self, name, value):
return self.__setattr__(name, value)
def __getattr__(self, name):
return self._builder._get_submodule(name)
def __getitem__(self, name):
return self.__getattr__(name)
class _ModuleBuilderDomainSet:
def __init__(self, builder):
object.__setattr__(self, "_builder", builder)
def __iadd__(self, domains):
for domain in flatten([domains]):
if not isinstance(domain, ClockDomain):
raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
.format(domain))
self._builder._add_domain(domain)
return self
def __setattr__(self, name, domain):
if not isinstance(domain, ClockDomain):
raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
.format(domain))
if domain.name != name:
raise NameError("Clock domain name {!r} must match name in `m.domains.{} += ...` "
"syntax"
.format(domain.name, name))
self._builder._add_domain(domain)
# It's not particularly clean to depend on an internal interface, but, unfortunately, __bool__
# must be defined on a class to be called during implicit conversion.
class _GuardedContextManager(_GeneratorContextManager):
def __init__(self, keyword, func, args, kwds):
self.keyword = keyword
return super().__init__(func, args, kwds)
def __bool__(self):
raise SyntaxError("`if m.{kw}(...):` does not work; use `with m.{kw}(...)`"
.format(kw=self.keyword))
def _guardedcontextmanager(keyword):
def decorator(func):
@wraps(func)
def helper(*args, **kwds):
return _GuardedContextManager(keyword, func, args, kwds)
return helper
return decorator
class FSM:
def __init__(self, state, encoding, decoding):
self.state = state
self.encoding = encoding
self.decoding = decoding
def ongoing(self, name):
if name not in self.encoding:
self.encoding[name] = len(self.encoding)
return Operator("==", [self.state, self.encoding[name]], src_loc_at=0)
class Module(_ModuleBuilderRoot, Elaboratable):
@classmethod
def __init_subclass__(cls):
raise SyntaxError("Instead of inheriting from `Module`, inherit from `Elaboratable` "
"and return a `Module` from the `elaborate(self, platform)` method")
def __init__(self):
_ModuleBuilderRoot.__init__(self, self, depth=0)
self.submodules = _ModuleBuilderSubmodules(self)
self.domains = _ModuleBuilderDomainSet(self)
self._statements = Statement.cast([])
self._ctrl_context = None
self._ctrl_stack = []
self._driving = SignalDict()
self._named_submodules = {}
self._anon_submodules = []
self._domains = {}
self._generated = {}
def _check_context(self, construct, context):
if self._ctrl_context != context:
if self._ctrl_context is None:
raise SyntaxError("{} is not permitted outside of {}"
.format(construct, context))
else:
if self._ctrl_context == "Switch":
secondary_context = "Case"
if self._ctrl_context == "FSM":
secondary_context = "State"
raise SyntaxError("{} is not permitted directly inside of {}; it is permitted "
"inside of {} {}"
.format(construct, self._ctrl_context,
self._ctrl_context, secondary_context))
def _get_ctrl(self, name):
if self._ctrl_stack:
top_name, top_data = self._ctrl_stack[-1]
if top_name == name:
return top_data
def _flush_ctrl(self):
while len(self._ctrl_stack) > self.domain._depth:
self._pop_ctrl()
def _set_ctrl(self, name, data):
self._flush_ctrl()
self._ctrl_stack.append((name, data))
return data
def _check_signed_cond(self, cond):
cond = Value.cast(cond)
width, signed = cond.shape()
if signed:
warnings.warn("Signed values in If/Elif conditions usually result from inverting "
"Python booleans with ~, which leads to unexpected results. "
"Replace `~flag` with `not flag`. (If this is a false positive, "
"silence this warning with `m.If(x)` → `m.If(x.bool())`.)",
SyntaxWarning, stacklevel=4)
return cond
@_guardedcontextmanager("If")
def If(self, cond):
self._check_context("If", context=None)
cond = self._check_signed_cond(cond)
src_loc = tracer.get_src_loc(src_loc_at=1)
if_data = self._set_ctrl("If", {
"depth": self.domain._depth,
"tests": [],
"bodies": [],
"src_loc": src_loc,
"src_locs": [],
})
try:
_outer_case, self._statements = self._statements, []
self.domain._depth += 1
yield
self._flush_ctrl()
if_data["tests"].append(cond)
if_data["bodies"].append(self._statements)
if_data["src_locs"].append(src_loc)
finally:
self.domain._depth -= 1
self._statements = _outer_case
@_guardedcontextmanager("Elif")
def Elif(self, cond):
self._check_context("Elif", context=None)
cond = self._check_signed_cond(cond)
src_loc = tracer.get_src_loc(src_loc_at=1)
if_data = self._get_ctrl("If")
if if_data is None or if_data["depth"] != self.domain._depth:
raise SyntaxError("Elif without preceding If")
try:
_outer_case, self._statements = self._statements, []
self.domain._depth += 1
yield
self._flush_ctrl()
if_data["tests"].append(cond)
if_data["bodies"].append(self._statements)
if_data["src_locs"].append(src_loc)
finally:
self.domain._depth -= 1
self._statements = _outer_case
@_guardedcontextmanager("Else")
def Else(self):
self._check_context("Else", context=None)
src_loc = tracer.get_src_loc(src_loc_at=1)
if_data = self._get_ctrl("If")
if if_data is None or if_data["depth"] != self.domain._depth:
raise SyntaxError("Else without preceding If/Elif")
try:
_outer_case, self._statements = self._statements, []
self.domain._depth += 1
yield
self._flush_ctrl()
if_data["bodies"].append(self._statements)
if_data["src_locs"].append(src_loc)
finally:
self.domain._depth -= 1
self._statements = _outer_case
self._pop_ctrl()
@contextmanager
def Switch(self, test):
self._check_context("Switch", context=None)
switch_data = self._set_ctrl("Switch", {
"test": Value.cast(test),
"cases": OrderedDict(),
"src_loc": tracer.get_src_loc(src_loc_at=1),
"case_src_locs": {},
})
try:
self._ctrl_context = "Switch"
self.domain._depth += 1
yield
finally:
self.domain._depth -= 1
self._ctrl_context = None
self._pop_ctrl()
@contextmanager
def Case(self, *patterns):
self._check_context("Case", context="Switch")
src_loc = tracer.get_src_loc(src_loc_at=1)
switch_data = self._get_ctrl("Switch")
new_patterns = ()
for pattern in patterns:
if not isinstance(pattern, (int, str, Enum)):
raise SyntaxError("Case pattern must be an integer, a string, or an enumeration, "
"not {!r}"
.format(pattern))
if isinstance(pattern, str) and any(bit not in "01- \t" for bit in pattern):
raise SyntaxError("Case pattern '{}' must consist of 0, 1, and - (don't care) "
"bits, and may include whitespace"
.format(pattern))
if (isinstance(pattern, str) and
len("".join(pattern.split())) != len(switch_data["test"])):
raise SyntaxError("Case pattern '{}' must have the same width as switch value "
"(which is {})"
.format(pattern, len(switch_data["test"])))
if isinstance(pattern, int) and bits_for(pattern) > len(switch_data["test"]):
warnings.warn("Case pattern '{:b}' is wider than switch value "
"(which has width {}); comparison will never be true"
.format(pattern, len(switch_data["test"])),
SyntaxWarning, stacklevel=3)
continue
if isinstance(pattern, Enum) and bits_for(pattern.value) > len(switch_data["test"]):
warnings.warn("Case pattern '{:b}' ({}.{}) is wider than switch value "
"(which has width {}); comparison will never be true"
.format(pattern.value, pattern.__class__.__name__, pattern.name,
len(switch_data["test"])),
SyntaxWarning, stacklevel=3)
continue
new_patterns = (*new_patterns, pattern)
try:
_outer_case, self._statements = self._statements, []
self._ctrl_context = None
yield
self._flush_ctrl()
# If none of the provided cases can possibly be true, omit this branch completely.
# This needs to be differentiated from no cases being provided in the first place,
# which means the branch will always match.
if not (patterns and not new_patterns):
switch_data["cases"][new_patterns] = self._statements
switch_data["case_src_locs"][new_patterns] = src_loc
finally:
self._ctrl_context = "Switch"
self._statements = _outer_case
def Default(self):
return self.Case()
@contextmanager
def FSM(self, reset=None, domain="sync", name="fsm"):
self._check_context("FSM", context=None)
if domain == "comb":
raise ValueError("FSM may not be driven by the '{}' domain".format(domain))
fsm_data = self._set_ctrl("FSM", {
"name": name,
"signal": Signal(name="{}_state".format(name), src_loc_at=2),
"reset": reset,
"domain": domain,
"encoding": OrderedDict(),
"decoding": OrderedDict(),
"states": OrderedDict(),
"src_loc": tracer.get_src_loc(src_loc_at=1),
"state_src_locs": {},
})
self._generated[name] = fsm = \
FSM(fsm_data["signal"], fsm_data["encoding"], fsm_data["decoding"])
try:
self._ctrl_context = "FSM"
self.domain._depth += 1
yield fsm
for state_name in fsm_data["encoding"]:
if state_name not in fsm_data["states"]:
raise NameError("FSM state '{}' is referenced but not defined"
.format(state_name))
finally:
self.domain._depth -= 1
self._ctrl_context = None
self._pop_ctrl()
@contextmanager
def State(self, name):
self._check_context("FSM State", context="FSM")
src_loc = tracer.get_src_loc(src_loc_at=1)
fsm_data = self._get_ctrl("FSM")
if name in fsm_data["states"]:
raise NameError("FSM state '{}' is already defined".format(name))
if name not in fsm_data["encoding"]:
fsm_data["encoding"][name] = len(fsm_data["encoding"])
try:
_outer_case, self._statements = self._statements, []
self._ctrl_context = None
yield
self._flush_ctrl()
fsm_data["states"][name] = self._statements
fsm_data["state_src_locs"][name] = src_loc
finally:
self._ctrl_context = "FSM"
self._statements = _outer_case
@property
def next(self):
raise SyntaxError("Only assignment to `m.next` is permitted")
@next.setter
def next(self, name):
if self._ctrl_context != "FSM":
for level, (ctrl_name, ctrl_data) in enumerate(reversed(self._ctrl_stack)):
if ctrl_name == "FSM":
if name not in ctrl_data["encoding"]:
ctrl_data["encoding"][name] = len(ctrl_data["encoding"])
self._add_statement(
assigns=[ctrl_data["signal"].eq(ctrl_data["encoding"][name])],
domain=ctrl_data["domain"],
depth=len(self._ctrl_stack))
return
raise SyntaxError("`m.next = <...>` is only permitted inside an FSM state")
def _pop_ctrl(self):
name, data = self._ctrl_stack.pop()
src_loc = data["src_loc"]
if name == "If":
if_tests, if_bodies = data["tests"], data["bodies"]
if_src_locs = data["src_locs"]
tests, cases = [], OrderedDict()
for if_test, if_case in zip(if_tests + [None], if_bodies):
if if_test is not None:
if len(if_test) != 1:
if_test = if_test.bool()
tests.append(if_test)
if if_test is not None:
match = ("1" + "-" * (len(tests) - 1)).rjust(len(if_tests), "-")
else:
match = None
cases[match] = if_case
self._statements.append(Switch(Cat(tests), cases,
src_loc=src_loc, case_src_locs=dict(zip(cases, if_src_locs))))
if name == "Switch":
switch_test, switch_cases = data["test"], data["cases"]
switch_case_src_locs = data["case_src_locs"]
self._statements.append(Switch(switch_test, switch_cases,
src_loc=src_loc, case_src_locs=switch_case_src_locs))
if name == "FSM":
fsm_signal, fsm_reset, fsm_encoding, fsm_decoding, fsm_states = \
data["signal"], data["reset"], data["encoding"], data["decoding"], data["states"]
fsm_state_src_locs = data["state_src_locs"]
if not fsm_states:
return
fsm_signal.width = bits_for(len(fsm_encoding) - 1)
if fsm_reset is None:
fsm_signal.reset = fsm_encoding[next(iter(fsm_states))]
else:
fsm_signal.reset = fsm_encoding[fsm_reset]
# The FSM is encoded such that the state with encoding 0 is always the reset state.
fsm_decoding.update((n, s) for s, n in fsm_encoding.items())
fsm_signal.decoder = lambda n: "{}/{}".format(fsm_decoding[n], n)
self._statements.append(Switch(fsm_signal,
OrderedDict((fsm_encoding[name], stmts) for name, stmts in fsm_states.items()),
src_loc=src_loc, case_src_locs={fsm_encoding[name]: fsm_state_src_locs[name]
for name in fsm_states}))
def _add_statement(self, assigns, domain, depth, compat_mode=False):
def domain_name(domain):
if domain is None:
return "comb"
else:
return domain
while len(self._ctrl_stack) > self.domain._depth:
self._pop_ctrl()
for stmt in Statement.cast(assigns):
if not compat_mode and not isinstance(stmt, (Assign, Assert, Assume, Cover)):
raise SyntaxError(
"Only assignments and property checks may be appended to d.{}"
.format(domain_name(domain)))
stmt._MustUse__used = True
stmt = SampleDomainInjector(domain)(stmt)
for signal in stmt._lhs_signals():
if signal not in self._driving:
self._driving[signal] = domain
elif self._driving[signal] != domain:
cd_curr = self._driving[signal]
raise SyntaxError(
"Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
"already driven from d.{}"
.format(signal, domain_name(domain), domain_name(cd_curr)))
self._statements.append(stmt)
def _add_submodule(self, submodule, name=None):
if not hasattr(submodule, "elaborate"):
raise TypeError("Trying to add {!r}, which does not implement .elaborate(), as "
"a submodule".format(submodule))
if name == None:
self._anon_submodules.append(submodule)
else:
if name in self._named_submodules:
raise NameError("Submodule named '{}' already exists".format(name))
self._named_submodules[name] = submodule
def _get_submodule(self, name):
if name in self._named_submodules:
return self._named_submodules[name]
else:
raise AttributeError("No submodule named '{}' exists".format(name))
def _add_domain(self, cd):
if cd.name in self._domains:
raise NameError("Clock domain named '{}' already exists".format(cd.name))
self._domains[cd.name] = cd
def _flush(self):
while self._ctrl_stack:
self._pop_ctrl()
def elaborate(self, platform):
self._flush()
fragment = Fragment()
for name in self._named_submodules:
fragment.add_subfragment(Fragment.get(self._named_submodules[name], platform), name)
for submodule in self._anon_submodules:
fragment.add_subfragment(Fragment.get(submodule, platform), None)
statements = SampleDomainInjector("sync")(self._statements)
fragment.add_statements(statements)
for signal, domain in self._driving.items():
fragment.add_driver(signal, domain)
fragment.add_domains(self._domains.values())
fragment.generated.update(self._generated)
return fragment
warnings.warn("instead of nmigen.hdl.dsl, use amaranth.hdl.dsl",
DeprecationWarning, stacklevel=2)

View file

@ -1,592 +1,7 @@
from abc import ABCMeta
from collections import defaultdict, OrderedDict
from functools import reduce
from amaranth.hdl.ir import *
from amaranth.hdl.ir import __all__
import warnings
from .._utils import *
from .._unused import *
from .ast import *
from .cd import *
__all__ = ["UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance"]
class UnusedElaboratable(UnusedMustUse):
pass
class Elaboratable(MustUse, metaclass=ABCMeta):
_MustUse__warning = UnusedElaboratable
class DriverConflict(UserWarning):
pass
class Fragment:
@staticmethod
def get(obj, platform):
code = None
while True:
if isinstance(obj, Fragment):
return obj
elif isinstance(obj, Elaboratable):
code = obj.elaborate.__code__
obj._MustUse__used = True
obj = obj.elaborate(platform)
elif hasattr(obj, "elaborate"):
warnings.warn(
message="Class {!r} is an elaboratable that does not explicitly inherit from "
"Elaboratable; doing so would improve diagnostics"
.format(type(obj)),
category=RuntimeWarning,
stacklevel=2)
code = obj.elaborate.__code__
obj = obj.elaborate(platform)
else:
raise AttributeError("Object {!r} cannot be elaborated".format(obj))
if obj is None and code is not None:
warnings.warn_explicit(
message=".elaborate() returned None; missing return statement?",
category=UserWarning,
filename=code.co_filename,
lineno=code.co_firstlineno)
def __init__(self):
self.ports = SignalDict()
self.drivers = OrderedDict()
self.statements = []
self.domains = OrderedDict()
self.subfragments = []
self.attrs = OrderedDict()
self.generated = OrderedDict()
self.flatten = False
def add_ports(self, *ports, dir):
assert dir in ("i", "o", "io")
for port in flatten(ports):
self.ports[port] = dir
def iter_ports(self, dir=None):
if dir is None:
yield from self.ports
else:
for port, port_dir in self.ports.items():
if port_dir == dir:
yield port
def add_driver(self, signal, domain=None):
if domain not in self.drivers:
self.drivers[domain] = SignalSet()
self.drivers[domain].add(signal)
def iter_drivers(self):
for domain, signals in self.drivers.items():
for signal in signals:
yield domain, signal
def iter_comb(self):
if None in self.drivers:
yield from self.drivers[None]
def iter_sync(self):
for domain, signals in self.drivers.items():
if domain is None:
continue
for signal in signals:
yield domain, signal
def iter_signals(self):
signals = SignalSet()
signals |= self.ports.keys()
for domain, domain_signals in self.drivers.items():
if domain is not None:
cd = self.domains[domain]
signals.add(cd.clk)
if cd.rst is not None:
signals.add(cd.rst)
signals |= domain_signals
return signals
def add_domains(self, *domains):
for domain in flatten(domains):
assert isinstance(domain, ClockDomain)
assert domain.name not in self.domains
self.domains[domain.name] = domain
def iter_domains(self):
yield from self.domains
def add_statements(self, *stmts):
for stmt in Statement.cast(stmts):
stmt._MustUse__used = True
self.statements.append(stmt)
def add_subfragment(self, subfragment, name=None):
assert isinstance(subfragment, Fragment)
self.subfragments.append((subfragment, name))
def find_subfragment(self, name_or_index):
if isinstance(name_or_index, int):
if name_or_index < len(self.subfragments):
subfragment, name = self.subfragments[name_or_index]
return subfragment
raise NameError("No subfragment at index #{}".format(name_or_index))
else:
for subfragment, name in self.subfragments:
if name == name_or_index:
return subfragment
raise NameError("No subfragment with name '{}'".format(name_or_index))
def find_generated(self, *path):
if len(path) > 1:
path_component, *path = path
return self.find_subfragment(path_component).find_generated(*path)
else:
item, = path
return self.generated[item]
def elaborate(self, platform):
return self
def _merge_subfragment(self, subfragment):
# Merge subfragment's everything except clock domains into this fragment.
# Flattening is done after clock domain propagation, so we can assume the domains
# are already the same in every involved fragment in the first place.
self.ports.update(subfragment.ports)
for domain, signal in subfragment.iter_drivers():
self.add_driver(signal, domain)
self.statements += subfragment.statements
self.subfragments += subfragment.subfragments
# Remove the merged subfragment.
found = False
for i, (check_subfrag, check_name) in enumerate(self.subfragments): # :nobr:
if subfragment == check_subfrag:
del self.subfragments[i]
found = True
break
assert found
def _resolve_hierarchy_conflicts(self, hierarchy=("top",), mode="warn"):
assert mode in ("silent", "warn", "error")
driver_subfrags = SignalDict()
memory_subfrags = OrderedDict()
def add_subfrag(registry, entity, entry):
# Because of missing domain insertion, at the point when this code runs, we have
# a mixture of bound and unbound {Clock,Reset}Signals. Map the bound ones to
# the actual signals (because the signal itself can be driven as well); but leave
# the unbound ones as it is, because there's no concrete signal for it yet anyway.
if isinstance(entity, ClockSignal) and entity.domain in self.domains:
entity = self.domains[entity.domain].clk
elif isinstance(entity, ResetSignal) and entity.domain in self.domains:
entity = self.domains[entity.domain].rst
if entity not in registry:
registry[entity] = set()
registry[entity].add(entry)
# For each signal driven by this fragment and/or its subfragments, determine which
# subfragments also drive it.
for domain, signal in self.iter_drivers():
add_subfrag(driver_subfrags, signal, (None, hierarchy))
flatten_subfrags = set()
for i, (subfrag, name) in enumerate(self.subfragments):
if name is None:
name = "<unnamed #{}>".format(i)
subfrag_hierarchy = hierarchy + (name,)
if subfrag.flatten:
# Always flatten subfragments that explicitly request it.
flatten_subfrags.add((subfrag, subfrag_hierarchy))
if isinstance(subfrag, Instance):
# For memories (which are subfragments, but semantically a part of superfragment),
# record that this fragment is driving it.
if subfrag.type in ("$memrd", "$memwr"):
memory = subfrag.parameters["MEMID"]
add_subfrag(memory_subfrags, memory, (None, hierarchy))
# Never flatten instances.
continue
# First, recurse into subfragments and let them detect driver conflicts as well.
subfrag_drivers, subfrag_memories = \
subfrag._resolve_hierarchy_conflicts(subfrag_hierarchy, mode)
# Second, classify subfragments by signals they drive and memories they use.
for signal in subfrag_drivers:
add_subfrag(driver_subfrags, signal, (subfrag, subfrag_hierarchy))
for memory in subfrag_memories:
add_subfrag(memory_subfrags, memory, (subfrag, subfrag_hierarchy))
# Find out the set of subfragments that needs to be flattened into this fragment
# to resolve driver-driver conflicts.
def flatten_subfrags_if_needed(subfrags):
if len(subfrags) == 1:
return []
flatten_subfrags.update((f, h) for f, h in subfrags if f is not None)
return list(sorted(".".join(h) for f, h in subfrags))
for signal, subfrags in driver_subfrags.items():
subfrag_names = flatten_subfrags_if_needed(subfrags)
if not subfrag_names:
continue
# While we're at it, show a message.
message = ("Signal '{}' is driven from multiple fragments: {}"
.format(signal, ", ".join(subfrag_names)))
if mode == "error":
raise DriverConflict(message)
elif mode == "warn":
message += "; hierarchy will be flattened"
warnings.warn_explicit(message, DriverConflict, *signal.src_loc)
for memory, subfrags in memory_subfrags.items():
subfrag_names = flatten_subfrags_if_needed(subfrags)
if not subfrag_names:
continue
# While we're at it, show a message.
message = ("Memory '{}' is accessed from multiple fragments: {}"
.format(memory.name, ", ".join(subfrag_names)))
if mode == "error":
raise DriverConflict(message)
elif mode == "warn":
message += "; hierarchy will be flattened"
warnings.warn_explicit(message, DriverConflict, *memory.src_loc)
# Flatten hierarchy.
for subfrag, subfrag_hierarchy in sorted(flatten_subfrags, key=lambda x: x[1]):
self._merge_subfragment(subfrag)
# If we flattened anything, we might be in a situation where we have a driver conflict
# again, e.g. if we had a tree of fragments like A --- B --- C where only fragments
# A and C were driving a signal S. In that case, since B is not driving S itself,
# processing B will not result in any flattening, but since B is transitively driving S,
# processing A will flatten B into it. Afterwards, we have a tree like AB --- C, which
# has another conflict.
if any(flatten_subfrags):
# Try flattening again.
return self._resolve_hierarchy_conflicts(hierarchy, mode)
# Nothing was flattened, we're done!
return (SignalSet(driver_subfrags.keys()),
set(memory_subfrags.keys()))
def _propagate_domains_up(self, hierarchy=("top",)):
from .xfrm import DomainRenamer
domain_subfrags = defaultdict(lambda: set())
# For each domain defined by a subfragment, determine which subfragments define it.
for i, (subfrag, name) in enumerate(self.subfragments):
# First, recurse into subfragments and let them propagate domains up as well.
hier_name = name
if hier_name is None:
hier_name = "<unnamed #{}>".format(i)
subfrag._propagate_domains_up(hierarchy + (hier_name,))
# Second, classify subfragments by domains they define.
for domain_name, domain in subfrag.domains.items():
if domain.local:
continue
domain_subfrags[domain_name].add((subfrag, name, i))
# For each domain defined by more than one subfragment, rename the domain in each
# of the subfragments such that they no longer conflict.
for domain_name, subfrags in domain_subfrags.items():
if len(subfrags) == 1:
continue
names = [n for f, n, i in subfrags]
if not all(names):
names = sorted("<unnamed #{}>".format(i) if n is None else "'{}'".format(n)
for f, n, i in subfrags)
raise DomainError("Domain '{}' is defined by subfragments {} of fragment '{}'; "
"it is necessary to either rename subfragment domains "
"explicitly, or give names to subfragments"
.format(domain_name, ", ".join(names), ".".join(hierarchy)))
if len(names) != len(set(names)):
names = sorted("#{}".format(i) for f, n, i in subfrags)
raise DomainError("Domain '{}' is defined by subfragments {} of fragment '{}', "
"some of which have identical names; it is necessary to either "
"rename subfragment domains explicitly, or give distinct names "
"to subfragments"
.format(domain_name, ", ".join(names), ".".join(hierarchy)))
for subfrag, name, i in subfrags:
domain_name_map = {domain_name: "{}_{}".format(name, domain_name)}
self.subfragments[i] = (DomainRenamer(domain_name_map)(subfrag), name)
# Finally, collect the (now unique) subfragment domains, and merge them into our domains.
for subfrag, name in self.subfragments:
for domain_name, domain in subfrag.domains.items():
if domain.local:
continue
self.add_domains(domain)
def _propagate_domains_down(self):
# For each domain defined in this fragment, ensure it also exists in all subfragments.
for subfrag, name in self.subfragments:
for domain in self.iter_domains():
if domain in subfrag.domains:
assert self.domains[domain] is subfrag.domains[domain]
else:
subfrag.add_domains(self.domains[domain])
subfrag._propagate_domains_down()
def _create_missing_domains(self, missing_domain, *, platform=None):
from .xfrm import DomainCollector
collector = DomainCollector()
collector(self)
new_domains = []
for domain_name in collector.used_domains - collector.defined_domains:
if domain_name is None:
continue
value = missing_domain(domain_name)
if value is None:
raise DomainError("Domain '{}' is used but not defined".format(domain_name))
if type(value) is ClockDomain:
self.add_domains(value)
# And expose ports on the newly added clock domain, since it is added directly
# and there was no chance to add any logic driving it.
new_domains.append(value)
else:
new_fragment = Fragment.get(value, platform=platform)
if domain_name not in new_fragment.domains:
defined = new_fragment.domains.keys()
raise DomainError(
"Fragment returned by missing domain callback does not define "
"requested domain '{}' (defines {})."
.format(domain_name, ", ".join("'{}'".format(n) for n in defined)))
self.add_subfragment(new_fragment, "cd_{}".format(domain_name))
self.add_domains(new_fragment.domains.values())
return new_domains
def _propagate_domains(self, missing_domain, *, platform=None):
self._propagate_domains_up()
self._propagate_domains_down()
self._resolve_hierarchy_conflicts()
new_domains = self._create_missing_domains(missing_domain, platform=platform)
self._propagate_domains_down()
return new_domains
def _prepare_use_def_graph(self, parent, level, uses, defs, ios, top):
def add_uses(*sigs, self=self):
for sig in flatten(sigs):
if sig not in uses:
uses[sig] = set()
uses[sig].add(self)
def add_defs(*sigs):
for sig in flatten(sigs):
if sig not in defs:
defs[sig] = self
else:
assert defs[sig] is self
def add_io(*sigs):
for sig in flatten(sigs):
if sig not in ios:
ios[sig] = self
else:
assert ios[sig] is self
# Collect all signals we're driving (on LHS of statements), and signals we're using
# (on RHS of statements, or in clock domains).
for stmt in self.statements:
add_uses(stmt._rhs_signals())
add_defs(stmt._lhs_signals())
for domain, _ in self.iter_sync():
cd = self.domains[domain]
add_uses(cd.clk)
if cd.rst is not None:
add_uses(cd.rst)
# Repeat for subfragments.
for subfrag, name in self.subfragments:
if isinstance(subfrag, Instance):
for port_name, (value, dir) in subfrag.named_ports.items():
if dir == "i":
# Prioritize defs over uses.
rhs_without_outputs = value._rhs_signals() - subfrag.iter_ports(dir="o")
subfrag.add_ports(rhs_without_outputs, dir=dir)
add_uses(value._rhs_signals())
if dir == "o":
subfrag.add_ports(value._lhs_signals(), dir=dir)
add_defs(value._lhs_signals())
if dir == "io":
subfrag.add_ports(value._lhs_signals(), dir=dir)
add_io(value._lhs_signals())
else:
parent[subfrag] = self
level [subfrag] = level[self] + 1
subfrag._prepare_use_def_graph(parent, level, uses, defs, ios, top)
def _propagate_ports(self, ports, all_undef_as_ports):
# Take this fragment graph:
#
# __ B (def: q, use: p r)
# /
# A (def: p, use: q r)
# \
# \_ C (def: r, use: p q)
#
# We need to consider three cases.
# 1. Signal p requires an input port in B;
# 2. Signal r requires an output port in C;
# 3. Signal r requires an output port in C and an input port in B.
#
# Adding these ports can be in general done in three steps for each signal:
# 1. Find the least common ancestor of all uses and defs.
# 2. Going upwards from the single def, add output ports.
# 3. Going upwards from all uses, add input ports.
parent = {self: None}
level = {self: 0}
uses = SignalDict()
defs = SignalDict()
ios = SignalDict()
self._prepare_use_def_graph(parent, level, uses, defs, ios, self)
ports = SignalSet(ports)
if all_undef_as_ports:
for sig in uses:
if sig in defs:
continue
ports.add(sig)
for sig in ports:
if sig not in uses:
uses[sig] = set()
uses[sig].add(self)
@memoize
def lca_of(fragu, fragv):
# Normalize fragu to be deeper than fragv.
if level[fragu] < level[fragv]:
fragu, fragv = fragv, fragu
# Find ancestor of fragu on the same level as fragv.
for _ in range(level[fragu] - level[fragv]):
fragu = parent[fragu]
# If fragv was the ancestor of fragv, we're done.
if fragu == fragv:
return fragu
# Otherwise, they are at the same level but in different branches. Step both fragu
# and fragv until we find the common ancestor.
while parent[fragu] != parent[fragv]:
fragu = parent[fragu]
fragv = parent[fragv]
return parent[fragu]
for sig in uses:
if sig in defs:
lca = reduce(lca_of, uses[sig], defs[sig])
else:
lca = reduce(lca_of, uses[sig])
for frag in uses[sig]:
if sig in defs and frag is defs[sig]:
continue
while frag != lca:
frag.add_ports(sig, dir="i")
frag = parent[frag]
if sig in defs:
frag = defs[sig]
while frag != lca:
frag.add_ports(sig, dir="o")
frag = parent[frag]
for sig in ios:
frag = ios[sig]
while frag is not None:
frag.add_ports(sig, dir="io")
frag = parent[frag]
for sig in ports:
if sig in ios:
continue
if sig in defs:
self.add_ports(sig, dir="o")
else:
self.add_ports(sig, dir="i")
def prepare(self, ports=None, missing_domain=lambda name: ClockDomain(name)):
from .xfrm import SampleLowerer, DomainLowerer
fragment = SampleLowerer()(self)
new_domains = fragment._propagate_domains(missing_domain)
fragment = DomainLowerer()(fragment)
if ports is None:
fragment._propagate_ports(ports=(), all_undef_as_ports=True)
else:
if not isinstance(ports, tuple) and not isinstance(ports, list):
msg = "`ports` must be either a list or a tuple, not {!r}"\
.format(ports)
if isinstance(ports, Value):
msg += " (did you mean `ports=(<signal>,)`, rather than `ports=<signal>`?)"
raise TypeError(msg)
mapped_ports = []
# Lower late bound signals like ClockSignal() to ports.
port_lowerer = DomainLowerer(fragment.domains)
for port in ports:
if not isinstance(port, (Signal, ClockSignal, ResetSignal)):
raise TypeError("Only signals may be added as ports, not {!r}"
.format(port))
mapped_ports.append(port_lowerer.on_value(port))
# Add ports for all newly created missing clock domains, since not doing so defeats
# the purpose of domain auto-creation. (It's possible to refer to these ports before
# the domain actually exists through late binding, but it's inconvenient.)
for cd in new_domains:
mapped_ports.append(cd.clk)
if cd.rst is not None:
mapped_ports.append(cd.rst)
fragment._propagate_ports(ports=mapped_ports, all_undef_as_ports=False)
return fragment
class Instance(Fragment):
def __init__(self, type, *args, **kwargs):
super().__init__()
self.type = type
self.parameters = OrderedDict()
self.named_ports = OrderedDict()
for (kind, name, value) in args:
if kind == "a":
self.attrs[name] = value
elif kind == "p":
self.parameters[name] = value
elif kind in ("i", "o", "io"):
self.named_ports[name] = (Value.cast(value), kind)
else:
raise NameError("Instance argument {!r} should be a tuple (kind, name, value) "
"where kind is one of \"a\", \"p\", \"i\", \"o\", or \"io\""
.format((kind, name, value)))
for kw, arg in kwargs.items():
if kw.startswith("a_"):
self.attrs[kw[2:]] = arg
elif kw.startswith("p_"):
self.parameters[kw[2:]] = arg
elif kw.startswith("i_"):
self.named_ports[kw[2:]] = (Value.cast(arg), "i")
elif kw.startswith("o_"):
self.named_ports[kw[2:]] = (Value.cast(arg), "o")
elif kw.startswith("io_"):
self.named_ports[kw[3:]] = (Value.cast(arg), "io")
else:
raise NameError("Instance keyword argument {}={!r} does not start with one of "
"\"a_\", \"p_\", \"i_\", \"o_\", or \"io_\""
.format(kw, arg))
warnings.warn("instead of nmigen.hdl.ir, use amaranth.hdl.ir",
DeprecationWarning, stacklevel=2)

View file

@ -1,322 +1,7 @@
import operator
from collections import OrderedDict
from .. import tracer
from .ast import *
from .ir import Elaboratable, Instance
from amaranth.hdl.mem import *
from amaranth.hdl.mem import __all__
__all__ = ["Memory", "ReadPort", "WritePort", "DummyPort"]
class Memory:
"""A word addressable storage.
Parameters
----------
width : int
Access granularity. Each storage element of this memory is ``width`` bits in size.
depth : int
Word count. This memory contains ``depth`` storage elements.
init : list of int
Initial values. At power on, each storage element in this memory is initialized to
the corresponding element of ``init``, if any, or to zero otherwise.
Uninitialized memories are not currently supported.
name : str
Name hint for this memory. If ``None`` (default) the name is inferred from the variable
name this ``Signal`` is assigned to.
attrs : dict
Dictionary of synthesis attributes.
Attributes
----------
width : int
depth : int
init : list of int
attrs : dict
"""
def __init__(self, *, width, depth, init=None, name=None, attrs=None, simulate=True):
if not isinstance(width, int) or width < 0:
raise TypeError("Memory width must be a non-negative integer, not {!r}"
.format(width))
if not isinstance(depth, int) or depth < 0:
raise TypeError("Memory depth must be a non-negative integer, not {!r}"
.format(depth))
self.name = name or tracer.get_var_name(depth=2, default="$memory")
self.src_loc = tracer.get_src_loc()
self.width = width
self.depth = depth
self.attrs = OrderedDict(() if attrs is None else attrs)
# Array of signals for simulation.
self._array = Array()
if simulate:
for addr in range(self.depth):
self._array.append(Signal(self.width, name="{}({})"
.format(name or "memory", addr)))
self.init = init
@property
def init(self):
return self._init
@init.setter
def init(self, new_init):
self._init = [] if new_init is None else list(new_init)
if len(self.init) > self.depth:
raise ValueError("Memory initialization value count exceed memory depth ({} > {})"
.format(len(self.init), self.depth))
try:
for addr in range(len(self._array)):
if addr < len(self._init):
self._array[addr].reset = operator.index(self._init[addr])
else:
self._array[addr].reset = 0
except TypeError as e:
raise TypeError("Memory initialization value at address {:x}: {}"
.format(addr, e)) from None
def read_port(self, *, src_loc_at=0, **kwargs):
"""Get a read port.
See :class:`ReadPort` for details.
Arguments
---------
domain : str
transparent : bool
Returns
-------
An instance of :class:`ReadPort` associated with this memory.
"""
return ReadPort(self, src_loc_at=1 + src_loc_at, **kwargs)
def write_port(self, *, src_loc_at=0, **kwargs):
"""Get a write port.
See :class:`WritePort` for details.
Arguments
---------
domain : str
granularity : int
Returns
-------
An instance of :class:`WritePort` associated with this memory.
"""
return WritePort(self, src_loc_at=1 + src_loc_at, **kwargs)
def __getitem__(self, index):
"""Simulation only."""
return self._array[index]
class ReadPort(Elaboratable):
"""A memory read port.
Parameters
----------
memory : :class:`Memory`
Memory associated with the port.
domain : str
Clock domain. Defaults to ``"sync"``. If set to ``"comb"``, the port is asynchronous.
Otherwise, the read data becomes available on the next clock cycle.
transparent : bool
Port transparency. If set (default), a read at an address that is also being written to in
the same clock cycle will output the new value. Otherwise, the old value will be output
first. This behavior only applies to ports in the same domain.
Attributes
----------
memory : :class:`Memory`
domain : str
transparent : bool
addr : Signal(range(memory.depth)), in
Read address.
data : Signal(memory.width), out
Read data.
en : Signal or Const, in
Read enable. If asserted, ``data`` is updated with the word stored at ``addr``. Note that
transparent ports cannot assign ``en`` (which is hardwired to 1 instead), as doing so is
currently not supported by Yosys.
Exceptions
----------
Raises :exn:`ValueError` if the read port is simultaneously asynchronous and non-transparent.
"""
def __init__(self, memory, *, domain="sync", transparent=True, src_loc_at=0):
if domain == "comb" and not transparent:
raise ValueError("Read port cannot be simultaneously asynchronous and non-transparent")
self.memory = memory
self.domain = domain
self.transparent = transparent
self.addr = Signal(range(memory.depth),
name="{}_r_addr".format(memory.name), src_loc_at=1 + src_loc_at)
self.data = Signal(memory.width,
name="{}_r_data".format(memory.name), src_loc_at=1 + src_loc_at)
if self.domain != "comb" and not transparent:
self.en = Signal(name="{}_r_en".format(memory.name), reset=1,
src_loc_at=1 + src_loc_at)
else:
self.en = Const(1)
def elaborate(self, platform):
f = Instance("$memrd",
p_MEMID=self.memory,
p_ABITS=self.addr.width,
p_WIDTH=self.data.width,
p_CLK_ENABLE=self.domain != "comb",
p_CLK_POLARITY=1,
p_TRANSPARENT=self.transparent,
i_CLK=ClockSignal(self.domain) if self.domain != "comb" else Const(0),
i_EN=self.en,
i_ADDR=self.addr,
o_DATA=self.data,
)
if self.domain == "comb":
# Asynchronous port
f.add_statements(self.data.eq(self.memory._array[self.addr]))
f.add_driver(self.data)
elif not self.transparent:
# Synchronous, read-before-write port
f.add_statements(
Switch(self.en, {
1: self.data.eq(self.memory._array[self.addr])
})
)
f.add_driver(self.data, self.domain)
else:
# Synchronous, write-through port
# This model is a bit unconventional. We model transparent ports as asynchronous ports
# that are latched when the clock is high. This isn't exactly correct, but it is very
# close to the correct behavior of a transparent port, and the difference should only
# be observable in pathological cases of clock gating. A register is injected to
# the address input to achieve the correct address-to-data latency. Also, the reset
# value of the data output is forcibly set to the 0th initial value, if any--note that
# many FPGAs do not guarantee this behavior!
if len(self.memory.init) > 0:
self.data.reset = operator.index(self.memory.init[0])
latch_addr = Signal.like(self.addr)
f.add_statements(
latch_addr.eq(self.addr),
Switch(ClockSignal(self.domain), {
0: self.data.eq(self.data),
1: self.data.eq(self.memory._array[latch_addr]),
}),
)
f.add_driver(latch_addr, self.domain)
f.add_driver(self.data)
return f
class WritePort(Elaboratable):
"""A memory write port.
Parameters
----------
memory : :class:`Memory`
Memory associated with the port.
domain : str
Clock domain. Defaults to ``"sync"``. Writes have a latency of 1 clock cycle.
granularity : int
Port granularity. Defaults to ``memory.width``. Write data is split evenly in
``memory.width // granularity`` chunks, which can be updated independently.
Attributes
----------
memory : :class:`Memory`
domain : str
granularity : int
addr : Signal(range(memory.depth)), in
Write address.
data : Signal(memory.width), in
Write data.
en : Signal(memory.width // granularity), in
Write enable. Each bit selects a non-overlapping chunk of ``granularity`` bits on the
``data`` signal, which is written to memory at ``addr``. Unselected chunks are ignored.
Exceptions
----------
Raises :exn:`ValueError` if the write port granularity is greater than memory width, or does not
divide memory width evenly.
"""
def __init__(self, memory, *, domain="sync", granularity=None, src_loc_at=0):
if granularity is None:
granularity = memory.width
if not isinstance(granularity, int) or granularity < 0:
raise TypeError("Write port granularity must be a non-negative integer, not {!r}"
.format(granularity))
if granularity > memory.width:
raise ValueError("Write port granularity must not be greater than memory width "
"({} > {})"
.format(granularity, memory.width))
if memory.width // granularity * granularity != memory.width:
raise ValueError("Write port granularity must divide memory width evenly")
self.memory = memory
self.domain = domain
self.granularity = granularity
self.addr = Signal(range(memory.depth),
name="{}_w_addr".format(memory.name), src_loc_at=1 + src_loc_at)
self.data = Signal(memory.width,
name="{}_w_data".format(memory.name), src_loc_at=1 + src_loc_at)
self.en = Signal(memory.width // granularity,
name="{}_w_en".format(memory.name), src_loc_at=1 + src_loc_at)
def elaborate(self, platform):
f = Instance("$memwr",
p_MEMID=self.memory,
p_ABITS=self.addr.width,
p_WIDTH=self.data.width,
p_CLK_ENABLE=1,
p_CLK_POLARITY=1,
p_PRIORITY=0,
i_CLK=ClockSignal(self.domain),
i_EN=Cat(Repl(en_bit, self.granularity) for en_bit in self.en),
i_ADDR=self.addr,
i_DATA=self.data,
)
if len(self.en) > 1:
for index, en_bit in enumerate(self.en):
offset = index * self.granularity
bits = slice(offset, offset + self.granularity)
write_data = self.memory._array[self.addr][bits].eq(self.data[bits])
f.add_statements(Switch(en_bit, { 1: write_data }))
else:
write_data = self.memory._array[self.addr].eq(self.data)
f.add_statements(Switch(self.en, { 1: write_data }))
for signal in self.memory._array:
f.add_driver(signal, self.domain)
return f
class DummyPort:
"""Dummy memory port.
This port can be used in place of either a read or a write port for testing and verification.
It does not include any read/write port specific attributes, i.e. none besides ``"domain"``;
any such attributes may be set manually.
"""
def __init__(self, *, data_width, addr_width, domain="sync", name=None, granularity=None):
self.domain = domain
if granularity is None:
granularity = data_width
if name is None:
name = tracer.get_var_name(depth=2, default="dummy")
self.addr = Signal(addr_width,
name="{}_addr".format(name), src_loc_at=1)
self.data = Signal(data_width,
name="{}_data".format(name), src_loc_at=1)
self.en = Signal(data_width // granularity,
name="{}_en".format(name), src_loc_at=1)
import warnings
warnings.warn("instead of nmigen.hdl.mem, use amaranth.hdl.mem",
DeprecationWarning, stacklevel=2)

View file

@ -1,278 +1,7 @@
from enum import Enum
from collections import OrderedDict
from functools import reduce, wraps
from .. import tracer
from .._utils import union
from .ast import *
from amaranth.hdl.rec import *
from amaranth.hdl.rec import __all__
__all__ = ["Direction", "DIR_NONE", "DIR_FANOUT", "DIR_FANIN", "Layout", "Record"]
Direction = Enum('Direction', ('NONE', 'FANOUT', 'FANIN'))
DIR_NONE = Direction.NONE
DIR_FANOUT = Direction.FANOUT
DIR_FANIN = Direction.FANIN
class Layout:
@staticmethod
def cast(obj, *, src_loc_at=0):
if isinstance(obj, Layout):
return obj
return Layout(obj, src_loc_at=1 + src_loc_at)
def __init__(self, fields, *, src_loc_at=0):
self.fields = OrderedDict()
for field in fields:
if not isinstance(field, tuple) or len(field) not in (2, 3):
raise TypeError("Field {!r} has invalid layout: should be either "
"(name, shape) or (name, shape, direction)"
.format(field))
if len(field) == 2:
name, shape = field
direction = DIR_NONE
if isinstance(shape, list):
shape = Layout.cast(shape)
else:
name, shape, direction = field
if not isinstance(direction, Direction):
raise TypeError("Field {!r} has invalid direction: should be a Direction "
"instance like DIR_FANIN"
.format(field))
if not isinstance(name, str):
raise TypeError("Field {!r} has invalid name: should be a string"
.format(field))
if not isinstance(shape, Layout):
try:
# Check provided shape by calling Shape.cast and checking for exception
Shape.cast(shape, src_loc_at=1 + src_loc_at)
except Exception:
raise TypeError("Field {!r} has invalid shape: should be castable to Shape "
"or a list of fields of a nested record"
.format(field))
if name in self.fields:
raise NameError("Field {!r} has a name that is already present in the layout"
.format(field))
self.fields[name] = (shape, direction)
def __getitem__(self, item):
if isinstance(item, tuple):
return Layout([
(name, shape, dir)
for (name, (shape, dir)) in self.fields.items()
if name in item
])
return self.fields[item]
def __iter__(self):
for name, (shape, dir) in self.fields.items():
yield (name, shape, dir)
def __eq__(self, other):
return self.fields == other.fields
def __repr__(self):
field_reprs = []
for name, shape, dir in self:
if dir == DIR_NONE:
field_reprs.append("({!r}, {!r})".format(name, shape))
else:
field_reprs.append("({!r}, {!r}, Direction.{})".format(name, shape, dir.name))
return "Layout([{}])".format(", ".join(field_reprs))
class Record(ValueCastable):
@staticmethod
def like(other, *, name=None, name_suffix=None, src_loc_at=0):
if name is not None:
new_name = str(name)
elif name_suffix is not None:
new_name = other.name + str(name_suffix)
else:
new_name = tracer.get_var_name(depth=2 + src_loc_at, default=None)
def concat(a, b):
if a is None:
return b
return "{}__{}".format(a, b)
fields = {}
for field_name in other.fields:
field = other[field_name]
if isinstance(field, Record):
fields[field_name] = Record.like(field, name=concat(new_name, field_name),
src_loc_at=1 + src_loc_at)
else:
fields[field_name] = Signal.like(field, name=concat(new_name, field_name),
src_loc_at=1 + src_loc_at)
return Record(other.layout, name=new_name, fields=fields, src_loc_at=1)
def __init__(self, layout, *, name=None, fields=None, src_loc_at=0):
if name is None:
name = tracer.get_var_name(depth=2 + src_loc_at, default=None)
self.name = name
self.src_loc = tracer.get_src_loc(src_loc_at)
def concat(a, b):
if a is None:
return b
return "{}__{}".format(a, b)
self.layout = Layout.cast(layout, src_loc_at=1 + src_loc_at)
self.fields = OrderedDict()
for field_name, field_shape, field_dir in self.layout:
if fields is not None and field_name in fields:
field = fields[field_name]
if isinstance(field_shape, Layout):
assert isinstance(field, Record) and field_shape == field.layout
else:
assert isinstance(field, Signal) and Shape.cast(field_shape) == field.shape()
self.fields[field_name] = field
else:
if isinstance(field_shape, Layout):
self.fields[field_name] = Record(field_shape, name=concat(name, field_name),
src_loc_at=1 + src_loc_at)
else:
self.fields[field_name] = Signal(field_shape, name=concat(name, field_name),
src_loc_at=1 + src_loc_at)
def __getattr__(self, name):
return self[name]
def __getitem__(self, item):
if isinstance(item, str):
try:
return self.fields[item]
except KeyError:
if self.name is None:
reference = "Unnamed record"
else:
reference = "Record '{}'".format(self.name)
raise AttributeError("{} does not have a field '{}'. Did you mean one of: {}?"
.format(reference, item, ", ".join(self.fields))) from None
elif isinstance(item, tuple):
return Record(self.layout[item], fields={
field_name: field_value
for field_name, field_value in self.fields.items()
if field_name in item
})
else:
try:
return Value.__getitem__(self, item)
except KeyError:
if self.name is None:
reference = "Unnamed record"
else:
reference = "Record '{}'".format(self.name)
raise AttributeError("{} does not have a field '{}'. Did you mean one of: {}?"
.format(reference, item, ", ".join(self.fields))) from None
@ValueCastable.lowermethod
def as_value(self):
return Cat(self.fields.values())
def __len__(self):
return len(self.as_value())
def _lhs_signals(self):
return union((f._lhs_signals() for f in self.fields.values()), start=SignalSet())
def _rhs_signals(self):
return union((f._rhs_signals() for f in self.fields.values()), start=SignalSet())
def __repr__(self):
fields = []
for field_name, field in self.fields.items():
if isinstance(field, Signal):
fields.append(field_name)
else:
fields.append(repr(field))
name = self.name
if name is None:
name = "<unnamed>"
return "(rec {} {})".format(name, " ".join(fields))
def shape(self):
return self.as_value().shape()
def connect(self, *subordinates, include=None, exclude=None):
def rec_name(record):
if record.name is None:
return "unnamed record"
else:
return "record '{}'".format(record.name)
for field in include or {}:
if field not in self.fields:
raise AttributeError("Cannot include field '{}' because it is not present in {}"
.format(field, rec_name(self)))
for field in exclude or {}:
if field not in self.fields:
raise AttributeError("Cannot exclude field '{}' because it is not present in {}"
.format(field, rec_name(self)))
stmts = []
for field in self.fields:
if include is not None and field not in include:
continue
if exclude is not None and field in exclude:
continue
shape, direction = self.layout[field]
if not isinstance(shape, Layout) and direction == DIR_NONE:
raise TypeError("Cannot connect field '{}' of {} because it does not have "
"a direction"
.format(field, rec_name(self)))
item = self.fields[field]
subord_items = []
for subord in subordinates:
if field not in subord.fields:
raise AttributeError("Cannot connect field '{}' of {} to subordinate {} "
"because the subordinate record does not have this field"
.format(field, rec_name(self), rec_name(subord)))
subord_items.append(subord.fields[field])
if isinstance(shape, Layout):
sub_include = include[field] if include and field in include else None
sub_exclude = exclude[field] if exclude and field in exclude else None
stmts += item.connect(*subord_items, include=sub_include, exclude=sub_exclude)
else:
if direction == DIR_FANOUT:
stmts += [sub_item.eq(item) for sub_item in subord_items]
if direction == DIR_FANIN:
stmts += [item.eq(reduce(lambda a, b: a | b, subord_items))]
return stmts
def _valueproxy(name):
value_func = getattr(Value, name)
@wraps(value_func)
def _wrapper(self, *args, **kwargs):
return value_func(Value.cast(self), *args, **kwargs)
return _wrapper
for name in [
"__bool__",
"__invert__", "__neg__",
"__add__", "__radd__", "__sub__", "__rsub__",
"__mul__", "__rmul__",
"__mod__", "__rmod__", "__floordiv__", "__rfloordiv__",
"__lshift__", "__rlshift__", "__rshift__", "__rrshift__",
"__and__", "__rand__", "__xor__", "__rxor__", "__or__", "__ror__",
"__eq__", "__ne__", "__lt__", "__le__", "__gt__", "__ge__",
"__abs__", "__len__",
"as_unsigned", "as_signed", "bool", "any", "all", "xor", "implies",
"bit_select", "word_select", "matches",
"shift_left", "shift_right", "rotate_left", "rotate_right", "eq"
]:
setattr(Record, name, _valueproxy(name))
del _valueproxy
del name
import warnings
warnings.warn("instead of nmigen.hdl.rec, use amaranth.hdl.rec",
DeprecationWarning, stacklevel=2)

View file

@ -1,743 +1,7 @@
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from collections.abc import Iterable
from amaranth.hdl.xfrm import *
from amaranth.hdl.xfrm import __all__
from .._utils import flatten
from .. import tracer
from .ast import *
from .ast import _StatementList
from .cd import *
from .ir import *
from .rec import *
__all__ = ["ValueVisitor", "ValueTransformer",
"StatementVisitor", "StatementTransformer",
"FragmentTransformer",
"TransformedElaboratable",
"DomainCollector", "DomainRenamer", "DomainLowerer",
"SampleDomainInjector", "SampleLowerer",
"SwitchCleaner", "LHSGroupAnalyzer", "LHSGroupFilter",
"ResetInserter", "EnableInserter"]
class ValueVisitor(metaclass=ABCMeta):
@abstractmethod
def on_Const(self, value):
pass # :nocov:
@abstractmethod
def on_AnyConst(self, value):
pass # :nocov:
@abstractmethod
def on_AnySeq(self, value):
pass # :nocov:
@abstractmethod
def on_Signal(self, value):
pass # :nocov:
@abstractmethod
def on_ClockSignal(self, value):
pass # :nocov:
@abstractmethod
def on_ResetSignal(self, value):
pass # :nocov:
@abstractmethod
def on_Operator(self, value):
pass # :nocov:
@abstractmethod
def on_Slice(self, value):
pass # :nocov:
@abstractmethod
def on_Part(self, value):
pass # :nocov:
@abstractmethod
def on_Cat(self, value):
pass # :nocov:
@abstractmethod
def on_Repl(self, value):
pass # :nocov:
@abstractmethod
def on_ArrayProxy(self, value):
pass # :nocov:
@abstractmethod
def on_Sample(self, value):
pass # :nocov:
@abstractmethod
def on_Initial(self, value):
pass # :nocov:
def on_unknown_value(self, value):
raise TypeError("Cannot transform value {!r}".format(value)) # :nocov:
def replace_value_src_loc(self, value, new_value):
return True
def on_value(self, value):
if type(value) is Const:
new_value = self.on_Const(value)
elif type(value) is AnyConst:
new_value = self.on_AnyConst(value)
elif type(value) is AnySeq:
new_value = self.on_AnySeq(value)
elif isinstance(value, Signal):
# Uses `isinstance()` and not `type() is` because nmigen.compat requires it.
new_value = self.on_Signal(value)
elif type(value) is ClockSignal:
new_value = self.on_ClockSignal(value)
elif type(value) is ResetSignal:
new_value = self.on_ResetSignal(value)
elif type(value) is Operator:
new_value = self.on_Operator(value)
elif type(value) is Slice:
new_value = self.on_Slice(value)
elif type(value) is Part:
new_value = self.on_Part(value)
elif type(value) is Cat:
new_value = self.on_Cat(value)
elif type(value) is Repl:
new_value = self.on_Repl(value)
elif type(value) is ArrayProxy:
new_value = self.on_ArrayProxy(value)
elif type(value) is Sample:
new_value = self.on_Sample(value)
elif type(value) is Initial:
new_value = self.on_Initial(value)
elif isinstance(value, UserValue):
# Uses `isinstance()` and not `type() is` to allow inheriting.
new_value = self.on_value(value._lazy_lower())
else:
new_value = self.on_unknown_value(value)
if isinstance(new_value, Value) and self.replace_value_src_loc(value, new_value):
new_value.src_loc = value.src_loc
return new_value
def __call__(self, value):
return self.on_value(value)
class ValueTransformer(ValueVisitor):
def on_Const(self, value):
return value
def on_AnyConst(self, value):
return value
def on_AnySeq(self, value):
return value
def on_Signal(self, value):
return value
def on_ClockSignal(self, value):
return value
def on_ResetSignal(self, value):
return value
def on_Operator(self, value):
return Operator(value.operator, [self.on_value(o) for o in value.operands])
def on_Slice(self, value):
return Slice(self.on_value(value.value), value.start, value.stop)
def on_Part(self, value):
return Part(self.on_value(value.value), self.on_value(value.offset),
value.width, value.stride)
def on_Cat(self, value):
return Cat(self.on_value(o) for o in value.parts)
def on_Repl(self, value):
return Repl(self.on_value(value.value), value.count)
def on_ArrayProxy(self, value):
return ArrayProxy([self.on_value(elem) for elem in value._iter_as_values()],
self.on_value(value.index))
def on_Sample(self, value):
return Sample(self.on_value(value.value), value.clocks, value.domain)
def on_Initial(self, value):
return value
class StatementVisitor(metaclass=ABCMeta):
@abstractmethod
def on_Assign(self, stmt):
pass # :nocov:
@abstractmethod
def on_Assert(self, stmt):
pass # :nocov:
@abstractmethod
def on_Assume(self, stmt):
pass # :nocov:
@abstractmethod
def on_Cover(self, stmt):
pass # :nocov:
@abstractmethod
def on_Switch(self, stmt):
pass # :nocov:
@abstractmethod
def on_statements(self, stmts):
pass # :nocov:
def on_unknown_statement(self, stmt):
raise TypeError("Cannot transform statement {!r}".format(stmt)) # :nocov:
def replace_statement_src_loc(self, stmt, new_stmt):
return True
def on_statement(self, stmt):
if type(stmt) is Assign:
new_stmt = self.on_Assign(stmt)
elif type(stmt) is Assert:
new_stmt = self.on_Assert(stmt)
elif type(stmt) is Assume:
new_stmt = self.on_Assume(stmt)
elif type(stmt) is Cover:
new_stmt = self.on_Cover(stmt)
elif isinstance(stmt, Switch):
# Uses `isinstance()` and not `type() is` because nmigen.compat requires it.
new_stmt = self.on_Switch(stmt)
elif isinstance(stmt, Iterable):
new_stmt = self.on_statements(stmt)
else:
new_stmt = self.on_unknown_statement(stmt)
if isinstance(new_stmt, Statement) and self.replace_statement_src_loc(stmt, new_stmt):
new_stmt.src_loc = stmt.src_loc
if isinstance(new_stmt, Switch) and isinstance(stmt, Switch):
new_stmt.case_src_locs = stmt.case_src_locs
if isinstance(new_stmt, Property):
new_stmt._MustUse__used = True
return new_stmt
def __call__(self, stmt):
return self.on_statement(stmt)
class StatementTransformer(StatementVisitor):
def on_value(self, value):
return value
def on_Assign(self, stmt):
return Assign(self.on_value(stmt.lhs), self.on_value(stmt.rhs))
def on_Assert(self, stmt):
return Assert(self.on_value(stmt.test), _check=stmt._check, _en=stmt._en)
def on_Assume(self, stmt):
return Assume(self.on_value(stmt.test), _check=stmt._check, _en=stmt._en)
def on_Cover(self, stmt):
return Cover(self.on_value(stmt.test), _check=stmt._check, _en=stmt._en)
def on_Switch(self, stmt):
cases = OrderedDict((k, self.on_statement(s)) for k, s in stmt.cases.items())
return Switch(self.on_value(stmt.test), cases)
def on_statements(self, stmts):
return _StatementList(flatten(self.on_statement(stmt) for stmt in stmts))
class FragmentTransformer:
def map_subfragments(self, fragment, new_fragment):
for subfragment, name in fragment.subfragments:
new_fragment.add_subfragment(self(subfragment), name)
def map_ports(self, fragment, new_fragment):
for port, dir in fragment.ports.items():
new_fragment.add_ports(port, dir=dir)
def map_named_ports(self, fragment, new_fragment):
if hasattr(self, "on_value"):
for name, (value, dir) in fragment.named_ports.items():
new_fragment.named_ports[name] = self.on_value(value), dir
else:
new_fragment.named_ports = OrderedDict(fragment.named_ports.items())
def map_domains(self, fragment, new_fragment):
for domain in fragment.iter_domains():
new_fragment.add_domains(fragment.domains[domain])
def map_statements(self, fragment, new_fragment):
if hasattr(self, "on_statement"):
new_fragment.add_statements(map(self.on_statement, fragment.statements))
else:
new_fragment.add_statements(fragment.statements)
def map_drivers(self, fragment, new_fragment):
for domain, signal in fragment.iter_drivers():
new_fragment.add_driver(signal, domain)
def on_fragment(self, fragment):
if isinstance(fragment, Instance):
new_fragment = Instance(fragment.type)
new_fragment.parameters = OrderedDict(fragment.parameters)
self.map_named_ports(fragment, new_fragment)
else:
new_fragment = Fragment()
new_fragment.flatten = fragment.flatten
new_fragment.attrs = OrderedDict(fragment.attrs)
self.map_ports(fragment, new_fragment)
self.map_subfragments(fragment, new_fragment)
self.map_domains(fragment, new_fragment)
self.map_statements(fragment, new_fragment)
self.map_drivers(fragment, new_fragment)
return new_fragment
def __call__(self, value, *, src_loc_at=0):
if isinstance(value, Fragment):
return self.on_fragment(value)
elif isinstance(value, TransformedElaboratable):
value._transforms_.append(self)
return value
elif hasattr(value, "elaborate"):
value = TransformedElaboratable(value, src_loc_at=1 + src_loc_at)
value._transforms_.append(self)
return value
else:
raise AttributeError("Object {!r} cannot be elaborated".format(value))
class TransformedElaboratable(Elaboratable):
def __init__(self, elaboratable, *, src_loc_at=0):
assert hasattr(elaboratable, "elaborate")
# Fields prefixed and suffixed with underscore to avoid as many conflicts with the inner
# object as possible, since we're forwarding attribute requests to it.
self._elaboratable_ = elaboratable
self._transforms_ = []
def __getattr__(self, attr):
return getattr(self._elaboratable_, attr)
def elaborate(self, platform):
fragment = Fragment.get(self._elaboratable_, platform)
for transform in self._transforms_:
fragment = transform(fragment)
return fragment
class DomainCollector(ValueVisitor, StatementVisitor):
def __init__(self):
self.used_domains = set()
self.defined_domains = set()
self._local_domains = set()
def _add_used_domain(self, domain_name):
if domain_name is None:
return
if domain_name in self._local_domains:
return
self.used_domains.add(domain_name)
def on_ignore(self, value):
pass
on_Const = on_ignore
on_AnyConst = on_ignore
on_AnySeq = on_ignore
on_Signal = on_ignore
def on_ClockSignal(self, value):
self._add_used_domain(value.domain)
def on_ResetSignal(self, value):
self._add_used_domain(value.domain)
def on_Operator(self, value):
for o in value.operands:
self.on_value(o)
def on_Slice(self, value):
self.on_value(value.value)
def on_Part(self, value):
self.on_value(value.value)
self.on_value(value.offset)
def on_Cat(self, value):
for o in value.parts:
self.on_value(o)
def on_Repl(self, value):
self.on_value(value.value)
def on_ArrayProxy(self, value):
for elem in value._iter_as_values():
self.on_value(elem)
self.on_value(value.index)
def on_Sample(self, value):
self.on_value(value.value)
def on_Initial(self, value):
pass
def on_Assign(self, stmt):
self.on_value(stmt.lhs)
self.on_value(stmt.rhs)
def on_property(self, stmt):
self.on_value(stmt.test)
on_Assert = on_property
on_Assume = on_property
on_Cover = on_property
def on_Switch(self, stmt):
self.on_value(stmt.test)
for stmts in stmt.cases.values():
self.on_statement(stmts)
def on_statements(self, stmts):
for stmt in stmts:
self.on_statement(stmt)
def on_fragment(self, fragment):
if isinstance(fragment, Instance):
for name, (value, dir) in fragment.named_ports.items():
self.on_value(value)
old_local_domains, self._local_domains = self._local_domains, set(self._local_domains)
for domain_name, domain in fragment.domains.items():
if domain.local:
self._local_domains.add(domain_name)
else:
self.defined_domains.add(domain_name)
self.on_statements(fragment.statements)
for domain_name in fragment.drivers:
self._add_used_domain(domain_name)
for subfragment, name in fragment.subfragments:
self.on_fragment(subfragment)
self._local_domains = old_local_domains
def __call__(self, fragment):
self.on_fragment(fragment)
class DomainRenamer(FragmentTransformer, ValueTransformer, StatementTransformer):
def __init__(self, domain_map):
if isinstance(domain_map, str):
domain_map = {"sync": domain_map}
for src, dst in domain_map.items():
if src == "comb":
raise ValueError("Domain '{}' may not be renamed".format(src))
if dst == "comb":
raise ValueError("Domain '{}' may not be renamed to '{}'".format(src, dst))
self.domain_map = OrderedDict(domain_map)
def on_ClockSignal(self, value):
if value.domain in self.domain_map:
return ClockSignal(self.domain_map[value.domain])
return value
def on_ResetSignal(self, value):
if value.domain in self.domain_map:
return ResetSignal(self.domain_map[value.domain],
allow_reset_less=value.allow_reset_less)
return value
def map_domains(self, fragment, new_fragment):
for domain in fragment.iter_domains():
cd = fragment.domains[domain]
if domain in self.domain_map:
if cd.name == domain:
# Rename the actual ClockDomain object.
cd.rename(self.domain_map[domain])
else:
assert cd.name == self.domain_map[domain]
new_fragment.add_domains(cd)
def map_drivers(self, fragment, new_fragment):
for domain, signals in fragment.drivers.items():
if domain in self.domain_map:
domain = self.domain_map[domain]
for signal in signals:
new_fragment.add_driver(self.on_value(signal), domain)
class DomainLowerer(FragmentTransformer, ValueTransformer, StatementTransformer):
def __init__(self, domains=None):
self.domains = domains
def _resolve(self, domain, context):
if domain not in self.domains:
raise DomainError("Signal {!r} refers to nonexistent domain '{}'"
.format(context, domain))
return self.domains[domain]
def map_drivers(self, fragment, new_fragment):
for domain, signal in fragment.iter_drivers():
new_fragment.add_driver(self.on_value(signal), domain)
def replace_value_src_loc(self, value, new_value):
return not isinstance(value, (ClockSignal, ResetSignal))
def on_ClockSignal(self, value):
domain = self._resolve(value.domain, value)
return domain.clk
def on_ResetSignal(self, value):
domain = self._resolve(value.domain, value)
if domain.rst is None:
if value.allow_reset_less:
return Const(0)
else:
raise DomainError("Signal {!r} refers to reset of reset-less domain '{}'"
.format(value, value.domain))
return domain.rst
def _insert_resets(self, fragment):
for domain_name, signals in fragment.drivers.items():
if domain_name is None:
continue
domain = fragment.domains[domain_name]
if domain.rst is None:
continue
stmts = [signal.eq(Const(signal.reset, signal.width))
for signal in signals if not signal.reset_less]
fragment.add_statements(Switch(domain.rst, {1: stmts}))
def on_fragment(self, fragment):
self.domains = fragment.domains
new_fragment = super().on_fragment(fragment)
self._insert_resets(new_fragment)
return new_fragment
class SampleDomainInjector(ValueTransformer, StatementTransformer):
def __init__(self, domain):
self.domain = domain
def on_Sample(self, value):
if value.domain is not None:
return value
return Sample(value.value, value.clocks, self.domain)
def __call__(self, stmts):
return self.on_statement(stmts)
class SampleLowerer(FragmentTransformer, ValueTransformer, StatementTransformer):
def __init__(self):
self.initial = None
self.sample_cache = None
self.sample_stmts = None
def _name_reset(self, value):
if isinstance(value, Const):
return "c${}".format(value.value), value.value
elif isinstance(value, Signal):
return "s${}".format(value.name), value.reset
elif isinstance(value, ClockSignal):
return "clk", 0
elif isinstance(value, ResetSignal):
return "rst", 1
elif isinstance(value, Initial):
return "init", 0 # Past(Initial()) produces 0, 1, 0, 0, ...
else:
raise NotImplementedError # :nocov:
def on_Sample(self, value):
if value in self.sample_cache:
return self.sample_cache[value]
sampled_value = self.on_value(value.value)
if value.clocks == 0:
sample = sampled_value
else:
assert value.domain is not None
sampled_name, sampled_reset = self._name_reset(value.value)
name = "$sample${}${}${}".format(sampled_name, value.domain, value.clocks)
sample = Signal.like(value.value, name=name, reset_less=True, reset=sampled_reset)
sample.attrs["nmigen.sample_reg"] = True
prev_sample = self.on_Sample(Sample(sampled_value, value.clocks - 1, value.domain))
if value.domain not in self.sample_stmts:
self.sample_stmts[value.domain] = []
self.sample_stmts[value.domain].append(sample.eq(prev_sample))
self.sample_cache[value] = sample
return sample
def on_Initial(self, value):
if self.initial is None:
self.initial = Signal(name="init")
return self.initial
def map_statements(self, fragment, new_fragment):
self.initial = None
self.sample_cache = ValueDict()
self.sample_stmts = OrderedDict()
new_fragment.add_statements(map(self.on_statement, fragment.statements))
for domain, stmts in self.sample_stmts.items():
new_fragment.add_statements(stmts)
for stmt in stmts:
new_fragment.add_driver(stmt.lhs, domain)
if self.initial is not None:
new_fragment.add_subfragment(Instance("$initstate", o_Y=self.initial))
class SwitchCleaner(StatementVisitor):
def on_ignore(self, stmt):
return stmt
on_Assign = on_ignore
on_Assert = on_ignore
on_Assume = on_ignore
on_Cover = on_ignore
def on_Switch(self, stmt):
cases = OrderedDict((k, self.on_statement(s)) for k, s in stmt.cases.items())
if any(len(s) for s in cases.values()):
return Switch(stmt.test, cases)
def on_statements(self, stmts):
stmts = flatten(self.on_statement(stmt) for stmt in stmts)
return _StatementList(stmt for stmt in stmts if stmt is not None)
class LHSGroupAnalyzer(StatementVisitor):
def __init__(self):
self.signals = SignalDict()
self.unions = OrderedDict()
def find(self, signal):
if signal not in self.signals:
self.signals[signal] = len(self.signals)
group = self.signals[signal]
while group in self.unions:
group = self.unions[group]
self.signals[signal] = group
return group
def unify(self, root, *leaves):
root_group = self.find(root)
for leaf in leaves:
leaf_group = self.find(leaf)
if root_group == leaf_group:
continue
self.unions[leaf_group] = root_group
def groups(self):
groups = OrderedDict()
for signal in self.signals:
group = self.find(signal)
if group not in groups:
groups[group] = SignalSet()
groups[group].add(signal)
return groups
def on_Assign(self, stmt):
lhs_signals = stmt._lhs_signals()
if lhs_signals:
self.unify(*stmt._lhs_signals())
def on_property(self, stmt):
lhs_signals = stmt._lhs_signals()
if lhs_signals:
self.unify(*stmt._lhs_signals())
on_Assert = on_property
on_Assume = on_property
on_Cover = on_property
def on_Switch(self, stmt):
for case_stmts in stmt.cases.values():
self.on_statements(case_stmts)
def on_statements(self, stmts):
for stmt in stmts:
self.on_statement(stmt)
def __call__(self, stmts):
self.on_statements(stmts)
return self.groups()
class LHSGroupFilter(SwitchCleaner):
def __init__(self, signals):
self.signals = signals
def on_Assign(self, stmt):
# The invariant provided by LHSGroupAnalyzer is that all signals that ever appear together
# on LHS are a part of the same group, so it is sufficient to check any of them.
lhs_signals = stmt.lhs._lhs_signals()
if lhs_signals:
any_lhs_signal = next(iter(lhs_signals))
if any_lhs_signal in self.signals:
return stmt
def on_property(self, stmt):
any_lhs_signal = next(iter(stmt._lhs_signals()))
if any_lhs_signal in self.signals:
return stmt
on_Assert = on_property
on_Assume = on_property
on_Cover = on_property
class _ControlInserter(FragmentTransformer):
def __init__(self, controls):
self.src_loc = None
if isinstance(controls, Value):
controls = {"sync": controls}
self.controls = OrderedDict(controls)
def on_fragment(self, fragment):
new_fragment = super().on_fragment(fragment)
for domain, signals in fragment.drivers.items():
if domain is None or domain not in self.controls:
continue
self._insert_control(new_fragment, domain, signals)
return new_fragment
def _insert_control(self, fragment, domain, signals):
raise NotImplementedError # :nocov:
def __call__(self, value, *, src_loc_at=0):
self.src_loc = tracer.get_src_loc(src_loc_at=src_loc_at)
return super().__call__(value, src_loc_at=1 + src_loc_at)
class ResetInserter(_ControlInserter):
def _insert_control(self, fragment, domain, signals):
stmts = [s.eq(Const(s.reset, s.width)) for s in signals if not s.reset_less]
fragment.add_statements(Switch(self.controls[domain], {1: stmts}, src_loc=self.src_loc))
class EnableInserter(_ControlInserter):
def _insert_control(self, fragment, domain, signals):
stmts = [s.eq(s) for s in signals]
fragment.add_statements(Switch(self.controls[domain], {0: stmts}, src_loc=self.src_loc))
def on_fragment(self, fragment):
new_fragment = super().on_fragment(fragment)
if isinstance(new_fragment, Instance) and new_fragment.type in ("$memrd", "$memwr"):
clk_port, clk_dir = new_fragment.named_ports["CLK"]
if isinstance(clk_port, ClockSignal) and clk_port.domain in self.controls:
en_port, en_dir = new_fragment.named_ports["EN"]
en_port = Mux(self.controls[clk_port.domain], en_port, Const(0, len(en_port)))
new_fragment.named_ports["EN"] = en_port, en_dir
return new_fragment
import warnings
warnings.warn("instead of nmigen.hdl.xfrm, use amaranth.hdl.xfrm",
DeprecationWarning, stacklevel=2)