amaranth/nmigen/build/plat.py
2019-06-01 16:43:27 +00:00

293 lines
10 KiB
Python

from collections import OrderedDict
from abc import ABCMeta, abstractmethod, abstractproperty
import os
import sys
import subprocess
import textwrap
import re
import zipfile
import jinja2
from .. import __version__
from ..hdl.ast import *
from ..hdl.dsl import *
from ..hdl.ir import *
from ..back import rtlil, verilog
from .res import ConstraintManager
__all__ = ["Platform", "TemplatedPlatform"]
class BuildPlan:
def __init__(self, script):
self.script = script
self.files = OrderedDict()
def add_file(self, filename, content):
assert isinstance(filename, str) and filename not in self.files
# Just to make sure we don't accidentally overwrite anything.
assert not os.path.normpath(filename).startswith("..")
self.files[filename] = content
def execute(self, root="build", run_script=True):
os.makedirs(root, exist_ok=True)
cwd = os.getcwd()
try:
os.chdir(root)
for filename, content in self.files.items():
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
mode = "wt" if isinstance(content, str) else "wb"
with open(filename, mode) as f:
f.write(content)
if run_script:
if sys.platform.startswith("win32"):
subprocess.run(["cmd", "/c", "{}.bat".format(self.script)], check=True)
else:
subprocess.run(["sh", "{}.sh".format(self.script)], check=True)
return BuildProducts(os.getcwd())
finally:
os.chdir(cwd)
def archive(self, file):
with zipfile.ZipFile(file, "w") as archive:
# Write archive members in deterministic order and with deterministic timestamp.
for filename in sorted(self.files):
archive.writestr(zipfile.ZipInfo(filename), self.files[filename])
class BuildProducts:
def __init__(self, root):
self._root = root
def get(self, filename, mode="b"):
assert mode in "bt"
with open(os.path.join(self._root, filename), "r" + mode) as f:
return f.read()
class Platform(ConstraintManager, metaclass=ABCMeta):
resources = abstractproperty()
clocks = abstractproperty()
def __init__(self):
super().__init__(self.resources, self.clocks)
self.extra_files = OrderedDict()
self._prepared = False
def add_file(self, filename, content):
if not isinstance(filename, str):
raise TypeError("File name must be a string")
if filename in self.extra_files:
raise ValueError("File {} already exists"
.format(filename))
if hasattr(content, "read"):
content = content.read()
elif not isinstance(content, (str, bytes)):
raise TypeError("File contents must be str, bytes, or a file-like object")
self.extra_files[filename] = content
def build(self, fragment, name="top",
build_dir="build", do_build=True,
program_opts=None, do_program=False,
**kwargs):
plan = self.prepare(fragment, name, **kwargs)
if not do_build:
return plan
products = plan.execute(build_dir)
if not do_program:
return products
self.toolchain_program(products, name, **(program_opts or {}))
def prepare(self, fragment, name="top", **kwargs):
assert not self._prepared
self._prepared = True
fragment = Fragment.get(fragment, self)
pin_fragments = []
for pin, port in self._se_pins:
if pin.dir == "i":
pin_fragments.append((pin.name, self.get_input(pin, port)))
if pin.dir == "o":
pin_fragments.append((pin.name, self.get_output(pin, port)))
if pin.dir == "io":
pin_fragments.append((pin.name, self.get_tristate(pin, port)))
for pin, p_port, n_port in self._dp_pins:
if pin.dir == "i":
pin_fragments.append((pin.name, self.get_diff_input(pin, p_port, n_port)))
if pin.dir == "o":
pin_fragments.append((pin.name, self.get_diff_output(pin, p_port, n_port)))
if pin.dir == "io":
pin_fragments.append((pin.name, self.get_diff_tristate(pin, p_port, n_port)))
for pin_name, pin_fragment in pin_fragments:
pin_fragment = Fragment.get(pin_fragment, self)
if not isinstance(pin_fragment, Instance):
pin_fragment.flatten = True
fragment.add_subfragment(pin_fragment, name="pin_{}".format(pin_name))
return self.toolchain_prepare(fragment, name, **kwargs)
@abstractmethod
def toolchain_prepare(self, fragment, name, **kwargs):
"""
Convert the ``fragment`` and constraints recorded in this :class:`Platform` into
a :class:`BuildPlan`.
"""
raise NotImplementedError # :nocov:
def toolchain_program(self, products, name, **kwargs):
"""
Extract bitstream for fragment ``name`` from ``products`` and download it to a target.
"""
raise NotImplementedError("Platform {} does not support programming"
.format(self.__class__.__name__))
def _check_feature(self, feature, pin, xdrs):
if not xdrs:
raise NotImplementedError("Platform {} does not support {}"
.format(self.__class__.__name__, feature))
elif pin.xdr not in xdrs:
raise NotImplementedError("Platform {} does not support {} for XDR {}"
.format(self.__class__.__name__, feature, pin.xdr))
def get_input(self, pin, port):
self._check_feature("single-ended input", pin, xdrs=(1,))
m = Module()
m.d.comb += pin.i.eq(port)
return m
def get_output(self, pin, port):
self._check_feature("single-ended output", pin, xdrs=(1,))
m = Module()
m.d.comb += port.eq(pin.o)
return m
def get_tristate(self, pin, port):
self._check_feature("single-ended tristate", pin, xdrs=(1,))
m = Module()
m.submodules += Instance("$tribuf",
p_WIDTH=pin.width,
i_EN=pin.oe,
i_A=pin.o,
o_Y=port,
)
m.d.comb += pin.i.eq(port)
return m
def get_diff_input(self, pin, p_port, n_port):
self._check_feature("differential input", pin, xdrs=())
def get_diff_output(self, pin, p_port, n_port):
self._check_feature("differential output", pin, xdrs=())
def get_diff_tristate(self, pin, p_port, n_port):
self._check_feature("differential tristate", pin, xdrs=())
class TemplatedPlatform(Platform):
file_templates = abstractproperty()
command_templates = abstractproperty()
build_script_templates = {
"build_{{name}}.sh": """
# {{autogenerated}}
set -e{{verbose("x")}}
{{emit_commands("sh")}}
""",
"build_{{name}}.bat": """
@rem {{autogenerated}}
{{emit_commands("bat")}}
""",
}
def toolchain_prepare(self, fragment, name, **kwargs):
# This notice serves a dual purpose: to explain that the file is autogenerated,
# and to incorporate
autogenerated = "Automatically generated by nMigen {}. Do not edit.".format(__version__)
def emit_design(backend):
return {"rtlil": rtlil, "verilog": verilog}[backend].convert(
fragment, name=name, platform=self, ports=list(self.iter_ports()),
ensure_sync_exists=False)
def emit_commands(format):
commands = []
for index, command_tpl in enumerate(self.command_templates):
command = render(command_tpl, origin="<command#{}>".format(index + 1))
command = re.sub(r"\s+", " ", command)
if format == "sh":
commands.append(command)
elif format == "bat":
commands.append(command + " || exit /b")
else:
assert False
return "\n".join(commands)
def get_tool(tool):
tool_env = tool.upper().replace("-", "_")
return os.environ.get(tool_env, tool)
def get_override(var):
var_env = "NMIGEN_{}".format(var)
if var_env in os.environ:
return os.environ[var_env]
elif var in kwargs:
return kwargs[var]
else:
return jinja2.Undefined(name=var)
def verbose(arg):
if "NMIGEN_verbose" in os.environ:
return arg
else:
return jinja2.Undefined(name="quiet")
def quiet(arg):
if "NMIGEN_verbose" in os.environ:
return jinja2.Undefined(name="quiet")
else:
return arg
def render(source, origin):
try:
source = textwrap.dedent(source).strip()
compiled = jinja2.Template(source, trim_blocks=True, lstrip_blocks=True)
except jinja2.TemplateSyntaxError as e:
e.args = ("{} (at {}:{})".format(e.message, origin, e.lineno),)
raise
return compiled.render({
"name": name,
"platform": self,
"emit_design": emit_design,
"emit_commands": emit_commands,
"get_tool": get_tool,
"get_override": get_override,
"verbose": verbose,
"quiet": quiet,
"autogenerated": autogenerated,
})
plan = BuildPlan(script="build_{}".format(name))
for filename_tpl, content_tpl in self.file_templates.items():
plan.add_file(render(filename_tpl, origin=filename_tpl),
render(content_tpl, origin=filename_tpl))
for filename, content in self.extra_files.items():
plan.add_file(filename, content)
return plan