Compare commits
No commits in common. "b0fc0debc99964f7741f6d61c9ed3de7e28353ca" and "ccd37ee16921b1a96a6d4ade3be85a7890843baa" have entirely different histories.
b0fc0debc9
...
ccd37ee169
16 changed files with 1099 additions and 1433 deletions
788
Cargo.lock
generated
788
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
|
@ -22,7 +22,6 @@ regex-full = ["dep:regex"]
|
|||
regex-lite = ["dep:regex-lite"]
|
||||
|
||||
[dependencies]
|
||||
axum = { version = "0.8.8", features = ["macros"] }
|
||||
bimap = "0.6.3"
|
||||
bitflags = { version = "2.11.0", features = ["std"] }
|
||||
bstr = "1.12.1"
|
||||
|
|
@ -30,17 +29,14 @@ circular-buffer = "1.2.0"
|
|||
clap = { version = "4.5.54", features = ["color", "derive"] }
|
||||
command-error = "0.8.0"
|
||||
const-str = "1.1.0"
|
||||
const_format = { version = "0.2.35", features = ["fmt"] }
|
||||
displaydoc = "0.2.5"
|
||||
fs-err = "3.2.2"
|
||||
humantime = "2.3.0"
|
||||
iddqd = "0.3.17"
|
||||
indoc = "2.0.7"
|
||||
itertools = "0.14.0"
|
||||
libc = { version = "0.2.180", features = ["extra_traits"] }
|
||||
#macro_rules_attribute = { version = "0.2.2", features = ["better-docs", "verbose-expansions"] }
|
||||
mio = { version = "1.1.1", features = ["os-ext", "os-poll", "net"] }
|
||||
parking_lot = { version = "0.12.5", features = ["arc_lock", "deadlock_detection", "hardware-lock-elision", "serde"] }
|
||||
regex = { version = "1.12.3", optional = true }
|
||||
regex-lite = { version = "0.1.9", optional = true }
|
||||
rustix = { version = "1.1.4", features = ["event", "fs", "net", "process", "termios"] }
|
||||
|
|
@ -48,12 +44,9 @@ serde = { version = "1.0.228", features = ["derive", "rc"] }
|
|||
serde_json = "1.0.149"
|
||||
sync-fd = "0.1.0"
|
||||
tap = "1.0.1"
|
||||
tokio = { version = "1.50.0", features = ["full", "mio", "tracing"] }
|
||||
tracing = { version = "0.1.44", features = ["attributes"] }
|
||||
tracing-human-layer = "0.2.1"
|
||||
tracing-subscriber = { version = "0.3.22", default-features = false, features = ["std", "env-filter", "fmt", "ansi", "registry", "parking_lot"] }
|
||||
utoipa = { version = "5.4.0", features = ["axum_extras", "config", "debug", "indexmap", "preserve_order", "preserve_path_order", "repr", "time", "url"] }
|
||||
utoipa-axum = { version = "0.2.0", features = ["debug"] }
|
||||
which = "8.0.2"
|
||||
|
||||
[profile.dev]
|
||||
|
|
|
|||
|
|
@ -46,7 +46,6 @@
|
|||
packages = extraVersions // {
|
||||
default = dynix;
|
||||
inherit dynix;
|
||||
tests = self.packages.${system}.dynix.allTests;
|
||||
};
|
||||
|
||||
devShells = extraDevShells // {
|
||||
|
|
|
|||
41
package.nix
41
package.nix
|
|
@ -67,9 +67,7 @@ in {
|
|||
inherit (self) strictDeps __structuredAttrs;
|
||||
inherit (self) doCheck doInstallCheck;
|
||||
|
||||
outputs = [ "out" ];
|
||||
# "Fake" doc output, since it's actually built as a separate derivation.
|
||||
passthru.doc = self.dynixCrateDocs;
|
||||
outputs = [ "out" "doc" ];
|
||||
|
||||
src = lib.fileset.toSource {
|
||||
root = ./.;
|
||||
|
|
@ -84,6 +82,12 @@ in {
|
|||
lockFile = ./Cargo.lock;
|
||||
};
|
||||
|
||||
postInstall = ''
|
||||
cargo doc --document-private-items
|
||||
mkdir -p "$doc"
|
||||
cp -r ./target/doc/* "$doc/"
|
||||
'';
|
||||
|
||||
nativeBuildInputs = rustHooks.asList ++ [
|
||||
cargo
|
||||
];
|
||||
|
|
@ -99,37 +103,6 @@ in {
|
|||
};
|
||||
};
|
||||
|
||||
# Run `cargo doc` as a separate derivation, so it can be run in parallel while the main
|
||||
# `cargo build` of `dynixCommand`.
|
||||
# This does effectively run `cargo check` twice, but that's by far the fastest part of the build,
|
||||
# so it's fine imo.
|
||||
dynixCrateDocs = stdenv.mkDerivation {
|
||||
pname = "${self.pname}-crate-docs";
|
||||
inherit (self) version;
|
||||
inherit (self) strictDeps __structuredAttrs;
|
||||
inherit (self.dynixCommand) src cargoDeps nativeBuildInputs;
|
||||
|
||||
phases = [ "unpackPhase" "patchPhase" "cargoDocPhase" "installPhase" ];
|
||||
|
||||
cargoDocPhase = ''
|
||||
runHook preCargoDoc
|
||||
cargo doc --document-private-items
|
||||
runHook postCargoDoc
|
||||
'';
|
||||
|
||||
installPhase = ''
|
||||
runHook preInstall
|
||||
mkdir -p "$out"
|
||||
cp -r ./target/doc/* "$out"
|
||||
runHook postInstall
|
||||
'';
|
||||
|
||||
meta = {
|
||||
description = "Crate Rustdoc for Dynix";
|
||||
inherit (self.meta) license;
|
||||
};
|
||||
};
|
||||
|
||||
dynixModules = stdenv.mkDerivation {
|
||||
pname = "${self.pname}-modules";
|
||||
inherit (self) version;
|
||||
|
|
|
|||
78
src/args.rs
78
src/args.rs
|
|
@ -3,9 +3,7 @@
|
|||
// SPDX-License-Identifier: EUPL-1.1
|
||||
|
||||
use std::{
|
||||
env, iter,
|
||||
net::SocketAddr,
|
||||
ops::Deref,
|
||||
env,
|
||||
sync::{Arc, LazyLock},
|
||||
};
|
||||
|
||||
|
|
@ -27,28 +25,25 @@ pub struct AppendCmd {
|
|||
#[derive(Debug, Clone, PartialEq, clap::Parser)]
|
||||
#[command(long_about = None)]
|
||||
pub struct DaemonCmd {
|
||||
/// Specify the bind address.
|
||||
#[arg(long, default_value = "0.0.0.0:42420")]
|
||||
pub bind: SocketAddr,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, clap::Parser)]
|
||||
#[command(long_about = None)]
|
||||
pub struct InitCmd {
|
||||
/// Overwrite existing files.
|
||||
/// Read from stdin instead of a Unix socket.
|
||||
#[arg(long)]
|
||||
pub force: bool,
|
||||
pub stdin: bool,
|
||||
|
||||
/// Manually specify the full alternative path to the server socket.
|
||||
///
|
||||
/// If not specified and `--stdin` is not specified, defaults to $XDG_RUNTIME_DIR/dynix.sock
|
||||
#[arg(long)]
|
||||
#[arg(conflicts_with = "stdin")]
|
||||
pub socket: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, clap::Subcommand)]
|
||||
pub enum Subcommand {
|
||||
Append(AppendCmd),
|
||||
Init(InitCmd),
|
||||
Daemon(DaemonCmd),
|
||||
OpenApiDocs,
|
||||
}
|
||||
|
||||
pub static DEFAULT_PATH: LazyLock<Arc<Path>> = LazyLock::new(|| {
|
||||
static DEFAULT_PATH: LazyLock<Box<OsStr>> = LazyLock::new(|| {
|
||||
// This has to be in a let binding to keep the storage around.
|
||||
let nixos_config = env::var_os("NIXOS_CONFIG");
|
||||
let nixos_config = nixos_config
|
||||
|
|
@ -56,7 +51,7 @@ pub static DEFAULT_PATH: LazyLock<Arc<Path>> = LazyLock::new(|| {
|
|||
.map(Path::new)
|
||||
.unwrap_or(Path::new("/etc/nixos/configuration.nix"));
|
||||
|
||||
let boxed = nixos_config
|
||||
nixos_config
|
||||
.parent()
|
||||
.unwrap_or_else(|| {
|
||||
error!(
|
||||
|
|
@ -65,48 +60,11 @@ pub static DEFAULT_PATH: LazyLock<Arc<Path>> = LazyLock::new(|| {
|
|||
);
|
||||
Path::new("/etc/nixos")
|
||||
})
|
||||
.join("dynamic.nix");
|
||||
|
||||
Arc::from(boxed)
|
||||
.join("dynamic.nix")
|
||||
.into_os_string()
|
||||
.into_boxed_os_str()
|
||||
});
|
||||
|
||||
#[repr(transparent)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct DynamicDotNix(pub Arc<Path>);
|
||||
impl Default for DynamicDotNix {
|
||||
fn default() -> Self {
|
||||
DynamicDotNix(Arc::clone(&*DEFAULT_PATH))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for DynamicDotNix {
|
||||
fn from(s: &str) -> DynamicDotNix {
|
||||
let path = Path::new(s);
|
||||
let path: PathBuf = if path.is_relative() && !path.starts_with("./") {
|
||||
iter::once(OsStr::new("./")).chain(path.iter()).collect()
|
||||
} else {
|
||||
path.to_path_buf()
|
||||
};
|
||||
|
||||
DynamicDotNix(Arc::from(path))
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for DynamicDotNix {
|
||||
fn fmt(&self, f: &mut Formatter) -> FmtResult {
|
||||
write!(f, "{}", self.0.display())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for DynamicDotNix {
|
||||
type Target = Arc<Path>;
|
||||
|
||||
fn deref(&self) -> &Arc<Path> {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, clap::Parser)]
|
||||
#[command(version, about, author)]
|
||||
#[command(arg_required_else_help(true), args_override_self(true))]
|
||||
|
|
@ -117,11 +75,9 @@ pub struct Args {
|
|||
|
||||
/// The .nix file with dynamic overrides to modify.
|
||||
/// [default: $(dirname ${NIXOS_CONFIG-/etc/nixos/configuration.nix})/dynamic.nix]
|
||||
#[arg(long, global(true))]
|
||||
#[arg(default_value_t)]
|
||||
#[arg(long, global(true), default_value = &**DEFAULT_PATH)]
|
||||
#[arg(hide_default_value(true))]
|
||||
//#[arg(value_parser = clap::value_parser!(PathBuf))]
|
||||
pub file: DynamicDotNix,
|
||||
pub file: Arc<OsStr>,
|
||||
|
||||
#[command(subcommand)]
|
||||
pub subcommand: Subcommand,
|
||||
|
|
|
|||
847
src/daemon.rs
847
src/daemon.rs
|
|
@ -1,42 +1,68 @@
|
|||
use std::{
|
||||
net::SocketAddr,
|
||||
process::{Output, Stdio},
|
||||
sync::LazyLock,
|
||||
env, io,
|
||||
os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd},
|
||||
process::{Command, Stdio},
|
||||
sync::{
|
||||
Arc, LazyLock,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use axum::{
|
||||
Json, Router,
|
||||
extract::State,
|
||||
http::{self, StatusCode, header::HeaderMap},
|
||||
routing::post,
|
||||
use iddqd::{BiHashMap, IdOrdMap};
|
||||
|
||||
use mio::{Events, Interest, Poll, Token, event::Event, net::UnixListener, unix::SourceFd};
|
||||
|
||||
use rustix::{
|
||||
buffer::spare_capacity,
|
||||
net::SocketFlags,
|
||||
process::{Pid, PidfdFlags, Uid, WaitId, WaitIdOptions},
|
||||
};
|
||||
use tokio::{net::TcpListener, process::Command};
|
||||
//use utoipa::{OpenApi as _, ToSchema, openapi::OpenApi};
|
||||
//use utoipa_axum::router::{OpenApiRouter, UtoipaMethodRouterExt};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
mod rustix {
|
||||
pub use rustix::process::{getuid, pidfd_open, waitid};
|
||||
pub use rustix::*;
|
||||
}
|
||||
|
||||
use crate::{SourceFile, prelude::*};
|
||||
//mod rustix_prelude {
|
||||
// pub use rustix::process::{getuid, pidfd_open, waitid};
|
||||
//}
|
||||
|
||||
use serde_json::StreamDeserializer;
|
||||
|
||||
use crate::prelude::*;
|
||||
|
||||
pub mod api;
|
||||
use api::{ConvenientAttrPath, NixLiteral};
|
||||
use api::DaemonCmd;
|
||||
|
||||
//pub static UID: LazyLock<Uid> = LazyLock::new(rustix::process::getuid);
|
||||
use crate::daemon_tokfd::{FdInfo, FdKind};
|
||||
|
||||
//pub static USER_SOCKET_DIR: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
// let dir: Box<Path> = env::var_os("XDG_RUNTIME_DIR")
|
||||
// .map(PathBuf::from)
|
||||
// .unwrap_or_else(|| ["/", "run", "user", &UID.to_string()].into_iter().collect())
|
||||
// .into_boxed_path();
|
||||
//
|
||||
// Box::leak(dir)
|
||||
//});
|
||||
use crate::{OwnedFdWithFlags, TokenFd};
|
||||
|
||||
//pub static TMPDIR: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
// let dir: Box<Path> = env::temp_dir().into_boxed_path();
|
||||
//
|
||||
// Box::leak(dir)
|
||||
//});
|
||||
pub static UID: LazyLock<Uid> = LazyLock::new(rustix::process::getuid);
|
||||
|
||||
pub static USER_SOCKET_DIR: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
let dir: Box<Path> = env::var_os("XDG_RUNTIME_DIR")
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|| ["/", "run", "user", &UID.to_string()].into_iter().collect())
|
||||
.into_boxed_path();
|
||||
|
||||
Box::leak(dir)
|
||||
});
|
||||
|
||||
pub static TMPDIR: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
let dir: Box<Path> = env::temp_dir().into_boxed_path();
|
||||
|
||||
Box::leak(dir)
|
||||
});
|
||||
|
||||
pub static NIXOS_REBUILD: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
which::which("nixos-rebuild")
|
||||
.inspect_err(|e| error!("couldn't find `nixos-rebuild` in PATH: {e}"))
|
||||
.map(PathBuf::into_boxed_path)
|
||||
.map(|boxed| &*Box::leak(boxed))
|
||||
.unwrap_or(Path::new("/run/current-system/sw/bin/nixos-rebuild"))
|
||||
});
|
||||
|
||||
pub static NIX: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
which::which("nix")
|
||||
|
|
@ -46,154 +72,402 @@ pub static NIX: LazyLock<&'static Path> = LazyLock::new(|| {
|
|||
.unwrap_or(Path::new("/run/current-system/sw/bin/nix"))
|
||||
});
|
||||
|
||||
pub async fn run(config: Config) {
|
||||
let addr = config.addr.clone();
|
||||
let router = Router::new()
|
||||
.route("/set", post(ep_set_post))
|
||||
// `.with_state()` has to be last for the type inference to work.
|
||||
.with_state(config);
|
||||
//let (router, api): (Router, OpenApi) = OpenApiRouter::with_openapi(ApiDoc::openapi())
|
||||
// .routes(utoipa_axum::routes!(ep_set_post))
|
||||
// // `.with_state()` has to be last for the type inference works.
|
||||
// .with_state(config)
|
||||
// .split_for_parts();
|
||||
const TIMEOUT_NEVER: Option<Duration> = None;
|
||||
|
||||
let listener = TcpListener::bind(addr).await.unwrap();
|
||||
static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1);
|
||||
fn next_token() -> Token {
|
||||
let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
axum::serve(listener, router).await.unwrap();
|
||||
// If the increment wrapped to 0, then we just increment it again.
|
||||
if tok == 0 {
|
||||
warn!("File descriptor token wrapped. That's... a lot.");
|
||||
return next_token();
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub config_file: SourceFile,
|
||||
pub addr: SocketAddr,
|
||||
pub token: Option<String>,
|
||||
Token(tok)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, PartialOrd)]
|
||||
#[derive(Deserialize, Serialize)]
|
||||
//#[derive(ToSchema)]
|
||||
pub struct SetParams {
|
||||
pub name: ConvenientAttrPath,
|
||||
pub value: NixLiteral,
|
||||
trait EventExt {
|
||||
type Display;
|
||||
|
||||
fn display(&self) -> Self::Display;
|
||||
}
|
||||
|
||||
#[derive(Copy)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
#[derive(Deserialize, Serialize)]
|
||||
//#[derive(ToSchema)]
|
||||
pub struct SetResponse {
|
||||
/// Will be 0 if everything is okay.
|
||||
///
|
||||
/// Will be -1 for an error with no code.
|
||||
pub status: i64,
|
||||
pub msg: Option<String>,
|
||||
struct EventDisplay {
|
||||
token: Token,
|
||||
error: bool,
|
||||
writable: bool,
|
||||
write_closed: bool,
|
||||
readable: bool,
|
||||
read_closed: bool,
|
||||
}
|
||||
impl EventExt for Event {
|
||||
type Display = EventDisplay;
|
||||
|
||||
#[axum::debug_handler]
|
||||
//#[utoipa::path(
|
||||
// post,
|
||||
// path = "/set",
|
||||
// responses(
|
||||
// (status = 200, description = "Request was valid", body = SetResponse)
|
||||
// ),
|
||||
//)]
|
||||
async fn ep_set_post(
|
||||
State(config): State<Config>,
|
||||
headers: HeaderMap,
|
||||
Json(SetParams { name, value }): Json<SetParams>,
|
||||
) -> Result<Json<SetResponse>, StatusCode> {
|
||||
debug!("POST /set with name={name:?}, value={value:?}");
|
||||
|
||||
if let Some(token) = &config.token {
|
||||
let Some(auth) = headers.get(http::header::AUTHORIZATION) else {
|
||||
// FIXME: technically RFC9110 requires us to respond with a
|
||||
// `WWW-Authenticate` header.
|
||||
error!("token specified in config but not provided in request");
|
||||
return Err(StatusCode::UNAUTHORIZED);
|
||||
};
|
||||
// No need to go through UTF-8 decoding here.
|
||||
if auth.as_bytes() != token.as_bytes() {
|
||||
error!("token provided in request does not match configured token");
|
||||
return Err(StatusCode::UNAUTHORIZED);
|
||||
fn display(&self) -> Self::Display {
|
||||
EventDisplay {
|
||||
token: self.token(),
|
||||
error: self.is_error(),
|
||||
writable: self.is_writable(),
|
||||
write_closed: self.is_write_closed(),
|
||||
readable: self.is_readable(),
|
||||
read_closed: self.is_read_closed(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Display for EventDisplay {
|
||||
fn fmt(&self, f: &mut Formatter) -> FmtResult {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
let file = config.config_file.clone();
|
||||
#[derive(Debug)]
|
||||
pub struct Daemon {
|
||||
config_path: Arc<Path>,
|
||||
fd: OwnedFdWithFlags,
|
||||
path: Option<Box<Path>>,
|
||||
|
||||
let prio = crate::get_where(file.clone());
|
||||
let new_prio = prio - 1;
|
||||
poller: Poll,
|
||||
|
||||
let opt_name = name.to_nix_decl();
|
||||
let opt_val = value.to_nix_source();
|
||||
let new_line = crate::get_next_prio_line(file.clone(), &opt_name, new_prio, &opt_val);
|
||||
fd_info: IdOrdMap<FdInfo>,
|
||||
|
||||
match crate::write_next_prio(file.clone(), new_line) {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
error!("Couldn't write next generation to {}: {e}", file.display());
|
||||
let status = e.raw_os_error().map(i64::from).unwrap_or(-1);
|
||||
return Ok(Json(SetResponse {
|
||||
status,
|
||||
msg: Some(format!("{e}")),
|
||||
}));
|
||||
},
|
||||
};
|
||||
// Bijective mapping of [`mio::Token`]s to [`RawFd`]s.
|
||||
tokfd: BiHashMap<TokenFd>,
|
||||
}
|
||||
|
||||
let child_status = match nix_run_apply(&config).await {
|
||||
/// `tokfd` handling.
|
||||
impl Daemon {
|
||||
fn fd_error_pop(&mut self, fd: RawFd) -> Option<IoError> {
|
||||
let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| {
|
||||
if let Ok(name) = FdInfo::guess_name(fd) {
|
||||
panic!(
|
||||
"tried to pop error for unknown fd {fd} ({})",
|
||||
name.to_string_lossy(),
|
||||
);
|
||||
}
|
||||
panic!("tried to pop error for unknown fd {fd}")
|
||||
});
|
||||
info.error_buffer.pop_front().tap_some(|e| {
|
||||
trace!("Popping error for {}: {e}", info.display());
|
||||
})
|
||||
}
|
||||
|
||||
fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> {
|
||||
let mut info = self
|
||||
.fd_info
|
||||
.get_mut(&fd)
|
||||
.unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}"));
|
||||
trace!("Pushing error for {}: {}", info.display(), error);
|
||||
info.error_buffer.try_push_back(error)
|
||||
}
|
||||
|
||||
fn main_fd_info(&self) -> &FdInfo {
|
||||
self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| {
|
||||
unreachable!(
|
||||
"Main daemon fd {:?} was not registered with fd_info",
|
||||
self.fd,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn register(&mut self, fd: RawFd, kind: FdKind) -> Token {
|
||||
let token = next_token();
|
||||
|
||||
debug!(
|
||||
"Registering new {} FdInfo for {fd} with token {token:?}",
|
||||
kind.name_str(),
|
||||
);
|
||||
|
||||
self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap();
|
||||
|
||||
self.tokfd
|
||||
.insert_unique(TokenFd { token, fd })
|
||||
.unwrap_or_else(|e| todo!("{e}"));
|
||||
|
||||
let mut source = SourceFd(&fd);
|
||||
self.poller
|
||||
.registry()
|
||||
.register(&mut source, token, Interest::READABLE)
|
||||
.unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}"));
|
||||
|
||||
token
|
||||
}
|
||||
|
||||
#[expect(dead_code)]
|
||||
fn register_with_name<S>(&mut self, fd: RawFd, kind: FdKind, name: Box<OsStr>) -> Token {
|
||||
let token = next_token();
|
||||
|
||||
debug!(
|
||||
"Registering new {} FdInfo for {fd} ({}) with token {token:?}",
|
||||
name.to_string_lossy(),
|
||||
kind.name_str(),
|
||||
);
|
||||
|
||||
self.fd_info
|
||||
.insert_unique(FdInfo::new_with_name(fd, kind, name))
|
||||
.unwrap();
|
||||
|
||||
self.tokfd
|
||||
.insert_unique(TokenFd { token, fd })
|
||||
.unwrap_or_else(|e| todo!("{e}"));
|
||||
|
||||
let mut source = SourceFd(&fd);
|
||||
self.poller
|
||||
.registry()
|
||||
.register(&mut source, token, Interest::READABLE)
|
||||
.unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}"));
|
||||
|
||||
token
|
||||
}
|
||||
|
||||
fn deregister(&mut self, fd: RawFd) {
|
||||
let info = self
|
||||
.fd_info
|
||||
.remove(&fd)
|
||||
.unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}"));
|
||||
debug!("Unregistering fd {}; calling close()", info.display());
|
||||
unsafe { rustix::io::close(fd) };
|
||||
let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) };
|
||||
debug_assert_eq!(res, -1);
|
||||
debug_assert_eq!(
|
||||
IoError::last_os_error().raw_os_error(),
|
||||
Some(Errno::BADF.raw_os_error()),
|
||||
);
|
||||
|
||||
self.tokfd.remove2(&fd).unwrap_or_else(|| todo!());
|
||||
}
|
||||
|
||||
fn fd_for_token(&self, token: Token) -> Option<RawFd> {
|
||||
self.tokfd
|
||||
.get1(&token)
|
||||
.map(|TokenFd { fd, .. }| fd)
|
||||
.copied()
|
||||
}
|
||||
|
||||
/// Not currently used, but here for completeness.
|
||||
#[expect(dead_code)]
|
||||
fn token_for_fd(&self, fd: RawFd) -> Option<Token> {
|
||||
self.tokfd
|
||||
.get2(&fd)
|
||||
.map(|TokenFd { token, .. }| token)
|
||||
.copied()
|
||||
}
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
pub fn new(
|
||||
config_path: Arc<Path>,
|
||||
fd: OwnedFd,
|
||||
kind: FdKind,
|
||||
name: Option<Box<OsStr>>,
|
||||
) -> Self {
|
||||
let mut fd_info: IdOrdMap<FdInfo> = Default::default();
|
||||
|
||||
// Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM.
|
||||
// If any of those are the case, we're screwed anyway.
|
||||
let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}"));
|
||||
// Make sure we register the poller in `fd_info`, so we can keep track of its errors.
|
||||
fd_info
|
||||
.insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller))
|
||||
.unwrap_or_else(|e| unreachable!("{e}"));
|
||||
|
||||
let fd = OwnedFdWithFlags::new_with_fallback(fd);
|
||||
|
||||
fd_info
|
||||
.insert_unique(FdInfo::new(fd.as_raw_fd(), kind))
|
||||
.unwrap_or_else(|e| unreachable!("{e}"));
|
||||
|
||||
debug!("opened daemon to {:?} file descriptor {fd:?}", name);
|
||||
|
||||
let path = name
|
||||
.as_ref()
|
||||
.map(PathBuf::from)
|
||||
.map(PathBuf::into_boxed_path);
|
||||
|
||||
Self {
|
||||
config_path,
|
||||
fd,
|
||||
path,
|
||||
poller,
|
||||
fd_info,
|
||||
tokfd: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_unix_socket_path(config_path: Arc<Path>, path: &Path) -> Result<Self, IoError> {
|
||||
// We unconditionally unlink() `path` before binding, but completely ignore the result.
|
||||
let _ = rustix::fs::unlink(path);
|
||||
let listener = UnixListener::bind(path)
|
||||
.tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?;
|
||||
let listener_owned_fd = OwnedFd::from(listener);
|
||||
// FIXME: should we KEEP_ALIVE?
|
||||
rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap();
|
||||
let path: Box<OsStr> = path.to_path_buf().into_boxed_path().into_boxed_os_str();
|
||||
|
||||
Ok(Self::new(
|
||||
config_path,
|
||||
listener_owned_fd,
|
||||
FdKind::Socket,
|
||||
Some(path),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn open_default_socket(config_path: Arc<Path>) -> Result<Self, IoError> {
|
||||
use IoErrorKind::*;
|
||||
let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path();
|
||||
let constructed = match Self::from_unix_socket_path(config_path.clone(), &preferred) {
|
||||
Ok(v) => v,
|
||||
Err(e) if e.kind() == Unsupported => {
|
||||
//
|
||||
return Err(e);
|
||||
},
|
||||
Err(e) => {
|
||||
let status = e.raw_os_error().map(i64::from).unwrap_or(-1);
|
||||
return Ok(Json(SetResponse {
|
||||
status,
|
||||
msg: Some(format!("{e}")),
|
||||
}));
|
||||
warn!(
|
||||
"failed binding AF_UNIX socket at {}: {e}; trying elsewhere",
|
||||
preferred.display(),
|
||||
);
|
||||
|
||||
let fallback = TMPDIR.join("dynix.sock").into_boxed_path();
|
||||
Self::from_unix_socket_path(config_path, &fallback).tap_err(|e| {
|
||||
error!(
|
||||
"failed binding AF_UNIX socket at {}: {e}",
|
||||
fallback.display(),
|
||||
)
|
||||
})?
|
||||
},
|
||||
};
|
||||
|
||||
let Output {
|
||||
status,
|
||||
stdout,
|
||||
stderr,
|
||||
} = child_status;
|
||||
|
||||
if status.code() != Some(0) {
|
||||
error!(
|
||||
"Child `nix run` process returned non-zero code {:?}",
|
||||
status.code(),
|
||||
);
|
||||
error!("Child stdout: {}", stdout.as_bstr());
|
||||
error!("Child stderr: {}", stderr.as_bstr());
|
||||
Ok(constructed)
|
||||
}
|
||||
|
||||
let status = status.code().map(i64::from).unwrap_or(-1);
|
||||
let msg = format!(
|
||||
"Stdout: {}\nStderr: {}\n",
|
||||
stdout.as_bstr(),
|
||||
stderr.as_bstr()
|
||||
);
|
||||
/// This panics if stdin cannot be opened.
|
||||
///
|
||||
/// If you want to handle that error, use [`Daemon::from_raw_parts()`].
|
||||
pub fn from_stdin(config_path: Arc<Path>) -> Self {
|
||||
let stdin = io::stdin();
|
||||
let fd = stdin
|
||||
.as_fd()
|
||||
.try_clone_to_owned()
|
||||
.expect("dynix daemon could not open stdin; try a Unix socket?");
|
||||
|
||||
Ok(Json(SetResponse {
|
||||
status,
|
||||
msg: Some(msg),
|
||||
}))
|
||||
Self::new(config_path, fd, FdKind::File, None)
|
||||
}
|
||||
|
||||
async fn nix_run_apply(config: &Config) -> Result<Output, IoError> {
|
||||
let configuration_nix = config
|
||||
.config_file
|
||||
.path()
|
||||
.parent()
|
||||
.unwrap()
|
||||
.join("configuration.nix");
|
||||
let configuration_nix = configuration_nix
|
||||
.to_str()
|
||||
.expect("specified NixOS config file is not a UTF-8 path");
|
||||
let expr = format!(
|
||||
"(import <nixpkgs/nixos> {{ configuration = {}; }})\
|
||||
.config.dynamicism.applyDynamicConfiguration {{ baseConfiguration = {}; }}",
|
||||
configuration_nix, configuration_nix,
|
||||
);
|
||||
//pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self {
|
||||
// Self {
|
||||
// fd: OwnedFdWithFlags::new_with_fallback(fd),
|
||||
// }
|
||||
//}
|
||||
|
||||
pub fn fd(&self) -> BorrowedFd<'_> {
|
||||
self.fd.as_fd()
|
||||
}
|
||||
}
|
||||
const DAEMON: Token = Token(0);
|
||||
|
||||
/// Private helpers.
|
||||
impl Daemon {
|
||||
fn proxy_stdio(&mut self, fd: &BorrowedFd) -> Result<(), IoError> {
|
||||
let info = self.fd_info.get(&fd.as_raw_fd()).unwrap();
|
||||
let label = match info.kind {
|
||||
FdKind::ChildStdout => "stdout",
|
||||
FdKind::ChildStderr => "stderr",
|
||||
other => unreachable!("child stdio cannot have kind {other:?}"),
|
||||
};
|
||||
// FIXME: don't use a new allocation every time.
|
||||
let mut buffer: Vec<u8> = Vec::with_capacity(1024);
|
||||
// FIXME: handle line buffering correctly.
|
||||
loop {
|
||||
let count = rustix::io::read(fd, spare_capacity(&mut buffer))
|
||||
.inspect_err(|e| error!("read() on child stdio fd {fd:?} failed: {e}"))?;
|
||||
|
||||
if count == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
for line in buffer.lines() {
|
||||
info!("[child {label}]: {}", line.as_bstr())
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> {
|
||||
// FIXME: don't use a new allocation every time.
|
||||
let mut cmd_buffer: Vec<u8> = Vec::with_capacity(1024);
|
||||
|
||||
let _count = rustix::io::read(fd, spare_capacity(&mut cmd_buffer))
|
||||
.tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?;
|
||||
|
||||
// The buffer might have existing data from the last read.
|
||||
let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer);
|
||||
let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter();
|
||||
for cmd in stream {
|
||||
let cmd = match cmd {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) if e.is_eof() => {
|
||||
warn!("Got EOF before a valid command");
|
||||
debug!("command buffer was: {:?}", cmd_buffer.as_bstr());
|
||||
return Ok(());
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("error deserializing command: {e}");
|
||||
debug!("command buffer was: {:?}", cmd_buffer.as_bstr());
|
||||
// Don't propagate the error unless we have too many.
|
||||
self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| {
|
||||
error!("Accumulated too many errors for daemon fd {fd:?}: {e}")
|
||||
})?;
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
debug!("got cmd: {cmd:?}");
|
||||
let _ = rustix::io::write(fd, b"");
|
||||
info!("dispatching command");
|
||||
self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dispatch_cmd(&mut self, cmd: DaemonCmd) -> Result<(), IoError> {
|
||||
// Write the new file...
|
||||
let (name, value) = match cmd {
|
||||
DaemonCmd::Append { name, value } => (name, value),
|
||||
};
|
||||
let source_file = crate::open_source_file(self.config_path.clone())?;
|
||||
let pri = crate::get_where(source_file.clone()).unwrap_or_else(|e| todo!("{e}"));
|
||||
let new_pri = pri - 1;
|
||||
// Get next priority line.
|
||||
let opt_name = name.to_nix_decl();
|
||||
let new_line = crate::get_next_prio_line(
|
||||
source_file.clone(),
|
||||
&opt_name,
|
||||
new_pri,
|
||||
&value.to_nix_source(),
|
||||
)
|
||||
.unwrap_or_else(|e| panic!("someone is holding a reference to source.lines(): {e}"));
|
||||
|
||||
crate::write_next_prio(source_file, new_line).unwrap_or_else(|e| todo!("{e}"));
|
||||
|
||||
// Rebuild and switch.
|
||||
// FIXME: allow passing additional args.
|
||||
//let child = Command::new(*NIXOS_REBUILD)
|
||||
// .arg("switch")
|
||||
// .arg("--log-format")
|
||||
// .arg("raw-with-logs")
|
||||
// .arg("--no-reexec")
|
||||
// .arg("-v")
|
||||
// .stdout(Stdio::piped())
|
||||
// .stderr(Stdio::piped())
|
||||
// .spawn()
|
||||
// .inspect_err(|e| {
|
||||
// error!("failed to spawn `nixos-rebuild` command: {e}");
|
||||
// })?;
|
||||
|
||||
let expr = "(import <nixpkgs/nixos> { }).config.dynamicism.applyDynamicConfiguration { }";
|
||||
let child = Command::new(*NIX)
|
||||
.arg("run")
|
||||
.arg("--show-trace")
|
||||
|
|
@ -204,28 +478,269 @@ async fn nix_run_apply(config: &Config) -> Result<Output, IoError> {
|
|||
.arg(expr)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.tap(|cmd| {
|
||||
if tracing::enabled!(Level::DEBUG) {
|
||||
let args = cmd
|
||||
.as_std()
|
||||
.get_args()
|
||||
.map(OsStr::to_string_lossy)
|
||||
.join(" ");
|
||||
debug!("Spawning command: `nix {args}`");
|
||||
}
|
||||
})
|
||||
.spawn()
|
||||
.inspect_err(|e| error!("error spawning command: {e}"))?;
|
||||
.inspect_err(|e| error!("failed to spawn `nix run` command: {e}"))?;
|
||||
|
||||
let output = child.wait_with_output().await.inspect_err(|e| {
|
||||
error!("couldn't wait for spawned child process: {e}");
|
||||
debug!("Spanwed child process {}", child.id());
|
||||
|
||||
let pid = Pid::from_child(&child);
|
||||
|
||||
let stdout = child.stdout.unwrap_or_else(|| {
|
||||
unreachable!("`child` is given `.stdout(Stdio::piped())`");
|
||||
});
|
||||
let stderr = child.stderr.unwrap_or_else(|| {
|
||||
unreachable!("`child` is given `.stderr(Stdio::piped())`");
|
||||
});
|
||||
|
||||
let _token = self.register(stdout.into_raw_fd(), FdKind::ChildStdout);
|
||||
let _token = self.register(stderr.into_raw_fd(), FdKind::ChildStderr);
|
||||
|
||||
match rustix::process::pidfd_open(pid, PidfdFlags::NONBLOCK) {
|
||||
Ok(pidfd) => {
|
||||
debug!("Opened pidfd {pidfd:?}, for process {pid}");
|
||||
self.register(pidfd.into_raw_fd(), FdKind::Pid(pid));
|
||||
},
|
||||
Err(e) if e.kind() == IoErrorKind::NotFound => {
|
||||
warn!("child {pid} not found; died before we could open it?");
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Error opening pidfd for child {pid}: {e}");
|
||||
return Err(e)?;
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn enter_loop(&mut self) -> Result<Option<()>, IoError> {
|
||||
let raw_fd = self.fd.as_raw_fd();
|
||||
if cfg!(debug_assertions) {
|
||||
assert!(
|
||||
self.fd_info.contains_key(&raw_fd),
|
||||
"we should know about daemon fd {raw_fd}",
|
||||
);
|
||||
assert!(
|
||||
self.fd_info.contains_key(&self.poller.as_raw_fd()),
|
||||
"we should know about poller fd {}",
|
||||
self.poller.as_raw_fd(),
|
||||
);
|
||||
}
|
||||
let mut daemon_source = SourceFd(&raw_fd);
|
||||
self.tokfd
|
||||
.insert_unique(TokenFd {
|
||||
token: DAEMON,
|
||||
fd: raw_fd,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
self.poller
|
||||
.registry()
|
||||
.register(&mut daemon_source, DAEMON, Interest::READABLE)
|
||||
.unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}"));
|
||||
|
||||
let mut events = Events::with_capacity(1024);
|
||||
|
||||
loop {
|
||||
if tracing::enabled!(tracing::Level::DEBUG) {
|
||||
debug!("Daemon loop iteration, with file descriptors: ");
|
||||
for info in &self.fd_info {
|
||||
debug!("- {}", info.display());
|
||||
}
|
||||
}
|
||||
|
||||
let poll_result = self.poller.poll(&mut events, TIMEOUT_NEVER);
|
||||
self.handle_poll(poll_result, &events)?;
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_poll(
|
||||
&mut self,
|
||||
poll_result: Result<(), IoError>,
|
||||
events: &Events,
|
||||
) -> Result<(), IoError> {
|
||||
match poll_result {
|
||||
Ok(()) => {
|
||||
trace!(
|
||||
"mio::Poller::poll() got events: {:?}",
|
||||
events.iter().size_hint().0,
|
||||
);
|
||||
if events.is_empty() {
|
||||
unreachable!(
|
||||
"epoll_wait() with a \"forever\" timeout should never give empty events",
|
||||
);
|
||||
}
|
||||
|
||||
let _ = self.fd_error_pop(self.poller.as_raw_fd());
|
||||
},
|
||||
Err(e) if e.kind() == IoErrorKind::Interrupted => {
|
||||
// EINTR is silly.
|
||||
// Return early, and poll() again.
|
||||
return Ok(());
|
||||
},
|
||||
Err(e) => {
|
||||
if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) {
|
||||
panic!("EBADF on poller fd; IO safety violation?");
|
||||
}
|
||||
warn!("mio Poll::poll() error: {e}");
|
||||
self.fd_error_push(self.poller.as_raw_fd(), e)
|
||||
.tap_err(|e| {
|
||||
error!("accumulated too many errors for mio::Poll::poll(): {e}")
|
||||
})?;
|
||||
},
|
||||
}
|
||||
|
||||
for event in events {
|
||||
self.handle_event(event)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_event(&mut self, event: &Event) -> Result<(), IoError> {
|
||||
trace!("Handling event {event:#?}");
|
||||
|
||||
match event.token() {
|
||||
DAEMON => {
|
||||
let is_sock = self.main_fd_info().kind == FdKind::Socket;
|
||||
if !is_sock {
|
||||
// SAFETY: oh boy: disjoint borrows with extra steps.
|
||||
let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) };
|
||||
self.read_cmd(&file_fd).unwrap();
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Accept, first.
|
||||
let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC;
|
||||
let stream_fd = match rustix::net::accept_with(&self.fd, flags) {
|
||||
Ok(stream) => {
|
||||
debug!(
|
||||
"Accepted connection from socket {:?} as stream {:?}",
|
||||
self.fd, stream,
|
||||
);
|
||||
stream
|
||||
},
|
||||
Err(e) => {
|
||||
error!("accept4 on daemon socket failed: {e}");
|
||||
self.fd_error_push(self.fd.as_raw_fd(), e.into())
|
||||
.tap_err(|e| {
|
||||
error!(
|
||||
"Accumulated too many errors for daemon fd {:?}: {e}",
|
||||
self.fd
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(output)
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
|
||||
// Add this stream to our poll interest list.
|
||||
// NOTE: `stream_fd` is now effectively `ManuallyDrop`.
|
||||
let stream_fd = stream_fd.into_raw_fd();
|
||||
let _token = self.register(stream_fd, FdKind::SockStream);
|
||||
|
||||
// Wait for the next poll to handle.
|
||||
},
|
||||
other_token => {
|
||||
// This must be a stream fd.
|
||||
let fd = self.fd_for_token(other_token).unwrap_or_else(|| {
|
||||
unreachable!("tried to get fd for non-existent token? {other_token:?}")
|
||||
});
|
||||
let Some(info) = self.fd_info.get(&fd) else {
|
||||
panic!("Received an event on an unregistered fd {fd}; IO-safety violation?");
|
||||
};
|
||||
|
||||
let either_available = event.is_readable() || event.is_writable();
|
||||
if !either_available {
|
||||
info!(
|
||||
"File descriptor {} r:{}, w:{}",
|
||||
info.display(),
|
||||
event.is_readable(),
|
||||
event.is_writable(),
|
||||
);
|
||||
// FIXME: code duplication
|
||||
if event.is_read_closed() {
|
||||
self.deregister(fd);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
//#[derive(Copy)]
|
||||
//#[derive(Debug, Clone, PartialEq)]
|
||||
//#[derive(utoipa::OpenApi)]
|
||||
//#[openapi(paths(ep_set_post))]
|
||||
//pub struct ApiDoc;
|
||||
match info.kind {
|
||||
FdKind::Pid(pid) => {
|
||||
debug!("Reaping child process {pid}");
|
||||
// SAFETY: `fd` cannot have been closed yet, since that's what we do here.
|
||||
let pidfd = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
let status = rustix::waitid(WaitId::PidFd(pidfd), WaitIdOptions::EXITED)
|
||||
.unwrap_or_else(|e| {
|
||||
todo!("waitid() can fail? on pid {pid}: {e}");
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
todo!("waitid() returned None? for pid {pid}");
|
||||
});
|
||||
|
||||
debug!("waitid() for pid {pid} returned status: {status:?}");
|
||||
let is_dead = status.exited() || status.killed() || status.dumped();
|
||||
if !is_dead {
|
||||
todo!("Handle process {pid} events that aren't death: {status:?}");
|
||||
}
|
||||
let Some(exit_code) = status.exit_status() else {
|
||||
unreachable!("Process {pid} died with no exit code at all? {status:?}");
|
||||
};
|
||||
debug!("Child process {pid} exited with code {exit_code}");
|
||||
|
||||
// Close the pidfd.
|
||||
self.deregister(fd);
|
||||
|
||||
let stream = self
|
||||
.fd_info
|
||||
.iter()
|
||||
.find_map(|info| (info.kind == FdKind::SockStream).then_some(info));
|
||||
if let Some(stream) = stream {
|
||||
// SAFETY: fixme.
|
||||
let stream_fd = unsafe { BorrowedFd::borrow_raw(stream.fd) };
|
||||
let payload = format!("{{ \"status\": {exit_code} }}\n");
|
||||
if let Err(e) = rustix::io::write(stream_fd, payload.as_bytes()) {
|
||||
error!("couldn't write reply to stream fd {stream_fd:?}: {e}");
|
||||
}
|
||||
}
|
||||
},
|
||||
FdKind::ChildStdout => {
|
||||
warn!("got stdout");
|
||||
// SAFETY: oh boy.
|
||||
let stdout = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
self.proxy_stdio(&stdout)
|
||||
.unwrap_or_else(|e| error!("failed to proxy child stdout: {e}"));
|
||||
},
|
||||
FdKind::ChildStderr => {
|
||||
warn!("got stderr");
|
||||
// SAFETY: oh boy.
|
||||
let stderr = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
self.proxy_stdio(&stderr)
|
||||
.unwrap_or_else(|e| error!("failed to proxy child stderr: {e}"));
|
||||
},
|
||||
FdKind::SockStream => {
|
||||
// SAFETY: oh boy.
|
||||
let stream_fd = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
self.read_cmd(&stream_fd).unwrap();
|
||||
},
|
||||
kind => todo!("{kind:?}"),
|
||||
};
|
||||
|
||||
if event.is_read_closed() {
|
||||
self.deregister(fd);
|
||||
return Ok(());
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Daemon {
|
||||
fn drop(&mut self) {
|
||||
if let Some(path) = self.path.as_deref() {
|
||||
let _ = rustix::fs::unlink(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ use std::ops::Deref;
|
|||
use crate::prelude::*;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
mod impls;
|
||||
|
||||
|
|
@ -12,7 +11,6 @@ mod impls;
|
|||
/// This type does not provide a [`Default`] impl, however.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[derive(ToSchema)]
|
||||
#[serde(untagged)]
|
||||
pub enum ConvenientAttrPath {
|
||||
Dotted(Box<str>),
|
||||
|
|
@ -49,9 +47,8 @@ impl ConvenientAttrPath {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, PartialOrd)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[derive(ToSchema)]
|
||||
#[serde(untagged)]
|
||||
pub enum NixLiteral {
|
||||
String(String),
|
||||
|
|
@ -73,7 +70,7 @@ impl NixLiteral {
|
|||
#[serde(tag = "action", content = "args", rename_all = "snake_case")]
|
||||
// FIXME: rename to not confuse with the clap argument type.
|
||||
pub enum DaemonCmd {
|
||||
Set {
|
||||
Append {
|
||||
name: ConvenientAttrPath,
|
||||
value: Box<NixLiteral>,
|
||||
},
|
||||
|
|
|
|||
171
src/daemon_tokfd.rs
Normal file
171
src/daemon_tokfd.rs
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
use std::{os::fd::RawFd, sync::OnceLock};
|
||||
|
||||
use circular_buffer::CircularBuffer;
|
||||
use iddqd::{BiHashItem, IdOrdItem};
|
||||
use mio::Token;
|
||||
use rustix::process::Pid;
|
||||
|
||||
use crate::prelude::*;
|
||||
|
||||
const ERROR_BUFFER_LEN: usize = 8;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FdInfo {
|
||||
pub fd: RawFd,
|
||||
pub kind: FdKind,
|
||||
pub name: OnceLock<Box<OsStr>>,
|
||||
pub error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
|
||||
}
|
||||
|
||||
impl FdInfo {
|
||||
pub fn new<Fd: AsRawFd>(fd: Fd, kind: FdKind) -> Self {
|
||||
Self {
|
||||
fd: fd.as_raw_fd(),
|
||||
kind,
|
||||
name: Default::default(),
|
||||
error_buffer: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_name<Fd: AsRawFd>(fd: Fd, kind: FdKind, name: Box<OsStr>) -> Self {
|
||||
Self {
|
||||
fd: fd.as_raw_fd(),
|
||||
kind,
|
||||
name: OnceLock::from(name),
|
||||
error_buffer: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FdInfo {
|
||||
pub(crate) fn guess_name<Fd: AsRawFd>(fd: Fd) -> Result<Box<OsStr>, IoError> {
|
||||
let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string());
|
||||
|
||||
fs_err::read_link(dev_fd_path)
|
||||
.map(PathBuf::into_os_string)
|
||||
.map(OsString::into_boxed_os_str)
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &OsStr {
|
||||
if let Some(name) = self.name.get() {
|
||||
return name;
|
||||
}
|
||||
|
||||
match Self::guess_name(self.fd) {
|
||||
Ok(name) => {
|
||||
let prev = self.name.set(name);
|
||||
debug_assert_eq!(prev, Ok(()));
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"can't read link for {} /dev/fd/{}: {e}",
|
||||
self.kind.name_str(),
|
||||
self.fd,
|
||||
);
|
||||
return OsStr::new("«unknown»");
|
||||
},
|
||||
}
|
||||
|
||||
self.name.get().unwrap_or_else(|| unreachable!())
|
||||
}
|
||||
|
||||
pub fn display(&self) -> FdInfoDisplay<'_> {
|
||||
FdInfoDisplay { inner: self }
|
||||
}
|
||||
}
|
||||
|
||||
impl IdOrdItem for FdInfo {
|
||||
type Key<'a> = &'a RawFd;
|
||||
|
||||
iddqd::id_upcast!();
|
||||
|
||||
fn key(&self) -> &RawFd {
|
||||
&self.fd
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FdInfoDisplay<'a> {
|
||||
inner: &'a FdInfo,
|
||||
}
|
||||
impl<'a> Display for FdInfoDisplay<'a> {
|
||||
fn fmt(&self, f: &mut Formatter) -> FmtResult {
|
||||
write!(
|
||||
f,
|
||||
"{} fd {} ({})",
|
||||
self.inner.kind.name_str(),
|
||||
self.inner.fd,
|
||||
self.inner.name().to_string_lossy(),
|
||||
)?;
|
||||
if !self.inner.error_buffer.is_empty() {
|
||||
write!(f, "; with errors: {}", self.inner.error_buffer.len())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
|
||||
#[non_exhaustive]
|
||||
pub enum FdKind {
|
||||
File,
|
||||
Socket,
|
||||
SockStream,
|
||||
Poller,
|
||||
ChildStdout,
|
||||
ChildStderr,
|
||||
Pid(Pid),
|
||||
#[default]
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl FdKind {
|
||||
pub fn name_str(self) -> &'static str {
|
||||
use FdKind::*;
|
||||
match self {
|
||||
File => "file",
|
||||
Socket => "socket",
|
||||
SockStream => "socket stream",
|
||||
Poller => "poller",
|
||||
ChildStdout => "child stdout",
|
||||
ChildStderr => "child stderr",
|
||||
Pid(_) => "pidfd",
|
||||
Unknown => "«unknown»",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct TokenFd {
|
||||
pub token: Token,
|
||||
pub fd: RawFd,
|
||||
}
|
||||
|
||||
impl BiHashItem for TokenFd {
|
||||
type K1<'a> = Token;
|
||||
type K2<'a> = RawFd;
|
||||
|
||||
iddqd::bi_upcast!();
|
||||
|
||||
fn key1(&self) -> Token {
|
||||
self.token
|
||||
}
|
||||
|
||||
fn key2(&self) -> RawFd {
|
||||
self.fd
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TokenFd> for (Token, RawFd) {
|
||||
fn from(TokenFd { token, fd }: TokenFd) -> (Token, RawFd) {
|
||||
(token, fd)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(Token, RawFd)> for TokenFd {
|
||||
fn from((token, fd): (Token, RawFd)) -> TokenFd {
|
||||
TokenFd { token, fd }
|
||||
}
|
||||
}
|
||||
126
src/lib.rs
126
src/lib.rs
|
|
@ -7,7 +7,7 @@ use std::{
|
|||
sync::{Arc, LazyLock},
|
||||
};
|
||||
|
||||
pub mod prelude {
|
||||
pub(crate) mod prelude {
|
||||
#![allow(unused_imports)]
|
||||
|
||||
pub use std::{
|
||||
|
|
@ -49,14 +49,17 @@ pub mod prelude {
|
|||
use prelude::*;
|
||||
|
||||
pub mod args;
|
||||
pub use args::{AppendCmd, Args, InitCmd};
|
||||
pub use args::{AppendCmd, Args};
|
||||
mod boxext;
|
||||
mod color;
|
||||
pub use color::{_CLI_ENABLE_COLOR, SHOULD_COLOR};
|
||||
mod daemon;
|
||||
//pub use daemon::ApiDoc;
|
||||
pub use daemon::Daemon;
|
||||
pub use daemon::api as daemon_api;
|
||||
mod daemon_io;
|
||||
pub use daemon_io::OwnedFdWithFlags;
|
||||
mod daemon_tokfd;
|
||||
pub(crate) use daemon_tokfd::TokenFd;
|
||||
pub mod line;
|
||||
pub use line::Line;
|
||||
mod nixcmd;
|
||||
|
|
@ -107,9 +110,14 @@ pub(crate) fn open_source_file(path: Arc<Path>) -> Result<SourceFile, IoError> {
|
|||
.tap_err(|e| error!("couldn't open source file at {}: {e}", path.display()))
|
||||
}
|
||||
|
||||
pub(crate) fn get_line_to_insert() -> SourceLine {
|
||||
//
|
||||
todo!();
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug")]
|
||||
pub fn do_append(args: Arc<Args>, append_args: AppendCmd) -> Result<(), BoxDynError> {
|
||||
let filepath = &args.file;
|
||||
let filepath = Path::new(&args.file);
|
||||
let filepath: PathBuf = if filepath.is_relative() && !filepath.starts_with("./") {
|
||||
iter::once(OsStr::new("./"))
|
||||
.chain(filepath.iter())
|
||||
|
|
@ -119,7 +127,7 @@ pub fn do_append(args: Arc<Args>, append_args: AppendCmd) -> Result<(), BoxDynEr
|
|||
};
|
||||
|
||||
let source_file = open_source_file(Arc::from(filepath))?;
|
||||
let pri = get_where(source_file.clone());
|
||||
let pri = get_where(source_file.clone())?;
|
||||
|
||||
let new_pri = pri - 1;
|
||||
|
||||
|
|
@ -128,7 +136,7 @@ pub fn do_append(args: Arc<Args>, append_args: AppendCmd) -> Result<(), BoxDynEr
|
|||
&append_args.name,
|
||||
new_pri,
|
||||
&append_args.value,
|
||||
);
|
||||
)?;
|
||||
|
||||
debug!("new_pri_line={new_pri_line}");
|
||||
|
||||
|
|
@ -139,87 +147,31 @@ pub fn do_append(args: Arc<Args>, append_args: AppendCmd) -> Result<(), BoxDynEr
|
|||
|
||||
//#[tracing::instrument(level = "debug")]
|
||||
pub fn do_daemon(args: Arc<Args>, daemon_args: DaemonCmd) -> Result<(), BoxDynError> {
|
||||
let config_file: Arc<Path> = Arc::clone(&args.file);
|
||||
let config_file = Path::new(&args.file);
|
||||
let config_file: PathBuf = if config_file.is_relative() && !config_file.starts_with("./") {
|
||||
iter::once(OsStr::new("./"))
|
||||
.chain(config_file.iter())
|
||||
.collect()
|
||||
} else {
|
||||
config_file.to_path_buf()
|
||||
};
|
||||
let config_file: Arc<Path> = Arc::from(config_file);
|
||||
|
||||
// FIXME: make configurable?
|
||||
let _ = rustix::process::umask(Mode::from_bits_retain(0o600).complement());
|
||||
|
||||
let rt = tokio::runtime::Runtime::new().expect("couldn't start tokio runtime");
|
||||
|
||||
let config = daemon::Config {
|
||||
config_file: SourceFile::new(config_file).unwrap(),
|
||||
addr: daemon_args.bind,
|
||||
// FIXME
|
||||
token: None,
|
||||
let mut daemon = match daemon_args {
|
||||
DaemonCmd { stdin: true, .. } => Daemon::from_stdin(config_file),
|
||||
DaemonCmd { socket: None, .. } => Daemon::open_default_socket(config_file)?,
|
||||
DaemonCmd {
|
||||
socket: Some(socket),
|
||||
..
|
||||
} => Daemon::from_unix_socket_path(config_file, &socket)?,
|
||||
};
|
||||
|
||||
rt.block_on(async move {
|
||||
daemon::run(config).await;
|
||||
});
|
||||
daemon.enter_loop().unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
const DYNAMIC_NIX_INIT: &str = indoc::indoc! {"
|
||||
/** GENERATED BY DYNIX */
|
||||
{ lib, ... }:
|
||||
|
||||
{
|
||||
imports = [
|
||||
./configuration.nix
|
||||
];
|
||||
config = lib.mkMerge [
|
||||
];
|
||||
}
|
||||
"};
|
||||
|
||||
fn open_for_init(path: &Path, init_args: &InitCmd) -> Result<File, IoError> {
|
||||
let mut opts = File::options();
|
||||
opts.write(true).create_new(true);
|
||||
let file = match opts.open(path) {
|
||||
Ok(file) => file,
|
||||
Err(e) if e.kind() == IoErrorKind::AlreadyExists => {
|
||||
if !init_args.force {
|
||||
error!(
|
||||
"Refusing to overwrite existing file at '{}' without --force",
|
||||
path.display(),
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
info!("Overwriting existing file at '{}'", path.display());
|
||||
File::options()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(path)
|
||||
.inspect_err(|e| {
|
||||
error!(
|
||||
"Failed to open path at '{}' for overwriting: {e}",
|
||||
path.display(),
|
||||
)
|
||||
})?
|
||||
},
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to open path at '{}' for creation: {e}",
|
||||
path.display()
|
||||
);
|
||||
return Err(e);
|
||||
},
|
||||
};
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
pub fn do_init(args: Arc<Args>, init_args: InitCmd) -> Result<(), BoxDynError> {
|
||||
let path = &*args.file;
|
||||
let mut file = open_for_init(path, &init_args)?;
|
||||
|
||||
write!(&mut file, "{}", DYNAMIC_NIX_INIT)
|
||||
.inspect_err(|e| error!("Failed to write to file at '{}': {e}", path.display()))?;
|
||||
file.flush()
|
||||
.inspect_err(|e| warn!("Failed to flush file at '{}': {e}", path.display()))?;
|
||||
|
||||
info!("Initialized Dynix file at '{}'", path.display());
|
||||
info!("daemon has exited");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -254,8 +206,8 @@ fn maybe_extract_prio_from_line(line: &SourceLine) -> Option<i64> {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn get_where(dynamic_nix: SourceFile) -> i64 {
|
||||
let lines = dynamic_nix.lines();
|
||||
pub fn get_where(dynamic_nix: SourceFile) -> Result<i64, BoxDynError> {
|
||||
let lines = dynamic_nix.lines()?;
|
||||
let prio = lines
|
||||
.iter()
|
||||
.filter_map(maybe_extract_prio_from_line)
|
||||
|
|
@ -263,7 +215,7 @@ pub fn get_where(dynamic_nix: SourceFile) -> i64 {
|
|||
.next() // Priorities with lower integer values are "stronger" priorities.
|
||||
.unwrap_or(0);
|
||||
|
||||
prio
|
||||
Ok(prio)
|
||||
}
|
||||
|
||||
pub fn get_next_prio_line(
|
||||
|
|
@ -271,8 +223,8 @@ pub fn get_next_prio_line(
|
|||
option_name: &str,
|
||||
new_prio: i64,
|
||||
new_value: &str,
|
||||
) -> SourceLine {
|
||||
let source_lines = source.lines();
|
||||
) -> Result<SourceLine, BoxDynError> {
|
||||
let source_lines = source.lines()?;
|
||||
let penultimate = source_lines.get(source_lines.len() - 2);
|
||||
// FIXME: don't rely on whitespace lol
|
||||
debug_assert_eq!(penultimate.map(SourceLine::text).as_deref(), Some(" ];"));
|
||||
|
|
@ -290,10 +242,10 @@ pub fn get_next_prio_line(
|
|||
)),
|
||||
};
|
||||
|
||||
new_line
|
||||
Ok(new_line)
|
||||
}
|
||||
|
||||
pub fn write_next_prio(mut source: SourceFile, new_line: SourceLine) -> Result<(), IoError> {
|
||||
pub fn write_next_prio(mut source: SourceFile, new_line: SourceLine) -> Result<(), BoxDynError> {
|
||||
let new_mod_start = SourceLine {
|
||||
line: new_line.line.prev(),
|
||||
path: source.path(),
|
||||
|
|
|
|||
25
src/line.rs
25
src/line.rs
|
|
@ -37,33 +37,8 @@ impl Line {
|
|||
self.0
|
||||
}
|
||||
|
||||
#[cfg(target_pointer_width = "64")]
|
||||
pub const fn index_usize(self) -> usize {
|
||||
self.0 as usize
|
||||
}
|
||||
|
||||
/// 1-indexed
|
||||
pub const fn linenr(self) -> u64 {
|
||||
self.0 + 1
|
||||
}
|
||||
}
|
||||
|
||||
/// [Alternate](Formatter::alternate) flag capitalizes "line".
|
||||
///
|
||||
/// ```
|
||||
/// # use dynix::Line;
|
||||
/// let line = Line::from_index(22);
|
||||
/// assert_eq!(format!("{line}"), "line 23");
|
||||
/// assert_eq!(format!("{line:#}"), "Line 23");
|
||||
/// ```
|
||||
impl Display for Line {
|
||||
fn fmt(&self, f: &mut Formatter) -> FmtResult {
|
||||
if f.alternate() {
|
||||
write!(f, "Line {}", self.linenr())?;
|
||||
} else {
|
||||
write!(f, "line {}", self.linenr())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,13 +11,12 @@ use clap::{ColorChoice, Parser as _};
|
|||
use tracing_human_layer::HumanLayer;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
use tracing_subscriber::{EnvFilter, layer::SubscriberExt};
|
||||
use utoipa::OpenApi as _;
|
||||
|
||||
fn main_wrapped() -> Result<(), Box<dyn StdError + Send + Sync + 'static>> {
|
||||
// Default RUST_LOG to warn if it's not specified.
|
||||
if env::var_os("RUST_LOG").is_none() {
|
||||
unsafe {
|
||||
env::set_var("RUST_LOG", "info");
|
||||
env::set_var("RUST_LOG", "warn");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -44,11 +43,6 @@ fn main_wrapped() -> Result<(), Box<dyn StdError + Send + Sync + 'static>> {
|
|||
match &args.subcommand {
|
||||
Append(append_args) => dynix::do_append(args.clone(), append_args.clone())?,
|
||||
Daemon(daemon_args) => dynix::do_daemon(args.clone(), daemon_args.clone())?,
|
||||
Init(init_args) => dynix::do_init(args.clone(), init_args.clone())?,
|
||||
OpenApiDocs => {
|
||||
//let api = dynix::ApiDoc::openapi();
|
||||
//dbg!(api);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
183
src/source.rs
183
src/source.rs
|
|
@ -3,10 +3,13 @@
|
|||
// SPDX-License-Identifier: EUPL-1.1
|
||||
|
||||
use std::{
|
||||
cell::{Ref, RefCell},
|
||||
hash::Hash,
|
||||
io::{BufRead, BufReader, BufWriter},
|
||||
mem::{self, MaybeUninit},
|
||||
ops::Deref,
|
||||
ptr,
|
||||
sync::{Arc, LazyLock},
|
||||
sync::{Arc, Mutex, OnceLock},
|
||||
};
|
||||
|
||||
use crate::Line;
|
||||
|
|
@ -16,17 +19,6 @@ use crate::prelude::*;
|
|||
|
||||
use fs_err::OpenOptions;
|
||||
use itertools::Itertools;
|
||||
// parking_lot's RwLock has RwLockReadGuard::map() in stable Rust.
|
||||
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
|
||||
|
||||
pub(crate) static DEFAULT_OPEN_OPTIONS: LazyLock<OpenOptions> = LazyLock::new(|| {
|
||||
let mut opts = fs_err::OpenOptions::new();
|
||||
opts.read(true)
|
||||
.write(true)
|
||||
.create(false)
|
||||
.custom_flags(libc::O_CLOEXEC);
|
||||
opts
|
||||
});
|
||||
|
||||
pub fn replace_file<'a>(
|
||||
path: &Path,
|
||||
|
|
@ -106,17 +98,35 @@ impl Display for SourceLine {
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct SourceFile {
|
||||
path: Arc<Path>,
|
||||
lines: Arc<RwLock<Vec<SourceLine>>>,
|
||||
file: Arc<Mutex<File>>,
|
||||
/// References to `SourceFile` do not prevent mutating `lines`.
|
||||
/// Also `lines` is lazily initialized.
|
||||
lines: Arc<OnceLock<RefCell<Vec<SourceLine>>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[repr(transparent)]
|
||||
pub struct OpaqueDerefSourceLines<'s>(Ref<'s, [SourceLine]>);
|
||||
impl<'s> Deref for OpaqueDerefSourceLines<'s> {
|
||||
type Target = [SourceLine];
|
||||
|
||||
fn deref(&self) -> &[SourceLine] {
|
||||
&*self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[repr(transparent)]
|
||||
pub struct OpaqueDerefSourceLine<'s>(Ref<'s, SourceLine>);
|
||||
impl<'s> Deref for OpaqueDerefSourceLine<'s> {
|
||||
type Target = SourceLine;
|
||||
|
||||
fn deref(&self) -> &SourceLine {
|
||||
&*self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl SourceFile {
|
||||
pub fn new<P>(path: P) -> Result<Self, IoError>
|
||||
where
|
||||
P: Into<Arc<Path>>,
|
||||
{
|
||||
Self::open_from(path.into(), DEFAULT_OPEN_OPTIONS.clone())
|
||||
}
|
||||
|
||||
/// Panics if `path` is a directory path instead of a file path.
|
||||
pub fn open_from(path: Arc<Path>, options: OpenOptions) -> Result<Self, IoError> {
|
||||
trace!(
|
||||
|
|
@ -126,43 +136,69 @@ impl SourceFile {
|
|||
);
|
||||
assert!(path.file_name().is_some());
|
||||
|
||||
let mut file = options
|
||||
.open(&*path)
|
||||
.inspect_err(|e| error!("Failed to open path at {}: {e}", path.display()))?;
|
||||
trace!("File opened to {file:?} ({})", file.as_raw_fd());
|
||||
let reader = BufReader::new(&mut file);
|
||||
let lines = reader
|
||||
let file = Arc::new(Mutex::new(options.open(&*path)?));
|
||||
|
||||
Ok(Self {
|
||||
path,
|
||||
file,
|
||||
lines: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn buf_reader(&mut self) -> Result<BufReader<&mut File>, IoError> {
|
||||
let file_mut = Arc::get_mut(&mut self.file)
|
||||
.unwrap_or_else(|| panic!("'File' for {} has existing handle", self.path.display()))
|
||||
.get_mut()
|
||||
.unwrap_or_else(|e| {
|
||||
panic!("'File' for {} was mutex-poisoned: {e}", self.path.display())
|
||||
});
|
||||
|
||||
let reader = BufReader::new(file_mut);
|
||||
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
fn _lines(&self) -> Result<Ref<'_, [SourceLine]>, IoError> {
|
||||
if let Some(lines) = self.lines.get() {
|
||||
let as_slice = Ref::map(lines.borrow(), |lines| lines.as_slice());
|
||||
return Ok(as_slice);
|
||||
}
|
||||
let lines = BufReader::new(&*self.file.lock().unwrap())
|
||||
.lines()
|
||||
.enumerate()
|
||||
.map(|(index, line_res)| {
|
||||
let line = Line::from_index(index as u64);
|
||||
line_res
|
||||
.map(|contents| SourceLine {
|
||||
line,
|
||||
path: Arc::clone(&path),
|
||||
text: Arc::from(contents),
|
||||
})
|
||||
.inspect_err(|e| {
|
||||
error!("Failed to read line {line} of {}: {e}", path.display());
|
||||
line_res.map(|line| SourceLine {
|
||||
line: Line::from_index(index as u64),
|
||||
path: Arc::clone(&self.path),
|
||||
text: Arc::from(line),
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<SourceLine>, IoError>>()
|
||||
.inspect_err(|e| {
|
||||
error!("Failed to read source file at {}: {e}", path.display());
|
||||
})?;
|
||||
.collect::<Result<Vec<SourceLine>, IoError>>()?;
|
||||
// Mutex should have dropped by now.
|
||||
debug_assert!(self.file.try_lock().is_ok());
|
||||
|
||||
let lines = Arc::new(RwLock::new(lines));
|
||||
self.lines.set(RefCell::new(lines)).unwrap();
|
||||
|
||||
Ok(Self { path, lines })
|
||||
Ok(self._lines_slice())
|
||||
}
|
||||
|
||||
pub fn lines(&self) -> _detail::OpaqueSourceLines<'_> {
|
||||
_detail::OpaqueSourceLines(self._lines())
|
||||
pub fn lines(&self) -> Result<OpaqueDerefSourceLines<'_>, IoError> {
|
||||
let lines = self._lines()?;
|
||||
|
||||
Ok(OpaqueDerefSourceLines(lines))
|
||||
}
|
||||
|
||||
/// Panics if `line` is out of range.
|
||||
pub fn line(&self, line: Line) -> _detail::OpaqueSourceLine<'_> {
|
||||
_detail::OpaqueSourceLine(self._line(line))
|
||||
pub fn line(&self, line: Line) -> Result<OpaqueDerefSourceLine<'_>, IoError> {
|
||||
let lines_lock = self._lines()?;
|
||||
let line = Ref::map(lines_lock, |lines| &lines[line.index() as usize]);
|
||||
|
||||
Ok(OpaqueDerefSourceLine(line))
|
||||
}
|
||||
|
||||
/// `lines` but already be initialized.
|
||||
fn _lines_slice(&self) -> Ref<'_, [SourceLine]> {
|
||||
debug_assert!(self.lines.get().is_some());
|
||||
Ref::map(self.lines.get().unwrap().borrow(), |lines| lines.as_slice())
|
||||
}
|
||||
|
||||
/// With debug assertions, panics if `lines` are not contiguous.
|
||||
|
|
@ -175,7 +211,7 @@ impl SourceFile {
|
|||
debug_assert!(new_lines.is_sorted_by(|lhs, rhs| lhs.line.next() == rhs.line));
|
||||
|
||||
let path = self.path();
|
||||
let cur_lines = self.lines();
|
||||
let cur_lines = self.lines()?;
|
||||
let first_half = cur_lines
|
||||
.iter()
|
||||
.take(num_lines_before_new)
|
||||
|
|
@ -201,9 +237,7 @@ impl SourceFile {
|
|||
debug_assert!(final_lines.is_sorted_by(|lhs, rhs| lhs.line.next() == rhs.line));
|
||||
debug_assert_eq!(cur_lines.len() + new_lines.len(), final_lines.len());
|
||||
|
||||
// Stop locking self.lines or we won't be able to write to it.
|
||||
drop(cur_lines);
|
||||
debug_assert!(!self.lines.is_locked());
|
||||
|
||||
let data = final_lines
|
||||
.iter()
|
||||
|
|
@ -211,11 +245,8 @@ impl SourceFile {
|
|||
.pipe(|iterator| Itertools::intersperse(iterator, b"\n"));
|
||||
replace_file(&path, data)?;
|
||||
|
||||
debug_assert_ne!(self.lines.read().len(), final_lines.len());
|
||||
|
||||
// Finally, update state.
|
||||
let mut lines_guard = self.lines.write();
|
||||
*lines_guard = final_lines;
|
||||
self.lines.get().unwrap().replace(final_lines);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -223,22 +254,6 @@ impl SourceFile {
|
|||
pub fn path(&self) -> Arc<Path> {
|
||||
Arc::clone(&self.path)
|
||||
}
|
||||
|
||||
pub fn display(&self) -> std::path::Display<'_> {
|
||||
self.path.display()
|
||||
}
|
||||
}
|
||||
|
||||
impl SourceFile {
|
||||
fn _lines(&self) -> MappedRwLockReadGuard<'_, [SourceLine]> {
|
||||
let lines = RwLock::read(&self.lines);
|
||||
RwLockReadGuard::map(lines, Vec::as_slice)
|
||||
}
|
||||
|
||||
fn _line(&self, line: Line) -> MappedRwLockReadGuard<'_, SourceLine> {
|
||||
let lines = self._lines();
|
||||
MappedRwLockReadGuard::map(lines, |lines| lines.get(line.index() as usize).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for SourceFile {
|
||||
|
|
@ -246,33 +261,3 @@ impl PartialEq for SourceFile {
|
|||
*self.path == *other.path
|
||||
}
|
||||
}
|
||||
|
||||
/// Types in this module are conceptually opaque, but their concrete implementations
|
||||
/// are provided because opaque types in Rust are difficult to work with.
|
||||
pub mod _detail {
|
||||
use std::ops::Deref;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[repr(transparent)]
|
||||
pub struct OpaqueSourceLines<'s>(pub(crate) MappedRwLockReadGuard<'s, [SourceLine]>);
|
||||
impl<'s> Deref for OpaqueSourceLines<'s> {
|
||||
type Target = [SourceLine];
|
||||
|
||||
fn deref(&self) -> &[SourceLine] {
|
||||
&*self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[repr(transparent)]
|
||||
pub struct OpaqueSourceLine<'s>(pub(crate) MappedRwLockReadGuard<'s, SourceLine>);
|
||||
impl<'s> Deref for OpaqueSourceLine<'s> {
|
||||
type Target = SourceLine;
|
||||
|
||||
fn deref(&self) -> &SourceLine {
|
||||
&*self.0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ if TYPE_CHECKING:
|
|||
assert machine.shell is not None
|
||||
|
||||
ls = "eza -lah --color=always --group-directories-first"
|
||||
testing_client = "/root/.nix-profile/libexec/dynix-testing-client.py"
|
||||
|
||||
indent = functools.partial(textwrap.indent, prefix=' ')
|
||||
|
||||
@beartype
|
||||
|
|
@ -49,19 +49,6 @@ parser.add_argument("--log-level", type=str)
|
|||
#parser.add_argument("--stats", action="store_true")
|
||||
#parser.add_argument("--stats-port", type=int)
|
||||
|
||||
@beartype
|
||||
def parse_systemd_exec(prop: str) -> str:
|
||||
# idk why, but systemd exec lines are secretly DBus dictionaries,
|
||||
# which `systemctl show -p` represents as an equals-delimited, semicolon-separated,
|
||||
# key-value pair, all in curly braces. e.g.:
|
||||
# { path=/nix/store/… ; argv[]=foobar ; ignore_errors=no ; … }
|
||||
inside = prop.removeprefix('{ ').removesuffix(' }')
|
||||
pairs = inside.split(';')
|
||||
# FIXME: don't assume `path` is the first one.
|
||||
# In case systemd ever changes that.
|
||||
_key, path = pairs[0].split('=')
|
||||
return path.strip()
|
||||
|
||||
@beartype
|
||||
def get_cli_args() -> argparse.Namespace:
|
||||
machine.wait_for_unit("distccd.service")
|
||||
|
|
@ -69,21 +56,27 @@ def get_cli_args() -> argparse.Namespace:
|
|||
machine.log(f"{mainpid=}")
|
||||
pidtext = machine.succeed(f"pgrep -P {mainpid}")
|
||||
machine.log(f"{pidtext=}")
|
||||
|
||||
pid = int(pidtext.splitlines()[0])
|
||||
cmdline_args = machine.succeed(rf"cat /proc/{pid}/cmdline | tr '\0' '\n'").splitlines()
|
||||
machine.log(f"{pid=}")
|
||||
execstart = machine.get_unit_property("distccd.service", "ExecStart")
|
||||
print(f"{execstart=}")
|
||||
|
||||
cmdline = machine.succeed(f"cat /proc/{pid}/cmdline")
|
||||
cmdline_args = cmdline.split("\0")
|
||||
machine.log(f"{cmdline_args=}")
|
||||
print(f"{cmdline_args=}")
|
||||
|
||||
args, rest = parser.parse_known_args(cmdline_args)
|
||||
return args
|
||||
|
||||
@beartype
|
||||
def dynix_append_cli(option: str, value: Any):
|
||||
value = f'"{value}"' if isinstance(value, str) else value
|
||||
def dynix_append(option: str, value: Any):
|
||||
machine.succeed(f'''
|
||||
dynix append {shlex.quote(option)} {shlex.quote(str(value))}
|
||||
'''.strip())
|
||||
|
||||
@beartype
|
||||
def do_apply():
|
||||
expr = textwrap.dedent("""
|
||||
(import <nixpkgs/nixos> { }).config.dynamicism.applyDynamicConfiguration { }
|
||||
""").strip()
|
||||
|
|
@ -92,31 +85,12 @@ def dynix_append_cli(option: str, value: Any):
|
|||
nix run --show-trace --log-format raw-with-logs --impure -E {shlex.quote(expr)}
|
||||
""".strip())
|
||||
|
||||
@beartype
|
||||
def dynix_append_daemon(option: str, value: Any):
|
||||
import json
|
||||
payload = json.dumps(dict(
|
||||
name=option,
|
||||
value=value,
|
||||
))
|
||||
|
||||
#status = machine.succeed(f"echo '{payload}' | {testing_client} /run/user/0/dynix.sock")
|
||||
status = machine.succeed(f"echo '{payload}' | curl localhost:42420/set --json @-")
|
||||
machine.log(f"daemon replied with status {status}")
|
||||
machine.wait_for_unit("distccd.service")
|
||||
|
||||
@beartype
|
||||
def run_all_tests(machine: Machine, *, use_daemon: bool):
|
||||
dynix_append = dynix_append_daemon if use_daemon else dynix_append_cli
|
||||
machine.wait_for_unit("default.target")
|
||||
machine.wait_for_unit("install-dynix.service")
|
||||
|
||||
dynix_out = machine.succeed("dynix --version")
|
||||
assert "dynix" in dynix_out, f"dynix not in {dynix_out=}"
|
||||
|
||||
machine.succeed("systemctl start user@0.service")
|
||||
machine.wait_for_unit("user@0.service")
|
||||
machine.succeed("systemctl start dynix-daemon.service")
|
||||
machine.wait_for_unit("dynix-daemon.service")
|
||||
|
||||
# Config should have our initial values.
|
||||
args = get_cli_args()
|
||||
assert args.jobs == 12, f'{args.jobs=} != 12'
|
||||
|
|
@ -134,6 +108,7 @@ def run_all_tests(machine: Machine, *, use_daemon: bool):
|
|||
|
||||
new_jobs = 4
|
||||
dynix_append("services.distccd.maxJobs", new_jobs)
|
||||
do_apply()
|
||||
|
||||
# Only jobs should have changed. The others should still be default.
|
||||
args = get_cli_args()
|
||||
|
|
@ -142,7 +117,8 @@ def run_all_tests(machine: Machine, *, use_daemon: bool):
|
|||
assert args.log_level == 'warning', f'{args.log_level=} != warning'
|
||||
|
||||
new_log_level = 'error'
|
||||
dynix_append("services.distccd.logLevel", f'{new_log_level}')
|
||||
dynix_append("services.distccd.logLevel", f'"{new_log_level}"')
|
||||
do_apply()
|
||||
|
||||
args = get_cli_args()
|
||||
assert args.jobs == new_jobs, f'{args.jobs=} != {new_jobs=}'
|
||||
|
|
@ -155,22 +131,3 @@ def run_all_tests(machine: Machine, *, use_daemon: bool):
|
|||
assert args.jobs == 12, f'{args.jobs=} != 12'
|
||||
assert args.job_lifetime == 900, f'{args.job_lifetime} != 900'
|
||||
assert args.log_level == 'warning', f'{args.log_level=} != warning'
|
||||
|
||||
machine.start(allow_reboot=True)
|
||||
machine.wait_for_unit("default.target")
|
||||
machine.wait_for_unit("install-dynix.service")
|
||||
try:
|
||||
run_all_tests(machine, use_daemon=False)
|
||||
except Exception as e:
|
||||
machine.logger.error(f"ERROR during CLI tests: {e}")
|
||||
raise
|
||||
|
||||
machine.reboot()
|
||||
|
||||
machine.wait_for_unit("default.target")
|
||||
machine.wait_for_unit("install-dynix.service")
|
||||
try:
|
||||
run_all_tests(machine, use_daemon=True)
|
||||
except Exception as e:
|
||||
machine.logger.error(f"ERROR during DAEMON tests: {e}")
|
||||
raise
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ in
|
|||
path = [ config.nix.package ];
|
||||
serviceConfig = {
|
||||
Environment = [
|
||||
"RUST_LOG=info,dynix=debug"
|
||||
"RUST_LOG=trace"
|
||||
];
|
||||
ExecSearchPath = [ "/run/current-system/sw/bin" ];
|
||||
SuccessExitStatus = [ "0" "2" ];
|
||||
|
|
@ -90,6 +90,5 @@ in
|
|||
netcat.nc
|
||||
socat
|
||||
python3
|
||||
curl
|
||||
];
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,34 +41,15 @@ def run_log(machine: Machine, *commands: str, timeout: int | None = 60) -> str:
|
|||
|
||||
return output
|
||||
|
||||
@beartype
|
||||
def parse_systemd_exec(prop: str) -> str:
|
||||
# idk why, but systemd exec lines are secretly DBus dictionaries,
|
||||
# which `systemctl show -p` represents as an equals-delimited, semicolon-separated,
|
||||
# key-value pair, all in curly braces. e.g.:
|
||||
# { path=/nix/store/… ; argv[]=foobar ; ignore_errors=no ; … }
|
||||
inside = prop.removeprefix('{ ').removesuffix(' }')
|
||||
|
||||
as_dict = dict()
|
||||
for pair in inside.split(';'):
|
||||
k, v = pair.split('=', maxsplit=1)
|
||||
# They have whitespace around the equals but I don't think that's guaranteed.
|
||||
as_dict[k.strip()] = v.strip()
|
||||
|
||||
# If argv is non-empty, use that entirely.
|
||||
if argv := as_dict["argv[]"]:
|
||||
return argv
|
||||
# Otherwise, just use the path, I guess?
|
||||
return as_dict["path"]
|
||||
|
||||
@beartype
|
||||
def get_config_file() -> str:
|
||||
machine.wait_for_unit("gotosocial.service")
|
||||
gotosocial_pid = int(machine.get_unit_property("gotosocial.service", "MainPID"))
|
||||
|
||||
cmdline = machine.succeed(f"cat /proc/{gotosocial_pid}/cmdline")
|
||||
cmdline_args = cmdline.split("\0")
|
||||
|
||||
execstart = machine.get_unit_property("gotosocial.service", "ExecStart")
|
||||
cmdline_args = parse_systemd_exec(execstart).split()
|
||||
config_file_idx = cmdline_args.index("--config-path") + 1
|
||||
|
||||
config_file = Path(cmdline_args[config_file_idx])
|
||||
|
||||
machine.log(f"copying from VM: {config_file=}")
|
||||
|
|
@ -104,11 +85,14 @@ def dynix_append_cli(option: str, value: Any):
|
|||
def dynix_append_daemon(option: str, value: Any):
|
||||
import json
|
||||
payload = json.dumps(dict(
|
||||
action="append",
|
||||
args=dict(
|
||||
name=option,
|
||||
value=value,
|
||||
),
|
||||
))
|
||||
status = machine.succeed(f"echo '{payload}' | curl localhost:42420/set --json @-")
|
||||
machine.log(f"daemon replied with status {status}")
|
||||
|
||||
machine.succeed(f"echo '{payload}' | {testing_client} /run/user/0/dynix.sock")
|
||||
|
||||
@beartype
|
||||
def run_all_tests(machine: Machine, *, use_daemon: bool):
|
||||
|
|
|
|||
|
|
@ -43,13 +43,11 @@ def run_log(machine: Machine, *commands: str, timeout: int | None = 60) -> str:
|
|||
@beartype
|
||||
def get_config_file() -> dict[str, Any]:
|
||||
machine.wait_for_unit("harmonia.service")
|
||||
|
||||
# FIXME: this doesn't work if any of the environment variables have spaces,
|
||||
# but idk what else to do.
|
||||
systemd_environ = machine.get_unit_property("harmonia.service", "Environment").split(" ")
|
||||
|
||||
pairs: list[list[str]] = [elem.split("=", maxsplit=1) for elem in systemd_environ]
|
||||
pid = int(machine.get_unit_property("harmonia.service", "MainPID"))
|
||||
env_lines: list[str] = machine.succeed(f"cat /proc/{pid}/environ").replace("\0", "\n").splitlines()
|
||||
pairs: list[list[str]] = [line.split("=", maxsplit=1) for line in env_lines]
|
||||
env = dict(pairs)
|
||||
|
||||
config_file = Path(env["CONFIG_FILE"])
|
||||
|
||||
machine.log(f"copying from VM: {config_file=}")
|
||||
|
|
@ -69,12 +67,14 @@ def get_config_file() -> dict[str, Any]:
|
|||
@beartype
|
||||
def dynix_append_daemon(option: str, value: Any):
|
||||
payload = json.dumps(dict(
|
||||
action="append",
|
||||
args=dict(
|
||||
name=option,
|
||||
value=value,
|
||||
),
|
||||
))
|
||||
|
||||
status = machine.succeed(f"echo '{payload}' | curl -v localhost:42420/set --json @- 2>&1")
|
||||
machine.log(f"daemon replied with status {status}")
|
||||
machine.succeed(f"echo '{payload}' | {testing_client} /run/user/0/dynix.sock")
|
||||
|
||||
@beartype
|
||||
def dynix_append_cli(option: str, value: Any):
|
||||
|
|
@ -105,6 +105,8 @@ def run_all_tests(machine: Machine, *, use_daemon: bool):
|
|||
run_log(machine, "systemctl start dynix-daemon.service")
|
||||
machine.wait_for_unit("dynix-daemon.service")
|
||||
|
||||
machine.log("Checking initial harmonia.service conditions")
|
||||
|
||||
# Config should have our initial values.
|
||||
config_toml = get_config_file()
|
||||
assert int(config_toml['workers']) == 4, f"{config_toml['workers']=} != 4"
|
||||
|
|
@ -125,6 +127,8 @@ def run_all_tests(machine: Machine, *, use_daemon: bool):
|
|||
machine.log("Testing that workers, but not max_connection_rate, changed")
|
||||
# Workers, but not max connection rate, should have changed.
|
||||
config_toml = get_config_file()
|
||||
from pprint import pformat
|
||||
machine.log(pformat(config_toml))
|
||||
assert int(config_toml['workers']) == new_workers, f"{config_toml['workers']=} != {new_workers}"
|
||||
assert int(config_toml['max_connection_rate']) == 256, f"{config_toml['max_connection_rate']=} != 256"
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue