133 lines
4.5 KiB
Python
133 lines
4.5 KiB
Python
# SPDX-FileCopyrightText: 2026 Qyriad <qyriad@qyriad.me>
|
|
#
|
|
# SPDX-License-Identifier: EUPL-1.1
|
|
|
|
import argparse
|
|
import functools
|
|
import shlex
|
|
import textwrap
|
|
from typing import Any, cast, TYPE_CHECKING
|
|
|
|
from beartype import beartype
|
|
|
|
from test_driver.machine import Machine
|
|
from test_driver.errors import RequestedAssertionFailed
|
|
|
|
if TYPE_CHECKING:
|
|
global machine
|
|
machine = cast(Machine, ...)
|
|
assert machine.shell is not None
|
|
|
|
ls = "eza -lah --color=always --group-directories-first"
|
|
|
|
indent = functools.partial(textwrap.indent, prefix=' ')
|
|
|
|
@beartype
|
|
def run_log(machine: Machine, *commands: str, timeout: int | None = 60) -> str:
|
|
output = ""
|
|
for command in commands:
|
|
with machine.nested(f"must succeed: {command}"):
|
|
(status, out) = machine.execute(f"{command} | tee /dev/stderr", timeout=timeout)
|
|
if status != 0:
|
|
machine.log(f"output: {out}")
|
|
raise RequestedAssertionFailed(
|
|
f"command `{command}` failed (exit code {status})",
|
|
)
|
|
output += out
|
|
|
|
return output
|
|
|
|
parser = argparse.ArgumentParser()
|
|
#parser.add_argument("--no-detach", action="store_true")
|
|
#parser.add_argument("--daemon", action="store_true")
|
|
#parser.add_argument("--enable-tcp-insecure", action="store_true")
|
|
#parser.add_argument("--port", type=int)
|
|
parser.add_argument("--jobs", type=int)
|
|
parser.add_argument("--job-lifetime", type=int)
|
|
parser.add_argument("--log-level", type=str)
|
|
#parser.add_argument("--nice", type=int)
|
|
#parser.add_argument("--stats", action="store_true")
|
|
#parser.add_argument("--stats-port", type=int)
|
|
|
|
@beartype
|
|
def get_cli_args() -> argparse.Namespace:
|
|
machine.wait_for_unit("distccd.service")
|
|
mainpid = int(machine.get_unit_property("distccd.service", "MainPID"))
|
|
machine.log(f"{mainpid=}")
|
|
pidtext = machine.succeed(f"pgrep -P {mainpid}")
|
|
machine.log(f"{pidtext=}")
|
|
pid = int(pidtext.splitlines()[0])
|
|
machine.log(f"{pid=}")
|
|
execstart = machine.get_unit_property("distccd.service", "ExecStart")
|
|
print(f"{execstart=}")
|
|
|
|
cmdline = machine.succeed(f"cat /proc/{pid}/cmdline")
|
|
cmdline_args = cmdline.split("\0")
|
|
machine.log(f"{cmdline_args=}")
|
|
print(f"{cmdline_args=}")
|
|
|
|
args, rest = parser.parse_known_args(cmdline_args)
|
|
return args
|
|
|
|
@beartype
|
|
def dynix_append(option: str, value: Any):
|
|
machine.succeed(f'''
|
|
dynix append {shlex.quote(option)} {shlex.quote(str(value))}
|
|
'''.strip())
|
|
|
|
@beartype
|
|
def do_apply():
|
|
expr = textwrap.dedent("""
|
|
(import <nixpkgs/nixos> { }).config.dynamicism.applyDynamicConfiguration { }
|
|
""").strip()
|
|
|
|
machine.succeed(rf"""
|
|
nix run --show-trace --log-format raw-with-logs --impure -E {shlex.quote(expr)}
|
|
""".strip())
|
|
|
|
machine.wait_for_unit("default.target")
|
|
machine.wait_for_unit("install-dynix.service")
|
|
|
|
dynix_out = machine.succeed("dynix --version")
|
|
assert "dynix" in dynix_out, f"dynix not in {dynix_out=}"
|
|
|
|
# Config should have our initial values.
|
|
args = get_cli_args()
|
|
assert args.jobs == 12, f'{args.jobs=} != 12'
|
|
assert args.job_lifetime == 900, f'{args.job_lifetime} != 900'
|
|
assert args.log_level == 'warning', f'{args.log_level=} != warning'
|
|
|
|
with machine.nested("must succeed: initial nixos-rebuild switch"):
|
|
machine.succeed("env PAGER= nixos-rebuild switch --log-format raw-with-logs --no-reexec --fallback")
|
|
|
|
# Config should not have changed.
|
|
args = get_cli_args()
|
|
assert args.jobs == 12, f'{args.jobs=} != 12'
|
|
assert args.job_lifetime == 900, f'{args.job_lifetime} != 900'
|
|
assert args.log_level == 'warning', f'{args.log_level=} != warning'
|
|
|
|
new_jobs = 4
|
|
dynix_append("services.distccd.maxJobs", new_jobs)
|
|
do_apply()
|
|
|
|
# Only jobs should have changed. The others should still be default.
|
|
args = get_cli_args()
|
|
assert args.jobs == new_jobs, f'{args.jobs=} != {new_jobs=}'
|
|
assert args.job_lifetime == 900, f'{args.job_lifetime} != 900'
|
|
assert args.log_level == 'warning', f'{args.log_level=} != warning'
|
|
|
|
new_log_level = 'error'
|
|
dynix_append("services.distccd.logLevel", f'"{new_log_level}"')
|
|
do_apply()
|
|
|
|
args = get_cli_args()
|
|
assert args.jobs == new_jobs, f'{args.jobs=} != {new_jobs=}'
|
|
assert args.job_lifetime == 900, f'{args.job_lifetime} != 900'
|
|
assert args.log_level == new_log_level, f'{args.log_level=} != {new_log_level=}'
|
|
|
|
# And this should set everything back.
|
|
machine.succeed("env PAGER= nixos-rebuild switch --log-format raw-with-logs --no-reexec --fallback")
|
|
args = get_cli_args()
|
|
assert args.jobs == 12, f'{args.jobs=} != 12'
|
|
assert args.job_lifetime == 900, f'{args.job_lifetime} != 900'
|
|
assert args.log_level == 'warning', f'{args.log_level=} != warning'
|