Implement RFC 36.

This feature does not exactly follow the RFC because the RFC as written
is not implementable; the treatment of async resets in `tick()` triggers
had to be changed. In addition, iterating a trigger was made to watch
for missed events, in case the body of the `async for` awaited for too
long.

Co-authored-by: Wanda <wanda-phi@users.noreply.github.com>
This commit is contained in:
Catherine 2024-05-01 03:57:52 +00:00
parent 5e59189c2b
commit 994fa81599
10 changed files with 1222 additions and 352 deletions

View file

@ -35,26 +35,25 @@ class SimulatorUnitTestCase(FHDLTestCase):
frag.add_statements("comb", stmt)
sim = Simulator(frag)
def process():
async def process(ctx):
for isig, input in zip(isigs, inputs):
yield isig.eq(input)
self.assertEqual((yield osig), output.value)
ctx.set(isig, ctx.get(input))
self.assertEqual(ctx.get(osig), output.value)
sim.add_testbench(process)
with sim.write_vcd("test.vcd", "test.gtkw", traces=[*isigs, osig]):
sim.run()
frag = Fragment()
sim = Simulator(frag)
def process():
async def process(ctx):
for isig, input in zip(isigs, inputs):
yield isig.eq(input)
yield Delay(0)
ctx.set(isig, ctx.get(input))
if isinstance(stmt, Assign):
yield stmt
ctx.set(stmt.lhs, ctx.get(stmt.rhs))
else:
yield from stmt
yield Delay(0)
self.assertEqual((yield osig), output.value)
for s in stmt:
ctx.set(s.lhs, ctx.get(s.rhs))
self.assertEqual(ctx.get(osig), output.value)
sim.add_testbench(process)
with sim.write_vcd("test.vcd", "test.gtkw", traces=[*isigs, osig]):
sim.run()
@ -597,18 +596,18 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
self.setUp_alu()
with self.assertSimulation(self.m) as sim:
sim.add_clock(1e-6)
def process():
yield self.a.eq(5)
yield self.b.eq(1)
self.assertEqual((yield self.x), 4)
yield Tick()
self.assertEqual((yield self.o), 6)
yield self.s.eq(1)
yield Tick()
self.assertEqual((yield self.o), 4)
yield self.s.eq(2)
yield Tick()
self.assertEqual((yield self.o), 0)
async def process(ctx):
ctx.set(self.a, 5)
ctx.set(self.b, 1)
self.assertEqual(ctx.get(self.x), 4)
await ctx.tick()
self.assertEqual(ctx.get(self.o), 6)
ctx.set(self.s, 1)
await ctx.tick()
self.assertEqual(ctx.get(self.o), 4)
ctx.set(self.s, 2)
await ctx.tick()
self.assertEqual(ctx.get(self.o), 0)
sim.add_testbench(process)
def setUp_clock_phase(self):
@ -636,7 +635,7 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
sim.add_clock(period, phase=2*period/4, domain="phase180")
sim.add_clock(period, phase=3*period/4, domain="phase270")
def proc():
async def proc(ctx):
clocks = [
self.phase0.clk,
self.phase90.clk,
@ -644,9 +643,9 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
self.phase270.clk
]
for i in range(16):
yield Tick("check")
await ctx.tick("check")
for j, c in enumerate(clocks):
self.assertEqual((yield c), self.expected[j][i])
self.assertEqual(ctx.get(c), self.expected[j][i])
sim.add_process(proc)
@ -663,16 +662,15 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
sim.add_clock(1e-6, domain="sys")
sim.add_clock(0.3e-6, domain="pix")
def sys_process():
yield Passive()
yield Tick("sys")
yield Tick("sys")
async def sys_process(ctx):
await ctx.tick("sys")
await ctx.tick("sys")
self.fail()
def pix_process():
yield Tick("pix")
yield Tick("pix")
yield Tick("pix")
sim.add_testbench(sys_process)
async def pix_process(ctx):
await ctx.tick("pix")
await ctx.tick("pix")
await ctx.tick("pix")
sim.add_testbench(sys_process, background=True)
sim.add_testbench(pix_process)
def setUp_lhs_rhs(self):
@ -698,9 +696,9 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
m.d.sync += s.eq(0)
with self.assertSimulation(m, deadline=100e-6) as sim:
sim.add_clock(1e-6)
def process():
async def process(ctx):
for _ in range(101):
yield Delay(1e-6)
await ctx.delay(1e-6)
self.fail()
sim.add_testbench(process)
@ -710,12 +708,12 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
m.d.sync += s.eq(0)
with self.assertRaises(AssertionError):
with self.assertSimulation(m, deadline=100e-6) as sim:
sim.add_clock(1e-6)
def process():
for _ in range(99):
yield Delay(1e-6)
self.fail()
sim.add_testbench(process)
sim.add_clock(1e-6)
async def process(ctx):
for _ in range(99):
await ctx.delay(1e-6)
self.fail()
sim.add_testbench(process)
def test_add_process_wrong(self):
with self.assertSimulation(Module()) as sim:
@ -818,13 +816,13 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
def test_memory_init(self):
self.setUp_memory()
with self.assertSimulation(self.m) as sim:
def process():
yield self.rdport.addr.eq(1)
yield Tick()
self.assertEqual((yield self.rdport.data), 0x55)
yield self.rdport.addr.eq(2)
yield Tick()
self.assertEqual((yield self.rdport.data), 0x00)
async def process(ctx):
ctx.set(self.rdport.addr, 1)
await ctx.tick()
self.assertEqual(ctx.get(self.rdport.data), 0x55)
ctx.set(self.rdport.addr, 2)
await ctx.tick()
self.assertEqual(ctx.get(self.rdport.data), 0x00)
sim.add_clock(1e-6)
sim.add_testbench(process)
@ -1443,3 +1441,497 @@ class SimulatorRegressionTestCase(FHDLTestCase):
yield c.eq(0)
sim.add_testbench(testbench)
sim.run()
def test_sample(self):
m = Module()
m.domains.sync = cd_sync = ClockDomain()
a = Signal(4)
b = Signal(4)
sim = Simulator(m)
async def bench_a(ctx):
_, _, av, bv = await ctx.tick().sample(a, b)
ctx.set(a, 5)
self.assertEqual(av, 1)
self.assertEqual(bv, 2)
async def bench_b(ctx):
_, _, av, bv = await ctx.tick().sample(a, b)
ctx.set(b, 6)
self.assertEqual(av, 1)
self.assertEqual(bv, 2)
async def bench_c(ctx):
ctx.set(a, 1)
ctx.set(b, 2)
ctx.set(cd_sync.clk, 1)
ctx.set(a, 3)
ctx.set(b, 4)
sim.add_testbench(bench_a)
sim.add_testbench(bench_b)
sim.add_testbench(bench_c)
sim.run()
def test_latch(self):
q = Signal(4)
d = Signal(4)
g = Signal()
async def latch(ctx):
async for dv, gv in ctx.changed(d, g):
if gv:
ctx.set(q, dv)
async def testbench(ctx):
ctx.set(d, 1)
self.assertEqual(ctx.get(q), 0)
ctx.set(g, 1)
self.assertEqual(ctx.get(q), 1)
ctx.set(d, 2)
self.assertEqual(ctx.get(q), 2)
ctx.set(g, 0)
self.assertEqual(ctx.get(q), 2)
ctx.set(d, 3)
self.assertEqual(ctx.get(q), 2)
sim = Simulator(Module())
sim.add_process(latch)
sim.add_testbench(testbench)
sim.run()
def test_edge(self):
a = Signal(4)
b = Signal(4)
log = []
async def monitor(ctx):
async for res in ctx.posedge(a[0]).negedge(a[1]).sample(b):
log.append(res)
async def testbench(ctx):
ctx.set(b, 8)
ctx.set(a, 0)
ctx.set(b, 9)
ctx.set(a, 1)
ctx.set(b, 10)
ctx.set(a, 2)
ctx.set(b, 11)
ctx.set(a, 3)
ctx.set(b, 12)
ctx.set(a, 4)
ctx.set(b, 13)
ctx.set(a, 6)
ctx.set(b, 14)
ctx.set(a, 5)
sim = Simulator(Module())
sim.add_process(monitor)
sim.add_testbench(testbench)
sim.run()
self.assertEqual(log, [
(True, False, 9),
(True, False, 11),
(False, True, 12),
(True, True, 14)
])
def test_delay(self):
log = []
async def monitor(ctx):
async for res in ctx.delay(1).delay(2).delay(1):
log.append(res)
async def testbench(ctx):
await ctx.delay(4)
sim = Simulator(Module())
sim.add_process(monitor)
sim.add_testbench(testbench)
sim.run()
self.assertEqual(log, [
(True, False, True),
(True, False, True),
(True, False, True),
(True, False, True),
])
def test_timeout(self):
a = Signal()
log = []
async def monitor(ctx):
async for res in ctx.posedge(a).delay(1.5):
log.append(res)
async def testbench(ctx):
await ctx.delay(0.5)
ctx.set(a, 1)
await ctx.delay(0.5)
ctx.set(a, 0)
await ctx.delay(0.5)
ctx.set(a, 1)
await ctx.delay(1)
ctx.set(a, 0)
await ctx.delay(1)
ctx.set(a, 1)
sim = Simulator(Module())
sim.add_process(monitor)
sim.add_testbench(testbench)
sim.run()
self.assertEqual(log, [
(True, False),
(True, False),
(False, True),
(True, False),
])
def test_struct(self):
class MyStruct(data.Struct):
x: unsigned(4)
y: signed(4)
a = Signal(MyStruct)
b = Signal(MyStruct)
m = Module()
m.domains.sync = ClockDomain()
log = []
async def adder(ctx):
async for av, in ctx.changed(a):
ctx.set(b, {
"x": av.y,
"y": av.x
})
async def monitor(ctx):
async for _, _, bv in ctx.tick().sample(b):
log.append(bv)
async def testbench(ctx):
ctx.set(a.x, 1)
ctx.set(a.y, 2)
self.assertEqual(ctx.get(b.x), 2)
self.assertEqual(ctx.get(b.y), 1)
self.assertEqual(ctx.get(b), MyStruct.const({"x": 2, "y": 1}))
await ctx.tick()
ctx.set(a, MyStruct.const({"x": 3, "y": 4}))
await ctx.tick()
sim = Simulator(m)
sim.add_process(adder)
sim.add_process(monitor)
sim.add_testbench(testbench)
sim.add_clock(1e-6)
sim.run()
self.assertEqual(log, [
MyStruct.const({"x": 2, "y": 1}),
MyStruct.const({"x": 4, "y": 3}),
])
def test_valuecastable(self):
a = Signal(4)
b = Signal(4)
t = Signal()
idx = Signal()
arr = Array([a, b])
async def process(ctx):
async for _ in ctx.posedge(t):
ctx.set(arr[idx], 1)
async def testbench(ctx):
self.assertEqual(ctx.get(arr[idx]), 0)
ctx.set(t, 1)
self.assertEqual(ctx.get(a), 1)
ctx.set(idx, 1)
ctx.set(arr[idx], 2)
self.assertEqual(ctx.get(b), 2)
sim = Simulator(Module())
sim.add_process(process)
sim.add_testbench(testbench)
sim.run()
def test_tick_repeat_until(self):
ctr = Signal(4)
m = Module()
m.domains.sync = cd_sync = ClockDomain()
m.d.sync += ctr.eq(ctr + 1)
async def testbench(ctx):
_, _, val, = await ctx.tick(cd_sync).sample(ctr)
self.assertEqual(val, 0)
self.assertEqual(ctx.get(ctr), 1)
val, = await ctx.tick(cd_sync).sample(ctr).until(ctr == 4)
self.assertEqual(val, 4)
self.assertEqual(ctx.get(ctr), 5)
val, = await ctx.tick(cd_sync).sample(ctr).repeat(3)
self.assertEqual(val, 7)
self.assertEqual(ctx.get(ctr), 8)
sim = Simulator(m)
sim.add_testbench(testbench)
sim.add_clock(1e-6)
sim.run()
def test_critical(self):
ctr = Signal(4)
m = Module()
m.domains.sync = cd_sync = ClockDomain()
m.d.sync += ctr.eq(ctr + 1)
last_ctr = 0
async def testbench(ctx):
await ctx.tick().repeat(7)
async def bgbench(ctx):
nonlocal last_ctr
while True:
await ctx.tick()
with ctx.critical():
await ctx.tick().repeat(2)
last_ctr = ctx.get(ctr)
sim = Simulator(m)
sim.add_testbench(testbench)
sim.add_testbench(bgbench, background=True)
sim.add_clock(1e-6)
sim.run()
self.assertEqual(last_ctr, 9)
def test_async_reset(self):
ctr = Signal(4)
m = Module()
m.domains.sync = cd_sync = ClockDomain(async_reset=True)
m.d.sync += ctr.eq(ctr + 1)
log = []
async def monitor(ctx):
async for res in ctx.tick().sample(ctr):
log.append(res)
async def testbench(ctx):
await ctx.posedge(cd_sync.clk)
await ctx.posedge(cd_sync.clk)
await ctx.negedge(cd_sync.clk)
ctx.set(cd_sync.rst, True)
await ctx.negedge(cd_sync.clk)
ctx.set(cd_sync.rst, False)
await ctx.posedge(cd_sync.clk)
await ctx.posedge(cd_sync.clk)
async def repeat_bench(ctx):
with self.assertRaises(DomainReset):
await ctx.tick().repeat(4)
async def until_bench(ctx):
with self.assertRaises(DomainReset):
await ctx.tick().until(ctr == 3)
sim = Simulator(m)
sim.add_process(monitor)
sim.add_testbench(testbench)
sim.add_testbench(repeat_bench)
sim.add_testbench(until_bench)
sim.add_clock(1e-6)
sim.run()
self.assertEqual(log, [
(True, False, 0),
(True, False, 1),
(False, True, 2),
(True, True, 0),
(True, False, 0),
(True, False, 1),
])
def test_sync_reset(self):
ctr = Signal(4)
m = Module()
m.domains.sync = cd_sync = ClockDomain()
m.d.sync += ctr.eq(ctr + 1)
log = []
async def monitor(ctx):
async for res in ctx.tick().sample(ctr):
log.append(res)
async def testbench(ctx):
await ctx.posedge(cd_sync.clk)
await ctx.posedge(cd_sync.clk)
await ctx.negedge(cd_sync.clk)
ctx.set(cd_sync.rst, True)
await ctx.negedge(cd_sync.clk)
ctx.set(cd_sync.rst, False)
await ctx.posedge(cd_sync.clk)
await ctx.posedge(cd_sync.clk)
sim = Simulator(m)
sim.add_process(monitor)
sim.add_testbench(testbench)
sim.add_clock(1e-6)
sim.run()
self.assertEqual(log, [
(True, False, 0),
(True, False, 1),
(True, True, 2),
(True, False, 0),
(True, False, 1),
])
def test_broken_multiedge(self):
a = Signal()
broken_trigger_hit = False
async def testbench(ctx):
await ctx.delay(1)
ctx.set(a, 1)
ctx.set(a, 0)
ctx.set(a, 1)
ctx.set(a, 0)
await ctx.delay(1)
async def monitor(ctx):
nonlocal broken_trigger_hit
try:
async for _ in ctx.edge(a, 1):
pass
except BrokenTrigger:
broken_trigger_hit = True
sim = Simulator(Module())
sim.add_testbench(testbench)
sim.add_testbench(monitor, background=True)
sim.run()
self.assertTrue(broken_trigger_hit)
def test_broken_other_trigger(self):
m = Module()
m.domains.sync = ClockDomain()
async def testbench(ctx):
with self.assertRaises(BrokenTrigger):
async for _ in ctx.tick():
await ctx.delay(2)
sim = Simulator(m)
sim.add_testbench(testbench)
sim.add_clock(1)
sim.run()
def test_abandon_delay(self):
ctr = Signal(4)
m = Module()
m.domains.sync = ClockDomain()
m.d.sync += ctr.eq(ctr + 1)
async def testbench(ctx):
async for _ in ctx.delay(1).delay(1):
break
await ctx.tick()
await ctx.tick()
self.assertEqual(ctx.get(ctr), 2)
sim = Simulator(m)
sim.add_testbench(testbench)
sim.add_clock(4)
sim.run()
def test_abandon_changed(self):
ctr = Signal(4)
a = Signal()
m = Module()
m.domains.sync = ClockDomain()
m.d.sync += ctr.eq(ctr + 1)
async def testbench(ctx):
async for _ in ctx.changed(a):
break
await ctx.tick()
await ctx.tick()
self.assertEqual(ctx.get(ctr), 2)
async def change(ctx):
await ctx.delay(1)
ctx.set(a, 1)
await ctx.delay(1)
ctx.set(a, 0)
await ctx.delay(1)
ctx.set(a, 1)
sim = Simulator(m)
sim.add_testbench(testbench)
sim.add_testbench(change)
sim.add_clock(4)
sim.run()
def test_trigger_wrong(self):
a = Signal(4)
m = Module()
m.domains.sync = cd_sync = ClockDomain()
reached_tb = False
reached_proc = False
async def process(ctx):
nonlocal reached_proc
with self.assertRaisesRegex(TypeError,
r"^`\.get\(\)` cannot be used to sample values in simulator processes; "
r"use `\.sample\(\)` on a trigger object instead$"):
ctx.get(a)
reached_proc = True
async def testbench(ctx):
nonlocal reached_tb
with self.assertRaisesRegex(TypeError,
r"^Change trigger can only be used with a signal, not \(~ \(sig a\)\)$"):
await ctx.changed(~a)
with self.assertRaisesRegex(TypeError,
r"^Edge trigger can only be used with a single-bit signal or "
r"a single-bit slice of a signal, not \(sig a\)$"):
await ctx.posedge(a)
with self.assertRaisesRegex(ValueError,
r"^Edge trigger polarity must be 0 or 1, not 2$"):
await ctx.edge(a[0], 2)
with self.assertRaisesRegex(TypeError,
r"^Condition must be a value-like object, not 'meow'$"):
await ctx.tick().until("meow")
with self.assertRaisesRegex(ValueError,
r"^Repeat count must be a positive integer, not 0$"):
await ctx.tick().repeat(0)
with self.assertRaisesRegex(ValueError,
r"^Combinational domain does not have a clock$"):
await ctx.tick("comb")
with self.assertRaisesRegex(ValueError,
r"^Context cannot be provided if a clock domain is specified directly$"):
await ctx.tick(cd_sync, context=m)
reached_tb = True
sim = Simulator(m)
sim.add_process(process)
sim.add_testbench(testbench)
sim.run()
self.assertTrue(reached_tb)
self.assertTrue(reached_proc)