close() fds that have nothing more to read

This commit is contained in:
Qyriad 2026-03-19 21:39:11 +01:00
parent b11627d030
commit 16f2649334
3 changed files with 108 additions and 67 deletions

View file

@ -13,12 +13,7 @@ use iddqd::{BiHashMap, IdOrdMap};
use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd};
use rustix::{ use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid};
buffer::spare_capacity,
fs::{FileType, Stat},
net::SocketFlags,
process::Uid,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::StreamDeserializer; use serde_json::StreamDeserializer;
@ -122,7 +117,6 @@ pub struct Daemon {
poller: Poll, poller: Poll,
//fd_error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
fd_info: IdOrdMap<FdInfo>, fd_info: IdOrdMap<FdInfo>,
// Bijective mapping of [`mio::Token`]s to [`RawFd`]s. // Bijective mapping of [`mio::Token`]s to [`RawFd`]s.
@ -135,36 +129,46 @@ pub struct Daemon {
/// `tokfd` handling. /// `tokfd` handling.
impl Daemon { impl Daemon {
fn fd_error_pop(&mut self, fd: RawFd) -> Option<IoError> { fn fd_error_pop(&mut self, fd: RawFd) -> Option<IoError> {
self.fd_info let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| {
.get_mut(&fd) if let Ok(name) = FdInfo::guess_name(fd) {
.unwrap_or_else(|| { panic!(
if let Ok(name) = FdInfo::guess_name(fd) { "tried to pop error for unknown fd {fd} ({})",
panic!( name.to_string_lossy(),
"tried to pop error for unknown fd {fd} ({})", );
name.to_string_lossy(), }
); panic!("tried to pop error for unknown fd {fd}")
} });
panic!("tried to pop error for unknown fd {fd}") info.error_buffer.pop_front().tap_some(|e| {
}) trace!("Popping error for {}: {e}", info.display());
.error_buffer })
.pop_front()
} }
fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> { fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> {
self.fd_info let mut info = self
.fd_info
.get_mut(&fd) .get_mut(&fd)
.unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")) .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}"));
.error_buffer trace!("Pushing error for {}: {}", info.display(), error);
.try_push_back(error) info.error_buffer.try_push_back(error)
} }
fn register(&mut self, fd: RawFd) -> Token { fn main_fd_info(&self) -> &FdInfo {
self.register_with_kind(fd, FdKind::Unknown) self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| {
unreachable!(
"Main daemon fd {:?} was not registered with fd_info",
self.fd,
)
})
} }
fn register_with_kind(&mut self, fd: RawFd, kind: FdKind) -> Token { fn register(&mut self, fd: RawFd, kind: FdKind) -> Token {
let token = next_token(); let token = next_token();
debug!(
"Registering new {} FdInfo for {fd} with token {token:?}",
kind.name_str(),
);
self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap(); self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap();
self.tokfd self.tokfd
@ -180,6 +184,21 @@ impl Daemon {
token token
} }
fn deregister(&mut self, fd: RawFd) {
let info = self
.fd_info
.remove(&fd)
.unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}"));
debug!("Unregistering fd {}; calling close()", info.display());
unsafe { rustix::io::close(fd) };
let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) };
debug_assert_eq!(res, -1);
debug_assert_eq!(
IoError::last_os_error().raw_os_error(),
Some(Errno::BADF.raw_os_error()),
);
}
fn fd_for_token(&self, token: Token) -> Option<RawFd> { fn fd_for_token(&self, token: Token) -> Option<RawFd> {
self.tokfd self.tokfd
.get1(&token) .get1(&token)
@ -224,10 +243,8 @@ impl Daemon {
fd, fd,
path, path,
poller, poller,
//poll_error_buffer: Default::default(),
fd_info, fd_info,
tokfd: Default::default(), tokfd: Default::default(),
//fd_error_buffer: Default::default(),
cmd_buffer: Vec::with_capacity(1024), cmd_buffer: Vec::with_capacity(1024),
next_timeout: TIMEOUT_NEVER, next_timeout: TIMEOUT_NEVER,
} }
@ -258,14 +275,14 @@ impl Daemon {
Err(e) => { Err(e) => {
warn!( warn!(
"failed binding AF_UNIX socket at {}: {e}; trying elsewhere", "failed binding AF_UNIX socket at {}: {e}; trying elsewhere",
preferred.display() preferred.display(),
); );
let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); let fallback = TMPDIR.join("dynix.sock").into_boxed_path();
Self::from_unix_socket_path(&fallback).tap_err(|e| { Self::from_unix_socket_path(&fallback).tap_err(|e| {
error!( error!(
"failed binding AF_UNIX socket at {}: {e}", "failed binding AF_UNIX socket at {}: {e}",
fallback.display() fallback.display(),
) )
})? })?
}, },
@ -301,11 +318,6 @@ impl Daemon {
/// Private helpers. /// Private helpers.
impl Daemon { impl Daemon {
fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> {
//let fd = match fd.as_ref() {
// Some(fd) => fd.as_fd(),
// None => self.fd.as_fd(),
//};
if self.cmd_buffer.len() == self.cmd_buffer.capacity() { if self.cmd_buffer.len() == self.cmd_buffer.capacity() {
self.cmd_buffer.reserve(1024); self.cmd_buffer.reserve(1024);
} }
@ -326,20 +338,18 @@ impl Daemon {
}, },
Err(e) => { Err(e) => {
warn!("error deserializing command: {e}"); warn!("error deserializing command: {e}");
debug!("command buffer was: {}", self.cmd_buffer.as_bstr()); debug!("command buffer was: {:?}", self.cmd_buffer.as_bstr());
//warn!("error count for fd {fd:?}: {}", self.fd_error_buffer.len());
self.cmd_buffer.clear(); self.cmd_buffer.clear();
// Don't propagate the error unless we have too many. // Don't propagate the error unless we have too many.
self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| {
error!("Accumulated too many errors for daemon fd {fd:?}: {e}") error!("Accumulated too many errors for daemon fd {fd:?}: {e}")
})?; })?;
//self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| {
// error!("Accumulated too many errors for daemon fd {fd:?}: {e}")
//})?;
return Ok(()); return Ok(());
}, },
}; };
debug!("got cmd: {cmd:?}"); debug!("got cmd: {cmd:?}");
let _ = rustix::io::write(fd, b"");
info!("dispatching command");
} }
self.cmd_buffer.clear(); self.cmd_buffer.clear();
@ -377,6 +387,14 @@ impl Daemon {
let mut events = Events::with_capacity(1024); let mut events = Events::with_capacity(1024);
loop { loop {
if tracing::enabled!(tracing::Level::DEBUG) {
//
trace!("Daemon loop iteration, with file descriptors: ");
for info in &self.fd_info {
trace!("- {}", info.display());
}
}
if let Some(timeout) = self.next_timeout { if let Some(timeout) = self.next_timeout {
debug!( debug!(
"epoll_wait() with a timeout: {}", "epoll_wait() with a timeout: {}",
@ -386,15 +404,25 @@ impl Daemon {
match self.poller.poll(&mut events, self.next_timeout.take()) { match self.poller.poll(&mut events, self.next_timeout.take()) {
Ok(_) => { Ok(_) => {
trace!(
"mio::Poller::poll() got events: {:?}",
events.iter().size_hint().0,
);
if events.is_empty() { if events.is_empty() {
warn!("timeout expired"); warn!("timeout expired");
self.cmd_buffer.clear(); self.cmd_buffer.clear();
} else { } else {
//let _ = self.poll_error_buffer.pop_front();
let _ = self.fd_error_pop(self.poller.as_raw_fd()); let _ = self.fd_error_pop(self.poller.as_raw_fd());
} }
}, },
Err(e) if e.kind() == IoErrorKind::Interrupted => {
// EINTR is silly.
continue;
},
Err(e) => { Err(e) => {
if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) {
unreachable!("EBADF on poller fd; IO safety violation?");
}
warn!("mio Poll::poll() error: {e}"); warn!("mio Poll::poll() error: {e}");
self.fd_error_push(self.poller.as_raw_fd(), e) self.fd_error_push(self.poller.as_raw_fd(), e)
.tap_err(|e| { .tap_err(|e| {
@ -404,14 +432,10 @@ impl Daemon {
} }
for event in &events { for event in &events {
trace!("event is {event:?}");
match event.token() { match event.token() {
DAEMON => { DAEMON => {
let is_sock = rustix::fs::fstat(&self.fd) let is_sock = self.main_fd_info().kind == FdKind::Socket;
.map(|Stat { st_mode, .. }| st_mode)
.map(FileType::from_raw_mode)
.map(FileType::is_socket)
.unwrap_or(false);
if !is_sock { if !is_sock {
// SAFETY: oh boy: disjoint borrows with extra steps. // SAFETY: oh boy: disjoint borrows with extra steps.
let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) };
@ -438,12 +462,6 @@ impl Daemon {
self.fd self.fd
) )
})?; })?;
//self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| {
// error!(
// "Accumulated too many errors for daemon fd {:?}: {e}",
// self.fd,
// )
//})?;
continue; continue;
}, },
}; };
@ -451,7 +469,7 @@ impl Daemon {
// Add this stream to our poll interest list. // Add this stream to our poll interest list.
// NOTE: `stream_fd` is now effectively `ManuallyDrop`. // NOTE: `stream_fd` is now effectively `ManuallyDrop`.
let stream_fd = stream_fd.into_raw_fd(); let stream_fd = stream_fd.into_raw_fd();
let _token = self.register(stream_fd); let _token = self.register(stream_fd, FdKind::SockStream);
// Wait for the next poll to handle. // Wait for the next poll to handle.
}, },
@ -461,9 +479,13 @@ impl Daemon {
unreachable!("tried to get fd for no-existent token? {other_token:?}") unreachable!("tried to get fd for no-existent token? {other_token:?}")
}); });
// SAFETY: oh boy. if event.is_read_closed() {
let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; self.deregister(stream_fd);
self.read_cmd(&stream_fd).unwrap(); } else {
// SAFETY: oh boy.
let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) };
self.read_cmd(&stream_fd).unwrap();
}
}, },
} }
} }
@ -473,14 +495,6 @@ impl Daemon {
impl Drop for Daemon { impl Drop for Daemon {
fn drop(&mut self) { fn drop(&mut self) {
//let probably_synthetic = self
// .path
// .to_str()
// .map(|p| p.starts_with("«") && p.ends_with("»"))
// .unwrap_or(false);
//if probably_synthetic {
// return;
//}
if let Some(path) = self.path.as_deref() { if let Some(path) = self.path.as_deref() {
let _ = rustix::fs::unlink(path); let _ = rustix::fs::unlink(path);
} }

View file

@ -67,6 +67,10 @@ impl FdInfo {
self.name.get().unwrap_or_else(|| unreachable!()) self.name.get().unwrap_or_else(|| unreachable!())
} }
pub fn display(&self) -> FdInfoDisplay<'_> {
FdInfoDisplay { inner: self }
}
} }
impl IdOrdItem for FdInfo { impl IdOrdItem for FdInfo {
@ -79,6 +83,27 @@ impl IdOrdItem for FdInfo {
iddqd::id_upcast!(); iddqd::id_upcast!();
} }
#[derive(Debug)]
pub struct FdInfoDisplay<'a> {
inner: &'a FdInfo,
}
impl<'a> Display for FdInfoDisplay<'a> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(
f,
"{} fd {} ({})",
self.inner.kind.name_str(),
self.inner.fd,
self.inner.name().to_string_lossy(),
)?;
if !self.inner.error_buffer.is_empty() {
write!(f, "; with errors: {}", self.inner.error_buffer.len())?;
}
Ok(())
}
}
#[derive(Copy)] #[derive(Copy)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive] #[non_exhaustive]

View file

@ -35,7 +35,9 @@ pub(crate) mod prelude {
pub use bstr::ByteSlice; pub use bstr::ByteSlice;
pub use tap::{Pipe, Tap, TapFallible}; pub use rustix::io::Errno;
pub use tap::{Pipe, Tap, TapFallible, TapOptional};
pub use tracing::{Level, debug, error, info, trace, warn}; pub use tracing::{Level, debug, error, info, trace, warn};