Implement RFC 45: Move hdl.Memory
to lib.Memory
.
This commit is contained in:
parent
6d65dc1366
commit
890e099ec3
|
@ -4,7 +4,7 @@ from ._ast import Const, C, Mux, Cat, Array, Signal, ClockSignal, ResetSignal
|
||||||
from ._dsl import SyntaxError, SyntaxWarning, Module
|
from ._dsl import SyntaxError, SyntaxWarning, Module
|
||||||
from ._cd import DomainError, ClockDomain
|
from ._cd import DomainError, ClockDomain
|
||||||
from ._ir import UnusedElaboratable, Elaboratable, DriverConflict, Fragment, Instance
|
from ._ir import UnusedElaboratable, Elaboratable, DriverConflict, Fragment, Instance
|
||||||
from ._mem import Memory, ReadPort, WritePort, DummyPort
|
from ._mem import MemoryIdentity, MemoryInstance, Memory, ReadPort, WritePort, DummyPort
|
||||||
from ._rec import Record
|
from ._rec import Record
|
||||||
from ._xfrm import DomainRenamer, ResetInserter, EnableInserter
|
from ._xfrm import DomainRenamer, ResetInserter, EnableInserter
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ __all__ = [
|
||||||
# _ir
|
# _ir
|
||||||
"UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance",
|
"UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance",
|
||||||
# _mem
|
# _mem
|
||||||
"Memory", "ReadPort", "WritePort", "DummyPort",
|
"MemoryIdentity", "MemoryInstance", "Memory", "ReadPort", "WritePort", "DummyPort",
|
||||||
# _rec
|
# _rec
|
||||||
"Record",
|
"Record",
|
||||||
# _xfrm
|
# _xfrm
|
||||||
|
|
|
@ -1109,7 +1109,7 @@ class NetlistEmitter:
|
||||||
en=en,
|
en=en,
|
||||||
clk=clk,
|
clk=clk,
|
||||||
clk_edge=cd.clk_edge,
|
clk_edge=cd.clk_edge,
|
||||||
transparent_for=tuple(write_ports[idx] for idx in port._transparency),
|
transparent_for=tuple(write_ports[idx] for idx in port._transparent_for),
|
||||||
src_loc=port._data.src_loc,
|
src_loc=port._data.src_loc,
|
||||||
)
|
)
|
||||||
data = self.netlist.add_value_cell(len(port._data), cell)
|
data = self.netlist.add_value_cell(len(port._data), cell)
|
||||||
|
|
|
@ -5,6 +5,7 @@ from .. import tracer
|
||||||
from ._ast import *
|
from ._ast import *
|
||||||
from ._ir import Elaboratable, Fragment
|
from ._ir import Elaboratable, Fragment
|
||||||
from ..utils import ceil_log2
|
from ..utils import ceil_log2
|
||||||
|
from .._utils import deprecated
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["Memory", "ReadPort", "WritePort", "DummyPort"]
|
__all__ = ["Memory", "ReadPort", "WritePort", "DummyPort"]
|
||||||
|
@ -33,18 +34,19 @@ class MemorySimWrite:
|
||||||
|
|
||||||
class MemoryInstance(Fragment):
|
class MemoryInstance(Fragment):
|
||||||
class _ReadPort:
|
class _ReadPort:
|
||||||
def __init__(self, *, domain, addr, data, en, transparency):
|
def __init__(self, *, domain, addr, data, en, transparent_for):
|
||||||
assert isinstance(domain, str)
|
assert isinstance(domain, str)
|
||||||
self._domain = domain
|
self._domain = domain
|
||||||
self._addr = Value.cast(addr)
|
self._addr = Value.cast(addr)
|
||||||
self._data = Value.cast(data)
|
self._data = Value.cast(data)
|
||||||
self._en = Value.cast(en)
|
self._en = Value.cast(en)
|
||||||
self._transparency = tuple(transparency)
|
self._transparent_for = tuple(transparent_for)
|
||||||
assert len(self._en) == 1
|
assert len(self._en) == 1
|
||||||
if domain == "comb":
|
if domain == "comb":
|
||||||
assert isinstance(self._en, Const)
|
assert isinstance(self._en, Const)
|
||||||
assert self._en.width == 1
|
assert self._en.width == 1
|
||||||
assert self._en.value == 1
|
assert self._en.value == 1
|
||||||
|
assert not self._transparent_for
|
||||||
|
|
||||||
class _WritePort:
|
class _WritePort:
|
||||||
def __init__(self, *, domain, addr, data, en):
|
def __init__(self, *, domain, addr, data, en):
|
||||||
|
@ -70,22 +72,24 @@ class MemoryInstance(Fragment):
|
||||||
self._identity = identity
|
self._identity = identity
|
||||||
self._width = operator.index(width)
|
self._width = operator.index(width)
|
||||||
self._depth = operator.index(depth)
|
self._depth = operator.index(depth)
|
||||||
self._init = tuple(init) if init is not None else ()
|
mask = (1 << self._width) - 1
|
||||||
|
self._init = tuple(item & mask for item in init) if init is not None else ()
|
||||||
assert len(self._init) <= self._depth
|
assert len(self._init) <= self._depth
|
||||||
self._init += (0,) * (self._depth - len(self._init))
|
self._init += (0,) * (self._depth - len(self._init))
|
||||||
for x in self._init:
|
for x in self._init:
|
||||||
assert isinstance(x, int)
|
assert isinstance(x, int)
|
||||||
self._attrs = attrs or {}
|
self._attrs = attrs or {}
|
||||||
self._read_ports = []
|
self._read_ports: "list[MemoryInstance._ReadPort]" = []
|
||||||
self._write_ports = []
|
self._write_ports: "list[MemoryInstance._WritePort]" = []
|
||||||
|
|
||||||
def read_port(self, *, domain, addr, data, en, transparency):
|
def read_port(self, *, domain, addr, data, en, transparent_for):
|
||||||
port = self._ReadPort(domain=domain, addr=addr, data=data, en=en, transparency=transparency)
|
port = self._ReadPort(domain=domain, addr=addr, data=data, en=en, transparent_for=transparent_for)
|
||||||
assert len(port._data) == self._width
|
assert len(port._data) == self._width
|
||||||
assert len(port._addr) == ceil_log2(self._depth)
|
assert len(port._addr) == ceil_log2(self._depth)
|
||||||
for x in port._transparency:
|
for idx in port._transparent_for:
|
||||||
assert isinstance(x, int)
|
assert isinstance(idx, int)
|
||||||
assert x in range(len(self._write_ports))
|
assert idx in range(len(self._write_ports))
|
||||||
|
assert self._write_ports[idx]._domain == port._domain
|
||||||
for signal in port._data._rhs_signals():
|
for signal in port._data._rhs_signals():
|
||||||
self.add_driver(signal, port._domain)
|
self.add_driver(signal, port._domain)
|
||||||
self._read_ports.append(port)
|
self._read_ports.append(port)
|
||||||
|
@ -124,6 +128,8 @@ class Memory(Elaboratable):
|
||||||
init : list of int
|
init : list of int
|
||||||
attrs : dict
|
attrs : dict
|
||||||
"""
|
"""
|
||||||
|
# TODO(amaranth-0.6): remove
|
||||||
|
@deprecated("`amaranth.hdl.Memory` is deprecated, use `amaranth.lib.memory.Memory` instead")
|
||||||
def __init__(self, *, width, depth, init=None, name=None, attrs=None, simulate=True):
|
def __init__(self, *, width, depth, init=None, name=None, attrs=None, simulate=True):
|
||||||
if not isinstance(width, int) or width < 0:
|
if not isinstance(width, int) or width < 0:
|
||||||
raise TypeError("Memory width must be a non-negative integer, not {!r}"
|
raise TypeError("Memory width must be a non-negative integer, not {!r}"
|
||||||
|
@ -132,8 +138,8 @@ class Memory(Elaboratable):
|
||||||
raise TypeError("Memory depth must be a non-negative integer, not {!r}"
|
raise TypeError("Memory depth must be a non-negative integer, not {!r}"
|
||||||
.format(depth))
|
.format(depth))
|
||||||
|
|
||||||
self.name = name or tracer.get_var_name(depth=2, default="$memory")
|
self.name = name or tracer.get_var_name(depth=3, default="$memory")
|
||||||
self.src_loc = tracer.get_src_loc()
|
self.src_loc = tracer.get_src_loc(src_loc_at=1)
|
||||||
|
|
||||||
self.width = width
|
self.width = width
|
||||||
self.depth = depth
|
self.depth = depth
|
||||||
|
@ -208,12 +214,12 @@ class Memory(Elaboratable):
|
||||||
for port in self._read_ports:
|
for port in self._read_ports:
|
||||||
port._MustUse__used = True
|
port._MustUse__used = True
|
||||||
if port.domain == "comb":
|
if port.domain == "comb":
|
||||||
f.read_port(domain="comb", addr=port.addr, data=port.data, en=Const(1), transparency=())
|
f.read_port(domain="comb", addr=port.addr, data=port.data, en=Const(1), transparent_for=())
|
||||||
else:
|
else:
|
||||||
transparency = []
|
transparent_for = []
|
||||||
if port.transparent:
|
if port.transparent:
|
||||||
transparency = write_ports.get(port.domain, [])
|
transparent_for = write_ports.get(port.domain, [])
|
||||||
f.read_port(domain=port.domain, addr=port.addr, data=port.data, en=port.en, transparency=transparency)
|
f.read_port(domain=port.domain, addr=port.addr, data=port.data, en=port.en, transparent_for=transparent_for)
|
||||||
return f
|
return f
|
||||||
|
|
||||||
|
|
||||||
|
@ -346,13 +352,15 @@ class DummyPort:
|
||||||
It does not include any read/write port specific attributes, i.e. none besides ``"domain"``;
|
It does not include any read/write port specific attributes, i.e. none besides ``"domain"``;
|
||||||
any such attributes may be set manually.
|
any such attributes may be set manually.
|
||||||
"""
|
"""
|
||||||
|
# TODO(amaranth-0.6): remove
|
||||||
|
@deprecated("`DummyPort` is deprecated, use `amaranth.lib.memory.ReadPort` or `amaranth.lib.memory.WritePort` instead")
|
||||||
def __init__(self, *, data_width, addr_width, domain="sync", name=None, granularity=None):
|
def __init__(self, *, data_width, addr_width, domain="sync", name=None, granularity=None):
|
||||||
self.domain = domain
|
self.domain = domain
|
||||||
|
|
||||||
if granularity is None:
|
if granularity is None:
|
||||||
granularity = data_width
|
granularity = data_width
|
||||||
if name is None:
|
if name is None:
|
||||||
name = tracer.get_var_name(depth=2, default="dummy")
|
name = tracer.get_var_name(depth=3, default="dummy")
|
||||||
|
|
||||||
self.addr = Signal(addr_width,
|
self.addr = Signal(addr_width,
|
||||||
name=f"{name}_addr", src_loc_at=1)
|
name=f"{name}_addr", src_loc_at=1)
|
||||||
|
|
|
@ -263,7 +263,7 @@ class FragmentTransformer:
|
||||||
addr=port._addr,
|
addr=port._addr,
|
||||||
data=port._data,
|
data=port._data,
|
||||||
en=port._en,
|
en=port._en,
|
||||||
transparency=port._transparency,
|
transparent_for=port._transparent_for,
|
||||||
)
|
)
|
||||||
for port in fragment._read_ports
|
for port in fragment._read_ports
|
||||||
]
|
]
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
"""First-in first-out queues."""
|
"""First-in first-out queues."""
|
||||||
|
|
||||||
import warnings
|
|
||||||
|
|
||||||
from .. import *
|
from .. import *
|
||||||
from ..asserts import *
|
from ..asserts import *
|
||||||
from ..utils import ceil_log2
|
from ..utils import ceil_log2
|
||||||
from .coding import GrayEncoder, GrayDecoder
|
from .coding import GrayEncoder, GrayDecoder
|
||||||
from .cdc import FFSynchronizer, AsyncFFSynchronizer
|
from .cdc import FFSynchronizer, AsyncFFSynchronizer
|
||||||
|
from .memory import Memory
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["FIFOInterface", "SyncFIFO", "SyncFIFOBuffered", "AsyncFIFO", "AsyncFIFOBuffered"]
|
__all__ = ["FIFOInterface", "SyncFIFO", "SyncFIFOBuffered", "AsyncFIFO", "AsyncFIFOBuffered"]
|
||||||
|
@ -130,7 +129,7 @@ class SyncFIFO(Elaboratable, FIFOInterface):
|
||||||
do_read = self.r_rdy & self.r_en
|
do_read = self.r_rdy & self.r_en
|
||||||
do_write = self.w_rdy & self.w_en
|
do_write = self.w_rdy & self.w_en
|
||||||
|
|
||||||
storage = m.submodules.storage = Memory(width=self.width, depth=self.depth)
|
storage = m.submodules.storage = Memory(shape=self.width, depth=self.depth, init=[])
|
||||||
w_port = storage.write_port()
|
w_port = storage.write_port()
|
||||||
r_port = storage.read_port(domain="comb")
|
r_port = storage.read_port(domain="comb")
|
||||||
produce = Signal(range(self.depth))
|
produce = Signal(range(self.depth))
|
||||||
|
@ -257,9 +256,9 @@ class SyncFIFOBuffered(Elaboratable, FIFOInterface):
|
||||||
|
|
||||||
do_inner_read = inner_r_rdy & (~self.r_rdy | self.r_en)
|
do_inner_read = inner_r_rdy & (~self.r_rdy | self.r_en)
|
||||||
|
|
||||||
storage = m.submodules.storage = Memory(width=self.width, depth=inner_depth)
|
storage = m.submodules.storage = Memory(shape=self.width, depth=inner_depth, init=[])
|
||||||
w_port = storage.write_port()
|
w_port = storage.write_port()
|
||||||
r_port = storage.read_port(domain="sync", transparent=False)
|
r_port = storage.read_port(domain="sync")
|
||||||
produce = Signal(range(inner_depth))
|
produce = Signal(range(inner_depth))
|
||||||
consume = Signal(range(inner_depth))
|
consume = Signal(range(inner_depth))
|
||||||
|
|
||||||
|
@ -438,9 +437,9 @@ class AsyncFIFO(Elaboratable, FIFOInterface):
|
||||||
m.d[self._w_domain] += self.w_level.eq(produce_w_bin - consume_w_bin)
|
m.d[self._w_domain] += self.w_level.eq(produce_w_bin - consume_w_bin)
|
||||||
m.d.comb += self.r_level.eq(produce_r_bin - consume_r_bin)
|
m.d.comb += self.r_level.eq(produce_r_bin - consume_r_bin)
|
||||||
|
|
||||||
storage = m.submodules.storage = Memory(width=self.width, depth=self.depth)
|
storage = m.submodules.storage = Memory(shape=self.width, depth=self.depth, init=[])
|
||||||
w_port = storage.write_port(domain=self._w_domain)
|
w_port = storage.write_port(domain=self._w_domain)
|
||||||
r_port = storage.read_port (domain=self._r_domain, transparent=False)
|
r_port = storage.read_port (domain=self._r_domain)
|
||||||
m.d.comb += [
|
m.d.comb += [
|
||||||
w_port.addr.eq(produce_w_bin[:-1]),
|
w_port.addr.eq(produce_w_bin[:-1]),
|
||||||
w_port.data.eq(self.w_data),
|
w_port.data.eq(self.w_data),
|
||||||
|
|
430
amaranth/lib/memory.py
Normal file
430
amaranth/lib/memory.py
Normal file
|
@ -0,0 +1,430 @@
|
||||||
|
import operator
|
||||||
|
from collections import OrderedDict
|
||||||
|
from collections.abc import MutableSequence
|
||||||
|
|
||||||
|
from ..hdl import MemoryIdentity, MemoryInstance, Shape, ShapeCastable, Const
|
||||||
|
from ..hdl._mem import MemorySimRead
|
||||||
|
from ..utils import ceil_log2
|
||||||
|
from .data import ArrayLayout
|
||||||
|
from . import wiring
|
||||||
|
from .. import tracer
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["WritePort", "ReadPort", "Memory"]
|
||||||
|
|
||||||
|
|
||||||
|
class WritePort:
|
||||||
|
"""A memory write port.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
signature : :class:`WritePort.Signature`
|
||||||
|
The signature of the port.
|
||||||
|
memory : :class:`Memory` or ``None``
|
||||||
|
Memory associated with the port.
|
||||||
|
domain : str
|
||||||
|
Clock domain. Defaults to ``"sync"``. Writes have a latency of 1 clock cycle.
|
||||||
|
|
||||||
|
Attributes
|
||||||
|
----------
|
||||||
|
signature : :class:`WritePort.Signature`
|
||||||
|
memory : :class:`Memory`
|
||||||
|
domain : str
|
||||||
|
"""
|
||||||
|
|
||||||
|
class Signature(wiring.Signature):
|
||||||
|
"""A signature of a write port.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
addr_width : int
|
||||||
|
Address width in bits. If the port is associated with a :class:`Memory`,
|
||||||
|
it must be equal to :py:`ceil_log2(memory.depth)`.
|
||||||
|
shape : :ref:`shape-like <lang-shapelike>` object
|
||||||
|
The shape of the port data. If the port is associated with a :class:`Memory`,
|
||||||
|
it must be equal to its element shape.
|
||||||
|
granularity : int or ``None``
|
||||||
|
Port granularity. If ``None``, the entire storage element is written at once.
|
||||||
|
Otherwise, determines the size of access covered by a single bit of ``en``.
|
||||||
|
One of the following must hold:
|
||||||
|
|
||||||
|
- ``granularity is None``, in which case ``en_width == 1``, or
|
||||||
|
- ``shape == unsigned(data_width)`` and ``data_width == 0 or data_width % granularity == 0`` in which case ``en_width == data_width // granularity`` (or 0 if ``data_width == 0``)
|
||||||
|
- ``shape == amaranth.lib.data.ArrayLayout(_, elem_count)`` and ``elem_count == 0 or elem_count % granularity == 0`` in which case ``en_width == elem_count // granularity`` (or 0 if ``elem_count == 0``)
|
||||||
|
|
||||||
|
Members
|
||||||
|
-------
|
||||||
|
addr: :py:`unsigned(data_width)`
|
||||||
|
data: ``shape``
|
||||||
|
en: :py:`unsigned(en_width)`
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *, addr_width, shape, granularity=None):
|
||||||
|
if not isinstance(addr_width, int) or addr_width < 0:
|
||||||
|
raise TypeError(f"`addr_width` must be a non-negative int, not {addr_width!r}")
|
||||||
|
self._addr_width = addr_width
|
||||||
|
self._shape = shape
|
||||||
|
self._granularity = granularity
|
||||||
|
if granularity is None:
|
||||||
|
en_width = 1
|
||||||
|
elif not isinstance(granularity, int) or granularity < 0:
|
||||||
|
raise TypeError(f"Granularity must be a non-negative int or None, not {granularity!r}")
|
||||||
|
elif not isinstance(shape, ShapeCastable):
|
||||||
|
actual_shape = Shape.cast(shape)
|
||||||
|
if actual_shape.signed:
|
||||||
|
raise ValueError("Granularity cannot be specified with signed shape")
|
||||||
|
elif actual_shape.width == 0:
|
||||||
|
en_width = 0
|
||||||
|
elif granularity == 0:
|
||||||
|
raise ValueError("Granularity must be positive")
|
||||||
|
elif actual_shape.width % granularity != 0:
|
||||||
|
raise ValueError("Granularity must divide data width")
|
||||||
|
else:
|
||||||
|
en_width = actual_shape.width // granularity
|
||||||
|
elif isinstance(shape, ArrayLayout):
|
||||||
|
if shape.length == 0:
|
||||||
|
en_width = 0
|
||||||
|
elif granularity == 0:
|
||||||
|
raise ValueError("Granularity must be positive")
|
||||||
|
elif shape.length % granularity != 0:
|
||||||
|
raise ValueError("Granularity must divide data array length")
|
||||||
|
else:
|
||||||
|
en_width = shape.length // granularity
|
||||||
|
else:
|
||||||
|
raise TypeError("Granularity can only be specified for plain unsigned `Shape` or `ArrayLayout`")
|
||||||
|
super().__init__({
|
||||||
|
"addr": wiring.In(addr_width),
|
||||||
|
"data": wiring.In(shape),
|
||||||
|
"en": wiring.In(en_width),
|
||||||
|
})
|
||||||
|
|
||||||
|
@property
|
||||||
|
def addr_width(self):
|
||||||
|
return self._addr_width
|
||||||
|
|
||||||
|
@property
|
||||||
|
def shape(self):
|
||||||
|
return self._shape
|
||||||
|
|
||||||
|
@property
|
||||||
|
def granularity(self):
|
||||||
|
return self._granularity
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
granularity = f", granularity={self.granularity}" if self.granularity is not None else ""
|
||||||
|
return f"WritePort.Signature(addr_width={self.addr_width}, shape={self.shape}{granularity})"
|
||||||
|
|
||||||
|
|
||||||
|
def __init__(self, signature, *, memory, domain):
|
||||||
|
if not isinstance(signature, WritePort.Signature):
|
||||||
|
raise TypeError(f"Expected `WritePort.Signature`, not {signature!r}")
|
||||||
|
if memory is not None:
|
||||||
|
if not isinstance(memory, Memory):
|
||||||
|
raise TypeError(f"Expected `Memory` or `None`, not {memory!r}")
|
||||||
|
if signature.shape != memory.shape or Shape.cast(signature.shape) != Shape.cast(memory.shape):
|
||||||
|
raise ValueError(f"Memory shape {memory.shape!r} doesn't match port shape {signature.shape!r}")
|
||||||
|
if signature.addr_width != ceil_log2(memory.depth):
|
||||||
|
raise ValueError(f"Memory address width {ceil_log2(memory.depth)!r} doesn't match port address width {signature.addr_width!r}")
|
||||||
|
if not isinstance(domain, str):
|
||||||
|
raise TypeError(f"Domain has to be a string, not {domain!r}")
|
||||||
|
if domain == "comb":
|
||||||
|
raise ValueError("Write port domain cannot be \"comb\"")
|
||||||
|
self._signature = signature
|
||||||
|
self._memory = memory
|
||||||
|
self._domain = domain
|
||||||
|
self.__dict__.update(signature.members.create())
|
||||||
|
if memory is not None:
|
||||||
|
memory._w_ports.append(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def signature(self):
|
||||||
|
return self._signature
|
||||||
|
|
||||||
|
@property
|
||||||
|
def memory(self):
|
||||||
|
return self._memory
|
||||||
|
|
||||||
|
@property
|
||||||
|
def domain(self):
|
||||||
|
return self._domain
|
||||||
|
|
||||||
|
|
||||||
|
class ReadPort:
|
||||||
|
"""A memory read port.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
signature : :class:`ReadPort.Signature`
|
||||||
|
The signature of the port.
|
||||||
|
memory : :class:`Memory`
|
||||||
|
Memory associated with the port.
|
||||||
|
domain : str
|
||||||
|
Clock domain. Defaults to ``"sync"``. If set to ``"comb"``, the port is asynchronous.
|
||||||
|
Otherwise, the read data becomes available on the next clock cycle.
|
||||||
|
transparent_for : iterable of :class:`WritePort`
|
||||||
|
The set of write ports that this read port should be transparent with. All ports
|
||||||
|
must belong to the same memory and the same clock domain.
|
||||||
|
|
||||||
|
Attributes
|
||||||
|
----------
|
||||||
|
signature : :class:`ReadPort.Signature`
|
||||||
|
memory : :class:`Memory`
|
||||||
|
domain : str
|
||||||
|
transparent_for : tuple of :class:`WritePort`
|
||||||
|
"""
|
||||||
|
|
||||||
|
class Signature(wiring.Signature):
|
||||||
|
"""A signature of a read port.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
addr_width : int
|
||||||
|
Address width in bits. If the port is associated with a :class:`Memory`,
|
||||||
|
it must be equal to :py:`ceil_log2(memory.depth)`.
|
||||||
|
shape : :ref:`shape-like <lang-shapelike>` object
|
||||||
|
The shape of the port data. If the port is associated with a :class:`Memory`,
|
||||||
|
it must be equal to its element shape.
|
||||||
|
|
||||||
|
Members
|
||||||
|
-------
|
||||||
|
addr: :py:`unsigned(data_width)`
|
||||||
|
data: ``shape``
|
||||||
|
en: :py:`unsigned(1)`
|
||||||
|
The enable signal. If ``domain == "comb"``, this is tied to ``Const(1)``.
|
||||||
|
Otherwise it is a signal with ``init=1``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *, addr_width, shape):
|
||||||
|
if not isinstance(addr_width, int) or addr_width < 0:
|
||||||
|
raise TypeError(f"`addr_width` must be a non-negative int, not {addr_width!r}")
|
||||||
|
self._addr_width = addr_width
|
||||||
|
self._shape = shape
|
||||||
|
super().__init__({
|
||||||
|
"addr": wiring.In(addr_width),
|
||||||
|
"data": wiring.Out(shape),
|
||||||
|
"en": wiring.In(1, init=1),
|
||||||
|
})
|
||||||
|
|
||||||
|
@property
|
||||||
|
def addr_width(self):
|
||||||
|
return self._addr_width
|
||||||
|
|
||||||
|
@property
|
||||||
|
def shape(self):
|
||||||
|
return self._shape
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"ReadPort.Signature(addr_width={self.addr_width}, shape={self.shape})"
|
||||||
|
|
||||||
|
|
||||||
|
def __init__(self, signature, *, memory, domain, transparent_for=()):
|
||||||
|
if not isinstance(signature, ReadPort.Signature):
|
||||||
|
raise TypeError(f"Expected `ReadPort.Signature`, not {signature!r}")
|
||||||
|
if memory is not None:
|
||||||
|
if not isinstance(memory, Memory):
|
||||||
|
raise TypeError(f"Expected `Memory` or `None`, not {memory!r}")
|
||||||
|
if signature.shape != memory.shape or Shape.cast(signature.shape) != Shape.cast(memory.shape):
|
||||||
|
raise ValueError(f"Memory shape {memory.shape!r} doesn't match port shape {signature.shape!r}")
|
||||||
|
if signature.addr_width != ceil_log2(memory.depth):
|
||||||
|
raise ValueError(f"Memory address width {ceil_log2(memory.depth)!r} doesn't match port address width {signature.addr_width!r}")
|
||||||
|
if not isinstance(domain, str):
|
||||||
|
raise TypeError(f"Domain has to be a string, not {domain!r}")
|
||||||
|
transparent_for = tuple(transparent_for)
|
||||||
|
for port in transparent_for:
|
||||||
|
if not isinstance(port, WritePort):
|
||||||
|
raise TypeError("`transparent_for` must contain only `WritePort` instances")
|
||||||
|
if memory is not None and port not in memory._w_ports:
|
||||||
|
raise ValueError("Transparent write ports must belong to the same memory")
|
||||||
|
if port.domain != domain:
|
||||||
|
raise ValueError("Transparent write ports must belong to the same domain")
|
||||||
|
self._signature = signature
|
||||||
|
self._memory = memory
|
||||||
|
self._domain = domain
|
||||||
|
self._transparent_for = transparent_for
|
||||||
|
self.__dict__.update(signature.members.create())
|
||||||
|
if domain == "comb":
|
||||||
|
self.en = Const(1)
|
||||||
|
if memory is not None:
|
||||||
|
memory._r_ports.append(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def signature(self):
|
||||||
|
return self._signature
|
||||||
|
|
||||||
|
@property
|
||||||
|
def memory(self):
|
||||||
|
return self._memory
|
||||||
|
|
||||||
|
@property
|
||||||
|
def domain(self):
|
||||||
|
return self._domain
|
||||||
|
|
||||||
|
@property
|
||||||
|
def transparent_for(self):
|
||||||
|
return self._transparent_for
|
||||||
|
|
||||||
|
|
||||||
|
class Memory(wiring.Component):
|
||||||
|
"""A word addressable storage.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
shape : :ref:`shape-like <lang-shapelike>` object
|
||||||
|
The shape of a single element of the storage.
|
||||||
|
depth : int
|
||||||
|
Word count. This memory contains ``depth`` storage elements.
|
||||||
|
init : iterable of int or of any objects accepted by ``shape.const()``
|
||||||
|
Initial values. At power on, each storage element in this memory is initialized to
|
||||||
|
the corresponding element of ``init``, if any, or to the default value of ``shape`` otherwise.
|
||||||
|
Uninitialized memories are not currently supported.
|
||||||
|
attrs : dict
|
||||||
|
Dictionary of synthesis attributes.
|
||||||
|
|
||||||
|
Attributes
|
||||||
|
----------
|
||||||
|
shape : :ref:`shape-like <lang-shapelike>`
|
||||||
|
depth : int
|
||||||
|
init : :class:`Memory.Init`
|
||||||
|
attrs : dict
|
||||||
|
r_ports : tuple of :class:`ReadPort`
|
||||||
|
w_ports : tuple of :class:`WritePort`
|
||||||
|
"""
|
||||||
|
|
||||||
|
class Init(MutableSequence):
|
||||||
|
"""Initial data of a :class:`Memory`.
|
||||||
|
|
||||||
|
This is a container implementing the ``MutableSequence`` protocol, enforcing two constraints:
|
||||||
|
|
||||||
|
- the length is immutable and must equal ``depth``
|
||||||
|
- if ``shape`` is a :class:`ShapeCastable`, each element can be cast to ``shape`` via :py:`shape.const()`
|
||||||
|
- otherwise, each element is an :py:`int`
|
||||||
|
"""
|
||||||
|
def __init__(self, items, *, shape, depth):
|
||||||
|
Shape.cast(shape)
|
||||||
|
if not isinstance(depth, int) or depth < 0:
|
||||||
|
raise TypeError("Memory depth must be a non-negative integer, not {!r}"
|
||||||
|
.format(depth))
|
||||||
|
self._shape = shape
|
||||||
|
self._depth = depth
|
||||||
|
if isinstance(shape, ShapeCastable):
|
||||||
|
self._items = [None] * depth
|
||||||
|
default = Const.cast(shape.const(None)).value
|
||||||
|
self._raw = [default] * depth
|
||||||
|
else:
|
||||||
|
self._raw = self._items = [0] * depth
|
||||||
|
try:
|
||||||
|
for idx, item in enumerate(items):
|
||||||
|
self[idx] = item
|
||||||
|
except (TypeError, ValueError) as e:
|
||||||
|
raise type(e)("Memory initialization value at address {:x}: {}"
|
||||||
|
.format(idx, e)) from None
|
||||||
|
|
||||||
|
def __getitem__(self, index):
|
||||||
|
return self._items[index]
|
||||||
|
|
||||||
|
def __setitem__(self, index, value):
|
||||||
|
if isinstance(index, slice):
|
||||||
|
start, stop, step = index.indices(len(self._items))
|
||||||
|
indices = range(start, stop, step)
|
||||||
|
if len(value) != len(indices):
|
||||||
|
raise ValueError("Changing length of Memory.init is not allowed")
|
||||||
|
for actual_index, actual_value in zip(indices, value):
|
||||||
|
self[actual_index] = actual_value
|
||||||
|
else:
|
||||||
|
if isinstance(self._shape, ShapeCastable):
|
||||||
|
self._raw[index] = Const.cast(self._shape.const(value)).value
|
||||||
|
else:
|
||||||
|
value = operator.index(value)
|
||||||
|
self._items[index] = value
|
||||||
|
|
||||||
|
def __delitem__(self, index):
|
||||||
|
raise TypeError("Deleting items from Memory.init is not allowed")
|
||||||
|
|
||||||
|
def insert(self, index, value):
|
||||||
|
raise TypeError("Inserting items into Memory.init is not allowed")
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return self._depth
|
||||||
|
|
||||||
|
@property
|
||||||
|
def depth(self):
|
||||||
|
return self._depth
|
||||||
|
|
||||||
|
@property
|
||||||
|
def shape(self):
|
||||||
|
return self._shape
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"Memory.Init({self._items!r})"
|
||||||
|
|
||||||
|
def __init__(self, *, depth, shape, init, attrs=None, src_loc_at=0, src_loc=None):
|
||||||
|
# shape and depth validation performed in Memory.Init constructor.
|
||||||
|
self._depth = depth
|
||||||
|
self._shape = shape
|
||||||
|
self._init = Memory.Init(init, shape=shape, depth=depth)
|
||||||
|
self._attrs = {} if attrs is None else dict(attrs)
|
||||||
|
self.src_loc = src_loc or tracer.get_src_loc(src_loc_at=src_loc_at)
|
||||||
|
self._identity = MemoryIdentity()
|
||||||
|
self._r_ports: "list[ReadPort]" = []
|
||||||
|
self._w_ports: "list[WritePort]" = []
|
||||||
|
super().__init__(wiring.Signature({}))
|
||||||
|
|
||||||
|
def read_port(self, *, domain="sync", transparent_for=()):
|
||||||
|
"""Adds a new read port and returns it.
|
||||||
|
|
||||||
|
Equivalent to creating a :class:`ReadPort` with a signature of :py:`ReadPort.Signature(addr_width=ceil_log2(self.depth), shape=self.shape)`
|
||||||
|
"""
|
||||||
|
signature = ReadPort.Signature(addr_width=ceil_log2(self.depth), shape=self.shape)
|
||||||
|
return ReadPort(signature, memory=self, domain=domain, transparent_for=transparent_for)
|
||||||
|
|
||||||
|
def write_port(self, *, domain="sync", granularity=None):
|
||||||
|
"""Adds a new write port and returns it.
|
||||||
|
|
||||||
|
Equivalent to creating a :class:`WritePort` with a signature of :py:`WritePort.Signature(addr_width=ceil_log2(self.depth), shape=self.shape, granularity=granularity)`
|
||||||
|
"""
|
||||||
|
signature = WritePort.Signature(addr_width=ceil_log2(self.depth), shape=self.shape, granularity=granularity)
|
||||||
|
return WritePort(signature, memory=self, domain=domain)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def depth(self):
|
||||||
|
return self._depth
|
||||||
|
|
||||||
|
@property
|
||||||
|
def shape(self):
|
||||||
|
return self._shape
|
||||||
|
|
||||||
|
@property
|
||||||
|
def init(self):
|
||||||
|
return self._init
|
||||||
|
|
||||||
|
@property
|
||||||
|
def attrs(self):
|
||||||
|
return self._attrs
|
||||||
|
|
||||||
|
@property
|
||||||
|
def w_ports(self):
|
||||||
|
"""Returns a tuple of all write ports defined so far."""
|
||||||
|
return tuple(self._w_ports)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def r_ports(self):
|
||||||
|
"""Returns a tuple of all read ports defined so far."""
|
||||||
|
return tuple(self._r_ports)
|
||||||
|
|
||||||
|
def elaborate(self, platform):
|
||||||
|
if hasattr(platform, "get_memory"):
|
||||||
|
return platform.get_memory(self)
|
||||||
|
shape = Shape.cast(self.shape)
|
||||||
|
instance = MemoryInstance(identity=self._identity, width=shape.width, depth=self.depth, init=self.init._raw, attrs=self.attrs, src_loc=self.src_loc)
|
||||||
|
w_ports = {}
|
||||||
|
for port in self._w_ports:
|
||||||
|
idx = instance.write_port(domain=port.domain, addr=port.addr, data=port.data, en=port.en)
|
||||||
|
w_ports[port] = idx
|
||||||
|
for port in self._r_ports:
|
||||||
|
transparent_for = [w_ports[write_port] for write_port in port.transparent_for]
|
||||||
|
instance.read_port(domain=port.domain, data=port.data, addr=port.addr, en=port.en, transparent_for=transparent_for)
|
||||||
|
return instance
|
||||||
|
|
||||||
|
def __getitem__(self, index):
|
||||||
|
"""Simulation only."""
|
||||||
|
return MemorySimRead(self._identity, index)
|
|
@ -505,7 +505,7 @@ class _FragmentCompiler:
|
||||||
addr = emitter.def_var("read_addr", f"({(1 << len(port._addr)) - 1:#x} & {addr})")
|
addr = emitter.def_var("read_addr", f"({(1 << len(port._addr)) - 1:#x} & {addr})")
|
||||||
data = emitter.def_var("read_data", f"slots[{memory_index}].read({addr})")
|
data = emitter.def_var("read_data", f"slots[{memory_index}].read({addr})")
|
||||||
|
|
||||||
for idx in port._transparency:
|
for idx in port._transparent_for:
|
||||||
waddr, wdata, wen = write_vals[idx]
|
waddr, wdata, wen = write_vals[idx]
|
||||||
emitter.append(f"if {addr} == {waddr}:")
|
emitter.append(f"if {addr} == {waddr}:")
|
||||||
with emitter.indent():
|
with emitter.indent():
|
||||||
|
|
|
@ -85,7 +85,7 @@ class _VCDWriter:
|
||||||
trace_names[trace_signal] = {("bench", name)}
|
trace_names[trace_signal] = {("bench", name)}
|
||||||
assigned_names.add(name)
|
assigned_names.add(name)
|
||||||
self.traces.append(trace_signal)
|
self.traces.append(trace_signal)
|
||||||
elif isinstance(trace, (MemoryInstance, Memory)):
|
elif hasattr(trace, "_identity") and isinstance(trace._identity, MemoryIdentity):
|
||||||
if not trace._identity in memories:
|
if not trace._identity in memories:
|
||||||
raise ValueError(f"{trace!r} is a memory not part of the elaborated design")
|
raise ValueError(f"{trace!r} is a memory not part of the elaborated design")
|
||||||
self.traces.append(trace._identity)
|
self.traces.append(trace._identity)
|
||||||
|
|
|
@ -32,6 +32,7 @@ Apply the following changes to code written against Amaranth 0.4 to migrate it t
|
||||||
* Update uses of ``reset=`` keyword argument to ``init=``
|
* Update uses of ``reset=`` keyword argument to ``init=``
|
||||||
* Convert uses of ``Simulator.add_sync_process`` used as testbenches to ``Simulator.add_testbench``
|
* Convert uses of ``Simulator.add_sync_process`` used as testbenches to ``Simulator.add_testbench``
|
||||||
* Convert other uses of ``Simulator.add_sync_process`` to ``Simulator.add_process``
|
* Convert other uses of ``Simulator.add_sync_process`` to ``Simulator.add_process``
|
||||||
|
* Replace uses of ``amaranth.hdl.Memory`` with ``amaranth.lib.memory.Memory``
|
||||||
|
|
||||||
|
|
||||||
Implemented RFCs
|
Implemented RFCs
|
||||||
|
@ -41,12 +42,14 @@ Implemented RFCs
|
||||||
.. _RFC 27: https://amaranth-lang.org/rfcs/0027-simulator-testbenches.html
|
.. _RFC 27: https://amaranth-lang.org/rfcs/0027-simulator-testbenches.html
|
||||||
.. _RFC 39: https://amaranth-lang.org/rfcs/0039-empty-case.html
|
.. _RFC 39: https://amaranth-lang.org/rfcs/0039-empty-case.html
|
||||||
.. _RFC 43: https://amaranth-lang.org/rfcs/0043-rename-reset-to-init.html
|
.. _RFC 43: https://amaranth-lang.org/rfcs/0043-rename-reset-to-init.html
|
||||||
|
.. _RFC 45: https://amaranth-lang.org/rfcs/0045-lib-memory.html
|
||||||
.. _RFC 46: https://amaranth-lang.org/rfcs/0046-shape-range-1.html
|
.. _RFC 46: https://amaranth-lang.org/rfcs/0046-shape-range-1.html
|
||||||
|
|
||||||
* `RFC 17`_: Remove ``log2_int``
|
* `RFC 17`_: Remove ``log2_int``
|
||||||
* `RFC 27`_: Testbench processes for the simulator
|
* `RFC 27`_: Testbench processes for the simulator
|
||||||
* `RFC 39`_: Change semantics of no-argument ``m.Case()``
|
* `RFC 39`_: Change semantics of no-argument ``m.Case()``
|
||||||
* `RFC 43`_: Rename ``reset=`` to ``init=``
|
* `RFC 43`_: Rename ``reset=`` to ``init=``
|
||||||
|
* `RFC 45`_: Move ``hdl.Memory`` to ``lib.Memory``
|
||||||
* `RFC 46`_: Change ``Shape.cast(range(1))`` to ``unsigned(0)``
|
* `RFC 46`_: Change ``Shape.cast(range(1))`` to ``unsigned(0)``
|
||||||
|
|
||||||
|
|
||||||
|
@ -65,6 +68,7 @@ Language changes
|
||||||
* Changed: the ``reset=`` argument of :class:`Signal`, :meth:`Signal.like`, :class:`amaranth.lib.wiring.Member`, :class:`amaranth.lib.cdc.FFSynchronizer`, and ``m.FSM()`` has been renamed to ``init=``. (`RFC 43`_)
|
* Changed: the ``reset=`` argument of :class:`Signal`, :meth:`Signal.like`, :class:`amaranth.lib.wiring.Member`, :class:`amaranth.lib.cdc.FFSynchronizer`, and ``m.FSM()`` has been renamed to ``init=``. (`RFC 43`_)
|
||||||
* Changed: :class:`Shape` has been made immutable and hashable.
|
* Changed: :class:`Shape` has been made immutable and hashable.
|
||||||
* Deprecated: :func:`amaranth.utils.log2_int`. (`RFC 17`_)
|
* Deprecated: :func:`amaranth.utils.log2_int`. (`RFC 17`_)
|
||||||
|
* Deprecated: :class:`amaranth.hdl.Memory`. (`RFC 45`_)
|
||||||
* Removed: (deprecated in 0.4) :meth:`Const.normalize`. (`RFC 5`_)
|
* Removed: (deprecated in 0.4) :meth:`Const.normalize`. (`RFC 5`_)
|
||||||
* Removed: (deprecated in 0.4) :class:`Repl`. (`RFC 10`_)
|
* Removed: (deprecated in 0.4) :class:`Repl`. (`RFC 10`_)
|
||||||
* Removed: (deprecated in 0.4) :class:`ast.Sample`, :class:`ast.Past`, :class:`ast.Stable`, :class:`ast.Rose`, :class:`ast.Fell`.
|
* Removed: (deprecated in 0.4) :class:`ast.Sample`, :class:`ast.Past`, :class:`ast.Stable`, :class:`ast.Rose`, :class:`ast.Fell`.
|
||||||
|
@ -75,6 +79,7 @@ Standard library changes
|
||||||
|
|
||||||
.. currentmodule:: amaranth.lib
|
.. currentmodule:: amaranth.lib
|
||||||
|
|
||||||
|
* Added: :mod:`amaranth.lib.memory`. (`RFC 45`_)
|
||||||
* Removed: (deprecated in 0.4) :mod:`amaranth.lib.scheduler`. (`RFC 19`_)
|
* Removed: (deprecated in 0.4) :mod:`amaranth.lib.scheduler`. (`RFC 19`_)
|
||||||
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.FIFOInterface` with ``fwft=False``. (`RFC 20`_)
|
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.FIFOInterface` with ``fwft=False``. (`RFC 20`_)
|
||||||
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.SyncFIFO` with ``fwft=False``. (`RFC 20`_)
|
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.SyncFIFO` with ``fwft=False``. (`RFC 20`_)
|
||||||
|
|
|
@ -4,7 +4,7 @@ Standard library
|
||||||
The :mod:`amaranth.lib` module, also known as the standard library, provides modules that falls into one of the three categories:
|
The :mod:`amaranth.lib` module, also known as the standard library, provides modules that falls into one of the three categories:
|
||||||
|
|
||||||
1. Modules that will used by essentially all idiomatic Amaranth code, and are necessary for interoperability. This includes :mod:`amaranth.lib.enum` (enumerations), :mod:`amaranth.lib.data` (data structures), and :mod:`amaranth.lib.wiring` (interfaces and components).
|
1. Modules that will used by essentially all idiomatic Amaranth code, and are necessary for interoperability. This includes :mod:`amaranth.lib.enum` (enumerations), :mod:`amaranth.lib.data` (data structures), and :mod:`amaranth.lib.wiring` (interfaces and components).
|
||||||
2. Modules that abstract common functionality whose implementation differs between hardware platforms. This includes :mod:`amaranth.lib.cdc`.
|
2. Modules that abstract common functionality whose implementation differs between hardware platforms. This includes :mod:`amaranth.lib.cdc`, :mod:`amaranth.lib.memory`.
|
||||||
3. Modules that have essentially one correct implementation and are of broad utility in digital designs. This includes :mod:`amaranth.lib.coding`, :mod:`amaranth.lib.fifo`, and :mod:`amaranth.lib.crc`.
|
3. Modules that have essentially one correct implementation and are of broad utility in digital designs. This includes :mod:`amaranth.lib.coding`, :mod:`amaranth.lib.fifo`, and :mod:`amaranth.lib.crc`.
|
||||||
|
|
||||||
As part of the Amaranth backwards compatibility guarantee, any behaviors described in these documents will not change from a version to another without at least one version including a warning about the impending change. Any nontrivial change to these behaviors must also go through the public review as a part of the `Amaranth Request for Comments process <https://amaranth-lang.org/rfcs/>`_.
|
As part of the Amaranth backwards compatibility guarantee, any behaviors described in these documents will not change from a version to another without at least one version including a warning about the impending change. Any nontrivial change to these behaviors must also go through the public review as a part of the `Amaranth Request for Comments process <https://amaranth-lang.org/rfcs/>`_.
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
from amaranth import *
|
from amaranth import *
|
||||||
|
from amaranth.lib.memory import Memory
|
||||||
from amaranth.cli import main
|
from amaranth.cli import main
|
||||||
|
|
||||||
|
|
||||||
|
@ -8,12 +9,13 @@ class RegisterFile(Elaboratable):
|
||||||
self.dat_r = Signal(8)
|
self.dat_r = Signal(8)
|
||||||
self.dat_w = Signal(8)
|
self.dat_w = Signal(8)
|
||||||
self.we = Signal()
|
self.we = Signal()
|
||||||
self.mem = Memory(width=8, depth=16, init=[0xaa, 0x55])
|
self.mem = Memory(shape=8, depth=16, init=[0xaa, 0x55])
|
||||||
|
|
||||||
def elaborate(self, platform):
|
def elaborate(self, platform):
|
||||||
m = Module()
|
m = Module()
|
||||||
m.submodules.rdport = rdport = self.mem.read_port()
|
m.submodules.mem = self.mem
|
||||||
m.submodules.wrport = wrport = self.mem.write_port()
|
rdport = self.mem.read_port()
|
||||||
|
wrport = self.mem.write_port()
|
||||||
m.d.comb += [
|
m.d.comb += [
|
||||||
rdport.addr.eq(self.adr),
|
rdport.addr.eq(self.adr),
|
||||||
self.dat_r.eq(rdport.data),
|
self.dat_r.eq(rdport.data),
|
||||||
|
|
|
@ -2,113 +2,128 @@
|
||||||
|
|
||||||
from amaranth.hdl._ast import *
|
from amaranth.hdl._ast import *
|
||||||
from amaranth.hdl._mem import *
|
from amaranth.hdl._mem import *
|
||||||
|
from amaranth._utils import _ignore_deprecated
|
||||||
|
|
||||||
from .utils import *
|
from .utils import *
|
||||||
|
|
||||||
|
|
||||||
class MemoryTestCase(FHDLTestCase):
|
class MemoryTestCase(FHDLTestCase):
|
||||||
def test_name(self):
|
def test_name(self):
|
||||||
m1 = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
self.assertEqual(m1.name, "m1")
|
m1 = Memory(width=8, depth=4)
|
||||||
m2 = [Memory(width=8, depth=4)][0]
|
self.assertEqual(m1.name, "m1")
|
||||||
self.assertEqual(m2.name, "$memory")
|
m2 = [Memory(width=8, depth=4)][0]
|
||||||
m3 = Memory(width=8, depth=4, name="foo")
|
self.assertEqual(m2.name, "$memory")
|
||||||
self.assertEqual(m3.name, "foo")
|
m3 = Memory(width=8, depth=4, name="foo")
|
||||||
|
self.assertEqual(m3.name, "foo")
|
||||||
|
|
||||||
def test_geometry(self):
|
def test_geometry(self):
|
||||||
m = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
self.assertEqual(m.width, 8)
|
m = Memory(width=8, depth=4)
|
||||||
self.assertEqual(m.depth, 4)
|
self.assertEqual(m.width, 8)
|
||||||
|
self.assertEqual(m.depth, 4)
|
||||||
|
|
||||||
def test_geometry_wrong(self):
|
def test_geometry_wrong(self):
|
||||||
with self.assertRaisesRegex(TypeError,
|
with _ignore_deprecated():
|
||||||
r"^Memory width must be a non-negative integer, not -1$"):
|
with self.assertRaisesRegex(TypeError,
|
||||||
m = Memory(width=-1, depth=4)
|
r"^Memory width must be a non-negative integer, not -1$"):
|
||||||
with self.assertRaisesRegex(TypeError,
|
m = Memory(width=-1, depth=4)
|
||||||
r"^Memory depth must be a non-negative integer, not -1$"):
|
with self.assertRaisesRegex(TypeError,
|
||||||
m = Memory(width=8, depth=-1)
|
r"^Memory depth must be a non-negative integer, not -1$"):
|
||||||
|
m = Memory(width=8, depth=-1)
|
||||||
|
|
||||||
def test_init(self):
|
def test_init(self):
|
||||||
m = Memory(width=8, depth=4, init=range(4))
|
with _ignore_deprecated():
|
||||||
self.assertEqual(m.init, [0, 1, 2, 3])
|
m = Memory(width=8, depth=4, init=range(4))
|
||||||
|
self.assertEqual(m.init, [0, 1, 2, 3])
|
||||||
|
|
||||||
def test_init_wrong_count(self):
|
def test_init_wrong_count(self):
|
||||||
with self.assertRaisesRegex(ValueError,
|
with _ignore_deprecated():
|
||||||
r"^Memory initialization value count exceed memory depth \(8 > 4\)$"):
|
with self.assertRaisesRegex(ValueError,
|
||||||
m = Memory(width=8, depth=4, init=range(8))
|
r"^Memory initialization value count exceed memory depth \(8 > 4\)$"):
|
||||||
|
m = Memory(width=8, depth=4, init=range(8))
|
||||||
|
|
||||||
def test_init_wrong_type(self):
|
def test_init_wrong_type(self):
|
||||||
with self.assertRaisesRegex(TypeError,
|
with _ignore_deprecated():
|
||||||
(r"^Memory initialization value at address 1: "
|
with self.assertRaisesRegex(TypeError,
|
||||||
r"'str' object cannot be interpreted as an integer$")):
|
(r"^Memory initialization value at address 1: "
|
||||||
m = Memory(width=8, depth=4, init=[1, "0"])
|
r"'str' object cannot be interpreted as an integer$")):
|
||||||
|
m = Memory(width=8, depth=4, init=[1, "0"])
|
||||||
|
|
||||||
def test_attrs(self):
|
def test_attrs(self):
|
||||||
m1 = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
self.assertEqual(m1.attrs, {})
|
m1 = Memory(width=8, depth=4)
|
||||||
m2 = Memory(width=8, depth=4, attrs={"ram_block": True})
|
self.assertEqual(m1.attrs, {})
|
||||||
self.assertEqual(m2.attrs, {"ram_block": True})
|
m2 = Memory(width=8, depth=4, attrs={"ram_block": True})
|
||||||
|
self.assertEqual(m2.attrs, {"ram_block": True})
|
||||||
|
|
||||||
def test_read_port_transparent(self):
|
def test_read_port_transparent(self):
|
||||||
mem = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
rdport = mem.read_port()
|
mem = Memory(width=8, depth=4)
|
||||||
self.assertEqual(rdport.memory, mem)
|
rdport = mem.read_port()
|
||||||
self.assertEqual(rdport.domain, "sync")
|
self.assertEqual(rdport.memory, mem)
|
||||||
self.assertEqual(rdport.transparent, True)
|
self.assertEqual(rdport.domain, "sync")
|
||||||
self.assertEqual(len(rdport.addr), 2)
|
self.assertEqual(rdport.transparent, True)
|
||||||
self.assertEqual(len(rdport.data), 8)
|
self.assertEqual(len(rdport.addr), 2)
|
||||||
self.assertEqual(len(rdport.en), 1)
|
self.assertEqual(len(rdport.data), 8)
|
||||||
self.assertIsInstance(rdport.en, Signal)
|
self.assertEqual(len(rdport.en), 1)
|
||||||
self.assertEqual(rdport.en.init, 1)
|
self.assertIsInstance(rdport.en, Signal)
|
||||||
|
self.assertEqual(rdport.en.init, 1)
|
||||||
|
|
||||||
def test_read_port_non_transparent(self):
|
def test_read_port_non_transparent(self):
|
||||||
mem = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
rdport = mem.read_port(transparent=False)
|
mem = Memory(width=8, depth=4)
|
||||||
self.assertEqual(rdport.memory, mem)
|
rdport = mem.read_port(transparent=False)
|
||||||
self.assertEqual(rdport.domain, "sync")
|
self.assertEqual(rdport.memory, mem)
|
||||||
self.assertEqual(rdport.transparent, False)
|
self.assertEqual(rdport.domain, "sync")
|
||||||
self.assertEqual(len(rdport.en), 1)
|
self.assertEqual(rdport.transparent, False)
|
||||||
self.assertIsInstance(rdport.en, Signal)
|
self.assertEqual(len(rdport.en), 1)
|
||||||
self.assertEqual(rdport.en.init, 1)
|
self.assertIsInstance(rdport.en, Signal)
|
||||||
|
self.assertEqual(rdport.en.init, 1)
|
||||||
|
|
||||||
def test_read_port_asynchronous(self):
|
def test_read_port_asynchronous(self):
|
||||||
mem = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
rdport = mem.read_port(domain="comb")
|
mem = Memory(width=8, depth=4)
|
||||||
self.assertEqual(rdport.memory, mem)
|
rdport = mem.read_port(domain="comb")
|
||||||
self.assertEqual(rdport.domain, "comb")
|
self.assertEqual(rdport.memory, mem)
|
||||||
self.assertEqual(rdport.transparent, True)
|
self.assertEqual(rdport.domain, "comb")
|
||||||
self.assertEqual(len(rdport.en), 1)
|
self.assertEqual(rdport.transparent, True)
|
||||||
self.assertIsInstance(rdport.en, Const)
|
self.assertEqual(len(rdport.en), 1)
|
||||||
self.assertEqual(rdport.en.value, 1)
|
self.assertIsInstance(rdport.en, Const)
|
||||||
|
self.assertEqual(rdport.en.value, 1)
|
||||||
|
|
||||||
def test_read_port_wrong(self):
|
def test_read_port_wrong(self):
|
||||||
mem = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
with self.assertRaisesRegex(ValueError,
|
mem = Memory(width=8, depth=4)
|
||||||
r"^Read port cannot be simultaneously asynchronous and non-transparent$"):
|
with self.assertRaisesRegex(ValueError,
|
||||||
mem.read_port(domain="comb", transparent=False)
|
r"^Read port cannot be simultaneously asynchronous and non-transparent$"):
|
||||||
|
mem.read_port(domain="comb", transparent=False)
|
||||||
|
|
||||||
def test_write_port(self):
|
def test_write_port(self):
|
||||||
mem = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
wrport = mem.write_port()
|
mem = Memory(width=8, depth=4)
|
||||||
self.assertEqual(wrport.memory, mem)
|
wrport = mem.write_port()
|
||||||
self.assertEqual(wrport.domain, "sync")
|
self.assertEqual(wrport.memory, mem)
|
||||||
self.assertEqual(wrport.granularity, 8)
|
self.assertEqual(wrport.domain, "sync")
|
||||||
self.assertEqual(len(wrport.addr), 2)
|
self.assertEqual(wrport.granularity, 8)
|
||||||
self.assertEqual(len(wrport.data), 8)
|
self.assertEqual(len(wrport.addr), 2)
|
||||||
self.assertEqual(len(wrport.en), 1)
|
self.assertEqual(len(wrport.data), 8)
|
||||||
|
self.assertEqual(len(wrport.en), 1)
|
||||||
|
|
||||||
def test_write_port_granularity(self):
|
def test_write_port_granularity(self):
|
||||||
mem = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
wrport = mem.write_port(granularity=2)
|
mem = Memory(width=8, depth=4)
|
||||||
self.assertEqual(wrport.memory, mem)
|
wrport = mem.write_port(granularity=2)
|
||||||
self.assertEqual(wrport.domain, "sync")
|
self.assertEqual(wrport.memory, mem)
|
||||||
self.assertEqual(wrport.granularity, 2)
|
self.assertEqual(wrport.domain, "sync")
|
||||||
self.assertEqual(len(wrport.addr), 2)
|
self.assertEqual(wrport.granularity, 2)
|
||||||
self.assertEqual(len(wrport.data), 8)
|
self.assertEqual(len(wrport.addr), 2)
|
||||||
self.assertEqual(len(wrport.en), 4)
|
self.assertEqual(len(wrport.data), 8)
|
||||||
|
self.assertEqual(len(wrport.en), 4)
|
||||||
|
|
||||||
def test_write_port_granularity_wrong(self):
|
def test_write_port_granularity_wrong(self):
|
||||||
mem = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
|
mem = Memory(width=8, depth=4)
|
||||||
with self.assertRaisesRegex(TypeError,
|
with self.assertRaisesRegex(TypeError,
|
||||||
r"^Write port granularity must be a non-negative integer, not -1$"):
|
r"^Write port granularity must be a non-negative integer, not -1$"):
|
||||||
mem.write_port(granularity=-1)
|
mem.write_port(granularity=-1)
|
||||||
|
@ -119,20 +134,32 @@ class MemoryTestCase(FHDLTestCase):
|
||||||
r"^Write port granularity must divide memory width evenly$"):
|
r"^Write port granularity must divide memory width evenly$"):
|
||||||
mem.write_port(granularity=3)
|
mem.write_port(granularity=3)
|
||||||
|
|
||||||
|
def test_deprecated(self):
|
||||||
|
with self.assertWarnsRegex(DeprecationWarning,
|
||||||
|
r"^`amaranth.hdl.Memory` is deprecated.*$"):
|
||||||
|
mem = Memory(width=8, depth=4)
|
||||||
|
|
||||||
|
|
||||||
class DummyPortTestCase(FHDLTestCase):
|
class DummyPortTestCase(FHDLTestCase):
|
||||||
def test_name(self):
|
def test_name(self):
|
||||||
p1 = DummyPort(data_width=8, addr_width=2)
|
with _ignore_deprecated():
|
||||||
self.assertEqual(p1.addr.name, "p1_addr")
|
p1 = DummyPort(data_width=8, addr_width=2)
|
||||||
p2 = [DummyPort(data_width=8, addr_width=2)][0]
|
self.assertEqual(p1.addr.name, "p1_addr")
|
||||||
self.assertEqual(p2.addr.name, "dummy_addr")
|
p2 = [DummyPort(data_width=8, addr_width=2)][0]
|
||||||
p3 = DummyPort(data_width=8, addr_width=2, name="foo")
|
self.assertEqual(p2.addr.name, "dummy_addr")
|
||||||
self.assertEqual(p3.addr.name, "foo_addr")
|
p3 = DummyPort(data_width=8, addr_width=2, name="foo")
|
||||||
|
self.assertEqual(p3.addr.name, "foo_addr")
|
||||||
|
|
||||||
def test_sizes(self):
|
def test_sizes(self):
|
||||||
p1 = DummyPort(data_width=8, addr_width=2)
|
with _ignore_deprecated():
|
||||||
self.assertEqual(p1.addr.width, 2)
|
p1 = DummyPort(data_width=8, addr_width=2)
|
||||||
self.assertEqual(p1.data.width, 8)
|
self.assertEqual(p1.addr.width, 2)
|
||||||
self.assertEqual(p1.en.width, 1)
|
self.assertEqual(p1.data.width, 8)
|
||||||
p2 = DummyPort(data_width=8, addr_width=2, granularity=2)
|
self.assertEqual(p1.en.width, 1)
|
||||||
self.assertEqual(p2.en.width, 4)
|
p2 = DummyPort(data_width=8, addr_width=2, granularity=2)
|
||||||
|
self.assertEqual(p2.en.width, 4)
|
||||||
|
|
||||||
|
def test_deprecated(self):
|
||||||
|
with self.assertWarnsRegex(DeprecationWarning,
|
||||||
|
r"^`DummyPort` is deprecated.*$"):
|
||||||
|
DummyPort(data_width=8, addr_width=2)
|
||||||
|
|
|
@ -128,7 +128,8 @@ class DomainRenamerTestCase(FHDLTestCase):
|
||||||
|
|
||||||
def test_rename_mem_ports(self):
|
def test_rename_mem_ports(self):
|
||||||
m = Module()
|
m = Module()
|
||||||
mem = Memory(depth=4, width=16)
|
with _ignore_deprecated():
|
||||||
|
mem = Memory(depth=4, width=16)
|
||||||
m.submodules.mem = mem
|
m.submodules.mem = mem
|
||||||
mem.read_port(domain="a")
|
mem.read_port(domain="a")
|
||||||
mem.read_port(domain="b")
|
mem.read_port(domain="b")
|
||||||
|
@ -397,7 +398,8 @@ class EnableInserterTestCase(FHDLTestCase):
|
||||||
""")
|
""")
|
||||||
|
|
||||||
def test_enable_read_port(self):
|
def test_enable_read_port(self):
|
||||||
mem = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
|
mem = Memory(width=8, depth=4)
|
||||||
mem.read_port(transparent=False)
|
mem.read_port(transparent=False)
|
||||||
f = EnableInserter(self.c1)(mem).elaborate(platform=None)
|
f = EnableInserter(self.c1)(mem).elaborate(platform=None)
|
||||||
self.assertRepr(f._read_ports[0]._en, """
|
self.assertRepr(f._read_ports[0]._en, """
|
||||||
|
@ -405,7 +407,8 @@ class EnableInserterTestCase(FHDLTestCase):
|
||||||
""")
|
""")
|
||||||
|
|
||||||
def test_enable_write_port(self):
|
def test_enable_write_port(self):
|
||||||
mem = Memory(width=8, depth=4)
|
with _ignore_deprecated():
|
||||||
|
mem = Memory(width=8, depth=4)
|
||||||
mem.write_port(granularity=2)
|
mem.write_port(granularity=2)
|
||||||
f = EnableInserter(self.c1)(mem).elaborate(platform=None)
|
f = EnableInserter(self.c1)(mem).elaborate(platform=None)
|
||||||
self.assertRepr(f._write_ports[0]._en, """
|
self.assertRepr(f._write_ports[0]._en, """
|
||||||
|
|
|
@ -6,6 +6,7 @@ from amaranth.hdl import *
|
||||||
from amaranth.asserts import *
|
from amaranth.asserts import *
|
||||||
from amaranth.sim import *
|
from amaranth.sim import *
|
||||||
from amaranth.lib.fifo import *
|
from amaranth.lib.fifo import *
|
||||||
|
from amaranth.lib.memory import *
|
||||||
|
|
||||||
from .utils import *
|
from .utils import *
|
||||||
from amaranth._utils import _ignore_deprecated
|
from amaranth._utils import _ignore_deprecated
|
||||||
|
@ -81,9 +82,9 @@ class FIFOModel(Elaboratable, FIFOInterface):
|
||||||
def elaborate(self, platform):
|
def elaborate(self, platform):
|
||||||
m = Module()
|
m = Module()
|
||||||
|
|
||||||
storage = Memory(width=self.width, depth=self.depth)
|
storage = m.submodules.storage = Memory(shape=self.width, depth=self.depth, init=[])
|
||||||
w_port = m.submodules.w_port = storage.write_port(domain=self.w_domain)
|
w_port = storage.write_port(domain=self.w_domain)
|
||||||
r_port = m.submodules.r_port = storage.read_port (domain="comb")
|
r_port = storage.read_port (domain="comb")
|
||||||
|
|
||||||
produce = Signal(range(self.depth))
|
produce = Signal(range(self.depth))
|
||||||
consume = Signal(range(self.depth))
|
consume = Signal(range(self.depth))
|
||||||
|
|
369
tests/test_lib_memory.py
Normal file
369
tests/test_lib_memory.py
Normal file
|
@ -0,0 +1,369 @@
|
||||||
|
# amaranth: UnusedElaboratable=no
|
||||||
|
|
||||||
|
from amaranth.hdl._ast import *
|
||||||
|
from amaranth.hdl._mem import MemoryInstance
|
||||||
|
from amaranth.lib.memory import *
|
||||||
|
from amaranth.lib.data import *
|
||||||
|
from amaranth.lib.wiring import In, Out, SignatureMembers
|
||||||
|
|
||||||
|
from .utils import *
|
||||||
|
|
||||||
|
class MyStruct(Struct):
|
||||||
|
a: unsigned(3)
|
||||||
|
b: signed(2)
|
||||||
|
|
||||||
|
|
||||||
|
class WritePortTestCase(FHDLTestCase):
|
||||||
|
def test_signature(self):
|
||||||
|
sig = WritePort.Signature(addr_width=2, shape=signed(4))
|
||||||
|
self.assertEqual(sig.addr_width, 2)
|
||||||
|
self.assertEqual(sig.shape, signed(4))
|
||||||
|
self.assertEqual(sig.granularity, None)
|
||||||
|
self.assertEqual(sig.members, SignatureMembers({
|
||||||
|
"addr": In(2),
|
||||||
|
"data": In(signed(4)),
|
||||||
|
"en": In(1),
|
||||||
|
}))
|
||||||
|
sig = WritePort.Signature(addr_width=2, shape=8, granularity=2)
|
||||||
|
self.assertEqual(sig.addr_width, 2)
|
||||||
|
self.assertEqual(sig.shape, 8)
|
||||||
|
self.assertEqual(sig.members, SignatureMembers({
|
||||||
|
"addr": In(2),
|
||||||
|
"data": In(8),
|
||||||
|
"en": In(4),
|
||||||
|
}))
|
||||||
|
sig = WritePort.Signature(addr_width=2, shape=ArrayLayout(9, 8), granularity=2)
|
||||||
|
self.assertEqual(sig.addr_width, 2)
|
||||||
|
self.assertEqual(sig.shape, ArrayLayout(9, 8))
|
||||||
|
self.assertEqual(sig.members, SignatureMembers({
|
||||||
|
"addr": In(2),
|
||||||
|
"data": In(ArrayLayout(9, 8)),
|
||||||
|
"en": In(4),
|
||||||
|
}))
|
||||||
|
sig = WritePort.Signature(addr_width=2, shape=0, granularity=0)
|
||||||
|
self.assertEqual(sig.addr_width, 2)
|
||||||
|
self.assertEqual(sig.shape, 0)
|
||||||
|
self.assertEqual(sig.members, SignatureMembers({
|
||||||
|
"addr": In(2),
|
||||||
|
"data": In(0),
|
||||||
|
"en": In(0),
|
||||||
|
}))
|
||||||
|
sig = WritePort.Signature(addr_width=2, shape=ArrayLayout(9, 0), granularity=0)
|
||||||
|
self.assertEqual(sig.addr_width, 2)
|
||||||
|
self.assertEqual(sig.shape, ArrayLayout(9, 0))
|
||||||
|
self.assertEqual(sig.members, SignatureMembers({
|
||||||
|
"addr": In(2),
|
||||||
|
"data": In(ArrayLayout(9, 0)),
|
||||||
|
"en": In(0),
|
||||||
|
}))
|
||||||
|
|
||||||
|
def test_signature_wrong(self):
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
"^`addr_width` must be a non-negative int, not -2$"):
|
||||||
|
WritePort.Signature(addr_width=-2, shape=8)
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
"^Granularity must be a non-negative int or None, not -2$"):
|
||||||
|
WritePort.Signature(addr_width=4, shape=8, granularity=-2)
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
"^Granularity cannot be specified with signed shape$"):
|
||||||
|
WritePort.Signature(addr_width=2, shape=signed(8), granularity=2)
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
"^Granularity can only be specified for plain unsigned `Shape` or `ArrayLayout`$"):
|
||||||
|
WritePort.Signature(addr_width=2, shape=MyStruct, granularity=2)
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
"^Granularity must be positive$"):
|
||||||
|
WritePort.Signature(addr_width=2, shape=8, granularity=0)
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
"^Granularity must be positive$"):
|
||||||
|
WritePort.Signature(addr_width=2, shape=ArrayLayout(8, 8), granularity=0)
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
"^Granularity must divide data width$"):
|
||||||
|
WritePort.Signature(addr_width=2, shape=8, granularity=3)
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
"^Granularity must divide data array length$"):
|
||||||
|
WritePort.Signature(addr_width=2, shape=ArrayLayout(8, 8), granularity=3)
|
||||||
|
|
||||||
|
def test_constructor(self):
|
||||||
|
signature = WritePort.Signature(shape=MyStruct, addr_width=4)
|
||||||
|
port = WritePort(signature, memory=None, domain="sync")
|
||||||
|
self.assertEqual(port.signature, signature)
|
||||||
|
self.assertIsNone(port.memory)
|
||||||
|
self.assertEqual(port.domain, "sync")
|
||||||
|
self.assertIsInstance(port.addr, Signal)
|
||||||
|
self.assertEqual(port.addr.shape(), unsigned(4))
|
||||||
|
self.assertIsInstance(port.data, View)
|
||||||
|
self.assertEqual(port.data.shape(), MyStruct)
|
||||||
|
self.assertIsInstance(port.en, Signal)
|
||||||
|
self.assertEqual(port.en.shape(), unsigned(1))
|
||||||
|
|
||||||
|
signature = WritePort.Signature(shape=8, addr_width=4, granularity=2)
|
||||||
|
port = WritePort(signature, memory=None, domain="sync")
|
||||||
|
self.assertEqual(port.signature, signature)
|
||||||
|
self.assertIsNone(port.memory)
|
||||||
|
self.assertEqual(port.domain, "sync")
|
||||||
|
self.assertIsInstance(port.addr, Signal)
|
||||||
|
self.assertEqual(port.addr.shape(), unsigned(4))
|
||||||
|
self.assertIsInstance(port.data, Signal)
|
||||||
|
self.assertEqual(port.data.shape(), unsigned(8))
|
||||||
|
self.assertIsInstance(port.en, Signal)
|
||||||
|
self.assertEqual(port.en.shape(), unsigned(4))
|
||||||
|
|
||||||
|
m = Memory(depth=16, shape=8, init=[])
|
||||||
|
port = WritePort(signature, memory=m, domain="sync")
|
||||||
|
self.assertIs(port.memory, m)
|
||||||
|
self.assertEqual(m.w_ports, (port,))
|
||||||
|
|
||||||
|
def test_constructor_wrong(self):
|
||||||
|
signature = ReadPort.Signature(shape=8, addr_width=4)
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Expected `WritePort.Signature`, not ReadPort.Signature\(.*\)$"):
|
||||||
|
WritePort(signature, memory=None, domain="sync")
|
||||||
|
signature = WritePort.Signature(shape=8, addr_width=4, granularity=2)
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Domain has to be a string, not None$"):
|
||||||
|
WritePort(signature, memory=None, domain=None)
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Expected `Memory` or `None`, not 'a'$"):
|
||||||
|
WritePort(signature, memory="a", domain="sync")
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
r"^Write port domain cannot be \"comb\"$"):
|
||||||
|
WritePort(signature, memory=None, domain="comb")
|
||||||
|
signature = WritePort.Signature(shape=8, addr_width=4)
|
||||||
|
m = Memory(depth=8, shape=8, init=[])
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
r"^Memory address width 3 doesn't match port address width 4$"):
|
||||||
|
WritePort(signature, memory=m, domain="sync")
|
||||||
|
m = Memory(depth=16, shape=signed(8), init=[])
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
r"^Memory shape signed\(8\) doesn't match port shape 8$"):
|
||||||
|
WritePort(signature, memory=m, domain="sync")
|
||||||
|
|
||||||
|
|
||||||
|
class ReadPortTestCase(FHDLTestCase):
|
||||||
|
def test_signature(self):
|
||||||
|
sig = ReadPort.Signature(addr_width=2, shape=signed(4))
|
||||||
|
self.assertEqual(sig.addr_width, 2)
|
||||||
|
self.assertEqual(sig.shape, signed(4))
|
||||||
|
self.assertEqual(sig.members, SignatureMembers({
|
||||||
|
"addr": In(2),
|
||||||
|
"data": Out(signed(4)),
|
||||||
|
"en": In(1, init=1),
|
||||||
|
}))
|
||||||
|
sig = ReadPort.Signature(addr_width=2, shape=8)
|
||||||
|
self.assertEqual(sig.addr_width, 2)
|
||||||
|
self.assertEqual(sig.shape, 8)
|
||||||
|
self.assertEqual(sig.members, SignatureMembers({
|
||||||
|
"addr": In(2),
|
||||||
|
"data": Out(8),
|
||||||
|
"en": In(1, init=1),
|
||||||
|
}))
|
||||||
|
sig = ReadPort.Signature(addr_width=2, shape=MyStruct)
|
||||||
|
self.assertEqual(sig.addr_width, 2)
|
||||||
|
self.assertEqual(sig.shape, MyStruct)
|
||||||
|
self.assertEqual(sig.members, SignatureMembers({
|
||||||
|
"addr": In(2),
|
||||||
|
"data": Out(MyStruct),
|
||||||
|
"en": In(1, init=1),
|
||||||
|
}))
|
||||||
|
|
||||||
|
def test_signature_wrong(self):
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
"^`addr_width` must be a non-negative int, not -2$"):
|
||||||
|
ReadPort.Signature(addr_width=-2, shape=8)
|
||||||
|
|
||||||
|
def test_constructor(self):
|
||||||
|
signature = ReadPort.Signature(shape=MyStruct, addr_width=4)
|
||||||
|
port = ReadPort(signature, memory=None, domain="sync")
|
||||||
|
self.assertEqual(port.signature, signature)
|
||||||
|
self.assertIsNone(port.memory)
|
||||||
|
self.assertEqual(port.domain, "sync")
|
||||||
|
self.assertIsInstance(port.addr, Signal)
|
||||||
|
self.assertEqual(port.addr.shape(), unsigned(4))
|
||||||
|
self.assertIsInstance(port.data, View)
|
||||||
|
self.assertEqual(port.data.shape(), MyStruct)
|
||||||
|
self.assertIsInstance(port.en, Signal)
|
||||||
|
self.assertEqual(port.en.shape(), unsigned(1))
|
||||||
|
self.assertEqual(port.transparent_for, ())
|
||||||
|
|
||||||
|
signature = ReadPort.Signature(shape=8, addr_width=4)
|
||||||
|
port = ReadPort(signature, memory=None, domain="comb")
|
||||||
|
self.assertEqual(port.signature, signature)
|
||||||
|
self.assertIsNone(port.memory)
|
||||||
|
self.assertEqual(port.domain, "comb")
|
||||||
|
self.assertIsInstance(port.addr, Signal)
|
||||||
|
self.assertEqual(port.addr.shape(), unsigned(4))
|
||||||
|
self.assertIsInstance(port.data, Signal)
|
||||||
|
self.assertEqual(port.data.shape(), unsigned(8))
|
||||||
|
self.assertIsInstance(port.en, Const)
|
||||||
|
self.assertEqual(port.en.shape(), unsigned(1))
|
||||||
|
self.assertEqual(port.en.value, 1)
|
||||||
|
self.assertEqual(port.transparent_for, ())
|
||||||
|
|
||||||
|
m = Memory(depth=16, shape=8, init=[])
|
||||||
|
port = ReadPort(signature, memory=m, domain="sync")
|
||||||
|
self.assertIs(port.memory, m)
|
||||||
|
self.assertEqual(m.r_ports, (port,))
|
||||||
|
write_port = m.write_port()
|
||||||
|
port = ReadPort(signature, memory=m, domain="sync", transparent_for=[write_port])
|
||||||
|
self.assertIs(port.memory, m)
|
||||||
|
self.assertEqual(port.transparent_for, (write_port,))
|
||||||
|
|
||||||
|
def test_constructor_wrong(self):
|
||||||
|
signature = WritePort.Signature(shape=8, addr_width=4)
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Expected `ReadPort.Signature`, not WritePort.Signature\(.*\)$"):
|
||||||
|
ReadPort(signature, memory=None, domain="sync")
|
||||||
|
signature = ReadPort.Signature(shape=8, addr_width=4)
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Domain has to be a string, not None$"):
|
||||||
|
ReadPort(signature, memory=None, domain=None)
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Expected `Memory` or `None`, not 'a'$"):
|
||||||
|
ReadPort(signature, memory="a", domain="sync")
|
||||||
|
signature = ReadPort.Signature(shape=8, addr_width=4)
|
||||||
|
m = Memory(depth=8, shape=8, init=[])
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
r"^Memory address width 3 doesn't match port address width 4$"):
|
||||||
|
ReadPort(signature, memory=m, domain="sync")
|
||||||
|
m = Memory(depth=16, shape=signed(8), init=[])
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
r"^Memory shape signed\(8\) doesn't match port shape 8$"):
|
||||||
|
ReadPort(signature, memory=m, domain="sync")
|
||||||
|
m = Memory(depth=16, shape=8, init=[])
|
||||||
|
port = m.read_port()
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^`transparent_for` must contain only `WritePort` instances$"):
|
||||||
|
ReadPort(signature, memory=m, domain="sync", transparent_for=[port])
|
||||||
|
write_port = m.write_port()
|
||||||
|
m2 = Memory(depth=16, shape=8, init=[])
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
r"^Transparent write ports must belong to the same memory$"):
|
||||||
|
ReadPort(signature, memory=m2, domain="sync", transparent_for=[write_port])
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
r"^Transparent write ports must belong to the same domain$"):
|
||||||
|
ReadPort(signature, memory=m, domain="other", transparent_for=[write_port])
|
||||||
|
|
||||||
|
|
||||||
|
class MemoryTestCase(FHDLTestCase):
|
||||||
|
def test_constructor(self):
|
||||||
|
m = Memory(shape=8, depth=4, init=[1, 2, 3])
|
||||||
|
self.assertEqual(m.shape, 8)
|
||||||
|
self.assertEqual(m.depth, 4)
|
||||||
|
self.assertEqual(m.init.shape, 8)
|
||||||
|
self.assertEqual(m.init.depth, 4)
|
||||||
|
self.assertEqual(m.attrs, {})
|
||||||
|
self.assertIsInstance(m.init, Memory.Init)
|
||||||
|
self.assertEqual(list(m.init), [1, 2, 3, 0])
|
||||||
|
self.assertEqual(m.init._raw, [1, 2, 3, 0])
|
||||||
|
self.assertRepr(m.init, "Memory.Init([1, 2, 3, 0])")
|
||||||
|
self.assertEqual(m.r_ports, ())
|
||||||
|
self.assertEqual(m.w_ports, ())
|
||||||
|
|
||||||
|
def test_constructor_shapecastable(self):
|
||||||
|
init = [
|
||||||
|
{"a": 0, "b": 1},
|
||||||
|
{"a": 2, "b": 3},
|
||||||
|
]
|
||||||
|
m = Memory(shape=MyStruct, depth=4, init=init, attrs={"ram_style": "block"})
|
||||||
|
self.assertEqual(m.shape, MyStruct)
|
||||||
|
self.assertEqual(m.depth, 4)
|
||||||
|
self.assertEqual(m.attrs, {"ram_style": "block"})
|
||||||
|
self.assertIsInstance(m.init, Memory.Init)
|
||||||
|
self.assertEqual(list(m.init), [{"a": 0, "b": 1}, {"a": 2, "b": 3}, None, None])
|
||||||
|
self.assertEqual(m.init._raw, [8, 0x1a, 0, 0])
|
||||||
|
|
||||||
|
def test_constructor_wrong(self):
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Memory depth must be a non-negative integer, not 'a'$"):
|
||||||
|
Memory(shape=8, depth="a", init=[])
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Memory depth must be a non-negative integer, not -1$"):
|
||||||
|
Memory(shape=8, depth=-1, init=[])
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Object 'a' cannot be converted to an Amaranth shape$"):
|
||||||
|
Memory(shape="a", depth=3, init=[])
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
(r"^Memory initialization value at address 1: "
|
||||||
|
r"'str' object cannot be interpreted as an integer$")):
|
||||||
|
Memory(shape=8, depth=4, init=[1, "0"])
|
||||||
|
|
||||||
|
def test_init_set(self):
|
||||||
|
m = Memory(shape=8, depth=4, init=[])
|
||||||
|
m.init[1] = 2
|
||||||
|
self.assertEqual(list(m.init), [0, 2, 0, 0])
|
||||||
|
self.assertEqual(m.init._raw, [0, 2, 0, 0])
|
||||||
|
m.init[2:] = [4, 5]
|
||||||
|
self.assertEqual(list(m.init), [0, 2, 4, 5])
|
||||||
|
|
||||||
|
def test_init_set_shapecastable(self):
|
||||||
|
m = Memory(shape=MyStruct, depth=4, init=[])
|
||||||
|
m.init[1] = {"a": 1, "b": 2}
|
||||||
|
self.assertEqual(list(m.init), [None, {"a": 1, "b": 2}, None, None])
|
||||||
|
self.assertEqual(m.init._raw, [0, 0x11, 0, 0])
|
||||||
|
|
||||||
|
def test_init_set_wrong(self):
|
||||||
|
m = Memory(shape=8, depth=4, init=[])
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^'str' object cannot be interpreted as an integer$"):
|
||||||
|
m.init[0] = "a"
|
||||||
|
m = Memory(shape=MyStruct, depth=4, init=[])
|
||||||
|
# underlying TypeError message differs between PyPy and CPython
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
m.init[0] = 1
|
||||||
|
|
||||||
|
def test_init_set_slice_wrong(self):
|
||||||
|
m = Memory(shape=8, depth=4, init=[])
|
||||||
|
with self.assertRaisesRegex(ValueError,
|
||||||
|
r"^Changing length of Memory.init is not allowed$"):
|
||||||
|
m.init[1:] = [1, 2]
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Deleting items from Memory.init is not allowed$"):
|
||||||
|
del m.init[1:2]
|
||||||
|
with self.assertRaisesRegex(TypeError,
|
||||||
|
r"^Inserting items into Memory.init is not allowed$"):
|
||||||
|
m.init.insert(1, 3)
|
||||||
|
|
||||||
|
def test_port(self):
|
||||||
|
for depth, addr_width in [
|
||||||
|
(0, 0),
|
||||||
|
(1, 0),
|
||||||
|
(3, 2),
|
||||||
|
(4, 2),
|
||||||
|
(5, 3),
|
||||||
|
]:
|
||||||
|
m = Memory(shape=8, depth=depth, init=[])
|
||||||
|
rp = m.read_port()
|
||||||
|
self.assertEqual(rp.signature.addr_width, addr_width)
|
||||||
|
self.assertEqual(rp.signature.shape, 8)
|
||||||
|
wp = m.write_port()
|
||||||
|
self.assertEqual(wp.signature.addr_width, addr_width)
|
||||||
|
self.assertEqual(wp.signature.shape, 8)
|
||||||
|
self.assertEqual(m.r_ports, (rp,))
|
||||||
|
self.assertEqual(m.w_ports, (wp,))
|
||||||
|
|
||||||
|
def test_elaborate(self):
|
||||||
|
m = Memory(shape=MyStruct, depth=4, init=[{"a": 1, "b": 2}])
|
||||||
|
wp = m.write_port()
|
||||||
|
rp0 = m.read_port(domain="sync", transparent_for=[wp])
|
||||||
|
rp1 = m.read_port(domain="comb")
|
||||||
|
f = m.elaborate(None)
|
||||||
|
self.assertIsInstance(f, MemoryInstance)
|
||||||
|
self.assertIs(f._identity, m._identity)
|
||||||
|
self.assertEqual(f._depth, 4)
|
||||||
|
self.assertEqual(f._width, 5)
|
||||||
|
self.assertEqual(f._init, (0x11, 0, 0, 0))
|
||||||
|
self.assertEqual(f._write_ports[0]._domain, "sync")
|
||||||
|
self.assertEqual(f._write_ports[0]._granularity, 5)
|
||||||
|
self.assertIs(f._write_ports[0]._addr, wp.addr)
|
||||||
|
self.assertIs(f._write_ports[0]._data, wp.data.as_value())
|
||||||
|
self.assertIs(f._write_ports[0]._en, wp.en)
|
||||||
|
self.assertEqual(f._read_ports[0]._domain, "sync")
|
||||||
|
self.assertEqual(f._read_ports[0]._transparent_for, (0,))
|
||||||
|
self.assertIs(f._read_ports[0]._addr, rp0.addr)
|
||||||
|
self.assertIs(f._read_ports[0]._data, rp0.data.as_value())
|
||||||
|
self.assertIs(f._read_ports[0]._en, rp0.en)
|
||||||
|
self.assertEqual(f._read_ports[1]._domain, "comb")
|
||||||
|
self.assertEqual(f._read_ports[1]._transparent_for, ())
|
||||||
|
self.assertIs(f._read_ports[1]._addr, rp1.addr)
|
||||||
|
self.assertIs(f._read_ports[1]._data, rp1.data.as_value())
|
||||||
|
self.assertIs(f._read_ports[1]._en, rp1.en)
|
|
@ -5,13 +5,13 @@ from contextlib import contextmanager
|
||||||
from amaranth._utils import flatten
|
from amaranth._utils import flatten
|
||||||
from amaranth.hdl._ast import *
|
from amaranth.hdl._ast import *
|
||||||
from amaranth.hdl._cd import *
|
from amaranth.hdl._cd import *
|
||||||
from amaranth.hdl._mem import *
|
|
||||||
with warnings.catch_warnings():
|
with warnings.catch_warnings():
|
||||||
warnings.filterwarnings(action="ignore", category=DeprecationWarning)
|
warnings.filterwarnings(action="ignore", category=DeprecationWarning)
|
||||||
from amaranth.hdl.rec import *
|
from amaranth.hdl.rec import *
|
||||||
from amaranth.hdl._dsl import *
|
from amaranth.hdl._dsl import *
|
||||||
from amaranth.hdl._ir import *
|
from amaranth.hdl._ir import *
|
||||||
from amaranth.sim import *
|
from amaranth.sim import *
|
||||||
|
from amaranth.lib.memory import Memory
|
||||||
|
|
||||||
from .utils import *
|
from .utils import *
|
||||||
from amaranth._utils import _ignore_deprecated
|
from amaranth._utils import _ignore_deprecated
|
||||||
|
@ -752,14 +752,12 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
|
||||||
sim.add_testbench(process)
|
sim.add_testbench(process)
|
||||||
self.assertTrue(survived)
|
self.assertTrue(survived)
|
||||||
|
|
||||||
def setUp_memory(self, rd_synchronous=True, rd_transparent=True, wr_granularity=None):
|
def setUp_memory(self, rd_synchronous=True, rd_transparent=False, wr_granularity=None):
|
||||||
self.m = Module()
|
self.m = Module()
|
||||||
self.memory = Memory(width=8, depth=4, init=[0xaa, 0x55])
|
self.memory = self.m.submodules.memory = Memory(shape=8, depth=4, init=[0xaa, 0x55])
|
||||||
self.m.submodules.rdport = self.rdport = \
|
self.wrport = self.memory.write_port(granularity=wr_granularity)
|
||||||
self.memory.read_port(domain="sync" if rd_synchronous else "comb",
|
self.rdport = self.memory.read_port(domain="sync" if rd_synchronous else "comb",
|
||||||
transparent=rd_transparent)
|
transparent_for=[self.wrport] if rd_transparent else [])
|
||||||
self.m.submodules.wrport = self.wrport = \
|
|
||||||
self.memory.write_port(granularity=wr_granularity)
|
|
||||||
|
|
||||||
def test_memory_init(self):
|
def test_memory_init(self):
|
||||||
self.setUp_memory()
|
self.setUp_memory()
|
||||||
|
@ -862,8 +860,8 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
|
||||||
|
|
||||||
def test_memory_read_only(self):
|
def test_memory_read_only(self):
|
||||||
self.m = Module()
|
self.m = Module()
|
||||||
self.memory = Memory(width=8, depth=4, init=[0xaa, 0x55])
|
self.m.submodules.memory = self.memory = Memory(shape=8, depth=4, init=[0xaa, 0x55])
|
||||||
self.m.submodules.rdport = self.rdport = self.memory.read_port()
|
self.rdport = self.memory.read_port()
|
||||||
with self.assertSimulation(self.m) as sim:
|
with self.assertSimulation(self.m) as sim:
|
||||||
def process():
|
def process():
|
||||||
yield Tick()
|
yield Tick()
|
||||||
|
@ -920,9 +918,9 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
|
||||||
def test_memory_transparency_simple(self):
|
def test_memory_transparency_simple(self):
|
||||||
m = Module()
|
m = Module()
|
||||||
init = [0x11, 0x22, 0x33, 0x44]
|
init = [0x11, 0x22, 0x33, 0x44]
|
||||||
m.submodules.memory = memory = Memory(width=8, depth=4, init=init)
|
m.submodules.memory = memory = Memory(shape=8, depth=4, init=init)
|
||||||
rdport = memory.read_port()
|
|
||||||
wrport = memory.write_port(granularity=8)
|
wrport = memory.write_port(granularity=8)
|
||||||
|
rdport = memory.read_port(transparent_for=[wrport])
|
||||||
with self.assertSimulation(m) as sim:
|
with self.assertSimulation(m) as sim:
|
||||||
def process():
|
def process():
|
||||||
yield rdport.addr.eq(0)
|
yield rdport.addr.eq(0)
|
||||||
|
@ -959,9 +957,9 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
|
||||||
def test_memory_transparency_multibit(self):
|
def test_memory_transparency_multibit(self):
|
||||||
m = Module()
|
m = Module()
|
||||||
init = [0x11111111, 0x22222222, 0x33333333, 0x44444444]
|
init = [0x11111111, 0x22222222, 0x33333333, 0x44444444]
|
||||||
m.submodules.memory = memory = Memory(width=32, depth=4, init=init)
|
m.submodules.memory = memory = Memory(shape=32, depth=4, init=init)
|
||||||
rdport = memory.read_port()
|
|
||||||
wrport = memory.write_port(granularity=8)
|
wrport = memory.write_port(granularity=8)
|
||||||
|
rdport = memory.read_port(transparent_for=[wrport])
|
||||||
with self.assertSimulation(m) as sim:
|
with self.assertSimulation(m) as sim:
|
||||||
def process():
|
def process():
|
||||||
yield rdport.addr.eq(0)
|
yield rdport.addr.eq(0)
|
||||||
|
|
Loading…
Reference in a new issue