oh right this was supposed to be http
This commit is contained in:
parent
1ca5aa2e97
commit
adaa020029
14 changed files with 1187 additions and 1040 deletions
878
src/daemon.rs
878
src/daemon.rs
|
|
@ -1,67 +1,42 @@
|
|||
use std::{
|
||||
borrow::Cow,
|
||||
env, io,
|
||||
net::SocketAddr,
|
||||
os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd},
|
||||
process::{Command, Stdio},
|
||||
sync::{
|
||||
Arc, LazyLock,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
time::Duration,
|
||||
process::{Output, Stdio},
|
||||
sync::LazyLock,
|
||||
};
|
||||
|
||||
use iddqd::{BiHashMap, IdOrdMap};
|
||||
|
||||
use mio::{
|
||||
Events, Interest, Poll, Token,
|
||||
event::Event,
|
||||
net::{TcpListener, UnixListener},
|
||||
unix::SourceFd,
|
||||
use axum::{
|
||||
Json, Router,
|
||||
extract::State,
|
||||
http::{self, StatusCode, header::HeaderMap},
|
||||
routing::post,
|
||||
};
|
||||
use tokio::{net::TcpListener, process::Command};
|
||||
//use utoipa::{OpenApi as _, ToSchema, openapi::OpenApi};
|
||||
//use utoipa_axum::router::{OpenApiRouter, UtoipaMethodRouterExt};
|
||||
|
||||
use rustix::{
|
||||
buffer::spare_capacity,
|
||||
net::SocketFlags,
|
||||
process::{Pid, PidfdFlags, Uid, WaitId, WaitIdOptions},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
mod rustix {
|
||||
pub use rustix::process::waitid;
|
||||
pub use rustix::*;
|
||||
}
|
||||
|
||||
//mod rustix_prelude {
|
||||
// pub use rustix::process::{getuid, pidfd_open, waitid};
|
||||
//}
|
||||
|
||||
use serde_json::StreamDeserializer;
|
||||
|
||||
use crate::prelude::*;
|
||||
use crate::{SourceFile, prelude::*};
|
||||
|
||||
pub mod api;
|
||||
use api::DaemonCmd;
|
||||
use api::{ConvenientAttrPath, NixLiteral};
|
||||
|
||||
use crate::daemon_tokfd::{FdInfo, FdKind};
|
||||
//pub static UID: LazyLock<Uid> = LazyLock::new(rustix::process::getuid);
|
||||
|
||||
use crate::{OwnedFdWithFlags, TokenFd};
|
||||
//pub static USER_SOCKET_DIR: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
// let dir: Box<Path> = env::var_os("XDG_RUNTIME_DIR")
|
||||
// .map(PathBuf::from)
|
||||
// .unwrap_or_else(|| ["/", "run", "user", &UID.to_string()].into_iter().collect())
|
||||
// .into_boxed_path();
|
||||
//
|
||||
// Box::leak(dir)
|
||||
//});
|
||||
|
||||
pub static UID: LazyLock<Uid> = LazyLock::new(rustix::process::getuid);
|
||||
|
||||
pub static USER_SOCKET_DIR: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
let dir: Box<Path> = env::var_os("XDG_RUNTIME_DIR")
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|| ["/", "run", "user", &UID.to_string()].into_iter().collect())
|
||||
.into_boxed_path();
|
||||
|
||||
Box::leak(dir)
|
||||
});
|
||||
|
||||
pub static TMPDIR: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
let dir: Box<Path> = env::temp_dir().into_boxed_path();
|
||||
|
||||
Box::leak(dir)
|
||||
});
|
||||
//pub static TMPDIR: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
// let dir: Box<Path> = env::temp_dir().into_boxed_path();
|
||||
//
|
||||
// Box::leak(dir)
|
||||
//});
|
||||
|
||||
pub static NIX: LazyLock<&'static Path> = LazyLock::new(|| {
|
||||
which::which("nix")
|
||||
|
|
@ -71,667 +46,186 @@ pub static NIX: LazyLock<&'static Path> = LazyLock::new(|| {
|
|||
.unwrap_or(Path::new("/run/current-system/sw/bin/nix"))
|
||||
});
|
||||
|
||||
const TIMEOUT_NEVER: Option<Duration> = None;
|
||||
pub async fn run(config: Config) {
|
||||
let addr = config.addr.clone();
|
||||
let router = Router::new()
|
||||
.route("/set", post(ep_set_post))
|
||||
// `.with_state()` has to be last for the type inference to work.
|
||||
.with_state(config);
|
||||
//let (router, api): (Router, OpenApi) = OpenApiRouter::with_openapi(ApiDoc::openapi())
|
||||
// .routes(utoipa_axum::routes!(ep_set_post))
|
||||
// // `.with_state()` has to be last for the type inference works.
|
||||
// .with_state(config)
|
||||
// .split_for_parts();
|
||||
|
||||
static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1);
|
||||
fn next_token() -> Token {
|
||||
let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst);
|
||||
let listener = TcpListener::bind(addr).await.unwrap();
|
||||
|
||||
// If the increment wrapped to 0, then we just increment it again.
|
||||
if tok == 0 {
|
||||
warn!("File descriptor token wrapped. That's... a lot.");
|
||||
return next_token();
|
||||
}
|
||||
|
||||
Token(tok)
|
||||
axum::serve(listener, router).await.unwrap();
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Daemon {
|
||||
config_path: Arc<Path>,
|
||||
fd: OwnedFdWithFlags,
|
||||
path: Option<Box<Path>>,
|
||||
|
||||
poller: Poll,
|
||||
|
||||
fd_info: IdOrdMap<FdInfo>,
|
||||
|
||||
// Bijective mapping of [`mio::Token`]s to [`RawFd`]s.
|
||||
tokfd: BiHashMap<TokenFd>,
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub config_file: SourceFile,
|
||||
pub addr: SocketAddr,
|
||||
pub token: Option<String>,
|
||||
}
|
||||
|
||||
/// `tokfd` handling.
|
||||
impl Daemon {
|
||||
fn fd_error_pop(&mut self, fd: RawFd) -> Option<IoError> {
|
||||
let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| {
|
||||
if let Ok(name) = FdInfo::guess_name(fd) {
|
||||
panic!(
|
||||
"tried to pop error for unknown fd {fd} ({})",
|
||||
name.to_string_lossy(),
|
||||
);
|
||||
}
|
||||
panic!("tried to pop error for unknown fd {fd}")
|
||||
});
|
||||
info.error_buffer.pop_front().tap_some(|e| {
|
||||
trace!("Popping error for {}: {e}", info.display());
|
||||
})
|
||||
}
|
||||
|
||||
fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> {
|
||||
let mut info = self
|
||||
.fd_info
|
||||
.get_mut(&fd)
|
||||
.unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}"));
|
||||
trace!("Pushing error for {}: {}", info.display(), error);
|
||||
info.error_buffer.try_push_back(error)
|
||||
}
|
||||
|
||||
fn main_fd_info(&self) -> &FdInfo {
|
||||
self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| {
|
||||
unreachable!(
|
||||
"Main daemon fd {:?} was not registered with fd_info",
|
||||
self.fd,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn register(&mut self, fd: RawFd, kind: FdKind) -> Token {
|
||||
let token = next_token();
|
||||
|
||||
debug!(
|
||||
"Registering new {} FdInfo for {fd} with token {token:?}",
|
||||
kind.name_str(),
|
||||
);
|
||||
|
||||
self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap();
|
||||
|
||||
self.tokfd
|
||||
.insert_unique(TokenFd { token, fd })
|
||||
.unwrap_or_else(|e| todo!("{e}"));
|
||||
|
||||
let mut source = SourceFd(&fd);
|
||||
self.poller
|
||||
.registry()
|
||||
.register(&mut source, token, Interest::READABLE)
|
||||
.unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}"));
|
||||
|
||||
token
|
||||
}
|
||||
|
||||
#[expect(dead_code)]
|
||||
fn register_with_name<S>(&mut self, fd: RawFd, kind: FdKind, name: Box<OsStr>) -> Token {
|
||||
let token = next_token();
|
||||
|
||||
debug!(
|
||||
"Registering new {} FdInfo for {fd} ({}) with token {token:?}",
|
||||
name.to_string_lossy(),
|
||||
kind.name_str(),
|
||||
);
|
||||
|
||||
self.fd_info
|
||||
.insert_unique(FdInfo::new_with_name(fd, kind, name))
|
||||
.unwrap();
|
||||
|
||||
self.tokfd
|
||||
.insert_unique(TokenFd { token, fd })
|
||||
.unwrap_or_else(|e| todo!("{e}"));
|
||||
|
||||
let mut source = SourceFd(&fd);
|
||||
self.poller
|
||||
.registry()
|
||||
.register(&mut source, token, Interest::READABLE)
|
||||
.unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}"));
|
||||
|
||||
token
|
||||
}
|
||||
|
||||
fn deregister(&mut self, fd: RawFd) {
|
||||
let info = self
|
||||
.fd_info
|
||||
.remove(&fd)
|
||||
.unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}"));
|
||||
debug!("Unregistering fd {}; calling close()", info.display());
|
||||
unsafe { rustix::io::close(fd) };
|
||||
let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) };
|
||||
debug_assert_eq!(res, -1);
|
||||
debug_assert_eq!(
|
||||
IoError::last_os_error().raw_os_error(),
|
||||
Some(Errno::BADF.raw_os_error()),
|
||||
);
|
||||
|
||||
self.tokfd.remove2(&fd).unwrap_or_else(|| todo!());
|
||||
}
|
||||
|
||||
fn fd_for_token(&self, token: Token) -> Option<RawFd> {
|
||||
self.tokfd
|
||||
.get1(&token)
|
||||
.map(|TokenFd { fd, .. }| fd)
|
||||
.copied()
|
||||
}
|
||||
|
||||
/// Not currently used, but here for completeness.
|
||||
#[expect(dead_code)]
|
||||
fn token_for_fd(&self, fd: RawFd) -> Option<Token> {
|
||||
self.tokfd
|
||||
.get2(&fd)
|
||||
.map(|TokenFd { token, .. }| token)
|
||||
.copied()
|
||||
}
|
||||
#[derive(Debug, Clone, PartialEq, PartialOrd)]
|
||||
#[derive(Deserialize, Serialize)]
|
||||
//#[derive(ToSchema)]
|
||||
pub struct SetParams {
|
||||
pub name: ConvenientAttrPath,
|
||||
pub value: NixLiteral,
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
pub fn new(
|
||||
config_path: Arc<Path>,
|
||||
fd: OwnedFd,
|
||||
kind: FdKind,
|
||||
name: Option<Box<OsStr>>,
|
||||
) -> Self {
|
||||
let mut fd_info: IdOrdMap<FdInfo> = Default::default();
|
||||
|
||||
// Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM.
|
||||
// If any of those are the case, we're screwed anyway.
|
||||
let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}"));
|
||||
// Make sure we register the poller in `fd_info`, so we can keep track of its errors.
|
||||
fd_info
|
||||
.insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller))
|
||||
.unwrap_or_else(|e| unreachable!("{e}"));
|
||||
|
||||
let fd = OwnedFdWithFlags::new_with_fallback(fd);
|
||||
|
||||
fd_info
|
||||
.insert_unique(FdInfo::new(fd.as_raw_fd(), kind))
|
||||
.unwrap_or_else(|e| unreachable!("{e}"));
|
||||
|
||||
if let Some(name) = &name {
|
||||
info!("opened daemon to {}", name.to_string_lossy());
|
||||
} else {
|
||||
debug!("opened daemon to {name:?} (fd {fd:?})");
|
||||
}
|
||||
|
||||
let path = name
|
||||
.as_ref()
|
||||
.map(PathBuf::from)
|
||||
.map(PathBuf::into_boxed_path);
|
||||
|
||||
Self {
|
||||
config_path,
|
||||
fd,
|
||||
path,
|
||||
poller,
|
||||
fd_info,
|
||||
tokfd: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_tcp_socket_addr(config_path: Arc<Path>, addr: SocketAddr) -> Result<Self, IoError> {
|
||||
let listener = TcpListener::bind(addr.clone())
|
||||
.inspect_err(|e| error!("failed to bind to '{addr}': {e}"))?;
|
||||
|
||||
let listener_owned_fd = OwnedFd::from(listener);
|
||||
// FIXME: should we KEEP_ALIVE?
|
||||
rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap();
|
||||
|
||||
let name = OsString::from(addr.to_string()).into_boxed_os_str();
|
||||
|
||||
Ok(Self::new(
|
||||
config_path,
|
||||
listener_owned_fd,
|
||||
FdKind::Socket,
|
||||
Some(name),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn from_unix_socket_path(config_path: Arc<Path>, path: &Path) -> Result<Self, IoError> {
|
||||
// We unconditionally unlink() `path` before binding, but completely ignore the result.
|
||||
let _ = rustix::fs::unlink(path);
|
||||
let listener = UnixListener::bind(path)
|
||||
.tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?;
|
||||
let listener_owned_fd = OwnedFd::from(listener);
|
||||
// FIXME: should we KEEP_ALIVE?
|
||||
rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap();
|
||||
let path: Box<OsStr> = path.to_path_buf().into_boxed_path().into_boxed_os_str();
|
||||
|
||||
Ok(Self::new(
|
||||
config_path,
|
||||
listener_owned_fd,
|
||||
FdKind::Socket,
|
||||
Some(path),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn open_default_socket(config_path: Arc<Path>) -> Result<Self, IoError> {
|
||||
use IoErrorKind::*;
|
||||
let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path();
|
||||
let constructed = match Self::from_unix_socket_path(config_path.clone(), &preferred) {
|
||||
Ok(v) => v,
|
||||
Err(e) if e.kind() == Unsupported => {
|
||||
//
|
||||
return Err(e);
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"failed binding AF_UNIX socket at {}: {e}; trying elsewhere",
|
||||
preferred.display(),
|
||||
);
|
||||
|
||||
let fallback = TMPDIR.join("dynix.sock").into_boxed_path();
|
||||
Self::from_unix_socket_path(config_path, &fallback).tap_err(|e| {
|
||||
error!(
|
||||
"failed binding AF_UNIX socket at {}: {e}",
|
||||
fallback.display(),
|
||||
)
|
||||
})?
|
||||
},
|
||||
};
|
||||
|
||||
Ok(constructed)
|
||||
}
|
||||
|
||||
/// This panics if stdin cannot be opened.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
#[derive(Deserialize, Serialize)]
|
||||
//#[derive(ToSchema)]
|
||||
pub struct SetResponse {
|
||||
/// Will be 0 if everything is okay.
|
||||
///
|
||||
/// If you want to handle that error, use [`Daemon::from_raw_parts()`].
|
||||
pub fn from_stdin(config_path: Arc<Path>) -> Self {
|
||||
let stdin = io::stdin();
|
||||
let fd = stdin
|
||||
.as_fd()
|
||||
.try_clone_to_owned()
|
||||
.expect("dynix daemon could not open stdin; try a Unix socket?");
|
||||
|
||||
Self::new(config_path, fd, FdKind::File, None)
|
||||
}
|
||||
|
||||
//pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self {
|
||||
// Self {
|
||||
// fd: OwnedFdWithFlags::new_with_fallback(fd),
|
||||
// }
|
||||
//}
|
||||
|
||||
pub fn fd(&self) -> BorrowedFd<'_> {
|
||||
self.fd.as_fd()
|
||||
}
|
||||
/// Will be -1 for an error with no code.
|
||||
pub status: i64,
|
||||
pub msg: Option<String>,
|
||||
}
|
||||
const DAEMON: Token = Token(0);
|
||||
|
||||
/// Private helpers.
|
||||
impl Daemon {
|
||||
fn proxy_stdio(&mut self, fd: &BorrowedFd) -> Result<(), IoError> {
|
||||
let info = self.fd_info.get(&fd.as_raw_fd()).unwrap();
|
||||
let label = match info.kind {
|
||||
FdKind::ChildStdout(pid) => format!("stdout[{pid}]"),
|
||||
FdKind::ChildStderr(pid) => format!("stderr[{pid}]"),
|
||||
other => unreachable!("child stdio cannot have kind {other:?}"),
|
||||
#[axum::debug_handler]
|
||||
//#[utoipa::path(
|
||||
// post,
|
||||
// path = "/set",
|
||||
// responses(
|
||||
// (status = 200, description = "Request was valid", body = SetResponse)
|
||||
// ),
|
||||
//)]
|
||||
async fn ep_set_post(
|
||||
State(config): State<Config>,
|
||||
headers: HeaderMap,
|
||||
Json(SetParams { name, value }): Json<SetParams>,
|
||||
) -> Result<Json<SetResponse>, StatusCode> {
|
||||
debug!("POST /set with name={name:?}, value={value:?}");
|
||||
|
||||
if let Some(token) = &config.token {
|
||||
let Some(auth) = headers.get(http::header::AUTHORIZATION) else {
|
||||
// FIXME: technically RFC9110 requires us to respond with a
|
||||
// `WWW-Authenticate` header.
|
||||
error!("token specified in config but not provided in request");
|
||||
return Err(StatusCode::UNAUTHORIZED);
|
||||
};
|
||||
// FIXME: don't use a new allocation every time.
|
||||
let mut buffer: Vec<u8> = Vec::with_capacity(1024);
|
||||
// FIXME: handle line buffering correctly.
|
||||
loop {
|
||||
let count = rustix::io::read(fd, spare_capacity(&mut buffer))
|
||||
.inspect_err(|e| error!("read() on child stdio fd {fd:?} failed: {e}"))?;
|
||||
|
||||
if count == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
for line in buffer.lines() {
|
||||
info!("[child {label}]: {}", line.as_bstr())
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> {
|
||||
// FIXME: don't use a new allocation every time.
|
||||
let mut cmd_buffer: Vec<u8> = Vec::with_capacity(1024);
|
||||
|
||||
let _count = rustix::io::read(fd, spare_capacity(&mut cmd_buffer))
|
||||
.tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?;
|
||||
|
||||
// The buffer might have existing data from the last read.
|
||||
let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer);
|
||||
let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter();
|
||||
for cmd in stream {
|
||||
let cmd = match cmd {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) if e.is_eof() => {
|
||||
warn!("Got EOF before a valid command");
|
||||
debug!("command buffer was: {:?}", cmd_buffer.as_bstr());
|
||||
return Ok(());
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("error deserializing command: {e}");
|
||||
debug!("command buffer was: {:?}", cmd_buffer.as_bstr());
|
||||
// Don't propagate the error unless we have too many.
|
||||
self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| {
|
||||
error!("Accumulated too many errors for daemon fd {fd:?}: {e}")
|
||||
})?;
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
debug!("got cmd: {cmd:?}");
|
||||
let _ = rustix::io::write(fd, b"");
|
||||
info!("dispatching command {cmd:?}");
|
||||
self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dispatch_cmd(&mut self, cmd: DaemonCmd) -> Result<(), IoError> {
|
||||
// Write the new file...
|
||||
let (name, value) = match cmd {
|
||||
DaemonCmd::Append { name, value } => (name, value),
|
||||
};
|
||||
let source_file = crate::open_source_file(self.config_path.clone())?;
|
||||
let pri = crate::get_where(source_file.clone()).unwrap_or_else(|e| todo!("{e}"));
|
||||
let new_pri = pri - 1;
|
||||
// Get next priority line.
|
||||
let opt_name = name.to_nix_decl();
|
||||
let new_line = crate::get_next_prio_line(
|
||||
source_file.clone(),
|
||||
&opt_name,
|
||||
new_pri,
|
||||
&value.to_nix_source(),
|
||||
)
|
||||
.unwrap_or_else(|e| panic!("someone is holding a reference to source.lines(): {e}"));
|
||||
|
||||
crate::write_next_prio(source_file, new_line).unwrap_or_else(|e| todo!("{e}"));
|
||||
|
||||
// Rebuild and switch.
|
||||
// FIXME: allow passing additional args.
|
||||
//let child = Command::new(*NIXOS_REBUILD)
|
||||
// .arg("switch")
|
||||
// .arg("--log-format")
|
||||
// .arg("raw-with-logs")
|
||||
// .arg("--no-reexec")
|
||||
// .arg("-v")
|
||||
// .stdout(Stdio::piped())
|
||||
// .stderr(Stdio::piped())
|
||||
// .spawn()
|
||||
// .inspect_err(|e| {
|
||||
// error!("failed to spawn `nixos-rebuild` command: {e}");
|
||||
// })?;
|
||||
|
||||
let expr = "(import <nixpkgs/nixos> { }).config.dynamicism.applyDynamicConfiguration { }";
|
||||
let child = Command::new(*NIX)
|
||||
.arg("run")
|
||||
.arg("--show-trace")
|
||||
.arg("--log-format")
|
||||
.arg("raw-with-logs")
|
||||
.arg("--impure")
|
||||
.arg("-E")
|
||||
.arg(expr)
|
||||
.tap(|cmd| {
|
||||
if !tracing::enabled!(Level::INFO) {
|
||||
return;
|
||||
}
|
||||
let args: Vec<Cow<'_, str>> = cmd.get_args().map(OsStr::to_string_lossy).collect();
|
||||
let separated = args.join(" ");
|
||||
info!("Spawning subprocess: [{separated}]");
|
||||
})
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.inspect_err(|e| error!("failed to spawn `nix run` command: {e}"))?;
|
||||
|
||||
debug!("Spanwed child process {}", child.id());
|
||||
|
||||
let pid = Pid::from_child(&child);
|
||||
|
||||
let stdout = child.stdout.unwrap_or_else(|| {
|
||||
unreachable!("`child` is given `.stdout(Stdio::piped())`");
|
||||
});
|
||||
let stderr = child.stderr.unwrap_or_else(|| {
|
||||
unreachable!("`child` is given `.stderr(Stdio::piped())`");
|
||||
});
|
||||
|
||||
let _token = self.register(stdout.into_raw_fd(), FdKind::ChildStdout(pid));
|
||||
let _token = self.register(stderr.into_raw_fd(), FdKind::ChildStderr(pid));
|
||||
|
||||
match rustix::process::pidfd_open(pid, PidfdFlags::NONBLOCK) {
|
||||
Ok(pidfd) => {
|
||||
debug!("Opened pidfd {pidfd:?}, for process {pid}");
|
||||
self.register(pidfd.into_raw_fd(), FdKind::Pid(pid));
|
||||
},
|
||||
Err(e) if e.kind() == IoErrorKind::NotFound => {
|
||||
warn!("child {pid} not found; died before we could open it?");
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Error opening pidfd for child {pid}: {e}");
|
||||
return Err(e)?;
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn enter_loop(&mut self) -> Result<Option<()>, IoError> {
|
||||
let raw_fd = self.fd.as_raw_fd();
|
||||
if cfg!(debug_assertions) {
|
||||
assert!(
|
||||
self.fd_info.contains_key(&raw_fd),
|
||||
"we should know about daemon fd {raw_fd}",
|
||||
);
|
||||
assert!(
|
||||
self.fd_info.contains_key(&self.poller.as_raw_fd()),
|
||||
"we should know about poller fd {}",
|
||||
self.poller.as_raw_fd(),
|
||||
);
|
||||
}
|
||||
let mut daemon_source = SourceFd(&raw_fd);
|
||||
self.tokfd
|
||||
.insert_unique(TokenFd {
|
||||
token: DAEMON,
|
||||
fd: raw_fd,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
self.poller
|
||||
.registry()
|
||||
.register(&mut daemon_source, DAEMON, Interest::READABLE)
|
||||
.unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}"));
|
||||
|
||||
let mut events = Events::with_capacity(1024);
|
||||
|
||||
loop {
|
||||
if tracing::enabled!(tracing::Level::DEBUG) {
|
||||
debug!("Daemon loop iteration, with file descriptors: ");
|
||||
for info in &self.fd_info {
|
||||
debug!("- {}", info.display());
|
||||
}
|
||||
}
|
||||
|
||||
let poll_result = self.poller.poll(&mut events, TIMEOUT_NEVER);
|
||||
self.handle_poll(poll_result, &events)?;
|
||||
// No need to go through UTF-8 decoding here.
|
||||
if auth.as_bytes() != token.as_bytes() {
|
||||
error!("token provided in request does not match configured token");
|
||||
return Err(StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_poll(
|
||||
&mut self,
|
||||
poll_result: Result<(), IoError>,
|
||||
events: &Events,
|
||||
) -> Result<(), IoError> {
|
||||
match poll_result {
|
||||
Ok(()) => {
|
||||
trace!(
|
||||
"mio::Poller::poll() got events: {:?}",
|
||||
events.iter().size_hint().0,
|
||||
);
|
||||
if events.is_empty() {
|
||||
unreachable!(
|
||||
"epoll_wait() with a \"forever\" timeout should never give empty events",
|
||||
);
|
||||
}
|
||||
let file = config.config_file.clone();
|
||||
|
||||
let _ = self.fd_error_pop(self.poller.as_raw_fd());
|
||||
},
|
||||
Err(e) if e.kind() == IoErrorKind::Interrupted => {
|
||||
// EINTR is silly.
|
||||
// Return early, and poll() again.
|
||||
return Ok(());
|
||||
},
|
||||
Err(e) => {
|
||||
if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) {
|
||||
panic!("EBADF on poller fd; IO safety violation?");
|
||||
}
|
||||
warn!("mio Poll::poll() error: {e}");
|
||||
self.fd_error_push(self.poller.as_raw_fd(), e)
|
||||
.tap_err(|e| {
|
||||
error!("accumulated too many errors for mio::Poll::poll(): {e}")
|
||||
})?;
|
||||
},
|
||||
}
|
||||
let prio = crate::get_where(file.clone());
|
||||
let new_prio = prio - 1;
|
||||
|
||||
for event in events {
|
||||
self.handle_event(event)?;
|
||||
}
|
||||
let opt_name = name.to_nix_decl();
|
||||
let opt_val = value.to_nix_source();
|
||||
let new_line = crate::get_next_prio_line(file.clone(), &opt_name, new_prio, &opt_val);
|
||||
|
||||
Ok(())
|
||||
match crate::write_next_prio(file.clone(), new_line) {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
error!("Couldn't write next generation to {}: {e}", file.display());
|
||||
let status = e.raw_os_error().map(i64::from).unwrap_or(-1);
|
||||
return Ok(Json(SetResponse {
|
||||
status,
|
||||
msg: Some(format!("{e}")),
|
||||
}));
|
||||
},
|
||||
};
|
||||
|
||||
let child_status = match nix_run_apply(&config).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
let status = e.raw_os_error().map(i64::from).unwrap_or(-1);
|
||||
return Ok(Json(SetResponse {
|
||||
status,
|
||||
msg: Some(format!("{e}")),
|
||||
}));
|
||||
},
|
||||
};
|
||||
|
||||
let Output {
|
||||
status,
|
||||
stdout,
|
||||
stderr,
|
||||
} = child_status;
|
||||
|
||||
if status.code() != Some(0) {
|
||||
error!(
|
||||
"Child `nix run` process returned non-zero code {:?}",
|
||||
status.code(),
|
||||
);
|
||||
error!("Child stdout: {}", stdout.as_bstr());
|
||||
error!("Child stderr: {}", stderr.as_bstr());
|
||||
}
|
||||
|
||||
fn handle_event(&mut self, event: &Event) -> Result<(), IoError> {
|
||||
trace!("Handling event {event:#?}");
|
||||
let status = status.code().map(i64::from).unwrap_or(-1);
|
||||
let msg = format!(
|
||||
"Stdout: {}\nStderr: {}\n",
|
||||
stdout.as_bstr(),
|
||||
stderr.as_bstr()
|
||||
);
|
||||
|
||||
match event.token() {
|
||||
DAEMON => {
|
||||
let is_sock = self.main_fd_info().kind == FdKind::Socket;
|
||||
if !is_sock {
|
||||
// SAFETY: oh boy: disjoint borrows with extra steps.
|
||||
let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) };
|
||||
self.read_cmd(&file_fd).unwrap();
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Accept, first.
|
||||
let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC;
|
||||
let stream_fd = match rustix::net::accept_with(&self.fd, flags) {
|
||||
Ok(stream) => {
|
||||
debug!(
|
||||
"Accepted connection from socket {:?} as stream {:?}",
|
||||
self.fd, stream,
|
||||
);
|
||||
stream
|
||||
},
|
||||
Err(e) => {
|
||||
error!("accept4 on daemon socket failed: {e}");
|
||||
self.fd_error_push(self.fd.as_raw_fd(), e.into())
|
||||
.tap_err(|e| {
|
||||
error!(
|
||||
"Accumulated too many errors for daemon fd {:?}: {e}",
|
||||
self.fd
|
||||
)
|
||||
})?;
|
||||
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
|
||||
// Add this stream to our poll interest list.
|
||||
// NOTE: `stream_fd` is now effectively `ManuallyDrop`.
|
||||
let stream_fd = stream_fd.into_raw_fd();
|
||||
let _token = self.register(stream_fd, FdKind::SockStream);
|
||||
|
||||
// Wait for the next poll to handle.
|
||||
},
|
||||
other_token => {
|
||||
// This must be a stream fd.
|
||||
let fd = self.fd_for_token(other_token).unwrap_or_else(|| {
|
||||
unreachable!("tried to get fd for non-existent token? {other_token:?}")
|
||||
});
|
||||
let Some(info) = self.fd_info.get(&fd) else {
|
||||
panic!("Received an event on an unregistered fd {fd}; IO-safety violation?");
|
||||
};
|
||||
|
||||
let either_available = event.is_readable() || event.is_writable();
|
||||
if !either_available {
|
||||
info!(
|
||||
"Got EVENT for file descriptor '{}': r={}, w={}",
|
||||
info.display(),
|
||||
event.is_readable(),
|
||||
event.is_writable(),
|
||||
);
|
||||
// FIXME: code duplication
|
||||
if event.is_read_closed() {
|
||||
self.deregister(fd);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
match info.kind {
|
||||
FdKind::Pid(pid) => {
|
||||
debug!("Reaping child process {pid}");
|
||||
// SAFETY: `fd` cannot have been closed yet, since that's what we do here.
|
||||
let pidfd = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
let status = rustix::waitid(WaitId::PidFd(pidfd), WaitIdOptions::EXITED)
|
||||
.unwrap_or_else(|e| {
|
||||
todo!("waitid() can fail? on pid {pid}: {e}");
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
todo!("waitid() returned None? for pid {pid}");
|
||||
});
|
||||
|
||||
trace!("waitid() for pid {pid} returned status: {status:?}");
|
||||
let is_dead = status.exited() || status.killed() || status.dumped();
|
||||
if !is_dead {
|
||||
todo!("Handle process {pid} events that aren't death: {status:?}");
|
||||
}
|
||||
let Some(exit_code) = status.exit_status() else {
|
||||
unreachable!("Process {pid} died with no exit code at all? {status:?}");
|
||||
};
|
||||
debug!("Child process {pid} exited with code {exit_code}");
|
||||
|
||||
// Close the pidfd.
|
||||
self.deregister(fd);
|
||||
|
||||
let stream = self
|
||||
.fd_info
|
||||
.iter()
|
||||
.find_map(|info| (info.kind == FdKind::SockStream).then_some(info));
|
||||
if let Some(stream) = stream {
|
||||
// SAFETY: fixme.
|
||||
let stream_fd = unsafe { BorrowedFd::borrow_raw(stream.fd) };
|
||||
let payload = format!("{{ \"status\": {exit_code} }}\n");
|
||||
if let Err(e) = rustix::io::write(stream_fd, payload.as_bytes()) {
|
||||
error!("couldn't write reply to stream fd {stream_fd:?}: {e}");
|
||||
}
|
||||
}
|
||||
},
|
||||
FdKind::ChildStdout(_pid) => {
|
||||
// SAFETY: oh boy.
|
||||
let stdout = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
self.proxy_stdio(&stdout)
|
||||
.unwrap_or_else(|e| error!("failed to proxy child stdout: {e}"));
|
||||
},
|
||||
FdKind::ChildStderr(_pid) => {
|
||||
// SAFETY: oh boy.
|
||||
let stderr = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
self.proxy_stdio(&stderr)
|
||||
.unwrap_or_else(|e| error!("failed to proxy child stderr: {e}"));
|
||||
},
|
||||
FdKind::SockStream => {
|
||||
// SAFETY: oh boy.
|
||||
let stream_fd = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
self.read_cmd(&stream_fd).unwrap();
|
||||
},
|
||||
kind => todo!("{kind:?}"),
|
||||
};
|
||||
|
||||
if event.is_read_closed() {
|
||||
self.deregister(fd);
|
||||
return Ok(());
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(Json(SetResponse {
|
||||
status,
|
||||
msg: Some(msg),
|
||||
}))
|
||||
}
|
||||
|
||||
impl Drop for Daemon {
|
||||
fn drop(&mut self) {
|
||||
if let Some(path) = self.path.as_deref() {
|
||||
let _ = rustix::fs::unlink(path);
|
||||
}
|
||||
}
|
||||
async fn nix_run_apply(config: &Config) -> Result<Output, IoError> {
|
||||
let configuration_nix = config
|
||||
.config_file
|
||||
.path()
|
||||
.parent()
|
||||
.unwrap()
|
||||
.join("configuration.nix");
|
||||
let configuration_nix = configuration_nix
|
||||
.to_str()
|
||||
.expect("specified NixOS config file is not a UTF-8 path");
|
||||
let expr = format!(
|
||||
"(import <nixpkgs/nixos> {{ configuration = {}; }})\
|
||||
.config.dynamicism.applyDynamicConfiguration {{ baseConfiguration = {}; }}",
|
||||
configuration_nix, configuration_nix,
|
||||
);
|
||||
|
||||
let child = Command::new(*NIX)
|
||||
.arg("run")
|
||||
.arg("--show-trace")
|
||||
.arg("--log-format")
|
||||
.arg("raw-with-logs")
|
||||
.arg("--impure")
|
||||
.arg("-E")
|
||||
.arg(expr)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.tap(|cmd| {
|
||||
if tracing::enabled!(Level::DEBUG) {
|
||||
let args = cmd
|
||||
.as_std()
|
||||
.get_args()
|
||||
.map(OsStr::to_string_lossy)
|
||||
.join(" ");
|
||||
debug!("Spawning command: `nix {args}`");
|
||||
}
|
||||
})
|
||||
.spawn()
|
||||
.inspect_err(|e| error!("error spawning command: {e}"))?;
|
||||
|
||||
let output = child.wait_with_output().await.inspect_err(|e| {
|
||||
error!("couldn't wait for spawned child process: {e}");
|
||||
})?;
|
||||
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
//#[derive(Copy)]
|
||||
//#[derive(Debug, Clone, PartialEq)]
|
||||
//#[derive(utoipa::OpenApi)]
|
||||
//#[openapi(paths(ep_set_post))]
|
||||
//pub struct ApiDoc;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue