use std::{ env, io, os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd}, process::{Command, Stdio}, sync::{ Arc, LazyLock, atomic::{AtomicUsize, Ordering}, }, time::Duration, }; use iddqd::{BiHashMap, IdOrdMap}; use mio::{Events, Interest, Poll, Token, event::Event, net::UnixListener, unix::SourceFd}; use rustix::{ buffer::spare_capacity, net::SocketFlags, process::{Pid, PidfdFlags, Uid, WaitId, WaitIdOptions}, }; mod rustix { pub use rustix::process::{getuid, pidfd_open, waitid}; pub use rustix::*; } //mod rustix_prelude { // pub use rustix::process::{getuid, pidfd_open, waitid}; //} use serde_json::StreamDeserializer; use crate::prelude::*; pub mod api; use api::DaemonCmd; use crate::daemon_tokfd::{FdInfo, FdKind}; use crate::{OwnedFdWithFlags, TokenFd}; pub static UID: LazyLock = LazyLock::new(rustix::process::getuid); pub static USER_SOCKET_DIR: LazyLock<&'static Path> = LazyLock::new(|| { let dir: Box = env::var_os("XDG_RUNTIME_DIR") .map(PathBuf::from) .unwrap_or_else(|| ["/", "run", "user", &UID.to_string()].into_iter().collect()) .into_boxed_path(); Box::leak(dir) }); pub static TMPDIR: LazyLock<&'static Path> = LazyLock::new(|| { let dir: Box = env::temp_dir().into_boxed_path(); Box::leak(dir) }); pub static NIXOS_REBUILD: LazyLock<&'static Path> = LazyLock::new(|| { which::which("nixos-rebuild") .inspect_err(|e| error!("couldn't find `nixos-rebuild` in PATH: {e}")) .map(PathBuf::into_boxed_path) .map(|boxed| &*Box::leak(boxed)) .unwrap_or(Path::new("/run/current-system/sw/bin/nixos-rebuild")) }); const TIMEOUT_NEVER: Option = None; static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1); fn next_token() -> Token { let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst); // If the increment wrapped to 0, then we just increment it again. if tok == 0 { warn!("File descriptor token wrapped. That's... a lot."); return next_token(); } Token(tok) } trait EventExt { type Display; fn display(&self) -> Self::Display; } #[derive(Copy)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] struct EventDisplay { token: Token, error: bool, writable: bool, write_closed: bool, readable: bool, read_closed: bool, } impl EventExt for Event { type Display = EventDisplay; fn display(&self) -> Self::Display { EventDisplay { token: self.token(), error: self.is_error(), writable: self.is_writable(), write_closed: self.is_write_closed(), readable: self.is_readable(), read_closed: self.is_read_closed(), } } } impl Display for EventDisplay { fn fmt(&self, f: &mut Formatter) -> FmtResult { todo!() } } #[derive(Debug)] pub struct Daemon { config_path: Arc, fd: OwnedFdWithFlags, path: Option>, poller: Poll, fd_info: IdOrdMap, // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. tokfd: BiHashMap, } /// `tokfd` handling. impl Daemon { fn fd_error_pop(&mut self, fd: RawFd) -> Option { let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| { if let Ok(name) = FdInfo::guess_name(fd) { panic!( "tried to pop error for unknown fd {fd} ({})", name.to_string_lossy(), ); } panic!("tried to pop error for unknown fd {fd}") }); info.error_buffer.pop_front().tap_some(|e| { trace!("Popping error for {}: {e}", info.display()); }) } fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> { let mut info = self .fd_info .get_mut(&fd) .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")); trace!("Pushing error for {}: {}", info.display(), error); info.error_buffer.try_push_back(error) } fn main_fd_info(&self) -> &FdInfo { self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| { unreachable!( "Main daemon fd {:?} was not registered with fd_info", self.fd, ) }) } fn register(&mut self, fd: RawFd, kind: FdKind) -> Token { let token = next_token(); debug!( "Registering new {} FdInfo for {fd} with token {token:?}", kind.name_str(), ); self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap(); self.tokfd .insert_unique(TokenFd { token, fd }) .unwrap_or_else(|e| todo!("{e}")); let mut source = SourceFd(&fd); self.poller .registry() .register(&mut source, token, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}")); token } fn register_with_name(&mut self, fd: RawFd, kind: FdKind, name: Box) -> Token { let token = next_token(); debug!( "Registering new {} FdInfo for {fd} ({}) with token {token:?}", name.to_string_lossy(), kind.name_str(), ); self.fd_info .insert_unique(FdInfo::new_with_name(fd, kind, name)) .unwrap(); self.tokfd .insert_unique(TokenFd { token, fd }) .unwrap_or_else(|e| todo!("{e}")); let mut source = SourceFd(&fd); self.poller .registry() .register(&mut source, token, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}")); token } fn deregister(&mut self, fd: RawFd) { let info = self .fd_info .remove(&fd) .unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}")); debug!("Unregistering fd {}; calling close()", info.display()); unsafe { rustix::io::close(fd) }; let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) }; debug_assert_eq!(res, -1); debug_assert_eq!( IoError::last_os_error().raw_os_error(), Some(Errno::BADF.raw_os_error()), ); self.tokfd.remove2(&fd).unwrap_or_else(|| todo!()); } fn fd_for_token(&self, token: Token) -> Option { self.tokfd .get1(&token) .map(|TokenFd { fd, .. }| fd) .copied() } /// Not currently used, but here for completeness. #[expect(dead_code)] fn token_for_fd(&self, fd: RawFd) -> Option { self.tokfd .get2(&fd) .map(|TokenFd { token, .. }| token) .copied() } } impl Daemon { pub fn new( config_path: Arc, fd: OwnedFd, kind: FdKind, name: Option>, ) -> Self { let mut fd_info: IdOrdMap = Default::default(); // Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM. // If any of those are the case, we're screwed anyway. let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}")); // Make sure we register the poller in `fd_info`, so we can keep track of its errors. fd_info .insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller)) .unwrap_or_else(|e| unreachable!("{e}")); let fd = OwnedFdWithFlags::new_with_fallback(fd); fd_info .insert_unique(FdInfo::new(fd.as_raw_fd(), kind)) .unwrap_or_else(|e| unreachable!("{e}")); debug!("opened daemon to {:?} file descriptor {fd:?}", name); let path = name .as_ref() .map(PathBuf::from) .map(PathBuf::into_boxed_path); Self { config_path, fd, path, poller, fd_info, tokfd: Default::default(), } } pub fn from_unix_socket_path(config_path: Arc, path: &Path) -> Result { // We unconditionally unlink() `path` before binding, but completely ignore the result. let _ = rustix::fs::unlink(path); let listener = UnixListener::bind(path) .tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?; let listener_owned_fd = OwnedFd::from(listener); // FIXME: should we KEEP_ALIVE? rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); Ok(Self::new( config_path, listener_owned_fd, FdKind::Socket, Some(path), )) } pub fn open_default_socket(config_path: Arc) -> Result { use IoErrorKind::*; let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path(); let constructed = match Self::from_unix_socket_path(config_path.clone(), &preferred) { Ok(v) => v, Err(e) if e.kind() == Unsupported => { // return Err(e); }, Err(e) => { warn!( "failed binding AF_UNIX socket at {}: {e}; trying elsewhere", preferred.display(), ); let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); Self::from_unix_socket_path(config_path, &fallback).tap_err(|e| { error!( "failed binding AF_UNIX socket at {}: {e}", fallback.display(), ) })? }, }; Ok(constructed) } /// This panics if stdin cannot be opened. /// /// If you want to handle that error, use [`Daemon::from_raw_parts()`]. pub fn from_stdin(config_path: Arc) -> Self { let stdin = io::stdin(); let fd = stdin .as_fd() .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); Self::new(config_path, fd, FdKind::File, None) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { // Self { // fd: OwnedFdWithFlags::new_with_fallback(fd), // } //} pub fn fd(&self) -> BorrowedFd<'_> { self.fd.as_fd() } } const DAEMON: Token = Token(0); /// Private helpers. impl Daemon { //fn proxy_stdio(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { // let info = self.fd_info.get(&fd.as_raw_fd()).unwrap(); // let label = match info.kind { // FdKind::ChildStdout => "stdout", // FdKind::ChildStderr => "stderr", // other => unreachable!("child stdio cannot have kind {other:?}"), // }; // // FIXME: don't use a new allocation every time. // let mut buffer: Vec = Vec::with_capacity(1024); // // FIXME: handle line buffering correctly. // loop { // let count = rustix::io::read(fd, spare_capacity(&mut buffer)) // .inspect_err(|e| error!("read() on child stdio fd {fd:?} failed: {e}"))?; // // if count == 0 { // break; // } // // for line in buffer.lines() { // debug!("[child {label}]: {}", line.as_bstr()) // } // } // // Ok(()) //} fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { // FIXME: don't use a new allocation every time. let mut cmd_buffer: Vec = Vec::with_capacity(1024); let _count = rustix::io::read(fd, spare_capacity(&mut cmd_buffer)) .tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?; // The buffer might have existing data from the last read. let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer); let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter(); for cmd in stream { let cmd = match cmd { Ok(cmd) => cmd, Err(e) if e.is_eof() => { warn!("Got EOF before a valid command"); debug!("command buffer was: {:?}", cmd_buffer.as_bstr()); return Ok(()); }, Err(e) => { warn!("error deserializing command: {e}"); debug!("command buffer was: {:?}", cmd_buffer.as_bstr()); // Don't propagate the error unless we have too many. self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; return Ok(()); }, }; debug!("got cmd: {cmd:?}"); let _ = rustix::io::write(fd, b""); info!("dispatching command"); self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}")); } Ok(()) } fn dispatch_cmd(&mut self, cmd: DaemonCmd) -> Result<(), IoError> { // Write the new file... let (name, value) = match cmd { DaemonCmd::Append { name, value } => (name, value), }; let source_file = crate::open_source_file(self.config_path.clone())?; let pri = crate::get_where(source_file.clone()).unwrap_or_else(|e| todo!("{e}")); let new_pri = pri - 1; // Get next priority line. let opt_name = name.to_nix_decl(); let new_line = crate::get_next_prio_line(source_file.clone(), &opt_name, new_pri, &value) .unwrap_or_else(|e| panic!("someone is holding a reference to source.lines(): {e}")); crate::write_next_prio(source_file, new_line).unwrap_or_else(|e| todo!("{e}")); // Rebuild and switch. // FIXME: allow passing additional args. let child = Command::new(*NIXOS_REBUILD) .arg("switch") .arg("--log-format") .arg("raw-with-logs") .arg("--no-reexec") .arg("-v") .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .inspect_err(|e| { error!("failed to spawn `nixos-rebuild` command: {e}"); })?; debug!("Spanwed child process {}", child.id()); let pid = Pid::from_child(&child); let stdout = child.stdout.unwrap_or_else(|| { unreachable!("`child` is given `.stdout(Stdio::piped())`"); }); let stderr = child.stderr.unwrap_or_else(|| { unreachable!("`child` is given `.stderr(Stdio::piped())`"); }); let _token = self.register(stdout.into_raw_fd(), FdKind::ChildStdout); let _token = self.register(stderr.into_raw_fd(), FdKind::ChildStderr); match rustix::process::pidfd_open(pid, PidfdFlags::NONBLOCK) { Ok(pidfd) => { debug!("Opened pidfd {pidfd:?}, for process {pid}"); self.register(pidfd.into_raw_fd(), FdKind::Pid(pid)); }, Err(e) if e.kind() == IoErrorKind::NotFound => { warn!("child {pid} not found; died before we could open it?"); }, Err(e) => { error!("Error opening pidfd for child {pid}: {e}"); return Err(e)?; }, } Ok(()) } pub(crate) fn enter_loop(&mut self) -> Result, IoError> { let raw_fd = self.fd.as_raw_fd(); if cfg!(debug_assertions) { assert!( self.fd_info.contains_key(&raw_fd), "we should know about daemon fd {raw_fd}", ); assert!( self.fd_info.contains_key(&self.poller.as_raw_fd()), "we should know about poller fd {}", self.poller.as_raw_fd(), ); } let mut daemon_source = SourceFd(&raw_fd); self.tokfd .insert_unique(TokenFd { token: DAEMON, fd: raw_fd, }) .unwrap(); self.poller .registry() .register(&mut daemon_source, DAEMON, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); let mut events = Events::with_capacity(1024); loop { if tracing::enabled!(tracing::Level::DEBUG) { debug!("Daemon loop iteration, with file descriptors: "); for info in &self.fd_info { debug!("- {}", info.display()); } } let poll_result = self.poller.poll(&mut events, TIMEOUT_NEVER); self.handle_poll(poll_result, &events)?; } } fn handle_poll( &mut self, poll_result: Result<(), IoError>, events: &Events, ) -> Result<(), IoError> { match poll_result { Ok(()) => { trace!( "mio::Poller::poll() got events: {:?}", events.iter().size_hint().0, ); if events.is_empty() { unreachable!( "epoll_wait() with a \"forever\" timeout should never give empty events", ); } let _ = self.fd_error_pop(self.poller.as_raw_fd()); }, Err(e) if e.kind() == IoErrorKind::Interrupted => { // EINTR is silly. // Return early, and poll() again. return Ok(()); }, Err(e) => { if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) { panic!("EBADF on poller fd; IO safety violation?"); } warn!("mio Poll::poll() error: {e}"); self.fd_error_push(self.poller.as_raw_fd(), e) .tap_err(|e| { error!("accumulated too many errors for mio::Poll::poll(): {e}") })?; }, } for event in events { self.handle_event(event)?; } Ok(()) } fn handle_event(&mut self, event: &Event) -> Result<(), IoError> { trace!("Handling event {event:#?}"); match event.token() { DAEMON => { let is_sock = self.main_fd_info().kind == FdKind::Socket; if !is_sock { // SAFETY: oh boy: disjoint borrows with extra steps. let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; self.read_cmd(&file_fd).unwrap(); return Ok(()); } // Accept, first. let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC; let stream_fd = match rustix::net::accept_with(&self.fd, flags) { Ok(stream) => { debug!( "Accepted connection from socket {:?} as stream {:?}", self.fd, stream, ); stream }, Err(e) => { error!("accept4 on daemon socket failed: {e}"); self.fd_error_push(self.fd.as_raw_fd(), e.into()) .tap_err(|e| { error!( "Accumulated too many errors for daemon fd {:?}: {e}", self.fd ) })?; return Ok(()); }, }; // Add this stream to our poll interest list. // NOTE: `stream_fd` is now effectively `ManuallyDrop`. let stream_fd = stream_fd.into_raw_fd(); let _token = self.register(stream_fd, FdKind::SockStream); // Wait for the next poll to handle. }, other_token => { // This must be a stream fd. let fd = self.fd_for_token(other_token).unwrap_or_else(|| { unreachable!("tried to get fd for non-existent token? {other_token:?}") }); let Some(info) = self.fd_info.get(&fd) else { panic!("Received an event on an unregistered fd {fd}; IO-safety violation?"); }; if event.is_read_closed() { self.deregister(fd); return Ok(()); } match info.kind { FdKind::Pid(pid) => { debug!("Reaping child process {pid}"); // SAFETY: `fd` cannot have been closed yet, since that's what we do here. let pidfd = unsafe { BorrowedFd::borrow_raw(fd) }; let status = rustix::waitid(WaitId::PidFd(pidfd), WaitIdOptions::EXITED) .unwrap_or_else(|e| { todo!("waitid() can fail? on pid {pid}: {e}"); }) .unwrap_or_else(|| { todo!("waitid() returned None? for pid {pid}"); }); debug!("waitid() for pid {pid} returned status: {status:?}"); let is_dead = status.exited() || status.killed() || status.dumped(); if !is_dead { todo!("Handle process {pid} events that aren't death: {status:?}"); } let Some(exit_code) = status.exit_status() else { unreachable!("Process {pid} died with no exit code at all? {status:?}"); }; debug!("Child process {pid} exited with code {exit_code}"); // Close the pidfd. self.deregister(fd); }, //FdKind::ChildStdout => { // warn!("got stdout"); // todo!(); //}, //FdKind::ChildStderr => { // warn!("got stderr"); // // SAFETY: oh boy. // let stderr = unsafe { BorrowedFd::borrow_raw(fd) }; // self.proxy_stdio(&stderr).unwrap(); //}, FdKind::SockStream => { // SAFETY: oh boy. let stream_fd = unsafe { BorrowedFd::borrow_raw(fd) }; self.read_cmd(&stream_fd).unwrap(); }, kind => todo!("{kind:?}"), }; }, } Ok(()) } } impl Drop for Daemon { fn drop(&mut self) { if let Some(path) = self.path.as_deref() { let _ = rustix::fs::unlink(path); } } }