compat: remove.

Fixes #692.
This commit is contained in:
Catherine 2023-07-18 11:44:09 +00:00
parent 750cbbc3c7
commit 9d4ffab104
34 changed files with 12 additions and 1812 deletions

View file

@ -1,5 +1,4 @@
Copyright (C) 2019-2023 Amaranth HDL contributors
Copyright (C) 2011-2019 M-Labs Limited
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

View file

@ -1,11 +0,0 @@
from .fhdl.structure import *
from .fhdl.module import *
from .fhdl.specials import *
from .fhdl.bitcontainer import *
from .fhdl.decorators import *
# from .fhdl.simplify import *
from .sim import *
from .genlib.record import *
from .genlib.fsm import *

View file

@ -1,21 +0,0 @@
from ... import utils
from ...hdl import ast
from ..._utils import deprecated
__all__ = ["log2_int", "bits_for", "value_bits_sign"]
@deprecated("instead of `log2_int`, use `amaranth.utils.log2_int`")
def log2_int(n, need_pow2=True):
return utils.log2_int(n, need_pow2)
@deprecated("instead of `bits_for`, use `amaranth.utils.bits_for`")
def bits_for(n, require_sign_bit=False):
return utils.bits_for(n, require_sign_bit)
@deprecated("instead of `value_bits_sign(v)`, use `v.shape()`")
def value_bits_sign(v):
return tuple(ast.Value.cast(v).shape())

View file

@ -1,35 +0,0 @@
from operator import itemgetter
class ConvOutput:
def __init__(self):
self.main_source = ""
self.data_files = dict()
def set_main_source(self, src):
self.main_source = src
def add_data_file(self, filename_base, content):
filename = filename_base
i = 1
while filename in self.data_files:
parts = filename_base.split(".", maxsplit=1)
parts[0] += "_" + str(i)
filename = ".".join(parts)
i += 1
self.data_files[filename] = content
return filename
def __str__(self):
r = self.main_source + "\n"
for filename, content in sorted(self.data_files.items(),
key=itemgetter(0)):
r += filename + ":\n" + content
return r
def write(self, main_filename):
with open(main_filename, "w") as f:
f.write(self.main_source)
for filename, content in self.data_files.items():
with open(filename, "w") as f:
f.write(content)

View file

@ -1,55 +0,0 @@
from ...hdl.ast import *
from ...hdl.xfrm import ResetInserter as NativeResetInserter
from ...hdl.xfrm import EnableInserter as NativeEnableInserter
from ...hdl.xfrm import DomainRenamer as NativeDomainRenamer
from ..._utils import deprecated
__all__ = ["ResetInserter", "CEInserter", "ClockDomainsRenamer"]
class _CompatControlInserter:
_control_name = None
_native_inserter = None
def __init__(self, clock_domains=None):
self.clock_domains = clock_domains
def __call__(self, module):
if self.clock_domains is None:
signals = {self._control_name: ("sync", Signal(name=self._control_name))}
else:
def name(cd):
return self._control_name + "_" + cd
signals = {name(cd): (cd, Signal(name=name(cd))) for cd in self.clock_domains}
for name, (cd, signal) in signals.items():
setattr(module, name, signal)
return self._native_inserter(dict(signals.values()))(module)
@deprecated("instead of `migen.fhdl.decorators.ResetInserter`, "
"use `amaranth.hdl.xfrm.ResetInserter`; note that Amaranth ResetInserter accepts "
"a dict of reset signals (or a single reset signal) as an argument, not "
"a set of clock domain names (or a single clock domain name)")
class CompatResetInserter(_CompatControlInserter):
_control_name = "reset"
_native_inserter = NativeResetInserter
@deprecated("instead of `migen.fhdl.decorators.CEInserter`, "
"use `amaranth.hdl.xfrm.EnableInserter`; note that Amaranth EnableInserter accepts "
"a dict of enable signals (or a single enable signal) as an argument, not "
"a set of clock domain names (or a single clock domain name)")
class CompatCEInserter(_CompatControlInserter):
_control_name = "ce"
_native_inserter = NativeEnableInserter
class CompatClockDomainsRenamer(NativeDomainRenamer):
def __init__(self, cd_remapping):
super().__init__(cd_remapping)
ResetInserter = CompatResetInserter
CEInserter = CompatCEInserter
ClockDomainsRenamer = CompatClockDomainsRenamer

View file

@ -1,163 +0,0 @@
from collections.abc import Iterable
from ..._utils import flatten, deprecated
from ...hdl import dsl, ir
__all__ = ["Module", "FinalizeError"]
def _flat_list(e):
if isinstance(e, Iterable):
return list(flatten(e))
else:
return [e]
class CompatFinalizeError(Exception):
pass
FinalizeError = CompatFinalizeError
class _CompatModuleProxy:
def __init__(self, cm):
object.__setattr__(self, "_cm", cm)
class _CompatModuleComb(_CompatModuleProxy):
@deprecated("instead of `self.comb +=`, use `m.d.comb +=`")
def __iadd__(self, assigns):
self._cm._module._add_statement(assigns, domain=None, depth=0, compat_mode=True)
return self
class _CompatModuleSyncCD:
def __init__(self, cm, cd):
self._cm = cm
self._cd = cd
@deprecated("instead of `self.sync.<domain> +=`, use `m.d.<domain> +=`")
def __iadd__(self, assigns):
self._cm._module._add_statement(assigns, domain=self._cd, depth=0, compat_mode=True)
return self
class _CompatModuleSync(_CompatModuleProxy):
@deprecated("instead of `self.sync +=`, use `m.d.sync +=`")
def __iadd__(self, assigns):
self._cm._module._add_statement(assigns, domain="sync", depth=0, compat_mode=True)
return self
def __getattr__(self, name):
return _CompatModuleSyncCD(self._cm, name)
def __setattr__(self, name, value):
if not isinstance(value, _CompatModuleSyncCD):
raise AttributeError("Attempted to assign sync property - use += instead")
class _CompatModuleSpecials(_CompatModuleProxy):
@deprecated("instead of `self.specials.<name> =`, use `m.submodules.<name> =`")
def __setattr__(self, name, value):
self._cm._submodules.append((name, value))
setattr(self._cm, name, value)
@deprecated("instead of `self.specials +=`, use `m.submodules +=`")
def __iadd__(self, other):
self._cm._submodules += [(None, e) for e in _flat_list(other)]
return self
class _CompatModuleSubmodules(_CompatModuleProxy):
@deprecated("instead of `self.submodules.<name> =`, use `m.submodules.<name> =`")
def __setattr__(self, name, value):
self._cm._submodules.append((name, value))
setattr(self._cm, name, value)
@deprecated("instead of `self.submodules +=`, use `m.submodules +=`")
def __iadd__(self, other):
self._cm._submodules += [(None, e) for e in _flat_list(other)]
return self
class _CompatModuleClockDomains(_CompatModuleProxy):
@deprecated("instead of `self.clock_domains.<name> =`, use `m.domains.<name> =`")
def __setattr__(self, name, value):
self.__iadd__(value)
setattr(self._cm, name, value)
@deprecated("instead of `self.clock_domains +=`, use `m.domains +=`")
def __iadd__(self, other):
self._cm._module.domains += _flat_list(other)
return self
class CompatModule(ir.Elaboratable):
_MustUse__silence = True
# Actually returns another Amaranth Elaboratable (amaranth.dsl.Module), not a Fragment.
def get_fragment(self):
assert not self.get_fragment_called
self.get_fragment_called = True
self.finalize()
return self._module
def elaborate(self, platform):
if not self.get_fragment_called:
self.get_fragment()
return self._module
def __getattr__(self, name):
if name == "comb":
return _CompatModuleComb(self)
elif name == "sync":
return _CompatModuleSync(self)
elif name == "specials":
return _CompatModuleSpecials(self)
elif name == "submodules":
return _CompatModuleSubmodules(self)
elif name == "clock_domains":
return _CompatModuleClockDomains(self)
elif name == "finalized":
self.finalized = False
return self.finalized
elif name == "_module":
self._module = dsl.Module()
return self._module
elif name == "_submodules":
self._submodules = []
return self._submodules
elif name == "_clock_domains":
self._clock_domains = []
return self._clock_domains
elif name == "get_fragment_called":
self.get_fragment_called = False
return self.get_fragment_called
else:
raise AttributeError("'{}' object has no attribute '{}'"
.format(type(self).__name__, name))
def finalize(self, *args, **kwargs):
def finalize_submodules():
for name, submodule in self._submodules:
if not hasattr(submodule, "finalize"):
continue
if submodule.finalized:
continue
submodule.finalize(*args, **kwargs)
if not self.finalized:
self.finalized = True
finalize_submodules()
self.do_finalize(*args, **kwargs)
finalize_submodules()
for name, submodule in self._submodules:
self._module._add_submodule(submodule, name)
def do_finalize(self):
pass
Module = CompatModule

View file

@ -1,139 +0,0 @@
import warnings
from ..._utils import deprecated, extend
from ...hdl.ast import *
from ...hdl.ir import Elaboratable
from ...hdl.mem import Memory as NativeMemory
from ...hdl.ir import Fragment, Instance
from ...hdl.dsl import Module
from .module import Module as CompatModule
from .structure import Signal
from ...lib.io import Pin
__all__ = ["TSTriple", "Instance", "Memory", "READ_FIRST", "WRITE_FIRST", "NO_CHANGE"]
class TSTriple:
def __init__(self, bits_sign=None, min=None, max=None, reset_o=0, reset_oe=0, reset_i=0,
name=None):
self.o = Signal(bits_sign, min=min, max=max, reset=reset_o,
name=None if name is None else name + "_o")
self.oe = Signal(reset=reset_oe,
name=None if name is None else name + "_oe")
self.i = Signal(bits_sign, min=min, max=max, reset=reset_i,
name=None if name is None else name + "_i")
def __len__(self):
return len(self.o)
def get_tristate(self, io):
return Tristate(io, self.o, self.oe, self.i)
class Tristate(Elaboratable):
def __init__(self, target, o, oe, i=None):
self.target = target
self.o = o
self.oe = oe
self.i = i if i is not None else None
def elaborate(self, platform):
if self.i is None:
pin = Pin(len(self.target), dir="oe")
pin.o = self.o
pin.oe = self.oe
return platform.get_tristate(pin, self.target, attrs={}, invert=None)
else:
pin = Pin(len(self.target), dir="io")
pin.o = self.o
pin.oe = self.oe
pin.i = self.i
return platform.get_input_output(pin, self.target, attrs={}, invert=None)
m = Module()
if self.i is not None:
m.d.comb += self.i.eq(self.target)
m.submodules += Instance("$tribuf",
p_WIDTH=len(self.target),
i_EN=self.oe,
i_A=self.o,
o_Y=self.target,
)
f = m.elaborate(platform)
f.flatten = True
return f
(READ_FIRST, WRITE_FIRST, NO_CHANGE) = range(3)
class _MemoryPort(CompatModule):
def __init__(self, adr, dat_r, we=None, dat_w=None, async_read=False, re=None,
we_granularity=0, mode=WRITE_FIRST, clock_domain="sync"):
self.adr = adr
self.dat_r = dat_r
self.we = we
self.dat_w = dat_w
self.async_read = async_read
self.re = re
self.we_granularity = we_granularity
self.mode = mode
self.clock = ClockSignal(clock_domain)
class CompatMemory(NativeMemory, Elaboratable):
def __init__(self, width, depth, init=None, name=None):
super().__init__(width=width, depth=depth, init=init, name=name)
@deprecated("instead of `get_port()`, use `read_port()` and `write_port()`")
def get_port(self, write_capable=False, async_read=False, has_re=False, we_granularity=0,
mode=WRITE_FIRST, clock_domain="sync"):
if we_granularity >= self.width:
warnings.warn("do not specify `we_granularity` greater than memory width, as it "
"is a hard error in non-compatibility mode",
DeprecationWarning, stacklevel=1)
we_granularity = 0
if we_granularity == 0:
warnings.warn("instead of `we_granularity=0`, use `we_granularity=None` or avoid "
"specifying it at all, as it is a hard error in non-compatibility mode",
DeprecationWarning, stacklevel=1)
we_granularity = None
assert mode != NO_CHANGE
rdport = self.read_port(domain="comb" if async_read else clock_domain,
transparent=mode == WRITE_FIRST)
rdport.addr.name = f"{self.name}_addr"
adr = rdport.addr
dat_r = rdport.data
if write_capable:
wrport = self.write_port(domain=clock_domain, granularity=we_granularity)
wrport.addr = rdport.addr
we = wrport.en
dat_w = wrport.data
else:
we = None
dat_w = None
if has_re:
if mode == READ_FIRST:
re = rdport.en
else:
warnings.warn("the combination of `has_re=True` and `mode=WRITE_FIRST` has "
"surprising behavior: keeping `re` low would merely latch "
"the address, while the data will change with changing memory "
"contents; avoid using `re` with transparent ports as it is a hard "
"error in non-compatibility mode",
DeprecationWarning, stacklevel=1)
re = Signal()
else:
re = None
mp = _MemoryPort(adr, dat_r, we, dat_w,
async_read, re, we_granularity, mode,
clock_domain)
mp.submodules.rdport = rdport
if write_capable:
mp.submodules.wrport = wrport
return mp
Memory = CompatMemory

View file

@ -1,188 +0,0 @@
import builtins
import warnings
from collections import OrderedDict
from ...utils import bits_for
from ..._utils import deprecated, extend
from ...hdl import ast
from ...hdl.ast import (DUID,
Shape, signed, unsigned,
Value, Const, C, Mux, Slice as _Slice, Part, Cat, Repl,
Signal as NativeSignal,
ClockSignal, ResetSignal,
Array, ArrayProxy as _ArrayProxy)
from ...hdl.cd import ClockDomain
__all__ = ["DUID", "wrap", "Mux", "Cat", "Replicate", "Constant", "C", "Signal", "ClockSignal",
"ResetSignal", "If", "Case", "Array", "ClockDomain"]
@deprecated("instead of `wrap`, use `Value.cast`")
def wrap(v):
return Value.cast(v)
class CompatSignal(NativeSignal):
def __init__(self, bits_sign=None, name=None, variable=False, reset=0,
reset_less=False, name_override=None, min=None, max=None,
related=None, attr=None, src_loc_at=0, **kwargs):
if min is not None or max is not None:
warnings.warn("instead of `Signal(min={min}, max={max})`, "
"use `Signal(range({min}, {max}))`"
.format(min=min or 0, max=max or 2),
DeprecationWarning, stacklevel=2 + src_loc_at)
if bits_sign is None:
if min is None:
min = 0
if max is None:
max = 2
max -= 1 # make both bounds inclusive
if min > max:
raise ValueError("Lower bound {} should be less or equal to higher bound {}"
.format(min, max + 1))
sign = min < 0 or max < 0
if min == max:
bits = 0
else:
bits = builtins.max(bits_for(min, sign), bits_for(max, sign))
shape = signed(bits) if sign else unsigned(bits)
else:
if not (min is None and max is None):
raise ValueError("Only one of bits/signedness or bounds may be specified")
if isinstance(bits_sign, tuple):
shape = Shape(*bits_sign)
else:
shape = Shape.cast(bits_sign)
super().__init__(shape=shape, name=name_override or name,
reset=reset, reset_less=reset_less,
attrs=attr, src_loc_at=1 + src_loc_at, **kwargs)
Signal = CompatSignal
@deprecated("instead of `Constant`, use `Const`")
def Constant(value, bits_sign=None):
return Const(value, bits_sign)
@deprecated("instead of `Replicate(v, n)`, use `v.replicate(n)`")
def Replicate(v, n):
return v.replicate(n)
@extend(Const)
@property
@deprecated("instead of `.nbits`, use `.width`")
def nbits(self):
return self.width
@extend(NativeSignal)
@property
@deprecated("instead of `.nbits`, use `.width`")
def nbits(self):
return self.width
@extend(NativeSignal)
@NativeSignal.nbits.setter
@deprecated("instead of `.nbits = x`, use `.width = x`")
def nbits(self, value):
self.width = value
@extend(NativeSignal)
@deprecated("instead of `.part`, use `.bit_select`")
def part(self, offset, width):
return Part(self, offset, width, src_loc_at=2)
@extend(Cat)
@property
@deprecated("instead of `.l`, use `.parts`")
def l(self):
return self.parts
@extend(ast.Operator)
@property
@deprecated("instead of `.op`, use `.operator`")
def op(self):
return self.operator
@extend(_ArrayProxy)
@property
@deprecated("instead `_ArrayProxy.choices`, use `ArrayProxy.elems`")
def choices(self):
return self.elems
class If(ast.Switch):
@deprecated("instead of `If(cond, ...)`, use `with m.If(cond): ...`")
def __init__(self, cond, *stmts):
cond = Value.cast(cond)
if len(cond) != 1:
cond = cond.bool()
super().__init__(cond, {("1",): ast.Statement.cast(stmts)})
@deprecated("instead of `.Elif(cond, ...)`, use `with m.Elif(cond): ...`")
def Elif(self, cond, *stmts):
cond = Value.cast(cond)
if len(cond) != 1:
cond = cond.bool()
self.cases = OrderedDict((("-" + k,), v) for (k,), v in self.cases.items())
self.cases[("1" + "-" * len(self.test),)] = ast.Statement.cast(stmts)
self.test = Cat(self.test, cond)
return self
@deprecated("instead of `.Else(...)`, use `with m.Else(): ...`")
def Else(self, *stmts):
self.cases[()] = ast.Statement.cast(stmts)
return self
class Case(ast.Switch):
@deprecated("instead of `Case(test, { value: stmts })`, use `with m.Switch(test):` and "
"`with m.Case(value): stmts`; instead of `\"default\": stmts`, use "
"`with m.Case(): stmts`")
def __init__(self, test, cases):
new_cases = []
default = None
for k, v in cases.items():
if isinstance(k, (bool, int)):
k = Const(k)
if (not isinstance(k, Const)
and not (isinstance(k, str) and k == "default")):
raise TypeError("Case object is not a Migen constant")
if isinstance(k, str) and k == "default":
default = v
continue
else:
k = k.value
new_cases.append((k, v))
if default is not None:
new_cases.append((None, default))
super().__init__(test, OrderedDict(new_cases))
@deprecated("instead of `Case(...).makedefault()`, use an explicit default case: "
"`with m.Case(): ...`")
def makedefault(self, key=None):
if key is None:
for choice in self.cases.keys():
if (key is None
or (isinstance(choice, str) and choice == "default")
or choice > key):
key = choice
elif isinstance(key, str) and key == "default":
key = ()
else:
key = ("{:0{}b}".format(ast.Value.cast(key).value, len(self.test)),)
stmts = self.cases[key]
del self.cases[key]
self.cases[()] = stmts
return self

View file

@ -1,35 +0,0 @@
import warnings
from ...hdl.ir import Fragment
from ...hdl.cd import ClockDomain
from ...back import verilog
from .conv_output import ConvOutput
from .module import Module
def convert(fi, ios=None, name="top", special_overrides=dict(),
attr_translate=None, create_clock_domains=True,
display_run=False):
if display_run:
warnings.warn("`display_run=True` support has been removed",
DeprecationWarning, stacklevel=1)
if special_overrides:
warnings.warn("`special_overrides` support as well as `Special` has been removed",
DeprecationWarning, stacklevel=1)
# TODO: attr_translate
if isinstance(fi, Module):
fi = fi.get_fragment()
def missing_domain(name):
if create_clock_domains:
return ClockDomain(name)
v_output = verilog.convert(
elaboratable=fi,
name=name,
ports=ios or (),
missing_domain=missing_domain
)
output = ConvOutput()
output.set_main_source(v_output)
return output

View file

@ -1,74 +0,0 @@
import warnings
from ..._utils import deprecated
from ...lib.cdc import FFSynchronizer as NativeFFSynchronizer
from ...lib.cdc import PulseSynchronizer as NativePulseSynchronizer
from ...hdl.ast import *
from ..fhdl.module import CompatModule
from ..fhdl.structure import If
__all__ = ["MultiReg", "PulseSynchronizer", "GrayCounter", "GrayDecoder"]
class MultiReg(NativeFFSynchronizer):
def __init__(self, i, o, odomain="sync", n=2, reset=0):
old_opts = []
new_opts = []
if odomain != "sync":
old_opts.append(f", odomain={odomain!r}")
new_opts.append(f", o_domain={odomain!r}")
if n != 2:
old_opts.append(f", n={n!r}")
new_opts.append(f", stages={n!r}")
warnings.warn("instead of `MultiReg(...{})`, use `FFSynchronizer(...{})`"
.format("".join(old_opts), "".join(new_opts)),
DeprecationWarning, stacklevel=2)
super().__init__(i, o, o_domain=odomain, stages=n, reset=reset)
self.odomain = odomain
@deprecated("instead of `migen.genlib.cdc.PulseSynchronizer`, use `amaranth.lib.cdc.PulseSynchronizer`")
class PulseSynchronizer(NativePulseSynchronizer):
def __init__(self, idomain, odomain):
super().__init__(i_domain=idomain, o_domain=odomain)
@deprecated("instead of `migen.genlib.cdc.GrayCounter`, use `amaranth.lib.coding.GrayEncoder`")
class GrayCounter(CompatModule):
def __init__(self, width):
self.ce = Signal()
self.q = Signal(width)
self.q_next = Signal(width)
self.q_binary = Signal(width)
self.q_next_binary = Signal(width)
###
self.comb += [
If(self.ce,
self.q_next_binary.eq(self.q_binary + 1)
).Else(
self.q_next_binary.eq(self.q_binary)
),
self.q_next.eq(self.q_next_binary ^ self.q_next_binary[1:])
]
self.sync += [
self.q_binary.eq(self.q_next_binary),
self.q.eq(self.q_next)
]
@deprecated("instead of `migen.genlib.cdc.GrayDecoder`, use `amaranth.lib.coding.GrayDecoder`")
class GrayDecoder(CompatModule):
def __init__(self, width):
self.i = Signal(width)
self.o = Signal(width, reset_less=True)
# # #
o_comb = Signal(width)
self.comb += o_comb[-1].eq(self.i[-1])
for i in reversed(range(width-1)):
self.comb += o_comb[i].eq(o_comb[i+1] ^ self.i[i])
self.sync += self.o.eq(o_comb)

View file

@ -1,4 +0,0 @@
from ...lib.coding import *
__all__ = ["Encoder", "PriorityEncoder", "Decoder", "PriorityDecoder"]

View file

@ -1,147 +0,0 @@
from ..._utils import deprecated, extend
from ...lib.fifo import (FIFOInterface as NativeFIFOInterface,
SyncFIFO as NativeSyncFIFO, SyncFIFOBuffered as NativeSyncFIFOBuffered,
AsyncFIFO as NativeAsyncFIFO, AsyncFIFOBuffered as NativeAsyncFIFOBuffered)
__all__ = ["_FIFOInterface", "SyncFIFO", "SyncFIFOBuffered", "AsyncFIFO", "AsyncFIFOBuffered"]
class CompatFIFOInterface(NativeFIFOInterface):
@deprecated("attribute `fwft` must be provided to FIFOInterface constructor")
def __init__(self, width, depth):
super().__init__(width=width, depth=depth, fwft=False)
del self.fwft
@extend(NativeFIFOInterface)
@property
@deprecated("instead of `fifo.din`, use `fifo.w_data`")
def din(self):
return self.w_data
@extend(NativeFIFOInterface)
@NativeFIFOInterface.din.setter
@deprecated("instead of `fifo.din = x`, use `fifo.w_data = x`")
def din(self, w_data):
self.w_data = w_data
@extend(NativeFIFOInterface)
@property
@deprecated("instead of `fifo.writable`, use `fifo.w_rdy`")
def writable(self):
return self.w_rdy
@extend(NativeFIFOInterface)
@NativeFIFOInterface.writable.setter
@deprecated("instead of `fifo.writable = x`, use `fifo.w_rdy = x`")
def writable(self, w_rdy):
self.w_rdy = w_rdy
@extend(NativeFIFOInterface)
@property
@deprecated("instead of `fifo.we`, use `fifo.w_en`")
def we(self):
return self.w_en
@extend(NativeFIFOInterface)
@NativeFIFOInterface.we.setter
@deprecated("instead of `fifo.we = x`, use `fifo.w_en = x`")
def we(self, w_en):
self.w_en = w_en
@extend(NativeFIFOInterface)
@property
@deprecated("instead of `fifo.dout`, use `fifo.r_data`")
def dout(self):
return self.r_data
@extend(NativeFIFOInterface)
@NativeFIFOInterface.dout.setter
@deprecated("instead of `fifo.dout = x`, use `fifo.r_data = x`")
def dout(self, r_data):
self.r_data = r_data
@extend(NativeFIFOInterface)
@property
@deprecated("instead of `fifo.readable`, use `fifo.r_rdy`")
def readable(self):
return self.r_rdy
@extend(NativeFIFOInterface)
@NativeFIFOInterface.readable.setter
@deprecated("instead of `fifo.readable = x`, use `fifo.r_rdy = x`")
def readable(self, r_rdy):
self.r_rdy = r_rdy
@extend(NativeFIFOInterface)
@property
@deprecated("instead of `fifo.re`, use `fifo.r_en`")
def re(self):
return self.r_en
@extend(NativeFIFOInterface)
@NativeFIFOInterface.re.setter
@deprecated("instead of `fifo.re = x`, use `fifo.r_en = x`")
def re(self, r_en):
self.r_en = r_en
@extend(NativeFIFOInterface)
def read(self):
"""Read method for simulation."""
assert (yield self.r_rdy)
value = (yield self.r_data)
yield self.r_en.eq(1)
yield
yield self.r_en.eq(0)
yield
return value
@extend(NativeFIFOInterface)
def write(self, data):
"""Write method for simulation."""
assert (yield self.w_rdy)
yield self.w_data.eq(data)
yield self.w_en.eq(1)
yield
yield self.w_en.eq(0)
yield
class CompatSyncFIFO(NativeSyncFIFO):
def __init__(self, width, depth, fwft=True):
super().__init__(width=width, depth=depth, fwft=fwft)
class CompatSyncFIFOBuffered(NativeSyncFIFOBuffered):
def __init__(self, width, depth):
super().__init__(width=width, depth=depth)
class CompatAsyncFIFO(NativeAsyncFIFO):
def __init__(self, width, depth):
super().__init__(width=width, depth=depth)
class CompatAsyncFIFOBuffered(NativeAsyncFIFOBuffered):
def __init__(self, width, depth):
super().__init__(width=width, depth=depth)
_FIFOInterface = CompatFIFOInterface
SyncFIFO = CompatSyncFIFO
SyncFIFOBuffered = CompatSyncFIFOBuffered
AsyncFIFO = CompatAsyncFIFO
AsyncFIFOBuffered = CompatAsyncFIFOBuffered

View file

@ -1,193 +0,0 @@
from collections import OrderedDict
from ..._utils import deprecated, _ignore_deprecated
from ...hdl.xfrm import ValueTransformer, StatementTransformer
from ...hdl.ast import *
from ...hdl.ast import Signal as NativeSignal
from ..fhdl.module import CompatModule, CompatFinalizeError
from ..fhdl.structure import Signal, If, Case
__all__ = ["AnonymousState", "NextState", "NextValue", "FSM"]
class AnonymousState:
pass
class NextState(Statement):
def __init__(self, state):
super().__init__()
self.state = state
class NextValue(Statement):
def __init__(self, target, value):
super().__init__()
self.target = target
self.value = value
def _target_eq(a, b):
if type(a) != type(b):
return False
ty = type(a)
if ty == Const:
return a.value == b.value
elif ty == NativeSignal or ty == Signal:
return a is b
elif ty == Cat:
return all(_target_eq(x, y) for x, y in zip(a.l, b.l))
elif ty == Slice:
return (_target_eq(a.value, b.value)
and a.start == b.start
and a.stop == b.stop)
elif ty == Part:
return (_target_eq(a.value, b.value)
and _target_eq(a.offset == b.offset)
and a.width == b.width)
elif ty == ArrayProxy:
return (all(_target_eq(x, y) for x, y in zip(a.choices, b.choices))
and _target_eq(a.key, b.key))
else:
raise ValueError("NextValue cannot be used with target type '{}'"
.format(ty))
class _LowerNext(ValueTransformer, StatementTransformer):
def __init__(self, next_state_signal, encoding, aliases):
self.next_state_signal = next_state_signal
self.encoding = encoding
self.aliases = aliases
# (target, next_value_ce, next_value)
self.registers = []
def _get_register_control(self, target):
for x in self.registers:
if _target_eq(target, x[0]):
return x[1], x[2]
raise KeyError
def on_unknown_statement(self, node):
if isinstance(node, NextState):
try:
actual_state = self.aliases[node.state]
except KeyError:
actual_state = node.state
return self.next_state_signal.eq(self.encoding[actual_state])
elif isinstance(node, NextValue):
try:
next_value_ce, next_value = self._get_register_control(node.target)
except KeyError:
related = node.target if isinstance(node.target, Signal) else None
next_value = Signal(node.target.shape(),
name=None if related is None else f"{related.name}_fsm_next")
next_value_ce = Signal(
name=None if related is None else f"{related.name}_fsm_next_ce")
self.registers.append((node.target, next_value_ce, next_value))
return next_value.eq(node.value), next_value_ce.eq(1)
else:
return node
@deprecated("instead of `migen.genlib.fsm.FSM()`, use `with m.FSM():`; note that there is no "
"replacement for `{before,after}_{entering,leaving}` and `delayed_enter` methods")
class FSM(CompatModule):
def __init__(self, reset_state=None):
self.actions = OrderedDict()
self.state_aliases = dict()
self.reset_state = reset_state
self.before_entering_signals = OrderedDict()
self.before_leaving_signals = OrderedDict()
self.after_entering_signals = OrderedDict()
self.after_leaving_signals = OrderedDict()
def act(self, state, *statements):
if self.finalized:
raise CompatFinalizeError
if self.reset_state is None:
self.reset_state = state
if state not in self.actions:
self.actions[state] = []
self.actions[state] += statements
def delayed_enter(self, name, target, delay):
if self.finalized:
raise CompatFinalizeError
if delay > 0:
state = name
for i in range(delay):
if i == delay - 1:
next_state = target
else:
next_state = AnonymousState()
self.act(state, NextState(next_state))
state = next_state
else:
self.state_aliases[name] = target
def ongoing(self, state):
is_ongoing = Signal()
self.act(state, is_ongoing.eq(1))
return is_ongoing
def _get_signal(self, d, state):
if state not in self.actions:
self.actions[state] = []
try:
return d[state]
except KeyError:
is_el = Signal()
d[state] = is_el
return is_el
def before_entering(self, state):
return self._get_signal(self.before_entering_signals, state)
def before_leaving(self, state):
return self._get_signal(self.before_leaving_signals, state)
def after_entering(self, state):
signal = self._get_signal(self.after_entering_signals, state)
self.sync += signal.eq(self.before_entering(state))
return signal
def after_leaving(self, state):
signal = self._get_signal(self.after_leaving_signals, state)
self.sync += signal.eq(self.before_leaving(state))
return signal
@_ignore_deprecated
def do_finalize(self):
nstates = len(self.actions)
self.encoding = {s: n for n, s in enumerate(self.actions.keys())}
self.decoding = {n: s for s, n in self.encoding.items()}
decoder = lambda n: f"{self.decoding[n]}/{n}"
self.state = Signal(range(nstates), reset=self.encoding[self.reset_state], decoder=decoder)
self.next_state = Signal.like(self.state)
for state, signal in self.before_leaving_signals.items():
encoded = self.encoding[state]
self.comb += signal.eq((self.state == encoded) & ~(self.next_state == encoded))
if self.reset_state in self.after_entering_signals:
self.after_entering_signals[self.reset_state].reset = 1
for state, signal in self.before_entering_signals.items():
encoded = self.encoding[state]
self.comb += signal.eq(~(self.state == encoded) & (self.next_state == encoded))
self._finalize_sync(self._lower_controls())
def _lower_controls(self):
return _LowerNext(self.next_state, self.encoding, self.state_aliases)
def _finalize_sync(self, ls):
cases = {self.encoding[k]: ls.on_statement(v) for k, v in self.actions.items() if v}
self.comb += [
self.next_state.eq(self.state),
Case(self.state, cases).makedefault(self.encoding[self.reset_state])
]
self.sync += self.state.eq(self.next_state)
for register, next_value_ce, next_value in ls.registers:
self.sync += If(next_value_ce, register.eq(next_value))

View file

@ -1,195 +0,0 @@
from ...tracer import *
from ..fhdl.structure import *
from functools import reduce
from operator import or_
(DIR_NONE, DIR_S_TO_M, DIR_M_TO_S) = range(3)
# Possible layout elements:
# 1. (name, size)
# 2. (name, size, direction)
# 3. (name, sublayout)
# size can be an int, or a (int, bool) tuple for signed numbers
# sublayout must be a list
def set_layout_parameters(layout, **layout_dict):
def resolve(p):
if isinstance(p, str):
try:
return layout_dict[p]
except KeyError:
return p
else:
return p
r = []
for f in layout:
if isinstance(f[1], (int, tuple, str)): # cases 1/2
if len(f) == 3:
r.append((f[0], resolve(f[1]), f[2]))
else:
r.append((f[0], resolve(f[1])))
elif isinstance(f[1], list): # case 3
r.append((f[0], set_layout_parameters(f[1], **layout_dict)))
else:
raise TypeError
return r
def layout_len(layout):
r = 0
for f in layout:
if isinstance(f[1], (int, tuple)): # cases 1/2
if len(f) == 3:
fname, fsize, fdirection = f
else:
fname, fsize = f
elif isinstance(f[1], list): # case 3
fname, fsublayout = f
fsize = layout_len(fsublayout)
else:
raise TypeError
if isinstance(fsize, tuple):
r += fsize[0]
else:
r += fsize
return r
def layout_get(layout, name):
for f in layout:
if f[0] == name:
return f
raise KeyError(name)
def layout_partial(layout, *elements):
r = []
for path in elements:
path_s = path.split("/")
last = path_s.pop()
copy_ref = layout
insert_ref = r
for hop in path_s:
name, copy_ref = layout_get(copy_ref, hop)
try:
name, insert_ref = layout_get(insert_ref, hop)
except KeyError:
new_insert_ref = []
insert_ref.append((hop, new_insert_ref))
insert_ref = new_insert_ref
insert_ref.append(layout_get(copy_ref, last))
return r
class Record:
def __init__(self, layout, name=None, **kwargs):
try:
self.name = get_var_name()
except NameNotFound:
self.name = ""
self.layout = layout
if self.name:
prefix = self.name + "_"
else:
prefix = ""
for f in self.layout:
if isinstance(f[1], (int, tuple)): # cases 1/2
if(len(f) == 3):
fname, fsize, fdirection = f
else:
fname, fsize = f
finst = Signal(fsize, name=prefix + fname, **kwargs)
elif isinstance(f[1], list): # case 3
fname, fsublayout = f
finst = Record(fsublayout, prefix + fname, **kwargs)
else:
raise TypeError
setattr(self, fname, finst)
def eq(self, other):
return [getattr(self, f[0]).eq(getattr(other, f[0]))
for f in self.layout if hasattr(other, f[0])]
def iter_flat(self):
for f in self.layout:
e = getattr(self, f[0])
if isinstance(e, Signal):
if len(f) == 3:
yield e, f[2]
else:
yield e, DIR_NONE
elif isinstance(e, Record):
yield from e.iter_flat()
else:
raise TypeError
def flatten(self):
return [signal for signal, direction in self.iter_flat()]
def raw_bits(self):
return Cat(*self.flatten())
def connect(self, *slaves, keep=None, omit=None):
if keep is None:
_keep = {f[0] for f in self.layout}
elif isinstance(keep, list):
_keep = set(keep)
else:
_keep = keep
if omit is None:
_omit = set()
elif isinstance(omit, list):
_omit = set(omit)
else:
_omit = omit
_keep = _keep - _omit
r = []
for f in self.layout:
field = f[0]
self_e = getattr(self, field)
if isinstance(self_e, Signal):
if field in _keep:
direction = f[2]
if direction == DIR_M_TO_S:
r += [getattr(slave, field).eq(self_e) for slave in slaves]
elif direction == DIR_S_TO_M:
r.append(self_e.eq(reduce(or_, [getattr(slave, field) for slave in slaves])))
else:
raise TypeError
else:
for slave in slaves:
r += self_e.connect(getattr(slave, field), keep=keep, omit=omit)
return r
def connect_flat(self, *slaves):
r = []
iter_slaves = [slave.iter_flat() for slave in slaves]
for m_signal, m_direction in self.iter_flat():
if m_direction == DIR_M_TO_S:
for iter_slave in iter_slaves:
s_signal, s_direction = next(iter_slave)
assert(s_direction == DIR_M_TO_S)
r.append(s_signal.eq(m_signal))
elif m_direction == DIR_S_TO_M:
s_signals = []
for iter_slave in iter_slaves:
s_signal, s_direction = next(iter_slave)
assert(s_direction == DIR_S_TO_M)
s_signals.append(s_signal)
r.append(m_signal.eq(reduce(or_, s_signals)))
else:
raise TypeError
return r
def __len__(self):
return layout_len(self.layout)
def __repr__(self):
return "<Record " + ":".join(f[0] for f in self.layout) + " at " + hex(id(self)) + ">"

View file

@ -1,16 +0,0 @@
from ..._utils import deprecated
from ...lib.cdc import ResetSynchronizer as NativeResetSynchronizer
__all__ = ["AsyncResetSynchronizer"]
@deprecated("instead of `migen.genlib.resetsync.AsyncResetSynchronizer`, "
"use `amaranth.lib.cdc.ResetSynchronizer`; note that ResetSynchronizer accepts "
"a clock domain name as an argument, not a clock domain object")
class CompatResetSynchronizer(NativeResetSynchronizer):
def __init__(self, cd, async_reset):
super().__init__(async_reset, domain=cd.name)
AsyncResetSynchronizer = CompatResetSynchronizer

View file

@ -1,58 +0,0 @@
import warnings
from ..fhdl.structure import Signal, If, Case
from ..fhdl.module import CompatModule
__all__ = ["RoundRobin", "SP_WITHDRAW", "SP_CE"]
(SP_WITHDRAW, SP_CE) = range(2)
class CompatRoundRobin(CompatModule):
def __init__(self, n, switch_policy=SP_WITHDRAW):
self.request = Signal(n)
self.grant = Signal(max=max(2, n))
self.switch_policy = switch_policy
if self.switch_policy == SP_CE:
warnings.warn("instead of `migen.genlib.roundrobin.RoundRobin`, "
"use `amaranth.lib.scheduler.RoundRobin`; note that RoundRobin does not "
"require a policy anymore but to get the same behavior as SP_CE you"
"should use an EnableInserter",
DeprecationWarning, stacklevel=1)
self.ce = Signal()
else:
warnings.warn("instead of `migen.genlib.roundrobin.RoundRobin`, "
"use `amaranth.lib.scheduler.RoundRobin`; note that RoundRobin does not "
"require a policy anymore",
DeprecationWarning, stacklevel=1)
###
if n > 1:
cases = {}
for i in range(n):
switch = []
for j in reversed(range(i+1, i+n)):
t = j % n
switch = [
If(self.request[t],
self.grant.eq(t)
).Else(
*switch
)
]
if self.switch_policy == SP_WITHDRAW:
case = [If(~self.request[i], *switch)]
else:
case = switch
cases[i] = case
statement = Case(self.grant, cases)
if self.switch_policy == SP_CE:
statement = If(self.ce, statement)
self.sync += statement
else:
self.comb += self.grant.eq(0)
RoundRobin = CompatRoundRobin

View file

@ -1,54 +0,0 @@
import functools
import inspect
from collections.abc import Iterable
from ...hdl.cd import ClockDomain
from ...hdl.ir import Fragment
from ...sim import *
__all__ = ["run_simulation", "passive"]
def run_simulation(fragment_or_module, generators, clocks={"sync": 10}, vcd_name=None,
special_overrides={}):
assert not special_overrides
if hasattr(fragment_or_module, "get_fragment"):
fragment = fragment_or_module.get_fragment()
else:
fragment = fragment_or_module
fragment = Fragment.get(fragment, platform=None)
if not isinstance(generators, dict):
generators = {"sync": generators}
if "sync" not in fragment.domains:
fragment.add_domains(ClockDomain("sync"))
sim = Simulator(fragment)
for domain, period in clocks.items():
sim.add_clock(period / 1e9, domain=domain)
for domain, processes in generators.items():
def wrap(process):
def wrapper():
yield from process
return wrapper
if isinstance(processes, Iterable) and not inspect.isgenerator(processes):
for process in processes:
sim.add_sync_process(wrap(process), domain=domain)
else:
sim.add_sync_process(wrap(processes), domain=domain)
if vcd_name is not None:
with sim.write_vcd(vcd_name):
sim.run()
else:
sim.run()
def passive(generator):
@functools.wraps(generator)
def wrapper(*args, **kwargs):
yield Passive()
yield from generator(*args, **kwargs)
return wrapper

View file

@ -1060,7 +1060,7 @@ class _SignalMeta(ABCMeta):
return signal
# @final
@final
class Signal(Value, DUID, metaclass=_SignalMeta):
"""A varying integer value.
@ -1213,9 +1213,8 @@ class Signal(Value, DUID, metaclass=_SignalMeta):
return str(value)
self._decoder = enum_decoder
# Not a @classmethod because amaranth.compat requires it.
@staticmethod
def like(other, *, name=None, name_suffix=None, src_loc_at=0, **kwargs):
@classmethod
def like(cls, other, *, name=None, name_suffix=None, src_loc_at=0, **kwargs):
"""Create Signal based on another.
Parameters
@ -1238,7 +1237,7 @@ class Signal(Value, DUID, metaclass=_SignalMeta):
kw.update(reset=other.reset, reset_less=other.reset_less,
attrs=other.attrs, decoder=other.decoder)
kw.update(kwargs)
return Signal(**kw, src_loc_at=1 + src_loc_at)
return cls(**kw, src_loc_at=1 + src_loc_at)
def shape(self):
return Shape(self.width, self.signed)
@ -1655,7 +1654,7 @@ class Cover(Property):
_kind = "cover"
# @final
@final
class Switch(Statement):
def __init__(self, test, cases, *, src_loc=None, src_loc_at=0, case_src_locs={}):
if src_loc is None:

View file

@ -474,7 +474,7 @@ class Module(_ModuleBuilderRoot, Elaboratable):
src_loc=src_loc, case_src_locs={fsm_encoding[name]: fsm_state_src_locs[name]
for name in fsm_states}))
def _add_statement(self, assigns, domain, depth, compat_mode=False):
def _add_statement(self, assigns, domain, depth):
def domain_name(domain):
if domain is None:
return "comb"
@ -485,7 +485,7 @@ class Module(_ModuleBuilderRoot, Elaboratable):
self._pop_ctrl()
for stmt in Statement.cast(assigns):
if not compat_mode and not isinstance(stmt, (Assign, Assert, Assume, Cover)):
if not isinstance(stmt, (Assign, Assert, Assume, Cover)):
raise SyntaxError(
"Only assignments and property checks may be appended to d.{}"
.format(domain_name(domain)))

View file

@ -81,8 +81,7 @@ class ValueVisitor(metaclass=ABCMeta):
new_value = self.on_AnyConst(value)
elif type(value) is AnySeq:
new_value = self.on_AnySeq(value)
elif isinstance(value, Signal):
# Uses `isinstance()` and not `type() is` because amaranth.compat requires it.
elif type(value) is Signal:
new_value = self.on_Signal(value)
elif type(value) is ClockSignal:
new_value = self.on_ClockSignal(value)
@ -190,8 +189,7 @@ class StatementVisitor(metaclass=ABCMeta):
new_stmt = self.on_Assume(stmt)
elif type(stmt) is Cover:
new_stmt = self.on_Cover(stmt)
elif isinstance(stmt, Switch):
# Uses `isinstance()` and not `type() is` because amaranth.compat requires it.
elif type(stmt) is Switch:
new_stmt = self.on_Switch(stmt)
elif isinstance(stmt, Iterable):
new_stmt = self.on_statements(stmt)

View file

@ -7,6 +7,9 @@ This document describes changes to the public interfaces in the Amaranth languag
Version 0.5 (unreleased)
========================
The Migen compatibility layer has been removed.
Language changes
----------------

View file

@ -1,16 +0,0 @@
from amaranth.compat import *
from amaranth.compat.fhdl import verilog
from amaranth._utils import _ignore_deprecated
class SimCase:
def setUp(self, *args, **kwargs):
with _ignore_deprecated():
self.tb = self.TestBench(*args, **kwargs)
def test_to_verilog(self):
verilog.convert(self.tb)
def run_with(self, generator):
with _ignore_deprecated():
run_simulation(self.tb, generator)

View file

@ -1,116 +0,0 @@
# amaranth: UnusedElaboratable=no
import unittest
from amaranth.compat import *
from amaranth.compat.genlib.coding import *
from .support import SimCase
class EncCase(SimCase, unittest.TestCase):
class TestBench(Module):
def __init__(self):
self.submodules.dut = Encoder(8)
def test_sizes(self):
self.assertEqual(len(self.tb.dut.i), 8)
self.assertEqual(len(self.tb.dut.o), 3)
self.assertEqual(len(self.tb.dut.n), 1)
def test_run_sequence(self):
seq = list(range(1<<8))
def gen():
for _ in range(256):
if seq:
yield self.tb.dut.i.eq(seq.pop(0))
yield
if (yield self.tb.dut.n):
self.assertNotIn((yield self.tb.dut.i), [1<<i for i in range(8)])
else:
self.assertEqual((yield self.tb.dut.i), 1<<(yield self.tb.dut.o))
self.run_with(gen())
class PrioEncCase(SimCase, unittest.TestCase):
class TestBench(Module):
def __init__(self):
self.submodules.dut = PriorityEncoder(8)
def test_sizes(self):
self.assertEqual(len(self.tb.dut.i), 8)
self.assertEqual(len(self.tb.dut.o), 3)
self.assertEqual(len(self.tb.dut.n), 1)
def test_run_sequence(self):
seq = list(range(1<<8))
def gen():
for _ in range(256):
if seq:
yield self.tb.dut.i.eq(seq.pop(0))
yield
i = yield self.tb.dut.i
if (yield self.tb.dut.n):
self.assertEqual(i, 0)
else:
o = yield self.tb.dut.o
if o > 0:
self.assertEqual(i & 1<<(o - 1), 0)
self.assertGreaterEqual(i, 1<<o)
self.run_with(gen())
class DecCase(SimCase, unittest.TestCase):
class TestBench(Module):
def __init__(self):
self.submodules.dut = Decoder(8)
def test_sizes(self):
self.assertEqual(len(self.tb.dut.i), 3)
self.assertEqual(len(self.tb.dut.o), 8)
self.assertEqual(len(self.tb.dut.n), 1)
def test_run_sequence(self):
seq = list(range(8*2))
def gen():
for _ in range(256):
if seq:
i = seq.pop()
yield self.tb.dut.i.eq(i//2)
yield self.tb.dut.n.eq(i%2)
yield
i = yield self.tb.dut.i
o = yield self.tb.dut.o
if (yield self.tb.dut.n):
self.assertEqual(o, 0)
else:
self.assertEqual(o, 1<<i)
self.run_with(gen())
class SmallPrioEncCase(SimCase, unittest.TestCase):
class TestBench(Module):
def __init__(self):
self.submodules.dut = PriorityEncoder(1)
def test_sizes(self):
self.assertEqual(len(self.tb.dut.i), 1)
self.assertEqual(len(self.tb.dut.o), 1)
self.assertEqual(len(self.tb.dut.n), 1)
def test_run_sequence(self):
seq = list(range(1))
def gen():
for _ in range(5):
if seq:
yield self.tb.dut.i.eq(seq.pop(0))
yield
i = yield self.tb.dut.i
if (yield self.tb.dut.n):
self.assertEqual(i, 0)
else:
o = yield self.tb.dut.o
if o > 0:
self.assertEqual(i & 1<<(o - 1), 0)
self.assertGreaterEqual(i, 1<<o)
self.run_with(gen())

View file

@ -1,30 +0,0 @@
import unittest
from amaranth.compat import *
from .support import SimCase
class ConstantCase(SimCase, unittest.TestCase):
class TestBench(Module):
def __init__(self):
self.sigs = [
(Signal(3), Constant(0), 0),
(Signal(3), Constant(5), 5),
(Signal(3), Constant(1, 2), 1),
(Signal(3), Constant(-1, 7), 7),
(Signal(3), Constant(0b10101)[:3], 0b101),
(Signal(3), Constant(0b10101)[1:4], 0b10),
(Signal(4), Constant(0b1100)[::-1], 0b0011),
]
self.comb += [a.eq(b) for a, b, c in self.sigs]
def test_comparisons(self):
def gen():
for s, l, v in self.tb.sigs:
s = yield s
self.assertEqual(
s, int(v),
"got {}, want {} from literal {}".format(
s, v, l))
self.run_with(gen())

View file

@ -1,38 +0,0 @@
import unittest
from itertools import count
from amaranth.compat import *
from amaranth.compat.genlib.fifo import SyncFIFO
from .support import SimCase
class SyncFIFOCase(SimCase, unittest.TestCase):
class TestBench(Module):
def __init__(self):
self.submodules.dut = SyncFIFO(64, 2)
self.sync += [
If(self.dut.we & self.dut.writable,
self.dut.din[:32].eq(self.dut.din[:32] + 1),
self.dut.din[32:].eq(self.dut.din[32:] + 2)
)
]
def test_run_sequence(self):
seq = list(range(20))
def gen():
for cycle in count():
# fire re and we at "random"
yield self.tb.dut.we.eq(cycle % 2 == 0)
yield self.tb.dut.re.eq(cycle % 3 == 0)
# the output if valid must be correct
if (yield self.tb.dut.readable) and (yield self.tb.dut.re):
try:
i = seq.pop(0)
except IndexError:
break
self.assertEqual((yield self.tb.dut.dout[:32]), i)
self.assertEqual((yield self.tb.dut.dout[32:]), i*2)
yield
self.run_with(gen())

View file

@ -1,86 +0,0 @@
import unittest
from amaranth.compat import *
from amaranth.compat.genlib.fsm import FSM
from .support import SimCase
class FSMCase(SimCase, unittest.TestCase):
class TestBench(Module):
def __init__(self):
self.ctrl = Signal()
self.data = Signal()
self.status = Signal(8)
self.submodules.dut = FSM()
self.dut.act("IDLE",
If(self.ctrl,
NextState("START")
)
)
self.dut.act("START",
If(self.data,
NextState("SET-STATUS-LOW")
).Else(
NextState("SET-STATUS")
)
)
self.dut.act("SET-STATUS",
NextValue(self.status, 0xaa),
NextState("IDLE")
)
self.dut.act("SET-STATUS-LOW",
NextValue(self.status[:4], 0xb),
NextState("IDLE")
)
def assertState(self, fsm, state):
self.assertEqual(fsm.decoding[(yield fsm.state)], state)
def test_next_state(self):
def gen():
yield from self.assertState(self.tb.dut, "IDLE")
yield
yield from self.assertState(self.tb.dut, "IDLE")
yield self.tb.ctrl.eq(1)
yield
yield from self.assertState(self.tb.dut, "IDLE")
yield self.tb.ctrl.eq(0)
yield
yield from self.assertState(self.tb.dut, "START")
yield
yield from self.assertState(self.tb.dut, "SET-STATUS")
yield self.tb.ctrl.eq(1)
yield
yield from self.assertState(self.tb.dut, "IDLE")
yield self.tb.ctrl.eq(0)
yield self.tb.data.eq(1)
yield
yield from self.assertState(self.tb.dut, "START")
yield self.tb.data.eq(0)
yield
yield from self.assertState(self.tb.dut, "SET-STATUS-LOW")
self.run_with(gen())
def test_next_value(self):
def gen():
self.assertEqual((yield self.tb.status), 0x00)
yield self.tb.ctrl.eq(1)
yield
yield self.tb.ctrl.eq(0)
yield
yield
yield from self.assertState(self.tb.dut, "SET-STATUS")
yield self.tb.ctrl.eq(1)
yield
self.assertEqual((yield self.tb.status), 0xaa)
yield self.tb.ctrl.eq(0)
yield self.tb.data.eq(1)
yield
yield self.tb.data.eq(0)
yield
yield from self.assertState(self.tb.dut, "SET-STATUS-LOW")
yield
self.assertEqual((yield self.tb.status), 0xab)
self.run_with(gen())

View file

@ -1,23 +0,0 @@
import unittest
from amaranth.compat import *
class PassiveCase(unittest.TestCase):
def test_terminates_correctly(self):
n = 5
count = 0
@passive
def counter():
nonlocal count
while True:
yield
count += 1
def terminator():
for i in range(n):
yield
run_simulation(Module(), [counter(), terminator()])
self.assertEqual(count, n)

View file

@ -1,28 +0,0 @@
import unittest
from amaranth import Signal, Module, Elaboratable
from .support import SimCase
class RunSimulation(SimCase, unittest.TestCase):
""" test for https://github.com/amaranth-lang/amaranth/issues/344 """
class TestBench(Elaboratable):
def __init__(self):
self.a = Signal()
def elaborate(self, platform):
m = Module()
m.d.sync += self.a.eq(~self.a)
return m
def test_run_simulation(self):
def gen():
yield
for i in range(10):
yield
a = (yield self.tb.a)
self.assertEqual(a, i % 2)
self.run_with(gen())

View file

@ -1,43 +0,0 @@
import unittest
from amaranth.compat import *
from .support import SimCase
class SignedCase(SimCase, unittest.TestCase):
class TestBench(Module):
def __init__(self):
self.a = Signal((3, True))
self.b = Signal((4, True))
comps = [
lambda p, q: p > q,
lambda p, q: p >= q,
lambda p, q: p < q,
lambda p, q: p <= q,
lambda p, q: p == q,
lambda p, q: p != q,
]
self.vals = []
for asign in 1, -1:
for bsign in 1, -1:
for f in comps:
r = Signal()
r0 = f(asign*self.a, bsign*self.b)
self.comb += r.eq(r0)
self.vals.append((asign, bsign, f, r, r0.op))
def test_comparisons(self):
def gen():
for i in range(-4, 4):
yield self.tb.a.eq(i)
yield self.tb.b.eq(i)
yield
a = yield self.tb.a
b = yield self.tb.b
for asign, bsign, f, r, op in self.tb.vals:
r, r0 = (yield r), f(asign*a, bsign*b)
self.assertEqual(r, int(r0),
"got {}, want {}*{} {} {}*{} = {}".format(
r, asign, a, op, bsign, b, r0))
self.run_with(gen())

View file

@ -1,21 +0,0 @@
import unittest
from amaranth._utils import _ignore_deprecated
from amaranth.compat import *
def _same_slices(a, b):
return a.value is b.value and a.start == b.start and a.stop == b.stop
class SignalSizeCase(unittest.TestCase):
def setUp(self):
self.i = C(0xaa)
self.j = C(-127)
with _ignore_deprecated():
self.s = Signal((13, True))
def test_len(self):
self.assertEqual(len(self.s), 13)
self.assertEqual(len(self.i), 8)
self.assertEqual(len(self.j), 8)

View file

@ -1,10 +0,0 @@
from amaranth.hdl.ir import Fragment
from amaranth.compat import *
from .utils import *
class CompatTestCase(FHDLTestCase):
def test_fragment_get(self):
m = Module()
f = Fragment.get(m, platform=None)