fhdl.ir: implement clock domain propagation.

This commit is contained in:
whitequark 2018-12-13 11:01:03 +00:00
parent fde2471963
commit 72257b6935
12 changed files with 324 additions and 46 deletions

View file

@ -1,5 +1,3 @@
from contextlib import contextmanager
from ..fhdl.ast import *
from ..fhdl.dsl import *
from .tools import *

View file

@ -1,4 +1,5 @@
from ..fhdl.ast import *
from ..fhdl.cd import *
from ..fhdl.ir import *
from .tools import *
@ -18,6 +19,7 @@ class FragmentPortsTestCase(FHDLTestCase):
self.c1.eq(self.s1),
self.s1.eq(self.c1)
)
ins, outs = f._propagate_ports(ports=())
self.assertEqual(ins, ValueSet())
self.assertEqual(outs, ValueSet())
@ -28,6 +30,7 @@ class FragmentPortsTestCase(FHDLTestCase):
f.add_statements(
self.c1.eq(self.s1)
)
ins, outs = f._propagate_ports(ports=())
self.assertEqual(ins, ValueSet((self.s1,)))
self.assertEqual(outs, ValueSet())
@ -38,6 +41,7 @@ class FragmentPortsTestCase(FHDLTestCase):
f.add_statements(
self.c1.eq(self.s1)
)
ins, outs = f._propagate_ports(ports=(self.c1,))
self.assertEqual(ins, ValueSet((self.s1,)))
self.assertEqual(outs, ValueSet((self.c1,)))
@ -69,8 +73,150 @@ class FragmentPortsTestCase(FHDLTestCase):
self.c2.eq(1)
)
f1.add_subfragment(f2)
ins, outs = f1._propagate_ports(ports=(self.c2,))
self.assertEqual(ins, ValueSet())
self.assertEqual(outs, ValueSet((self.c2,)))
self.assertEqual(f1.ports, ValueSet((self.c2,)))
self.assertEqual(f2.ports, ValueSet((self.c2,)))
def test_input_cd(self):
sync = ClockDomain()
f = Fragment()
f.add_statements(
self.c1.eq(self.s1)
)
f.add_domains(sync)
f.drive(self.c1, "sync")
ins, outs = f._propagate_ports(ports=())
self.assertEqual(ins, ValueSet((self.s1, sync.clk, sync.rst)))
self.assertEqual(outs, ValueSet(()))
self.assertEqual(f.ports, ValueSet((self.s1, sync.clk, sync.rst)))
def test_input_cd_reset_less(self):
sync = ClockDomain(reset_less=True)
f = Fragment()
f.add_statements(
self.c1.eq(self.s1)
)
f.add_domains(sync)
f.drive(self.c1, "sync")
ins, outs = f._propagate_ports(ports=())
self.assertEqual(ins, ValueSet((self.s1, sync.clk)))
self.assertEqual(outs, ValueSet(()))
self.assertEqual(f.ports, ValueSet((self.s1, sync.clk)))
class FragmentDomainsTestCase(FHDLTestCase):
def test_propagate_up(self):
cd = ClockDomain()
f1 = Fragment()
f2 = Fragment()
f1.add_subfragment(f2)
f2.add_domains(cd)
f1._propagate_domains_up()
self.assertEqual(f1.domains, {"cd": cd})
def test_domain_conflict(self):
cda = ClockDomain("sync")
cdb = ClockDomain("sync")
fa = Fragment()
fa.add_domains(cda)
fb = Fragment()
fb.add_domains(cdb)
f = Fragment()
f.add_subfragment(fa, "a")
f.add_subfragment(fb, "b")
f._propagate_domains_up()
self.assertEqual(f.domains, {"a_sync": cda, "b_sync": cdb})
(fa, _), (fb, _) = f.subfragments
self.assertEqual(fa.domains, {"a_sync": cda})
self.assertEqual(fb.domains, {"b_sync": cdb})
def test_domain_conflict_anon(self):
cda = ClockDomain("sync")
cdb = ClockDomain("sync")
fa = Fragment()
fa.add_domains(cda)
fb = Fragment()
fb.add_domains(cdb)
f = Fragment()
f.add_subfragment(fa, "a")
f.add_subfragment(fb)
with self.assertRaises(DomainError,
msg="Domain 'sync' is defined by subfragments 'a', <unnamed #1> of fragment "
"'top'; it is necessary to either rename subfragment domains explicitly, "
"or give names to subfragments"):
f._propagate_domains_up()
def test_domain_conflict_name(self):
cda = ClockDomain("sync")
cdb = ClockDomain("sync")
fa = Fragment()
fa.add_domains(cda)
fb = Fragment()
fb.add_domains(cdb)
f = Fragment()
f.add_subfragment(fa, "x")
f.add_subfragment(fb, "x")
with self.assertRaises(DomainError,
msg="Domain 'sync' is defined by subfragments #0, #1 of fragment 'top', some "
"of which have identical names; it is necessary to either rename subfragment "
"domains explicitly, or give distinct names to subfragments"):
f._propagate_domains_up()
def test_propagate_down(self):
cd = ClockDomain()
f1 = Fragment()
f2 = Fragment()
f1.add_domains(cd)
f1.add_subfragment(f2)
f1._propagate_domains_down()
self.assertEqual(f2.domains, {"cd": cd})
def test_propagate_down_idempotent(self):
cd = ClockDomain()
f1 = Fragment()
f1.add_domains(cd)
f2 = Fragment()
f2.add_domains(cd)
f1.add_subfragment(f2)
f1._propagate_domains_down()
self.assertEqual(f1.domains, {"cd": cd})
self.assertEqual(f2.domains, {"cd": cd})
def test_propagate(self):
cd = ClockDomain()
f1 = Fragment()
f2 = Fragment()
f1.add_domains(cd)
f1.add_subfragment(f2)
f1._propagate_domains()
self.assertEqual(f1.domains, {"cd": cd})
self.assertEqual(f2.domains, {"cd": cd})
def test_propagate_default(self):
f1 = Fragment()
f2 = Fragment()
f1.add_subfragment(f2)
f1._propagate_domains(ensure_sync_exists=True)
self.assertEqual(f1.domains.keys(), {"sync"})
self.assertEqual(f2.domains.keys(), {"sync"})
self.assertEqual(f1.domains["sync"], f2.domains["sync"])

View file

@ -1,4 +1,5 @@
from ..fhdl.ast import *
from ..fhdl.cd import *
from ..fhdl.ir import *
from ..fhdl.xfrm import *
from .tools import *
@ -56,6 +57,37 @@ class DomainRenamerTestCase(FHDLTestCase):
)
""")
def test_rename_cd(self):
cd_sync = ClockDomain()
cd_pix = ClockDomain()
f = Fragment()
f.add_domains(cd_sync, cd_pix)
f = DomainRenamer("ext")(f)
self.assertEqual(cd_sync.name, "ext")
self.assertEqual(f.domains, {
"ext": cd_sync,
"pix": cd_pix,
})
def test_rename_cd_subfragment(self):
cd_sync = ClockDomain()
cd_pix = ClockDomain()
f1 = Fragment()
f1.add_domains(cd_sync, cd_pix)
f2 = Fragment()
f2.add_domains(cd_sync)
f1.add_subfragment(f2)
f1 = DomainRenamer("ext")(f1)
self.assertEqual(cd_sync.name, "ext")
self.assertEqual(f1.domains, {
"ext": cd_sync,
"pix": cd_pix,
})
class ResetInserterTestCase(FHDLTestCase):
def setUp(self):
@ -87,6 +119,7 @@ class ResetInserterTestCase(FHDLTestCase):
self.s1.eq(1),
self.s2.eq(0),
)
f.add_domains(ClockDomain("sync"))
f.drive(self.s1, "sync")
f.drive(self.s2, "pix")

View file

@ -1,5 +1,6 @@
import re
import unittest
from contextlib import contextmanager
from ..fhdl.ast import *
@ -14,3 +15,11 @@ class FHDLTestCase(unittest.TestCase):
repr_str = re.sub(r"\( (?=\()", "(", repr_str)
repr_str = re.sub(r"\) (?=\))", ")", repr_str)
self.assertEqual(repr(obj), repr_str.strip())
@contextmanager
def assertRaises(self, exception, msg=None):
with super().assertRaises(exception) as cm:
yield
if msg is not None:
# WTF? unittest.assertRaises is completely broken.
self.assertEqual(str(cm.exception), msg)