lib.io: Implement *Port from RFC 55.

This commit is contained in:
Wanda 2024-03-18 22:00:07 +01:00 committed by Catherine
parent 744576011f
commit 598cf8db28
4 changed files with 421 additions and 26 deletions

View file

@ -14,17 +14,6 @@ class ResourceError(Exception):
pass
class SingleEndedPort:
def __init__(self, io):
self.io = io
class DifferentialPort:
def __init__(self, p, n):
self.p = p
self.n = n
class PortGroup:
pass
@ -138,26 +127,23 @@ class ResourceManager:
elif isinstance(resource.ios[0], (Pins, DiffPairs)):
phys = resource.ios[0]
# The flow is `In` below regardless of requested pin direction. The flow should
# never be used as it's not used internally and anyone using `dir="-"` should
# ignore it as well.
if phys.dir == "oe":
direction = "o"
else:
direction = phys.dir
if isinstance(phys, Pins):
phys_names = phys.names
io = IOPort(len(phys), name="__".join(path) + "__io")
port = SingleEndedPort(io)
port = SingleEndedPort(io, invert=phys.invert, direction=direction)
if isinstance(phys, DiffPairs):
phys_names = []
p = IOPort(len(phys), name="__".join(path) + "__p")
n = IOPort(len(phys), name="__".join(path) + "__n")
if not self.should_skip_port_component(None, attrs, "p"):
p = IOPort(len(phys), name="__".join(path) + "__p")
phys_names += phys.p.names
else:
p = None
if not self.should_skip_port_component(None, attrs, "n"):
n = IOPort(len(phys), name="__".join(path) + "__n")
phys_names += phys.n.names
else:
n = None
port = DifferentialPort(p, n)
port = DifferentialPort(p, n, invert=phys.invert, direction=direction)
if dir == "-":
pin = None
else:

View file

@ -1,10 +1,256 @@
from .. import *
import enum
from collections.abc import Iterable
from ..hdl import *
from ..lib import wiring
from ..lib.wiring import In, Out
from .. import tracer
__all__ = ["Pin"]
__all__ = ["Direction", "SingleEndedPort", "DifferentialPort", "Pin"]
class Direction(enum.Enum):
"""Represents a direction of an I/O port, or of an I/O buffer."""
#: Input direction (from world to Amaranth design)
Input = "i"
#: Output direction (from Amaranth design to world)
Output = "o"
#: Bidirectional (can be switched between input and output)
Bidir = "io"
def __or__(self, other):
if not isinstance(other, Direction):
return NotImplemented
if self == other:
return self
else:
return Direction.Bidir
def __and__(self, other):
if not isinstance(other, Direction):
return NotImplemented
if self == other:
return self
elif self is Direction.Bidir:
return other
elif other is Direction.Bidir:
return self
else:
raise ValueError("Cannot combine input port with output port")
class SingleEndedPort:
"""Represents a single-ended I/O port with optional inversion.
Parameters
----------
io : IOValue
The raw I/O value being wrapped.
invert : bool or iterable of bool
If true, the electrical state of the physical pin will be opposite from the Amaranth value
(the ``*Buffer`` classes will insert inverters on ``o`` and ``i`` pins, as appropriate).
This can be used for various purposes:
- Normalizing active-low pins (such as ``CS_B``) to be active-high in Amaranth code
- Compensating for boards where an inverting level-shifter (or similar circuitry) was used
on the pin
If the value is a simple :class:`bool`, it is used for all bits of this port. If the value
is an iterable of :class:`bool`, the iterable must have the same length as ``io``, and
the inversion is specified per-bit.
direction : Direction or str
Represents the allowed directions of this port. If equal to :attr:`Direction.Input` or
:attr:`Direction.Output`, this port can only be used with buffers of matching direction.
If equal to :attr:`Direction.Bidir`, this port can be used with buffers of any direction.
If a string is passed, it is cast to :class:`Direction`.
"""
def __init__(self, io, *, invert=False, direction=Direction.Bidir):
self._io = IOValue.cast(io)
if isinstance(invert, bool):
self._invert = (invert,) * len(self._io)
elif isinstance(invert, Iterable):
self._invert = tuple(invert)
if len(self._invert) != len(self._io):
raise ValueError(f"Length of 'invert' ({len(self._invert)}) doesn't match "
f"length of 'io' ({len(self._io)})")
if not all(isinstance(item, bool) for item in self._invert):
raise TypeError(f"'invert' must be a bool or iterable of bool, not {invert!r}")
else:
raise TypeError(f"'invert' must be a bool or iterable of bool, not {invert!r}")
self._direction = Direction(direction)
@property
def io(self):
"""The ``io`` argument passed to the constructor."""
return self._io
@property
def invert(self):
"""The ``invert`` argument passed to the constructor, normalized to a :class:`tuple`
of :class:`bool`."""
return self._invert
@property
def direction(self):
"""The ``direction`` argument passed to the constructor, normalized to :class:`Direction`."""
return self._direction
def __len__(self):
"""Returns the width of this port in bits. Equal to :py:`len(self.io)`."""
return len(self._io)
def __invert__(self):
"""Returns a new :class:`SingleEndedPort` with the opposite value of ``invert``."""
return SingleEndedPort(self._io, invert=tuple(not inv for inv in self._invert),
direction=self._direction)
def __getitem__(self, index):
"""Slices the port, returning another :class:`SingleEndedPort` with a subset
of its bits.
The index can be a :class:`slice` or :class:`int`. If the index is
an :class:`int`, the result is a single-bit :class:`SingleEndedPort`."""
return SingleEndedPort(self._io[index], invert=self._invert[index],
direction=self._direction)
def __add__(self, other):
"""Concatenates two :class:`SingleEndedPort` objects together, returning a new
:class:`SingleEndedPort` object.
When the concatenated ports have different directions, the conflict is resolved as follows:
- If a bidirectional port is concatenated with an input port, the result is an input port.
- If a bidirectional port is concatenated with an output port, the result is an output port.
- If an input port is concatenated with an output port, :exc:`ValueError` is raised.
"""
if not isinstance(other, SingleEndedPort):
return NotImplemented
return SingleEndedPort(Cat(self._io, other._io), invert=self._invert + other._invert,
direction=self._direction | other._direction)
def __repr__(self):
if all(self._invert):
invert = True
elif not any(self._invert):
invert = False
else:
invert = self._invert
return f"SingleEndedPort({self._io!r}, invert={invert!r}, direction={self._direction})"
class DifferentialPort:
"""Represents a differential I/O port with optional inversion.
Parameters
----------
p : IOValue
The raw I/O value used as positive (true) half of the port.
n : IOValue
The raw I/O value used as negative (complemented) half of the port. Must have the same
length as ``p``.
invert : bool or iterable of bool
If true, the electrical state of the physical pin will be opposite from the Amaranth value
(the ``*Buffer`` classes will insert inverters on ``o`` and ``i`` pins, as appropriate).
This can be used for various purposes:
- Normalizing active-low pins (such as ``CS_B``) to be active-high in Amaranth code
- Compensating for boards where the P and N pins are swapped (e.g. for easier routing)
If the value is a simple :class:`bool`, it is used for all bits of this port. If the value
is an iterable of :class:`bool`, the iterable must have the same length as ``io``, and
the inversion is specified per-bit.
direction : Direction or str
Represents the allowed directions of this port. If equal to :attr:`Direction.Input` or
:attr:`Direction.Output`, this port can only be used with buffers of matching direction.
If equal to :attr:`Direction.Bidir`, this port can be used with buffers of any direction.
If a string is passed, it is cast to :class:`Direction`.
"""
def __init__(self, p, n, *, invert=False, direction=Direction.Bidir):
self._p = IOValue.cast(p)
self._n = IOValue.cast(n)
if len(self._p) != len(self._n):
raise ValueError(f"Length of 'p' ({len(self._p)}) doesn't match length of 'n' "
f"({len(self._n)})")
if isinstance(invert, bool):
self._invert = (invert,) * len(self._p)
elif isinstance(invert, Iterable):
self._invert = tuple(invert)
if len(self._invert) != len(self._p):
raise ValueError(f"Length of 'invert' ({len(self._invert)}) doesn't match "
f"length of 'p' ({len(self._p)})")
if not all(isinstance(item, bool) for item in self._invert):
raise TypeError(f"'invert' must be a bool or iterable of bool, not {invert!r}")
else:
raise TypeError(f"'invert' must be a bool or iterable of bool, not {invert!r}")
self._direction = Direction(direction)
@property
def p(self):
"""The ``p`` argument passed to the constructor."""
return self._p
@property
def n(self):
"""The ``n`` argument passed to the constructor."""
return self._n
@property
def invert(self):
"""The ``invert`` argument passed to the constructor, normalized to a :class:`tuple`
of :class:`bool`."""
return self._invert
@property
def direction(self):
"""The ``direction`` argument passed to the constructor, normalized to :class:`Direction`."""
return self._direction
def __len__(self):
"""Returns the width of this port in bits. Equal to :py:`len(self.p)` (and :py:`len(self.n)`)."""
return len(self._p)
def __invert__(self):
"""Returns a new :class:`DifferentialPort` with the opposite value of ``invert``."""
return DifferentialPort(self._p, self._n, invert=tuple(not inv for inv in self._invert),
direction=self._direction)
def __getitem__(self, index):
"""Slices the port, returning another :class:`DifferentialPort` with a subset
of its bits.
The index can be a :class:`slice` or :class:`int`. If the index is
an :class:`int`, the result is a single-bit :class:`DifferentialPort`."""
return DifferentialPort(self._p[index], self._n[index], invert=self._invert[index],
direction=self._direction)
def __add__(self, other):
"""Concatenates two :class:`DifferentialPort` objects together, returning a new
:class:`DifferentialPort` object.
When the concatenated ports have different directions, the conflict is resolved as follows:
- If a bidirectional port is concatenated with an input port, the result is an input port.
- If a bidirectional port is concatenated with an output port, the result is an output port.
- If an input port is concatenated with an output port, :exc:`ValueError` is raised.
"""
if not isinstance(other, DifferentialPort):
return NotImplemented
return DifferentialPort(Cat(self._p, other._p), Cat(self._n, other._n),
invert=self._invert + other._invert,
direction=self._direction | other._direction)
def __repr__(self):
if not any(self._invert):
invert = False
elif all(self._invert):
invert = True
else:
invert = self._invert
return f"DifferentialPort({self._p!r}, {self._n!r}, invert={invert!r}, direction={self._direction})"
class Pin(wiring.PureInterface):

View file

@ -50,6 +50,7 @@ Implemented RFCs
.. _RFC 46: https://amaranth-lang.org/rfcs/0046-shape-range-1.html
.. _RFC 50: https://amaranth-lang.org/rfcs/0050-print.html
.. _RFC 53: https://amaranth-lang.org/rfcs/0053-ioport.html
.. _RFC 55: https://amaranth-lang.org/rfcs/0055-lib-io.html
* `RFC 17`_: Remove ``log2_int``
* `RFC 27`_: Testbench processes for the simulator
@ -93,6 +94,7 @@ Standard library changes
.. currentmodule:: amaranth.lib
* Added: :mod:`amaranth.lib.memory`. (`RFC 45`_)
* Added: :class:`amaranth.lib.io.SingleEndedPort`, :class:`amaranth.lib.io.DifferentialPort`. (`RFC 55`_)
* Removed: (deprecated in 0.4) :mod:`amaranth.lib.scheduler`. (`RFC 19`_)
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.FIFOInterface` with ``fwft=False``. (`RFC 20`_)
* Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.SyncFIFO` with ``fwft=False``. (`RFC 20`_)

View file

@ -1,5 +1,3 @@
import warnings
from amaranth.hdl import *
from amaranth.sim import *
from amaranth.lib.io import *
@ -8,6 +6,169 @@ from amaranth.lib.wiring import *
from .utils import *
class DirectionTestCase(FHDLTestCase):
def test_or(self):
self.assertIs(Direction.Input | Direction.Input, Direction.Input)
self.assertIs(Direction.Input | Direction.Output, Direction.Bidir)
self.assertIs(Direction.Input | Direction.Bidir, Direction.Bidir)
self.assertIs(Direction.Output | Direction.Input, Direction.Bidir)
self.assertIs(Direction.Output | Direction.Output, Direction.Output)
self.assertIs(Direction.Output | Direction.Bidir, Direction.Bidir)
self.assertIs(Direction.Bidir | Direction.Input, Direction.Bidir)
self.assertIs(Direction.Bidir | Direction.Output, Direction.Bidir)
self.assertIs(Direction.Bidir | Direction.Bidir, Direction.Bidir)
with self.assertRaises(TypeError):
Direction.Bidir | 3
def test_and(self):
self.assertIs(Direction.Input & Direction.Input, Direction.Input)
self.assertIs(Direction.Input & Direction.Bidir, Direction.Input)
self.assertIs(Direction.Output & Direction.Output, Direction.Output)
self.assertIs(Direction.Output & Direction.Bidir, Direction.Output)
self.assertIs(Direction.Bidir & Direction.Input, Direction.Input)
self.assertIs(Direction.Bidir & Direction.Output, Direction.Output)
self.assertIs(Direction.Bidir & Direction.Bidir, Direction.Bidir)
with self.assertRaisesRegex(ValueError,
r"Cannot combine input port with output port"):
Direction.Output & Direction.Input
with self.assertRaisesRegex(ValueError,
r"Cannot combine input port with output port"):
Direction.Input & Direction.Output
with self.assertRaises(TypeError):
Direction.Bidir & 3
class SingleEndedPortTestCase(FHDLTestCase):
def test_construct(self):
io = IOPort(4)
port = SingleEndedPort(io)
self.assertIs(port.io, io)
self.assertEqual(port.invert, (False, False, False, False))
self.assertEqual(port.direction, Direction.Bidir)
self.assertEqual(len(port), 4)
self.assertRepr(port, "SingleEndedPort((io-port io), invert=False, direction=Direction.Bidir)")
port = SingleEndedPort(io, invert=True, direction='i')
self.assertEqual(port.invert, (True, True, True, True))
self.assertRepr(port, "SingleEndedPort((io-port io), invert=True, direction=Direction.Input)")
port = SingleEndedPort(io, invert=[True, False, True, False], direction=Direction.Output)
self.assertIsInstance(port.invert, tuple)
self.assertEqual(port.invert, (True, False, True, False))
self.assertRepr(port, "SingleEndedPort((io-port io), invert=(True, False, True, False), direction=Direction.Output)")
def test_construct_wrong(self):
io = IOPort(4)
sig = Signal(4)
with self.assertRaisesRegex(TypeError,
r"^Object \(sig sig\) cannot be converted to an IO value$"):
SingleEndedPort(sig)
with self.assertRaisesRegex(TypeError,
r"^'invert' must be a bool or iterable of bool, not 3$"):
SingleEndedPort(io, invert=3)
with self.assertRaisesRegex(TypeError,
r"^'invert' must be a bool or iterable of bool, not \[1, 2, 3, 4\]$"):
SingleEndedPort(io, invert=[1, 2, 3, 4])
with self.assertRaisesRegex(ValueError,
r"^Length of 'invert' \(5\) doesn't match length of 'io' \(4\)$"):
SingleEndedPort(io, invert=[False, False, False, False, False])
with self.assertRaisesRegex(ValueError,
r"^'bidir' is not a valid Direction$"):
SingleEndedPort(io, direction="bidir")
def test_slice(self):
io = IOPort(8)
port = SingleEndedPort(io, invert=(True, False, False, True, True, False, False, True), direction="o")
self.assertRepr(port[2:5], "SingleEndedPort((io-slice (io-port io) 2:5), invert=(False, True, True), direction=Direction.Output)")
self.assertRepr(port[7], "SingleEndedPort((io-slice (io-port io) 7:8), invert=True, direction=Direction.Output)")
def test_cat(self):
ioa = IOPort(3)
iob = IOPort(2)
porta = SingleEndedPort(ioa, direction=Direction.Input)
portb = SingleEndedPort(iob, invert=True, direction=Direction.Input)
cport = porta + portb
self.assertRepr(cport, "SingleEndedPort((io-cat (io-port ioa) (io-port iob)), invert=(False, False, False, True, True), direction=Direction.Input)")
with self.assertRaises(TypeError):
porta + iob
def test_invert(self):
io = IOPort(4)
port = SingleEndedPort(io, invert=[True, False, True, False], direction=Direction.Output)
iport = ~port
self.assertRepr(iport, "SingleEndedPort((io-port io), invert=(False, True, False, True), direction=Direction.Output)")
class DifferentialPortTestCase(FHDLTestCase):
def test_construct(self):
iop = IOPort(4)
ion = IOPort(4)
port = DifferentialPort(iop, ion)
self.assertIs(port.p, iop)
self.assertIs(port.n, ion)
self.assertEqual(port.invert, (False, False, False, False))
self.assertEqual(port.direction, Direction.Bidir)
self.assertEqual(len(port), 4)
self.assertRepr(port, "DifferentialPort((io-port iop), (io-port ion), invert=False, direction=Direction.Bidir)")
port = DifferentialPort(iop, ion, invert=True, direction='i')
self.assertEqual(port.invert, (True, True, True, True))
self.assertRepr(port, "DifferentialPort((io-port iop), (io-port ion), invert=True, direction=Direction.Input)")
port = DifferentialPort(iop, ion, invert=[True, False, True, False], direction=Direction.Output)
self.assertIsInstance(port.invert, tuple)
self.assertEqual(port.invert, (True, False, True, False))
self.assertRepr(port, "DifferentialPort((io-port iop), (io-port ion), invert=(True, False, True, False), direction=Direction.Output)")
def test_construct_wrong(self):
iop = IOPort(4)
ion = IOPort(4)
sig = Signal(4)
with self.assertRaisesRegex(TypeError,
r"^Object \(sig sig\) cannot be converted to an IO value$"):
DifferentialPort(iop, sig)
with self.assertRaisesRegex(TypeError,
r"^Object \(sig sig\) cannot be converted to an IO value$"):
DifferentialPort(sig, ion)
with self.assertRaisesRegex(ValueError,
r"^Length of 'p' \(4\) doesn't match length of 'n' \(3\)$"):
DifferentialPort(iop, ion[:3])
with self.assertRaisesRegex(TypeError,
r"^'invert' must be a bool or iterable of bool, not 3$"):
DifferentialPort(iop, ion, invert=3)
with self.assertRaisesRegex(TypeError,
r"^'invert' must be a bool or iterable of bool, not \[1, 2, 3, 4\]$"):
DifferentialPort(iop, ion, invert=[1, 2, 3, 4])
with self.assertRaisesRegex(ValueError,
r"^Length of 'invert' \(5\) doesn't match length of 'p' \(4\)$"):
DifferentialPort(iop, ion, invert=[False, False, False, False, False])
with self.assertRaisesRegex(ValueError,
r"^'bidir' is not a valid Direction$"):
DifferentialPort(iop, ion, direction="bidir")
def test_slice(self):
iop = IOPort(8)
ion = IOPort(8)
port = DifferentialPort(iop, ion, invert=(True, False, False, True, True, False, False, True), direction="o")
self.assertRepr(port[2:5], "DifferentialPort((io-slice (io-port iop) 2:5), (io-slice (io-port ion) 2:5), invert=(False, True, True), direction=Direction.Output)")
self.assertRepr(port[7], "DifferentialPort((io-slice (io-port iop) 7:8), (io-slice (io-port ion) 7:8), invert=True, direction=Direction.Output)")
def test_cat(self):
ioap = IOPort(3)
ioan = IOPort(3)
iobp = IOPort(2)
iobn = IOPort(2)
porta = DifferentialPort(ioap, ioan, direction=Direction.Input)
portb = DifferentialPort(iobp, iobn, invert=True, direction=Direction.Input)
cport = porta + portb
self.assertRepr(cport, "DifferentialPort((io-cat (io-port ioap) (io-port iobp)), (io-cat (io-port ioan) (io-port iobn)), invert=(False, False, False, True, True), direction=Direction.Input)")
with self.assertRaises(TypeError):
porta + SingleEndedPort(ioap)
def test_invert(self):
iop = IOPort(4)
ion = IOPort(4)
port = DifferentialPort(iop, ion, invert=[True, False, True, False], direction=Direction.Output)
iport = ~port
self.assertRepr(iport, "DifferentialPort((io-port iop), (io-port ion), invert=(False, True, False, True), direction=Direction.Output)")
class PinSignatureTestCase(FHDLTestCase):
def assertSignatureEqual(self, signature, expected):
self.assertEqual(signature.members, Signature(expected).members)