From 16f2649334a77bf1afc67f3955106818bffbd7f6 Mon Sep 17 00:00:00 2001 From: Qyriad Date: Thu, 19 Mar 2026 21:39:11 +0100 Subject: [PATCH] close() fds that have nothing more to read --- src/daemon.rs | 146 ++++++++++++++++++++++++-------------------- src/daemon_tokfd.rs | 25 ++++++++ src/lib.rs | 4 +- 3 files changed, 108 insertions(+), 67 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 9f4856e..9492f04 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -13,12 +13,7 @@ use iddqd::{BiHashMap, IdOrdMap}; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; -use rustix::{ - buffer::spare_capacity, - fs::{FileType, Stat}, - net::SocketFlags, - process::Uid, -}; +use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid}; use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; @@ -122,7 +117,6 @@ pub struct Daemon { poller: Poll, - //fd_error_buffer: CircularBuffer, fd_info: IdOrdMap, // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. @@ -135,36 +129,46 @@ pub struct Daemon { /// `tokfd` handling. impl Daemon { fn fd_error_pop(&mut self, fd: RawFd) -> Option { - self.fd_info - .get_mut(&fd) - .unwrap_or_else(|| { - if let Ok(name) = FdInfo::guess_name(fd) { - panic!( - "tried to pop error for unknown fd {fd} ({})", - name.to_string_lossy(), - ); - } - panic!("tried to pop error for unknown fd {fd}") - }) - .error_buffer - .pop_front() + let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| { + if let Ok(name) = FdInfo::guess_name(fd) { + panic!( + "tried to pop error for unknown fd {fd} ({})", + name.to_string_lossy(), + ); + } + panic!("tried to pop error for unknown fd {fd}") + }); + info.error_buffer.pop_front().tap_some(|e| { + trace!("Popping error for {}: {e}", info.display()); + }) } fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> { - self.fd_info + let mut info = self + .fd_info .get_mut(&fd) - .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")) - .error_buffer - .try_push_back(error) + .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")); + trace!("Pushing error for {}: {}", info.display(), error); + info.error_buffer.try_push_back(error) } - fn register(&mut self, fd: RawFd) -> Token { - self.register_with_kind(fd, FdKind::Unknown) + fn main_fd_info(&self) -> &FdInfo { + self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| { + unreachable!( + "Main daemon fd {:?} was not registered with fd_info", + self.fd, + ) + }) } - fn register_with_kind(&mut self, fd: RawFd, kind: FdKind) -> Token { + fn register(&mut self, fd: RawFd, kind: FdKind) -> Token { let token = next_token(); + debug!( + "Registering new {} FdInfo for {fd} with token {token:?}", + kind.name_str(), + ); + self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap(); self.tokfd @@ -180,6 +184,21 @@ impl Daemon { token } + fn deregister(&mut self, fd: RawFd) { + let info = self + .fd_info + .remove(&fd) + .unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}")); + debug!("Unregistering fd {}; calling close()", info.display()); + unsafe { rustix::io::close(fd) }; + let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) }; + debug_assert_eq!(res, -1); + debug_assert_eq!( + IoError::last_os_error().raw_os_error(), + Some(Errno::BADF.raw_os_error()), + ); + } + fn fd_for_token(&self, token: Token) -> Option { self.tokfd .get1(&token) @@ -224,10 +243,8 @@ impl Daemon { fd, path, poller, - //poll_error_buffer: Default::default(), fd_info, tokfd: Default::default(), - //fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, } @@ -258,14 +275,14 @@ impl Daemon { Err(e) => { warn!( "failed binding AF_UNIX socket at {}: {e}; trying elsewhere", - preferred.display() + preferred.display(), ); let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); Self::from_unix_socket_path(&fallback).tap_err(|e| { error!( "failed binding AF_UNIX socket at {}: {e}", - fallback.display() + fallback.display(), ) })? }, @@ -301,11 +318,6 @@ impl Daemon { /// Private helpers. impl Daemon { fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { - //let fd = match fd.as_ref() { - // Some(fd) => fd.as_fd(), - // None => self.fd.as_fd(), - //}; - if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); } @@ -326,20 +338,18 @@ impl Daemon { }, Err(e) => { warn!("error deserializing command: {e}"); - debug!("command buffer was: {}", self.cmd_buffer.as_bstr()); - //warn!("error count for fd {fd:?}: {}", self.fd_error_buffer.len()); + debug!("command buffer was: {:?}", self.cmd_buffer.as_bstr()); self.cmd_buffer.clear(); // Don't propagate the error unless we have too many. self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; - //self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - // error!("Accumulated too many errors for daemon fd {fd:?}: {e}") - //})?; return Ok(()); }, }; debug!("got cmd: {cmd:?}"); + let _ = rustix::io::write(fd, b""); + info!("dispatching command"); } self.cmd_buffer.clear(); @@ -377,6 +387,14 @@ impl Daemon { let mut events = Events::with_capacity(1024); loop { + if tracing::enabled!(tracing::Level::DEBUG) { + // + trace!("Daemon loop iteration, with file descriptors: "); + for info in &self.fd_info { + trace!("- {}", info.display()); + } + } + if let Some(timeout) = self.next_timeout { debug!( "epoll_wait() with a timeout: {}", @@ -386,15 +404,25 @@ impl Daemon { match self.poller.poll(&mut events, self.next_timeout.take()) { Ok(_) => { + trace!( + "mio::Poller::poll() got events: {:?}", + events.iter().size_hint().0, + ); if events.is_empty() { warn!("timeout expired"); self.cmd_buffer.clear(); } else { - //let _ = self.poll_error_buffer.pop_front(); let _ = self.fd_error_pop(self.poller.as_raw_fd()); } }, + Err(e) if e.kind() == IoErrorKind::Interrupted => { + // EINTR is silly. + continue; + }, Err(e) => { + if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) { + unreachable!("EBADF on poller fd; IO safety violation?"); + } warn!("mio Poll::poll() error: {e}"); self.fd_error_push(self.poller.as_raw_fd(), e) .tap_err(|e| { @@ -404,14 +432,10 @@ impl Daemon { } for event in &events { + trace!("event is {event:?}"); match event.token() { DAEMON => { - let is_sock = rustix::fs::fstat(&self.fd) - .map(|Stat { st_mode, .. }| st_mode) - .map(FileType::from_raw_mode) - .map(FileType::is_socket) - .unwrap_or(false); - + let is_sock = self.main_fd_info().kind == FdKind::Socket; if !is_sock { // SAFETY: oh boy: disjoint borrows with extra steps. let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; @@ -438,12 +462,6 @@ impl Daemon { self.fd ) })?; - //self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - // error!( - // "Accumulated too many errors for daemon fd {:?}: {e}", - // self.fd, - // ) - //})?; continue; }, }; @@ -451,7 +469,7 @@ impl Daemon { // Add this stream to our poll interest list. // NOTE: `stream_fd` is now effectively `ManuallyDrop`. let stream_fd = stream_fd.into_raw_fd(); - let _token = self.register(stream_fd); + let _token = self.register(stream_fd, FdKind::SockStream); // Wait for the next poll to handle. }, @@ -461,9 +479,13 @@ impl Daemon { unreachable!("tried to get fd for no-existent token? {other_token:?}") }); - // SAFETY: oh boy. - let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; - self.read_cmd(&stream_fd).unwrap(); + if event.is_read_closed() { + self.deregister(stream_fd); + } else { + // SAFETY: oh boy. + let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; + self.read_cmd(&stream_fd).unwrap(); + } }, } } @@ -473,14 +495,6 @@ impl Daemon { impl Drop for Daemon { fn drop(&mut self) { - //let probably_synthetic = self - // .path - // .to_str() - // .map(|p| p.starts_with("«") && p.ends_with("»")) - // .unwrap_or(false); - //if probably_synthetic { - // return; - //} if let Some(path) = self.path.as_deref() { let _ = rustix::fs::unlink(path); } diff --git a/src/daemon_tokfd.rs b/src/daemon_tokfd.rs index b5e52bd..05d5ec9 100644 --- a/src/daemon_tokfd.rs +++ b/src/daemon_tokfd.rs @@ -67,6 +67,10 @@ impl FdInfo { self.name.get().unwrap_or_else(|| unreachable!()) } + + pub fn display(&self) -> FdInfoDisplay<'_> { + FdInfoDisplay { inner: self } + } } impl IdOrdItem for FdInfo { @@ -79,6 +83,27 @@ impl IdOrdItem for FdInfo { iddqd::id_upcast!(); } +#[derive(Debug)] +pub struct FdInfoDisplay<'a> { + inner: &'a FdInfo, +} +impl<'a> Display for FdInfoDisplay<'a> { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!( + f, + "{} fd {} ({})", + self.inner.kind.name_str(), + self.inner.fd, + self.inner.name().to_string_lossy(), + )?; + if !self.inner.error_buffer.is_empty() { + write!(f, "; with errors: {}", self.inner.error_buffer.len())?; + } + + Ok(()) + } +} + #[derive(Copy)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[non_exhaustive] diff --git a/src/lib.rs b/src/lib.rs index d5d53e5..426fc44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,9 @@ pub(crate) mod prelude { pub use bstr::ByteSlice; - pub use tap::{Pipe, Tap, TapFallible}; + pub use rustix::io::Errno; + + pub use tap::{Pipe, Tap, TapFallible, TapOptional}; pub use tracing::{Level, debug, error, info, trace, warn};