lib.fifo: use model equivalence to simplify formal specification.
This is unfortunately slow, and should probably be using theory of arrays.
This commit is contained in:
		
							parent
							
								
									38b3c4af31
								
							
						
					
					
						commit
						6ea0a12dd4
					
				|  | @ -32,10 +32,102 @@ class FIFOSmokeTestCase(FHDLTestCase): | |||
|         self.assertSyncFIFOWorks(SyncFIFOBuffered(width=8, depth=4)) | ||||
| 
 | ||||
| 
 | ||||
| class FIFOContract: | ||||
|     def __init__(self, fifo, fwft, bound): | ||||
| class FIFOModel(FIFOInterface): | ||||
|     """ | ||||
|     Non-synthesizable first-in first-out queue, implemented naively as a chain of registers. | ||||
|     """ | ||||
|     def __init__(self, width, depth, fwft): | ||||
|         super().__init__(width, depth) | ||||
| 
 | ||||
|         self.fwft    = fwft | ||||
| 
 | ||||
|         self.replace = Signal() | ||||
|         self.level   = Signal(max=self.depth + 1) | ||||
| 
 | ||||
|     def get_fragment(self, platform): | ||||
|         m = Module() | ||||
| 
 | ||||
|         storage = Array(Signal(self.width, reset_less=True, name="storage<{}>".format(n)) | ||||
|                         for n in range(self.depth)) | ||||
| 
 | ||||
|         m.d.comb += self.readable.eq(self.level > 0) | ||||
|         with m.If(self.readable): | ||||
|             with m.If(self.re): | ||||
|                 for (entry0, entry1) in zip(storage[:], storage[1:]): | ||||
|                     m.d.sync += entry0.eq(entry1) | ||||
|             if self.fwft: | ||||
|                 m.d.comb += self.dout.eq(storage[0]) | ||||
|             else: | ||||
|                 with m.If(self.re): | ||||
|                     m.d.sync += self.dout.eq(storage[0]) | ||||
| 
 | ||||
|         m.d.comb += self.writable.eq(self.level < self.depth) | ||||
|         write_at = self.level - (self.readable & self.re) | ||||
|         with m.If(self.we): | ||||
|             with m.If(~self.replace & self.writable): | ||||
|                 m.d.sync += storage[write_at].eq(self.din) | ||||
|             with m.If(self.replace): | ||||
|                 # The result of trying to replace an element in an empty queue is irrelevant. | ||||
|                 # The result of trying to replace the element that is currently being read | ||||
|                 # is undefined. | ||||
|                 m.d.comb += Assume(write_at > 0) | ||||
|                 m.d.sync += storage[write_at - 1].eq(self.din) | ||||
| 
 | ||||
|         m.d.sync += self.level.eq(self.level | ||||
|             + (self.writable & self.we & ~self.replace) | ||||
|             - (self.readable & self.re)) | ||||
| 
 | ||||
|         return m.lower(platform) | ||||
| 
 | ||||
| 
 | ||||
| class FIFOModelEquivalenceSpec: | ||||
|     """ | ||||
|     The first-in first-out queue model equivalence specification: for any inputs and control | ||||
|     signals, the behavior of the implementation under test exactly matches the ideal model, | ||||
|     except for behavior not defined by the model. | ||||
|     """ | ||||
|     def __init__(self, fifo): | ||||
|         self.fifo = fifo | ||||
| 
 | ||||
|     def get_fragment(self, platform): | ||||
|         m = Module() | ||||
|         m.submodules.dut  = dut  = self.fifo | ||||
|         m.submodules.gold = gold = FIFOModel(dut.width, dut.depth, dut.fwft) | ||||
| 
 | ||||
|         m.d.comb += [ | ||||
|             gold.re.eq(dut.readable & dut.re), | ||||
|             gold.we.eq(dut.we), | ||||
|             gold.din.eq(dut.din), | ||||
|         ] | ||||
|         if hasattr(dut, "replace"): | ||||
|             m.d.comb += gold.replace.eq(dut.replace) | ||||
|         else: | ||||
|             m.d.comb += gold.replace.eq(0) | ||||
| 
 | ||||
|         m.d.comb += Assert(dut.readable.implies(gold.readable)) | ||||
|         if dut.fwft: | ||||
|             m.d.comb += Assert(dut.readable | ||||
|                                .implies(dut.dout == gold.dout)) | ||||
|         else: | ||||
|             m.d.comb += Assert((Past(dut.readable) & Past(dut.re)) | ||||
|                                .implies(dut.dout == gold.dout)) | ||||
| 
 | ||||
|         m.d.comb += Assert(dut.writable == gold.writable) | ||||
| 
 | ||||
|         if hasattr(dut, "level"): | ||||
|             m.d.comb += Assert(dut.level == gold.level) | ||||
| 
 | ||||
|         return m.lower(platform) | ||||
| 
 | ||||
| 
 | ||||
| class FIFOContractSpec: | ||||
|     """ | ||||
|     The first-in first-out queue contract specification: if two elements are written to the queue | ||||
|     consecutively, they must be read out consecutively at some later point, no matter all other | ||||
|     circumstances, with the exception of reset. | ||||
|     """ | ||||
|     def __init__(self, fifo, bound): | ||||
|         self.fifo  = fifo | ||||
|         self.fwft  = fwft | ||||
|         self.bound = bound | ||||
| 
 | ||||
|     def get_fragment(self, platform): | ||||
|  | @ -89,123 +181,30 @@ class FIFOContract: | |||
|         return m.lower(platform) | ||||
| 
 | ||||
| 
 | ||||
| class SyncFIFOInvariants: | ||||
|     def __init__(self, fifo): | ||||
|         self.fifo = fifo | ||||
| 
 | ||||
|     def get_fragment(self, platform): | ||||
|         m = Module() | ||||
|         m.submodules.dut = fifo = self.fifo | ||||
| 
 | ||||
|         with m.If(Fell(ResetSignal())): | ||||
|             m.d.comb += [ | ||||
|                 Assert(fifo.level == 0), | ||||
|                 Assert(~fifo.readable), | ||||
|                 Assert(fifo.writable), | ||||
|             ] | ||||
|         with m.Elif(~ResetSignal()): | ||||
|             m.d.comb += Assert(fifo.level == (Past(fifo.level) | ||||
|                                 + (Past(fifo.writable) & Past(fifo.we) & ~Past(fifo.replace)) | ||||
|                                 - (Past(fifo.readable) & Past(fifo.re)))) | ||||
|             with m.If(fifo.level != 0): | ||||
|                 m.d.comb += Assert(fifo.readable) | ||||
|             with m.If(fifo.level != fifo.depth): | ||||
|                 m.d.comb += Assert(fifo.writable) | ||||
| 
 | ||||
|             with m.If(Past(1)): | ||||
|                 with m.If(~Past(fifo.re)): | ||||
|                     # Unless `re` is asserted, output should not change, other than for the case of | ||||
|                     # an empty FWFT queue, or an FWFT queue with a single entry being replaced. | ||||
|                     if fifo.fwft: | ||||
|                         with m.If((fifo.level == 1) & Past(fifo.we) & | ||||
|                                   (Past(fifo.writable) | Past(fifo.replace))): | ||||
|                             m.d.comb += Assert(fifo.dout == Past(fifo.din)) | ||||
|                         with m.Else(): | ||||
|                             m.d.comb += Assert(fifo.dout == Past(fifo.dout)) | ||||
|                     else: | ||||
|                         m.d.comb += Assert(fifo.dout == Past(fifo.dout)) | ||||
| 
 | ||||
|         return m.lower(platform) | ||||
| 
 | ||||
| 
 | ||||
| class SyncFIFOBufferedInvariants: | ||||
|     def __init__(self, fifo): | ||||
|         self.fifo = fifo | ||||
| 
 | ||||
|     def get_fragment(self, platform): | ||||
|         m = Module() | ||||
|         m.submodules.dut = fifo = self.fifo | ||||
| 
 | ||||
|         with m.If(Fell(ResetSignal())): | ||||
|             m.d.comb += [ | ||||
|                 Assert(fifo.level == 0), | ||||
|                 Assert(~fifo.readable), | ||||
|                 Assert(fifo.writable), | ||||
|             ] | ||||
|         with m.Elif(~ResetSignal()): | ||||
|             m.d.comb += Assert(fifo.level == (Past(fifo.level) | ||||
|                                 + (Past(fifo.writable) & Past(fifo.we)) | ||||
|                                 - (Past(fifo.readable) & Past(fifo.re)))) | ||||
|             with m.If(fifo.level == 0): | ||||
|                 m.d.comb += Assert(~fifo.readable) | ||||
|             with m.If(fifo.level == 1): | ||||
|                 # When there is one entry in the queue, it might be either in the inner unbuffered | ||||
|                 # queue memory, or in its output register. | ||||
|                 with m.If(Past(fifo.readable)): | ||||
|                     # On the previous cycle, there was an entry in output register. | ||||
|                     with m.If(Past(fifo.level) == 1): | ||||
|                         # It was the only entry in the queue, so it's only there if it was | ||||
|                         # not read. | ||||
|                         m.d.comb += Assert(fifo.readable == ~Past(fifo.re)) | ||||
|                     with m.Else(): | ||||
|                         # There were more entries in the queue, which would refil the output | ||||
|                         # register, if necessary. | ||||
|                         m.d.comb += Assert(fifo.readable) | ||||
|                 with m.Elif(~Fell(ResetSignal(), 1)): | ||||
|                     # On the previous cycle, there was no entry in the output register, so there is | ||||
|                     # one only if it was written two cycles back. | ||||
|                     m.d.comb += Assert(fifo.readable == Past(fifo.we, 2)) | ||||
|             with m.If(fifo.level > 1): | ||||
|                 m.d.comb += Assert(fifo.readable) | ||||
|             with m.If(fifo.level != fifo.depth): | ||||
|                 m.d.comb += Assert(fifo.writable) | ||||
| 
 | ||||
|             with m.If(~Past(fifo.re)): | ||||
|                 # Unless `re` is asserted, output should not change, other than for the case of | ||||
|                 # an empty FWFT queue, where it changes with latency 1. | ||||
|                 with m.If(fifo.readable & ~Past(fifo.readable) & | ||||
|                           Past(fifo.we, 2) & Past(fifo.writable, 2)): | ||||
|                     m.d.comb += Assert(fifo.dout == Past(fifo.din, 2)) | ||||
|                 with m.Else(): | ||||
|                     m.d.comb += Assert(fifo.dout == Past(fifo.dout)) | ||||
| 
 | ||||
|         return m.lower(platform) | ||||
| 
 | ||||
| 
 | ||||
| class FIFOFormalCase(FHDLTestCase): | ||||
|     def check_fifo(self, fifo, invariants_cls): | ||||
|         self.assertFormal(FIFOContract(fifo, fwft=fifo.fwft, bound=fifo.depth * 2 + 1), | ||||
|     def check_fifo(self, fifo): | ||||
|         self.assertFormal(FIFOModelEquivalenceSpec(fifo), | ||||
|                           mode="bmc", depth=fifo.depth * 2) | ||||
|         self.assertFormal(FIFOContractSpec(fifo, bound=fifo.depth * 2 + 1), | ||||
|                           mode="hybrid", depth=fifo.depth * 2 + 1) | ||||
|         self.assertFormal(invariants_cls(fifo), | ||||
|                           mode="prove", depth=fifo.depth * 2) | ||||
| 
 | ||||
|     def test_sync_fwft_pot(self): | ||||
|         self.check_fifo(SyncFIFO(width=8, depth=4, fwft=True), SyncFIFOInvariants) | ||||
|         self.check_fifo(SyncFIFO(width=8, depth=4, fwft=True)) | ||||
| 
 | ||||
|     def test_sync_fwft_npot(self): | ||||
|         self.check_fifo(SyncFIFO(width=8, depth=5, fwft=True), SyncFIFOInvariants) | ||||
|         self.check_fifo(SyncFIFO(width=8, depth=5, fwft=True)) | ||||
| 
 | ||||
|     def test_sync_not_fwft_pot(self): | ||||
|         self.check_fifo(SyncFIFO(width=8, depth=4, fwft=False), SyncFIFOInvariants) | ||||
|         self.check_fifo(SyncFIFO(width=8, depth=4, fwft=False)) | ||||
| 
 | ||||
|     def test_sync_not_fwft_npot(self): | ||||
|         self.check_fifo(SyncFIFO(width=8, depth=5, fwft=False), SyncFIFOInvariants) | ||||
|         self.check_fifo(SyncFIFO(width=8, depth=5, fwft=False)) | ||||
| 
 | ||||
|     def test_sync_buffered_pot(self): | ||||
|         self.check_fifo(SyncFIFOBuffered(width=8, depth=4), SyncFIFOBufferedInvariants) | ||||
|         self.check_fifo(SyncFIFOBuffered(width=8, depth=4)) | ||||
| 
 | ||||
|     def test_sync_buffered_potp1(self): | ||||
|         self.check_fifo(SyncFIFOBuffered(width=8, depth=5), SyncFIFOBufferedInvariants) | ||||
|         self.check_fifo(SyncFIFOBuffered(width=8, depth=5)) | ||||
| 
 | ||||
|     def test_sync_buffered_potm1(self): | ||||
|         self.check_fifo(SyncFIFOBuffered(width=8, depth=3), SyncFIFOBufferedInvariants) | ||||
|         self.check_fifo(SyncFIFOBuffered(width=8, depth=3)) | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue
	
	 whitequark
						whitequark