use std::{ env, io, ops::Deref, os::fd::{AsFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, sync::LazyLock, time::Duration, }; use circular_buffer::CircularBuffer; use iddqd::BiHashMap; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; use rustix::{ buffer::spare_capacity, fs::{FileType, Stat}, net::SocketFlags, process::Uid, }; use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; use crate::prelude::*; use crate::{OwnedFdWithFlags, TokenFd}; pub static UID: LazyLock = LazyLock::new(|| rustix::process::getuid()); pub static USER_SOCKET_DIR: LazyLock<&'static Path> = LazyLock::new(|| { let dir: Box = env::var_os("XDG_RUNTIME_DIR") .map(PathBuf::from) .unwrap_or_else(|| ["/", "run", "user", &UID.to_string()].into_iter().collect()) .into_boxed_path(); Box::leak(dir) }); pub static TMPDIR: LazyLock<&'static Path> = LazyLock::new(|| { let dir: Box = env::temp_dir().into_boxed_path(); Box::leak(dir) }); #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Deserialize, Serialize)] #[serde(untagged)] pub enum ConvenientAttrPath { Dotted(Box), Split(Box<[Box]>), } impl ConvenientAttrPath { pub fn clone_from_dotted(s: &str) -> Self { Self::Dotted(Box::from(s)) } pub fn clone_from_split(s: &[&str]) -> Self { Self::from_str_iter(s.into_iter().map(Deref::deref)) } pub fn from_str_iter<'i, I>(iter: I) -> Self where I: Iterator, { let boxed = iter.map(|s| Box::from(s)); Self::Split(Box::from_iter(boxed)) } } impl<'i> FromIterator<&'i str> for ConvenientAttrPath { fn from_iter(iter: I) -> Self where I: IntoIterator, { Self::from_str_iter(iter.into_iter()) } } impl FromIterator> for ConvenientAttrPath { fn from_iter(iter: I) -> Self where I: IntoIterator>, { Self::Split(Box::from_iter(iter)) } } #[derive(Debug, Clone, PartialEq)] #[derive(serde::Deserialize, serde::Serialize)] #[serde(tag = "action", content = "args", rename_all = "snake_case")] pub enum DaemonCmd { Append { name: ConvenientAttrPath, value: Box, }, } const TIMEOUT_NEVER: Option = None; #[derive(Debug)] pub struct Daemon { fd: OwnedFdWithFlags, path: Box, poll_error_buffer: CircularBuffer, fd_error_buffer: CircularBuffer, // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. tokfd: BiHashMap, cmd_buffer: Vec, next_timeout: Option, } /// `tokfd` handling. impl Daemon { //fn register(&mut self, token: Token, fd: RawFd) { // self.insert_unique //} fn fd_for_token(&self, token: Token) -> Option { self.tokfd .get1(&token) .map(|TokenFd { fd, .. }| fd) .copied() } fn token_for_fd(&self, fd: RawFd) -> Option { self.tokfd .get2(&fd) .map(|TokenFd { token, .. }| token) .copied() } } impl Daemon { pub fn new(fd: OwnedFd, name_or_path: Box) -> Self { let fd = OwnedFdWithFlags::new_with_fallback(fd); debug!( "opened daemon to {} file descriptor {fd:?}", name_or_path.display(), ); Self { fd, path: name_or_path, poll_error_buffer: Default::default(), tokfd: Default::default(), fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, } } pub fn from_unix_socket_path(path: &Path) -> Result { // We unconditionally unlink() `path` before binding, but completely ignore the result. let _ = rustix::fs::unlink(path); let listener = UnixListener::bind(path) .tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?; let listener_owned_fd = OwnedFd::from(listener); // FIXME: should we KEEP_ALIVE? rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); Ok(Self::new(listener_owned_fd, path)) } pub fn open_default_socket() -> Result { use IoErrorKind::*; let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path(); let constructed = match Self::from_unix_socket_path(&preferred) { Ok(v) => v, Err(e) if e.kind() == Unsupported => { // return Err(e); }, Err(e) => { warn!( "failed binding AF_UNIX socket at {}: {e}; trying elsewhere", preferred.display() ); let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); Self::from_unix_socket_path(&fallback).tap_err(|e| { error!( "failed binding AF_UNIX socket at {}: {e}", fallback.display() ) })? }, }; Ok(constructed) } /// This panics if stdin cannot be opened. /// /// If you want to handle that error, use [`Daemon::from_raw_parts()`]. pub fn from_stdin() -> Self { let stdin = io::stdin(); let fd = stdin .as_fd() .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); Self::new(fd, Box::from(OsStr::new("«stdin»"))) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { // Self { // fd: OwnedFdWithFlags::new_with_fallback(fd), // } //} pub fn fd(&self) -> BorrowedFd<'_> { self.fd.as_fd() } } const ERROR_BUFFER_LEN: usize = 8; /// Private helpers. impl Daemon { fn read_cmd(&mut self, fd: Option<&dyn AsFd>) -> Result<(), IoError> { let fd = match fd.as_ref() { Some(fd) => fd.as_fd(), None => self.fd.as_fd(), }; if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); } let _count = rustix::io::read(fd, spare_capacity(&mut self.cmd_buffer)) .tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?; // The buffer might have existing data from the last read. let deserializer = serde_json::Deserializer::from_slice(&self.cmd_buffer); let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter(); for cmd in stream { let cmd = match cmd { Ok(cmd) => cmd, Err(e) if e.is_eof() => { self.next_timeout = Some(Duration::from_secs(4)); warn!("Didn't get a valid daemon command; giving the other side 4 seconds..."); return Ok(()); }, Err(e) => { warn!("error deserializing command: {e}"); warn!("error count: {}", self.fd_error_buffer.len()); self.cmd_buffer.clear(); // Don't propagate the error unless we have too many. self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; return Ok(()); }, }; debug!("got cmd: {cmd:?}"); } self.cmd_buffer.clear(); Ok(()) } pub(crate) fn enter_loop(&mut self) -> Result, IoError> { let raw_fd = self.fd.as_raw_fd(); let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); self.tokfd .insert_unique(TokenFd { token: DAEMON, fd: raw_fd, }) .unwrap(); let mut next_token_number: usize = 1; let mut next_token = || -> Token { let t = Token(next_token_number); next_token_number = next_token_number.saturating_add(1); t }; let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); poll.registry() .register(&mut daemon_source, DAEMON, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); let mut events = Events::with_capacity(1024); loop { if let Some(timeout) = self.next_timeout { debug!( "epoll_wait() with a timeout: {}", humantime::format_duration(timeout), ); } match poll.poll(&mut events, self.next_timeout.take()) { Ok(_) => { if events.is_empty() { warn!("timeout expired"); self.cmd_buffer.clear(); } else { let _ = self.poll_error_buffer.pop_front(); } }, Err(e) => { warn!("mio Poll::poll() error: {e}"); self.poll_error_buffer.try_push_back(e).tap_err(|e| { error!("accumulated too many errors for mio::Poll::poll(): {e}") })?; }, } for event in &events { match event.token() { DAEMON => { let is_sock = rustix::fs::fstat(&self.fd) .map(|Stat { st_mode, .. }| st_mode) .map(FileType::from_raw_mode) .map(FileType::is_socket) .unwrap_or(false); if !is_sock { self.read_cmd(None).unwrap(); continue; } // Accept, first. let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC; let stream_fd = match rustix::net::accept_with(&self.fd, flags) { Ok(stream) => { debug!( "Accepted connection from socket {:?} as stream {:?}", self.fd, stream, ); stream }, Err(e) => { error!("accept4 on daemon socket failed: {e}"); self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { error!( "Accumulated too many errors for daemon fd {:?}: {e}", self.fd ) })?; continue; }, }; // Add this stream to our poll interest list. let mut stream_fd = stream_fd.into_raw_fd(); let token = next_token(); self.tokfd .insert_unique((token, stream_fd).into()) .unwrap_or_else(|e| unreachable!("? {e}")); let mut source = SourceFd(&mut stream_fd); poll.registry() .register(&mut source, token, Interest::READABLE) .unwrap(); // Wait for the next poll to handle. }, other_token => { // This must be a stream fd. let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| { unreachable!("tried to get fd for no-existent token? {other_token:?}") }); // SAFETY: oh boy. let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; self.read_cmd(Some(&stream_fd)).unwrap(); }, } } } } //fn get_fallback_fd_name(fd: Fd) -> Option> { // let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); // // fs_err::read_link(dev_fd_path) // .map(PathBuf::into_os_string) // .map(OsString::into_boxed_os_str) // .ok() //} } impl Drop for Daemon { fn drop(&mut self) { let probably_synthetic = self .path .to_str() .map(|p| p.starts_with("«") && p.ends_with("»")) .unwrap_or(false); if probably_synthetic { return; } let _ = rustix::fs::unlink(&*self.path); } }