hdl.dsl: implement FSM.
This commit is contained in:
		
							parent
							
								
									b4fbef65ca
								
							
						
					
					
						commit
						54e3195dcb
					
				|  | @ -2,7 +2,7 @@ from collections import OrderedDict | ||||||
| from collections.abc import Iterable | from collections.abc import Iterable | ||||||
| from contextlib import contextmanager | from contextlib import contextmanager | ||||||
| 
 | 
 | ||||||
| from ..tools import flatten | from ..tools import flatten, bits_for | ||||||
| from .ast import * | from .ast import * | ||||||
| from .ir import * | from .ir import * | ||||||
| from .xfrm import * | from .xfrm import * | ||||||
|  | @ -211,6 +211,62 @@ class Module(_ModuleBuilderRoot): | ||||||
|             self._ctrl_context = "Switch" |             self._ctrl_context = "Switch" | ||||||
|             self._statements = _outer_case |             self._statements = _outer_case | ||||||
| 
 | 
 | ||||||
|  |     @contextmanager | ||||||
|  |     def FSM(self, reset=None, domain="sync", name="fsm"): | ||||||
|  |         self._check_context("FSM", context=None) | ||||||
|  |         fsm_data = self._set_ctrl("FSM", { | ||||||
|  |             "signal":   Signal(name="{}_state".format(name)), | ||||||
|  |             "domain":   domain, | ||||||
|  |             "encoding": OrderedDict(), | ||||||
|  |             "states":   OrderedDict(), | ||||||
|  |         }) | ||||||
|  |         if reset is not None: | ||||||
|  |             fsm_data["encoding"][reset] = 0 | ||||||
|  |         try: | ||||||
|  |             self._ctrl_context = "FSM" | ||||||
|  |             self.domain._depth += 1 | ||||||
|  |             yield | ||||||
|  |         finally: | ||||||
|  |             self.domain._depth -= 1 | ||||||
|  |             self._ctrl_context = None | ||||||
|  |         self._pop_ctrl() | ||||||
|  | 
 | ||||||
|  |     @contextmanager | ||||||
|  |     def State(self, name): | ||||||
|  |         self._check_context("FSM State", context="FSM") | ||||||
|  |         fsm_data = self._get_ctrl("FSM") | ||||||
|  |         if name in fsm_data["states"]: | ||||||
|  |             raise SyntaxError("FSM state '{}' is already defined".format(name)) | ||||||
|  |         if name not in fsm_data["encoding"]: | ||||||
|  |             fsm_data["encoding"][name] = len(fsm_data["encoding"]) | ||||||
|  |         try: | ||||||
|  |             _outer_case, self._statements = self._statements, [] | ||||||
|  |             self._ctrl_context = None | ||||||
|  |             yield | ||||||
|  |             self._flush_ctrl() | ||||||
|  |             fsm_data["states"][name] = self._statements | ||||||
|  |         finally: | ||||||
|  |             self._ctrl_context = "FSM" | ||||||
|  |             self._statements = _outer_case | ||||||
|  | 
 | ||||||
|  |     @property | ||||||
|  |     def next(self): | ||||||
|  |         raise SyntaxError("Only assignment to `m.next` is permitted") | ||||||
|  | 
 | ||||||
|  |     @next.setter | ||||||
|  |     def next(self, name): | ||||||
|  |         for ctrl_name, ctrl_data in reversed(self._ctrl_stack): | ||||||
|  |             if ctrl_name == "FSM": | ||||||
|  |                 if name not in ctrl_data["encoding"]: | ||||||
|  |                     ctrl_data["encoding"][name] = len(ctrl_data["encoding"]) | ||||||
|  |                 self._add_statement( | ||||||
|  |                     assigns=[ctrl_data["signal"].eq(ctrl_data["encoding"][name])], | ||||||
|  |                     domain=ctrl_data["domain"], | ||||||
|  |                     depth=len(self._ctrl_stack)) | ||||||
|  |                 break | ||||||
|  |         else: | ||||||
|  |             raise SyntaxError("`m.next = <...>` is only permitted inside an FSM") | ||||||
|  | 
 | ||||||
|     def _pop_ctrl(self): |     def _pop_ctrl(self): | ||||||
|         name, data = self._ctrl_stack.pop() |         name, data = self._ctrl_stack.pop() | ||||||
| 
 | 
 | ||||||
|  | @ -238,6 +294,13 @@ class Module(_ModuleBuilderRoot): | ||||||
| 
 | 
 | ||||||
|             self._statements.append(Switch(switch_test, switch_cases)) |             self._statements.append(Switch(switch_test, switch_cases)) | ||||||
| 
 | 
 | ||||||
|  |         if name == "FSM": | ||||||
|  |             fsm_signal, fsm_encoding, fsm_states = data["signal"], data["encoding"], data["states"] | ||||||
|  |             fsm_signal.nbits = bits_for(len(fsm_encoding) - 1) | ||||||
|  |             # The FSM is encoded such that the state with encoding 0 is always the reset state. | ||||||
|  |             self._statements.append(Switch(fsm_signal, | ||||||
|  |                 OrderedDict((fsm_encoding[name], stmts) for name, stmts in fsm_states.items()))) | ||||||
|  | 
 | ||||||
|     def _add_statement(self, assigns, domain, depth, compat_mode=False): |     def _add_statement(self, assigns, domain, depth, compat_mode=False): | ||||||
|         def domain_name(domain): |         def domain_name(domain): | ||||||
|             if domain is None: |             if domain is None: | ||||||
|  |  | ||||||
|  | @ -301,6 +301,86 @@ class DSLTestCase(FHDLTestCase): | ||||||
|                 with m.If(self.s2): |                 with m.If(self.s2): | ||||||
|                     pass |                     pass | ||||||
| 
 | 
 | ||||||
|  |     def test_FSM_basic(self): | ||||||
|  |         a = Signal() | ||||||
|  |         b = Signal() | ||||||
|  |         c = Signal() | ||||||
|  |         m = Module() | ||||||
|  |         with m.FSM(): | ||||||
|  |             with m.State("FIRST"): | ||||||
|  |                 m.d.comb += a.eq(1) | ||||||
|  |                 m.next = "SECOND" | ||||||
|  |             with m.State("SECOND"): | ||||||
|  |                 m.d.sync += b.eq(~b) | ||||||
|  |                 with m.If(c): | ||||||
|  |                     m.next = "FIRST" | ||||||
|  |         m._flush() | ||||||
|  |         self.assertRepr(m._statements, """ | ||||||
|  |         ( | ||||||
|  |             (switch (sig fsm_state) | ||||||
|  |                 (case 0 | ||||||
|  |                     (eq (sig a) (const 1'd1)) | ||||||
|  |                     (eq (sig fsm_state) (const 1'd1)) | ||||||
|  |                 ) | ||||||
|  |                 (case 1 | ||||||
|  |                     (eq (sig b) (~ (sig b))) | ||||||
|  |                     (switch (cat (sig c)) | ||||||
|  |                         (case 1 | ||||||
|  |                             (eq (sig fsm_state) (const 1'd0))) | ||||||
|  |                     ) | ||||||
|  |                 ) | ||||||
|  |             ) | ||||||
|  |         ) | ||||||
|  |         """) | ||||||
|  |         self.assertEqual({repr(k): v for k, v in m._driving.items()}, { | ||||||
|  |             "(sig a)": None, | ||||||
|  |             "(sig fsm_state)": "sync", | ||||||
|  |             "(sig b)": "sync", | ||||||
|  |         }) | ||||||
|  | 
 | ||||||
|  |     def test_FSM_reset(self): | ||||||
|  |         a = Signal() | ||||||
|  |         m = Module() | ||||||
|  |         with m.FSM(reset="SECOND"): | ||||||
|  |             with m.State("FIRST"): | ||||||
|  |                 m.d.comb += a.eq(0) | ||||||
|  |                 m.next = "SECOND" | ||||||
|  |             with m.State("SECOND"): | ||||||
|  |                 m.next = "FIRST" | ||||||
|  |         m._flush() | ||||||
|  |         self.assertRepr(m._statements, """ | ||||||
|  |         ( | ||||||
|  |             (switch (sig fsm_state) | ||||||
|  |                 (case 1 | ||||||
|  |                     (eq (sig a) (const 1'd0)) | ||||||
|  |                     (eq (sig fsm_state) (const 1'd0)) | ||||||
|  |                 ) | ||||||
|  |                 (case 0 | ||||||
|  |                     (eq (sig fsm_state) (const 1'd1)) | ||||||
|  |                 ) | ||||||
|  |             ) | ||||||
|  |         ) | ||||||
|  |         """) | ||||||
|  | 
 | ||||||
|  |     def test_FSM_wrong_redefined(self): | ||||||
|  |         m = Module() | ||||||
|  |         with m.FSM(): | ||||||
|  |             with m.State("FOO"): | ||||||
|  |                 pass | ||||||
|  |             with self.assertRaises(SyntaxError, | ||||||
|  |                     msg="FSM state 'FOO' is already defined"): | ||||||
|  |                 with m.State("FOO"): | ||||||
|  |                     pass | ||||||
|  | 
 | ||||||
|  |     def test_FSM_wrong_next(self): | ||||||
|  |         m = Module() | ||||||
|  |         with self.assertRaises(SyntaxError, | ||||||
|  |                 msg="Only assignment to `m.next` is permitted"): | ||||||
|  |             m.next | ||||||
|  |         with self.assertRaises(SyntaxError, | ||||||
|  |                 msg="`m.next = <...>` is only permitted inside an FSM"): | ||||||
|  |             m.next = "FOO" | ||||||
|  | 
 | ||||||
|     def test_auto_pop_ctrl(self): |     def test_auto_pop_ctrl(self): | ||||||
|         m = Module() |         m = Module() | ||||||
|         with m.If(self.w1): |         with m.If(self.w1): | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue
	
	 whitequark
						whitequark