from collections import OrderedDict from abc import ABCMeta, abstractmethod, abstractproperty import os import sys import subprocess import textwrap import re import zipfile import jinja2 from .. import __version__ from ..hdl.ast import * from ..hdl.dsl import * from ..hdl.ir import * from ..back import rtlil, verilog from .res import ConstraintManager __all__ = ["Platform", "TemplatedPlatform"] class BuildPlan: def __init__(self, script): self.script = script self.files = OrderedDict() def add_file(self, filename, content): assert isinstance(filename, str) and filename not in self.files # Just to make sure we don't accidentally overwrite anything. assert not os.path.normpath(filename).startswith("..") self.files[filename] = content def execute(self, root="build", run_script=True): os.makedirs(root, exist_ok=True) cwd = os.getcwd() try: os.chdir(root) for filename, content in self.files.items(): dirname = os.path.dirname(filename) if dirname: os.makedirs(dirname, exist_ok=True) mode = "wt" if isinstance(content, str) else "wb" with open(filename, mode) as f: f.write(content) if run_script: if sys.platform.startswith("win32"): subprocess.run(["cmd", "/c", "{}.bat".format(self.script)], check=True) else: subprocess.run(["sh", "{}.sh".format(self.script)], check=True) return BuildProducts(os.getcwd()) finally: os.chdir(cwd) def archive(self, file): with zipfile.ZipFile(file, "w") as archive: # Write archive members in deterministic order and with deterministic timestamp. for filename in sorted(self.files): archive.writestr(zipfile.ZipInfo(filename), self.files[filename]) class BuildProducts: def __init__(self, root): self._root = root def get(self, filename, mode="b"): assert mode in "bt" with open(os.path.join(self._root, filename), "r" + mode) as f: return f.read() class Platform(ConstraintManager, metaclass=ABCMeta): resources = abstractproperty() connectors = abstractproperty() clocks = abstractproperty() def __init__(self): super().__init__(self.resources, self.connectors, self.clocks) self.extra_files = OrderedDict() self._prepared = False def add_file(self, filename, content): if not isinstance(filename, str): raise TypeError("File name must be a string") if filename in self.extra_files: raise ValueError("File {} already exists" .format(filename)) if hasattr(content, "read"): content = content.read() elif not isinstance(content, (str, bytes)): raise TypeError("File contents must be str, bytes, or a file-like object") self.extra_files[filename] = content def build(self, fragment, name="top", build_dir="build", do_build=True, program_opts=None, do_program=False, **kwargs): plan = self.prepare(fragment, name, **kwargs) if not do_build: return plan products = plan.execute(build_dir) if not do_program: return products self.toolchain_program(products, name, **(program_opts or {})) def prepare(self, fragment, name="top", **kwargs): assert not self._prepared self._prepared = True fragment = Fragment.get(fragment, self) def add_pin_fragment(pin, pin_fragment): pin_fragment = Fragment.get(pin_fragment, self) if not isinstance(pin_fragment, Instance): pin_fragment.flatten = True fragment.add_subfragment(pin_fragment, name="pin_{}".format(pin.name)) for pin, port, extras in self.iter_single_ended_pins(): if pin.dir == "i": add_pin_fragment(pin, self.get_input(pin, port, extras)) if pin.dir == "o": add_pin_fragment(pin, self.get_output(pin, port, extras)) if pin.dir == "oe": add_pin_fragment(pin, self.get_tristate(pin, port, extras)) if pin.dir == "io": add_pin_fragment(pin, self.get_input_output(pin, port, extras)) for pin, p_port, n_port, extras in self.iter_differential_pins(): if pin.dir == "i": add_pin_fragment(pin, self.get_diff_input(pin, p_port, n_port, extras)) if pin.dir == "o": add_pin_fragment(pin, self.get_diff_output(pin, p_port, n_port, extras)) if pin.dir == "oe": add_pin_fragment(pin, self.get_diff_tristate(pin, p_port, n_port, extras)) if pin.dir == "io": add_pin_fragment(pin, self.get_diff_input_output(pin, p_port, n_port, extras)) return self.toolchain_prepare(fragment, name, **kwargs) @abstractmethod def toolchain_prepare(self, fragment, name, **kwargs): """ Convert the ``fragment`` and constraints recorded in this :class:`Platform` into a :class:`BuildPlan`. """ raise NotImplementedError # :nocov: def toolchain_program(self, products, name, **kwargs): """ Extract bitstream for fragment ``name`` from ``products`` and download it to a target. """ raise NotImplementedError("Platform {} does not support programming" .format(self.__class__.__name__)) def _check_feature(self, feature, pin, extras, valid_xdrs, valid_extras): if not valid_xdrs: raise NotImplementedError("Platform {} does not support {}" .format(self.__class__.__name__, feature)) elif pin.xdr not in valid_xdrs: raise NotImplementedError("Platform {} does not support {} for XDR {}" .format(self.__class__.__name__, feature, pin.xdr)) if not valid_extras and extras: raise NotImplementedError("Platform {} does not support extras for {}" .format(self.__class__.__name__, feature)) def get_input(self, pin, port, extras): self._check_feature("single-ended input", pin, extras, valid_xdrs=(0,), valid_extras=None) m = Module() m.d.comb += pin.i.eq(port) return m def get_output(self, pin, port, extras): self._check_feature("single-ended output", pin, extras, valid_xdrs=(0,), valid_extras=None) m = Module() m.d.comb += port.eq(pin.o) return m def get_tristate(self, pin, port, extras): self._check_feature("single-ended tristate", pin, extras, valid_xdrs=(0,), valid_extras=None) m = Module() m.submodules += Instance("$tribuf", p_WIDTH=pin.width, i_EN=pin.oe, i_A=pin.o, o_Y=port, ) return m def get_input_output(self, pin, port, extras): self._check_feature("single-ended input/output", pin, extras, valid_xdrs=(0,), valid_extras=None) m = Module() m.submodules += Instance("$tribuf", p_WIDTH=pin.width, i_EN=pin.oe, i_A=pin.o, o_Y=port, ) m.d.comb += pin.i.eq(port) return m def get_diff_input(self, pin, p_port, n_port, extras): self._check_feature("differential input", pin, extras, valid_xdrs=(), valid_extras=None) def get_diff_output(self, pin, p_port, n_port, extras): self._check_feature("differential output", pin, extras, valid_xdrs=(), valid_extras=None) def get_diff_tristate(self, pin, p_port, n_port, extras): self._check_feature("differential tristate", pin, extras, valid_xdrs=(), valid_extras=None) def get_diff_input_output(self, pin, p_port, n_port, extras): self._check_feature("differential input/output", pin, extras, valid_xdrs=(), valid_extras=None) class TemplatedPlatform(Platform): file_templates = abstractproperty() command_templates = abstractproperty() build_script_templates = { "build_{{name}}.sh": """ # {{autogenerated}} set -e{{verbose("x")}} {{emit_commands("sh")}} """, "build_{{name}}.bat": """ @rem {{autogenerated}} {{emit_commands("bat")}} """, } def toolchain_prepare(self, fragment, name, **kwargs): # This notice serves a dual purpose: to explain that the file is autogenerated, # and to incorporate autogenerated = "Automatically generated by nMigen {}. Do not edit.".format(__version__) def emit_design(backend): return {"rtlil": rtlil, "verilog": verilog}[backend].convert( fragment, name=name, platform=self, ports=list(self.iter_ports()), ensure_sync_exists=False) def emit_commands(format): commands = [] for index, command_tpl in enumerate(self.command_templates): command = render(command_tpl, origin="".format(index + 1)) command = re.sub(r"\s+", " ", command) if format == "sh": commands.append(command) elif format == "bat": commands.append(command + " || exit /b") else: assert False return "\n".join(commands) def get_tool(tool): tool_env = tool.upper().replace("-", "_") return os.environ.get(tool_env, tool) def get_override(var): var_env = "NMIGEN_{}".format(var) if var_env in os.environ: return os.environ[var_env] elif var in kwargs: return kwargs[var] else: return jinja2.Undefined(name=var) def verbose(arg): if "NMIGEN_verbose" in os.environ: return arg else: return jinja2.Undefined(name="quiet") def quiet(arg): if "NMIGEN_verbose" in os.environ: return jinja2.Undefined(name="quiet") else: return arg def render(source, origin): try: source = textwrap.dedent(source).strip() compiled = jinja2.Template(source, trim_blocks=True, lstrip_blocks=True) except jinja2.TemplateSyntaxError as e: e.args = ("{} (at {}:{})".format(e.message, origin, e.lineno),) raise return compiled.render({ "name": name, "platform": self, "emit_design": emit_design, "emit_commands": emit_commands, "get_tool": get_tool, "get_override": get_override, "verbose": verbose, "quiet": quiet, "autogenerated": autogenerated, }) plan = BuildPlan(script="build_{}".format(name)) for filename_tpl, content_tpl in self.file_templates.items(): plan.add_file(render(filename_tpl, origin=filename_tpl), render(content_tpl, origin=filename_tpl)) for filename, content in self.extra_files.items(): plan.add_file(filename, content) return plan