use std::{ env, io, ops::Deref, os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd}, sync::{ Arc, LazyLock, atomic::{AtomicUsize, Ordering}, }, time::Duration, }; use iddqd::{BiHashMap, IdOrdMap}; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid}; use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; use crate::{ SourceFile, SourceLine, daemon_tokfd::{FdInfo, FdKind}, prelude::*, }; use crate::{OwnedFdWithFlags, TokenFd}; pub static UID: LazyLock = LazyLock::new(|| rustix::process::getuid()); pub static USER_SOCKET_DIR: LazyLock<&'static Path> = LazyLock::new(|| { let dir: Box = env::var_os("XDG_RUNTIME_DIR") .map(PathBuf::from) .unwrap_or_else(|| ["/", "run", "user", &UID.to_string()].into_iter().collect()) .into_boxed_path(); Box::leak(dir) }); pub static TMPDIR: LazyLock<&'static Path> = LazyLock::new(|| { let dir: Box = env::temp_dir().into_boxed_path(); Box::leak(dir) }); #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Deserialize, Serialize)] #[serde(untagged)] pub enum ConvenientAttrPath { Dotted(Box), Split(Box<[Box]>), } impl ConvenientAttrPath { pub fn clone_from_dotted(s: &str) -> Self { Self::Dotted(Box::from(s)) } pub fn clone_from_split(s: &[&str]) -> Self { Self::from_str_iter(s.into_iter().map(Deref::deref)) } pub fn from_str_iter<'i, I>(iter: I) -> Self where I: Iterator, { let boxed = iter.map(|s| Box::from(s)); Self::Split(Box::from_iter(boxed)) } pub fn to_nix_decl(&self) -> Box { use ConvenientAttrPath::*; match self { Dotted(s) => s.clone(), Split(path) => { // FIXME: quote if necessary path.join(".").into_boxed_str() }, } } } impl<'i> FromIterator<&'i str> for ConvenientAttrPath { fn from_iter(iter: I) -> Self where I: IntoIterator, { Self::from_str_iter(iter.into_iter()) } } impl FromIterator> for ConvenientAttrPath { fn from_iter(iter: I) -> Self where I: IntoIterator>, { Self::Split(Box::from_iter(iter)) } } #[derive(Debug, Clone, PartialEq)] #[derive(serde::Deserialize, serde::Serialize)] #[serde(tag = "action", content = "args", rename_all = "snake_case")] // FIXME: rename to not confuse with the clap argument type. pub enum DaemonCmd { Append { name: ConvenientAttrPath, value: Box, }, } const TIMEOUT_NEVER: Option = None; static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1); fn next_token() -> Token { let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst); // If the increment wrapped to 0, then we just increment it again. if tok == 0 { return Token(NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst)); } Token(tok) } #[derive(Debug)] pub struct Daemon { config_path: Arc, fd: OwnedFdWithFlags, path: Option>, poller: Poll, fd_info: IdOrdMap, // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. tokfd: BiHashMap, } /// `tokfd` handling. impl Daemon { fn fd_error_pop(&mut self, fd: RawFd) -> Option { let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| { if let Ok(name) = FdInfo::guess_name(fd) { panic!( "tried to pop error for unknown fd {fd} ({})", name.to_string_lossy(), ); } panic!("tried to pop error for unknown fd {fd}") }); info.error_buffer.pop_front().tap_some(|e| { trace!("Popping error for {}: {e}", info.display()); }) } fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> { let mut info = self .fd_info .get_mut(&fd) .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")); trace!("Pushing error for {}: {}", info.display(), error); info.error_buffer.try_push_back(error) } fn main_fd_info(&self) -> &FdInfo { self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| { unreachable!( "Main daemon fd {:?} was not registered with fd_info", self.fd, ) }) } fn register(&mut self, fd: RawFd, kind: FdKind) -> Token { let token = next_token(); debug!( "Registering new {} FdInfo for {fd} with token {token:?}", kind.name_str(), ); self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap(); self.tokfd .insert_unique(TokenFd { token, fd }) .unwrap_or_else(|e| todo!("{e}")); let mut source = SourceFd(&fd); self.poller .registry() .register(&mut source, token, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}")); token } fn deregister(&mut self, fd: RawFd) { let info = self .fd_info .remove(&fd) .unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}")); debug!("Unregistering fd {}; calling close()", info.display()); unsafe { rustix::io::close(fd) }; let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) }; debug_assert_eq!(res, -1); debug_assert_eq!( IoError::last_os_error().raw_os_error(), Some(Errno::BADF.raw_os_error()), ); self.tokfd.remove2(&fd).unwrap_or_else(|| todo!()); } fn fd_for_token(&self, token: Token) -> Option { self.tokfd .get1(&token) .map(|TokenFd { fd, .. }| fd) .copied() } /// Not currently used, but here for completeness. #[expect(dead_code)] fn token_for_fd(&self, fd: RawFd) -> Option { self.tokfd .get2(&fd) .map(|TokenFd { token, .. }| token) .copied() } } impl Daemon { pub fn new( config_path: Arc, fd: OwnedFd, kind: FdKind, name: Option>, ) -> Self { let mut fd_info: IdOrdMap = Default::default(); // Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM. // If any of those are the case, we're screwed anyway. let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}")); // Make sure we register the poller in `fd_info`, so we can keep track of its errors. fd_info .insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller)) .unwrap_or_else(|e| unreachable!("{e}")); let fd = OwnedFdWithFlags::new_with_fallback(fd); fd_info .insert_unique(FdInfo::new(fd.as_raw_fd(), kind)) .unwrap_or_else(|e| unreachable!("{e}")); debug!("opened daemon to {:?} file descriptor {fd:?}", name); let path = match &name { Some(name) => Some(PathBuf::from(name).into_boxed_path()), None => None, }; Self { config_path, fd, path, poller, fd_info, tokfd: Default::default(), } } pub fn from_unix_socket_path(config_path: Arc, path: &Path) -> Result { // We unconditionally unlink() `path` before binding, but completely ignore the result. let _ = rustix::fs::unlink(path); let listener = UnixListener::bind(path) .tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?; let listener_owned_fd = OwnedFd::from(listener); // FIXME: should we KEEP_ALIVE? rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); Ok(Self::new( config_path, listener_owned_fd, FdKind::Socket, Some(path), )) } pub fn open_default_socket(config_path: Arc) -> Result { use IoErrorKind::*; let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path(); let constructed = match Self::from_unix_socket_path(config_path.clone(), &preferred) { Ok(v) => v, Err(e) if e.kind() == Unsupported => { // return Err(e); }, Err(e) => { warn!( "failed binding AF_UNIX socket at {}: {e}; trying elsewhere", preferred.display(), ); let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); Self::from_unix_socket_path(config_path, &fallback).tap_err(|e| { error!( "failed binding AF_UNIX socket at {}: {e}", fallback.display(), ) })? }, }; Ok(constructed) } /// This panics if stdin cannot be opened. /// /// If you want to handle that error, use [`Daemon::from_raw_parts()`]. pub fn from_stdin(config_path: Arc) -> Self { let stdin = io::stdin(); let fd = stdin .as_fd() .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); Self::new(config_path, fd, FdKind::File, None) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { // Self { // fd: OwnedFdWithFlags::new_with_fallback(fd), // } //} pub fn fd(&self) -> BorrowedFd<'_> { self.fd.as_fd() } } /// Private helpers. impl Daemon { fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { // FIXME: don't use a new allocation every time. let mut cmd_buffer: Vec = Vec::with_capacity(1024); let _count = rustix::io::read(fd, spare_capacity(&mut cmd_buffer)) .tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?; // The buffer might have existing data from the last read. let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer); let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter(); for cmd in stream { let cmd = match cmd { Ok(cmd) => cmd, Err(e) if e.is_eof() => { warn!("Got EOF before a valid command"); debug!("command buffer was: {:?}", cmd_buffer.as_bstr()); return Ok(()); }, Err(e) => { warn!("error deserializing command: {e}"); debug!("command buffer was: {:?}", cmd_buffer.as_bstr()); // Don't propagate the error unless we have too many. self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; return Ok(()); }, }; debug!("got cmd: {cmd:?}"); let _ = rustix::io::write(fd, b""); info!("dispatching command"); self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}")); } Ok(()) } fn dispatch_cmd(&mut self, cmd: DaemonCmd) -> Result<(), IoError> { let (name, value) = match cmd { DaemonCmd::Append { name, value } => (name, value), }; let mut opts = File::options(); opts.read(true) .write(true) .create(false) .custom_flags(libc::O_CLOEXEC); let source_file = SourceFile::open_from(self.config_path.clone(), opts)?; let pri = crate::get_where(source_file.clone()).unwrap_or_else(|e| todo!("{e}")); let new_pri = pri - 1; //let new_pri_line = // crate::get_next_prio_line(source_file.clone(), Arc::from(name), Arc::from(value)); // Get next priority line. let source_lines = source_file.lines()?; let penultimate = source_lines.get(source_lines.len() - 2); // FIXME: don't rely on whitespace lol debug_assert_eq!(penultimate.map(SourceLine::text).as_deref(), Some(" ];")); let penultimate = penultimate.unwrap(); let new_generation = 0 - new_pri; let new_line = SourceLine { line: penultimate.line, path: source_file.path(), text: Arc::from(format!( " {} = lib.mkOverride ({}) ({}); # DYNIX GENERATION {}", name.to_nix_decl(), new_pri, value, new_generation, )), }; drop(source_lines); crate::write_next_prio(source_file, new_line).unwrap_or_else(|e| todo!("{e}")); Ok(()) } pub(crate) fn enter_loop(&mut self) -> Result, IoError> { let raw_fd = self.fd.as_raw_fd(); if cfg!(debug_assertions) { assert!( self.fd_info.contains_key(&raw_fd), "we should know about daemon fd {raw_fd}", ); assert!( self.fd_info.contains_key(&self.poller.as_raw_fd()), "we should know about poller fd {}", self.poller.as_raw_fd(), ); } let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); self.tokfd .insert_unique(TokenFd { token: DAEMON, fd: raw_fd, }) .unwrap(); self.poller .registry() .register(&mut daemon_source, DAEMON, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); let mut events = Events::with_capacity(1024); loop { if tracing::enabled!(tracing::Level::DEBUG) { // trace!("Daemon loop iteration, with file descriptors: "); for info in &self.fd_info { trace!("- {}", info.display()); } } match self.poller.poll(&mut events, TIMEOUT_NEVER) { Ok(_) => { trace!( "mio::Poller::poll() got events: {:?}", events.iter().size_hint().0, ); if events.is_empty() { unreachable!( "epoll_wait() with a \"forever\" timeout should never give empty events", ); } let _ = self.fd_error_pop(self.poller.as_raw_fd()); }, Err(e) if e.kind() == IoErrorKind::Interrupted => { // EINTR is silly. continue; }, Err(e) => { if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) { unreachable!("EBADF on poller fd; IO safety violation?"); } warn!("mio Poll::poll() error: {e}"); self.fd_error_push(self.poller.as_raw_fd(), e) .tap_err(|e| { error!("accumulated too many errors for mio::Poll::poll(): {e}") })?; }, } for event in &events { trace!("event is {event:?}"); match event.token() { DAEMON => { let is_sock = self.main_fd_info().kind == FdKind::Socket; if !is_sock { // SAFETY: oh boy: disjoint borrows with extra steps. let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; self.read_cmd(&file_fd).unwrap(); continue; } // Accept, first. let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC; let stream_fd = match rustix::net::accept_with(&self.fd, flags) { Ok(stream) => { debug!( "Accepted connection from socket {:?} as stream {:?}", self.fd, stream, ); stream }, Err(e) => { error!("accept4 on daemon socket failed: {e}"); self.fd_error_push(self.fd.as_raw_fd(), e.into()) .tap_err(|e| { error!( "Accumulated too many errors for daemon fd {:?}: {e}", self.fd ) })?; continue; }, }; // Add this stream to our poll interest list. // NOTE: `stream_fd` is now effectively `ManuallyDrop`. let stream_fd = stream_fd.into_raw_fd(); let _token = self.register(stream_fd, FdKind::SockStream); // Wait for the next poll to handle. }, other_token => { // This must be a stream fd. let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| { unreachable!("tried to get fd for no-existent token? {other_token:?}") }); if event.is_read_closed() { self.deregister(stream_fd); } else { // SAFETY: oh boy. let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; self.read_cmd(&stream_fd).unwrap(); } }, } } } } } impl Drop for Daemon { fn drop(&mut self) { if let Some(path) = self.path.as_deref() { let _ = rustix::fs::unlink(path); } } }