diff --git a/src/daemon.rs b/src/daemon.rs index f48b827..9f4856e 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,13 +1,15 @@ use std::{ env, io, ops::Deref, - os::fd::{AsFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, - sync::LazyLock, + os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd}, + sync::{ + LazyLock, + atomic::{AtomicUsize, Ordering}, + }, time::Duration, }; -use circular_buffer::CircularBuffer; -use iddqd::BiHashMap; +use iddqd::{BiHashMap, IdOrdMap}; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; @@ -21,7 +23,10 @@ use rustix::{ use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; -use crate::prelude::*; +use crate::{ + daemon_tokfd::{FdInfo, FdKind}, + prelude::*, +}; use crate::{OwnedFdWithFlags, TokenFd}; @@ -98,12 +103,27 @@ pub enum DaemonCmd { const TIMEOUT_NEVER: Option = None; +static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1); +fn next_token() -> Token { + let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst); + + // If the increment wrapped to 0, then we just increment it again. + if tok == 0 { + return Token(NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst)); + } + + Token(tok) +} + #[derive(Debug)] pub struct Daemon { fd: OwnedFdWithFlags, - path: Box, - poll_error_buffer: CircularBuffer, - fd_error_buffer: CircularBuffer, + path: Option>, + + poller: Poll, + + //fd_error_buffer: CircularBuffer, + fd_info: IdOrdMap, // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. tokfd: BiHashMap, @@ -114,9 +134,52 @@ pub struct Daemon { /// `tokfd` handling. impl Daemon { - //fn register(&mut self, token: Token, fd: RawFd) { - // self.insert_unique - //} + fn fd_error_pop(&mut self, fd: RawFd) -> Option { + self.fd_info + .get_mut(&fd) + .unwrap_or_else(|| { + if let Ok(name) = FdInfo::guess_name(fd) { + panic!( + "tried to pop error for unknown fd {fd} ({})", + name.to_string_lossy(), + ); + } + panic!("tried to pop error for unknown fd {fd}") + }) + .error_buffer + .pop_front() + } + + fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> { + self.fd_info + .get_mut(&fd) + .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")) + .error_buffer + .try_push_back(error) + } + + fn register(&mut self, fd: RawFd) -> Token { + self.register_with_kind(fd, FdKind::Unknown) + } + + fn register_with_kind(&mut self, fd: RawFd, kind: FdKind) -> Token { + let token = next_token(); + + self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap(); + + self.tokfd + .insert_unique(TokenFd { token, fd }) + .unwrap_or_else(|e| todo!("{e}")); + + let mut source = SourceFd(&fd); + self.poller + .registry() + .register(&mut source, token, Interest::READABLE) + .unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}")); + + token + } + fn fd_for_token(&self, token: Token) -> Option { self.tokfd .get1(&token) @@ -133,20 +196,38 @@ impl Daemon { } impl Daemon { - pub fn new(fd: OwnedFd, name_or_path: Box) -> Self { + pub fn new(fd: OwnedFd, kind: FdKind, name: Option>) -> Self { + let mut fd_info: IdOrdMap = Default::default(); + + // Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM. + // If any of those are the case, we're screwed anyway. + let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}")); + // Make sure we register the poller in `fd_info`, so we can keep track of its errors. + fd_info + .insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller)) + .unwrap_or_else(|e| unreachable!("{e}")); + let fd = OwnedFdWithFlags::new_with_fallback(fd); - debug!( - "opened daemon to {} file descriptor {fd:?}", - name_or_path.display(), - ); + fd_info + .insert_unique(FdInfo::new(fd.as_raw_fd(), kind)) + .unwrap_or_else(|e| unreachable!("{e}")); + + debug!("opened daemon to {:?} file descriptor {fd:?}", name); + + let path = match &name { + Some(name) => Some(PathBuf::from(name).into_boxed_path()), + None => None, + }; Self { fd, - path: name_or_path, - poll_error_buffer: Default::default(), + path, + poller, + //poll_error_buffer: Default::default(), + fd_info, tokfd: Default::default(), - fd_error_buffer: Default::default(), + //fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, } @@ -162,7 +243,7 @@ impl Daemon { rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); - Ok(Self::new(listener_owned_fd, path)) + Ok(Self::new(listener_owned_fd, FdKind::Socket, Some(path))) } pub fn open_default_socket() -> Result { @@ -203,7 +284,7 @@ impl Daemon { .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); - Self::new(fd, Box::from(OsStr::new("«stdin»"))) + Self::new(fd, FdKind::File, None) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { @@ -217,15 +298,13 @@ impl Daemon { } } -const ERROR_BUFFER_LEN: usize = 8; - /// Private helpers. impl Daemon { - fn read_cmd(&mut self, fd: Option<&dyn AsFd>) -> Result<(), IoError> { - let fd = match fd.as_ref() { - Some(fd) => fd.as_fd(), - None => self.fd.as_fd(), - }; + fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { + //let fd = match fd.as_ref() { + // Some(fd) => fd.as_fd(), + // None => self.fd.as_fd(), + //}; if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); @@ -247,12 +326,16 @@ impl Daemon { }, Err(e) => { warn!("error deserializing command: {e}"); - warn!("error count: {}", self.fd_error_buffer.len()); + debug!("command buffer was: {}", self.cmd_buffer.as_bstr()); + //warn!("error count for fd {fd:?}: {}", self.fd_error_buffer.len()); self.cmd_buffer.clear(); // Don't propagate the error unless we have too many. - self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; + //self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + // error!("Accumulated too many errors for daemon fd {fd:?}: {e}") + //})?; return Ok(()); }, }; @@ -266,6 +349,17 @@ impl Daemon { pub(crate) fn enter_loop(&mut self) -> Result, IoError> { let raw_fd = self.fd.as_raw_fd(); + if cfg!(debug_assertions) { + assert!( + self.fd_info.contains_key(&raw_fd), + "we should know about daemon fd {raw_fd}", + ); + assert!( + self.fd_info.contains_key(&self.poller.as_raw_fd()), + "we should know about poller fd {}", + self.poller.as_raw_fd(), + ); + } let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); self.tokfd @@ -274,16 +368,9 @@ impl Daemon { fd: raw_fd, }) .unwrap(); - let mut next_token_number: usize = 1; - let mut next_token = || -> Token { - let t = Token(next_token_number); - next_token_number = next_token_number.saturating_add(1); - t - }; - let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); - - poll.registry() + self.poller + .registry() .register(&mut daemon_source, DAEMON, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); @@ -297,20 +384,22 @@ impl Daemon { ); } - match poll.poll(&mut events, self.next_timeout.take()) { + match self.poller.poll(&mut events, self.next_timeout.take()) { Ok(_) => { if events.is_empty() { warn!("timeout expired"); self.cmd_buffer.clear(); } else { - let _ = self.poll_error_buffer.pop_front(); + //let _ = self.poll_error_buffer.pop_front(); + let _ = self.fd_error_pop(self.poller.as_raw_fd()); } }, Err(e) => { warn!("mio Poll::poll() error: {e}"); - self.poll_error_buffer.try_push_back(e).tap_err(|e| { - error!("accumulated too many errors for mio::Poll::poll(): {e}") - })?; + self.fd_error_push(self.poller.as_raw_fd(), e) + .tap_err(|e| { + error!("accumulated too many errors for mio::Poll::poll(): {e}") + })?; }, } @@ -324,7 +413,9 @@ impl Daemon { .unwrap_or(false); if !is_sock { - self.read_cmd(None).unwrap(); + // SAFETY: oh boy: disjoint borrows with extra steps. + let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; + self.read_cmd(&file_fd).unwrap(); continue; } @@ -340,26 +431,27 @@ impl Daemon { }, Err(e) => { error!("accept4 on daemon socket failed: {e}"); - self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - error!( - "Accumulated too many errors for daemon fd {:?}: {e}", - self.fd - ) - })?; + self.fd_error_push(self.fd.as_raw_fd(), e.into()) + .tap_err(|e| { + error!( + "Accumulated too many errors for daemon fd {:?}: {e}", + self.fd + ) + })?; + //self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + // error!( + // "Accumulated too many errors for daemon fd {:?}: {e}", + // self.fd, + // ) + //})?; continue; }, }; // Add this stream to our poll interest list. - let mut stream_fd = stream_fd.into_raw_fd(); - let token = next_token(); - self.tokfd - .insert_unique((token, stream_fd).into()) - .unwrap_or_else(|e| unreachable!("? {e}")); - let mut source = SourceFd(&mut stream_fd); - poll.registry() - .register(&mut source, token, Interest::READABLE) - .unwrap(); + // NOTE: `stream_fd` is now effectively `ManuallyDrop`. + let stream_fd = stream_fd.into_raw_fd(); + let _token = self.register(stream_fd); // Wait for the next poll to handle. }, @@ -371,33 +463,26 @@ impl Daemon { // SAFETY: oh boy. let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; - self.read_cmd(Some(&stream_fd)).unwrap(); + self.read_cmd(&stream_fd).unwrap(); }, } } } } - - //fn get_fallback_fd_name(fd: Fd) -> Option> { - // let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); - // - // fs_err::read_link(dev_fd_path) - // .map(PathBuf::into_os_string) - // .map(OsString::into_boxed_os_str) - // .ok() - //} } impl Drop for Daemon { fn drop(&mut self) { - let probably_synthetic = self - .path - .to_str() - .map(|p| p.starts_with("«") && p.ends_with("»")) - .unwrap_or(false); - if probably_synthetic { - return; + //let probably_synthetic = self + // .path + // .to_str() + // .map(|p| p.starts_with("«") && p.ends_with("»")) + // .unwrap_or(false); + //if probably_synthetic { + // return; + //} + if let Some(path) = self.path.as_deref() { + let _ = rustix::fs::unlink(path); } - let _ = rustix::fs::unlink(&*self.path); } } diff --git a/src/daemon_io.rs b/src/daemon_io.rs index a1caa97..891a518 100644 --- a/src/daemon_io.rs +++ b/src/daemon_io.rs @@ -6,8 +6,6 @@ use std::{ }, }; -use iddqd::BiHashItem; -use mio::Token; use rustix::{ fs::{OFlags, fcntl_getfl, fcntl_setfl}, io::Errno, diff --git a/src/daemon_tokfd.rs b/src/daemon_tokfd.rs index 008c88d..b5e52bd 100644 --- a/src/daemon_tokfd.rs +++ b/src/daemon_tokfd.rs @@ -1,8 +1,114 @@ -use std::os::fd::RawFd; +use std::{os::fd::RawFd, sync::OnceLock}; -use iddqd::BiHashItem; +use circular_buffer::CircularBuffer; +use iddqd::{BiHashItem, IdOrdItem}; use mio::Token; +use crate::prelude::*; + +const ERROR_BUFFER_LEN: usize = 8; + +#[derive(Debug)] +pub struct FdInfo { + pub fd: RawFd, + pub kind: FdKind, + pub name: OnceLock>, + pub error_buffer: CircularBuffer, +} + +impl FdInfo { + pub fn new(fd: Fd, kind: FdKind) -> Self { + Self { + fd: fd.as_raw_fd(), + kind, + name: Default::default(), + error_buffer: Default::default(), + } + } + + pub fn new_with_name(fd: Fd, kind: FdKind, name: Box) -> Self { + Self { + fd: fd.as_raw_fd(), + kind, + name: OnceLock::from(name), + error_buffer: Default::default(), + } + } +} + +impl FdInfo { + pub(crate) fn guess_name(fd: Fd) -> Result, IoError> { + let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); + + fs_err::read_link(dev_fd_path) + .map(PathBuf::into_os_string) + .map(OsString::into_boxed_os_str) + } + + pub fn name(&self) -> &OsStr { + if let Some(name) = self.name.get() { + return name; + } + + match Self::guess_name(self.fd) { + Ok(name) => { + let prev = self.name.set(Box::from(name)); + debug_assert_eq!(prev, Ok(())); + }, + Err(e) => { + warn!( + "can't read link for {} /dev/fd/{}: {e}", + self.kind.name_str(), + self.fd, + ); + return OsStr::new("«unknown»"); + }, + } + + self.name.get().unwrap_or_else(|| unreachable!()) + } +} + +impl IdOrdItem for FdInfo { + type Key<'a> = &'a RawFd; + + fn key(&self) -> &RawFd { + &self.fd + } + + iddqd::id_upcast!(); +} + +#[derive(Copy)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[non_exhaustive] +pub enum FdKind { + File, + Socket, + SockStream, + Poller, + Unknown, +} + +impl FdKind { + pub fn name_str(self) -> &'static str { + use FdKind::*; + match self { + File => "file", + Socket => "socket", + SockStream => "socket stream", + Poller => "poller", + Unknown => "«unknown»", + } + } +} + +impl Default for FdKind { + fn default() -> FdKind { + FdKind::Unknown + } +} + #[derive(Copy)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct TokenFd {