track poller and daemon fds like the rest

This commit is contained in:
Qyriad 2026-03-19 20:44:57 +01:00
parent aee8dcb31a
commit b11627d030
3 changed files with 271 additions and 82 deletions

View file

@ -1,13 +1,15 @@
use std::{ use std::{
env, io, env, io,
ops::Deref, ops::Deref,
os::fd::{AsFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd},
sync::LazyLock, sync::{
LazyLock,
atomic::{AtomicUsize, Ordering},
},
time::Duration, time::Duration,
}; };
use circular_buffer::CircularBuffer; use iddqd::{BiHashMap, IdOrdMap};
use iddqd::BiHashMap;
use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd};
@ -21,7 +23,10 @@ use rustix::{
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::StreamDeserializer; use serde_json::StreamDeserializer;
use crate::prelude::*; use crate::{
daemon_tokfd::{FdInfo, FdKind},
prelude::*,
};
use crate::{OwnedFdWithFlags, TokenFd}; use crate::{OwnedFdWithFlags, TokenFd};
@ -98,12 +103,27 @@ pub enum DaemonCmd {
const TIMEOUT_NEVER: Option<Duration> = None; const TIMEOUT_NEVER: Option<Duration> = None;
static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1);
fn next_token() -> Token {
let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst);
// If the increment wrapped to 0, then we just increment it again.
if tok == 0 {
return Token(NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst));
}
Token(tok)
}
#[derive(Debug)] #[derive(Debug)]
pub struct Daemon { pub struct Daemon {
fd: OwnedFdWithFlags, fd: OwnedFdWithFlags,
path: Box<OsStr>, path: Option<Box<Path>>,
poll_error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
fd_error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>, poller: Poll,
//fd_error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
fd_info: IdOrdMap<FdInfo>,
// Bijective mapping of [`mio::Token`]s to [`RawFd`]s. // Bijective mapping of [`mio::Token`]s to [`RawFd`]s.
tokfd: BiHashMap<TokenFd>, tokfd: BiHashMap<TokenFd>,
@ -114,9 +134,52 @@ pub struct Daemon {
/// `tokfd` handling. /// `tokfd` handling.
impl Daemon { impl Daemon {
//fn register(&mut self, token: Token, fd: RawFd) { fn fd_error_pop(&mut self, fd: RawFd) -> Option<IoError> {
// self.insert_unique self.fd_info
//} .get_mut(&fd)
.unwrap_or_else(|| {
if let Ok(name) = FdInfo::guess_name(fd) {
panic!(
"tried to pop error for unknown fd {fd} ({})",
name.to_string_lossy(),
);
}
panic!("tried to pop error for unknown fd {fd}")
})
.error_buffer
.pop_front()
}
fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> {
self.fd_info
.get_mut(&fd)
.unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}"))
.error_buffer
.try_push_back(error)
}
fn register(&mut self, fd: RawFd) -> Token {
self.register_with_kind(fd, FdKind::Unknown)
}
fn register_with_kind(&mut self, fd: RawFd, kind: FdKind) -> Token {
let token = next_token();
self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap();
self.tokfd
.insert_unique(TokenFd { token, fd })
.unwrap_or_else(|e| todo!("{e}"));
let mut source = SourceFd(&fd);
self.poller
.registry()
.register(&mut source, token, Interest::READABLE)
.unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}"));
token
}
fn fd_for_token(&self, token: Token) -> Option<RawFd> { fn fd_for_token(&self, token: Token) -> Option<RawFd> {
self.tokfd self.tokfd
.get1(&token) .get1(&token)
@ -133,20 +196,38 @@ impl Daemon {
} }
impl Daemon { impl Daemon {
pub fn new(fd: OwnedFd, name_or_path: Box<OsStr>) -> Self { pub fn new(fd: OwnedFd, kind: FdKind, name: Option<Box<OsStr>>) -> Self {
let mut fd_info: IdOrdMap<FdInfo> = Default::default();
// Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM.
// If any of those are the case, we're screwed anyway.
let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}"));
// Make sure we register the poller in `fd_info`, so we can keep track of its errors.
fd_info
.insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller))
.unwrap_or_else(|e| unreachable!("{e}"));
let fd = OwnedFdWithFlags::new_with_fallback(fd); let fd = OwnedFdWithFlags::new_with_fallback(fd);
debug!( fd_info
"opened daemon to {} file descriptor {fd:?}", .insert_unique(FdInfo::new(fd.as_raw_fd(), kind))
name_or_path.display(), .unwrap_or_else(|e| unreachable!("{e}"));
);
debug!("opened daemon to {:?} file descriptor {fd:?}", name);
let path = match &name {
Some(name) => Some(PathBuf::from(name).into_boxed_path()),
None => None,
};
Self { Self {
fd, fd,
path: name_or_path, path,
poll_error_buffer: Default::default(), poller,
//poll_error_buffer: Default::default(),
fd_info,
tokfd: Default::default(), tokfd: Default::default(),
fd_error_buffer: Default::default(), //fd_error_buffer: Default::default(),
cmd_buffer: Vec::with_capacity(1024), cmd_buffer: Vec::with_capacity(1024),
next_timeout: TIMEOUT_NEVER, next_timeout: TIMEOUT_NEVER,
} }
@ -162,7 +243,7 @@ impl Daemon {
rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap();
let path: Box<OsStr> = path.to_path_buf().into_boxed_path().into_boxed_os_str(); let path: Box<OsStr> = path.to_path_buf().into_boxed_path().into_boxed_os_str();
Ok(Self::new(listener_owned_fd, path)) Ok(Self::new(listener_owned_fd, FdKind::Socket, Some(path)))
} }
pub fn open_default_socket() -> Result<Self, IoError> { pub fn open_default_socket() -> Result<Self, IoError> {
@ -203,7 +284,7 @@ impl Daemon {
.try_clone_to_owned() .try_clone_to_owned()
.expect("dynix daemon could not open stdin; try a Unix socket?"); .expect("dynix daemon could not open stdin; try a Unix socket?");
Self::new(fd, Box::from(OsStr::new("«stdin»"))) Self::new(fd, FdKind::File, None)
} }
//pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self {
@ -217,15 +298,13 @@ impl Daemon {
} }
} }
const ERROR_BUFFER_LEN: usize = 8;
/// Private helpers. /// Private helpers.
impl Daemon { impl Daemon {
fn read_cmd(&mut self, fd: Option<&dyn AsFd>) -> Result<(), IoError> { fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> {
let fd = match fd.as_ref() { //let fd = match fd.as_ref() {
Some(fd) => fd.as_fd(), // Some(fd) => fd.as_fd(),
None => self.fd.as_fd(), // None => self.fd.as_fd(),
}; //};
if self.cmd_buffer.len() == self.cmd_buffer.capacity() { if self.cmd_buffer.len() == self.cmd_buffer.capacity() {
self.cmd_buffer.reserve(1024); self.cmd_buffer.reserve(1024);
@ -247,12 +326,16 @@ impl Daemon {
}, },
Err(e) => { Err(e) => {
warn!("error deserializing command: {e}"); warn!("error deserializing command: {e}");
warn!("error count: {}", self.fd_error_buffer.len()); debug!("command buffer was: {}", self.cmd_buffer.as_bstr());
//warn!("error count for fd {fd:?}: {}", self.fd_error_buffer.len());
self.cmd_buffer.clear(); self.cmd_buffer.clear();
// Don't propagate the error unless we have too many. // Don't propagate the error unless we have too many.
self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| {
error!("Accumulated too many errors for daemon fd {fd:?}: {e}") error!("Accumulated too many errors for daemon fd {fd:?}: {e}")
})?; })?;
//self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| {
// error!("Accumulated too many errors for daemon fd {fd:?}: {e}")
//})?;
return Ok(()); return Ok(());
}, },
}; };
@ -266,6 +349,17 @@ impl Daemon {
pub(crate) fn enter_loop(&mut self) -> Result<Option<()>, IoError> { pub(crate) fn enter_loop(&mut self) -> Result<Option<()>, IoError> {
let raw_fd = self.fd.as_raw_fd(); let raw_fd = self.fd.as_raw_fd();
if cfg!(debug_assertions) {
assert!(
self.fd_info.contains_key(&raw_fd),
"we should know about daemon fd {raw_fd}",
);
assert!(
self.fd_info.contains_key(&self.poller.as_raw_fd()),
"we should know about poller fd {}",
self.poller.as_raw_fd(),
);
}
let mut daemon_source = SourceFd(&raw_fd); let mut daemon_source = SourceFd(&raw_fd);
const DAEMON: Token = Token(0); const DAEMON: Token = Token(0);
self.tokfd self.tokfd
@ -274,16 +368,9 @@ impl Daemon {
fd: raw_fd, fd: raw_fd,
}) })
.unwrap(); .unwrap();
let mut next_token_number: usize = 1;
let mut next_token = || -> Token {
let t = Token(next_token_number);
next_token_number = next_token_number.saturating_add(1);
t
};
let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); self.poller
.registry()
poll.registry()
.register(&mut daemon_source, DAEMON, Interest::READABLE) .register(&mut daemon_source, DAEMON, Interest::READABLE)
.unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}"));
@ -297,18 +384,20 @@ impl Daemon {
); );
} }
match poll.poll(&mut events, self.next_timeout.take()) { match self.poller.poll(&mut events, self.next_timeout.take()) {
Ok(_) => { Ok(_) => {
if events.is_empty() { if events.is_empty() {
warn!("timeout expired"); warn!("timeout expired");
self.cmd_buffer.clear(); self.cmd_buffer.clear();
} else { } else {
let _ = self.poll_error_buffer.pop_front(); //let _ = self.poll_error_buffer.pop_front();
let _ = self.fd_error_pop(self.poller.as_raw_fd());
} }
}, },
Err(e) => { Err(e) => {
warn!("mio Poll::poll() error: {e}"); warn!("mio Poll::poll() error: {e}");
self.poll_error_buffer.try_push_back(e).tap_err(|e| { self.fd_error_push(self.poller.as_raw_fd(), e)
.tap_err(|e| {
error!("accumulated too many errors for mio::Poll::poll(): {e}") error!("accumulated too many errors for mio::Poll::poll(): {e}")
})?; })?;
}, },
@ -324,7 +413,9 @@ impl Daemon {
.unwrap_or(false); .unwrap_or(false);
if !is_sock { if !is_sock {
self.read_cmd(None).unwrap(); // SAFETY: oh boy: disjoint borrows with extra steps.
let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) };
self.read_cmd(&file_fd).unwrap();
continue; continue;
} }
@ -340,26 +431,27 @@ impl Daemon {
}, },
Err(e) => { Err(e) => {
error!("accept4 on daemon socket failed: {e}"); error!("accept4 on daemon socket failed: {e}");
self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { self.fd_error_push(self.fd.as_raw_fd(), e.into())
.tap_err(|e| {
error!( error!(
"Accumulated too many errors for daemon fd {:?}: {e}", "Accumulated too many errors for daemon fd {:?}: {e}",
self.fd self.fd
) )
})?; })?;
//self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| {
// error!(
// "Accumulated too many errors for daemon fd {:?}: {e}",
// self.fd,
// )
//})?;
continue; continue;
}, },
}; };
// Add this stream to our poll interest list. // Add this stream to our poll interest list.
let mut stream_fd = stream_fd.into_raw_fd(); // NOTE: `stream_fd` is now effectively `ManuallyDrop`.
let token = next_token(); let stream_fd = stream_fd.into_raw_fd();
self.tokfd let _token = self.register(stream_fd);
.insert_unique((token, stream_fd).into())
.unwrap_or_else(|e| unreachable!("? {e}"));
let mut source = SourceFd(&mut stream_fd);
poll.registry()
.register(&mut source, token, Interest::READABLE)
.unwrap();
// Wait for the next poll to handle. // Wait for the next poll to handle.
}, },
@ -371,33 +463,26 @@ impl Daemon {
// SAFETY: oh boy. // SAFETY: oh boy.
let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) };
self.read_cmd(Some(&stream_fd)).unwrap(); self.read_cmd(&stream_fd).unwrap();
}, },
} }
} }
} }
} }
//fn get_fallback_fd_name<Fd: AsRawFd>(fd: Fd) -> Option<Box<OsStr>> {
// let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string());
//
// fs_err::read_link(dev_fd_path)
// .map(PathBuf::into_os_string)
// .map(OsString::into_boxed_os_str)
// .ok()
//}
} }
impl Drop for Daemon { impl Drop for Daemon {
fn drop(&mut self) { fn drop(&mut self) {
let probably_synthetic = self //let probably_synthetic = self
.path // .path
.to_str() // .to_str()
.map(|p| p.starts_with("«") && p.ends_with("»")) // .map(|p| p.starts_with("«") && p.ends_with("»"))
.unwrap_or(false); // .unwrap_or(false);
if probably_synthetic { //if probably_synthetic {
return; // return;
} //}
let _ = rustix::fs::unlink(&*self.path); if let Some(path) = self.path.as_deref() {
let _ = rustix::fs::unlink(path);
}
} }
} }

View file

@ -6,8 +6,6 @@ use std::{
}, },
}; };
use iddqd::BiHashItem;
use mio::Token;
use rustix::{ use rustix::{
fs::{OFlags, fcntl_getfl, fcntl_setfl}, fs::{OFlags, fcntl_getfl, fcntl_setfl},
io::Errno, io::Errno,

View file

@ -1,8 +1,114 @@
use std::os::fd::RawFd; use std::{os::fd::RawFd, sync::OnceLock};
use iddqd::BiHashItem; use circular_buffer::CircularBuffer;
use iddqd::{BiHashItem, IdOrdItem};
use mio::Token; use mio::Token;
use crate::prelude::*;
const ERROR_BUFFER_LEN: usize = 8;
#[derive(Debug)]
pub struct FdInfo {
pub fd: RawFd,
pub kind: FdKind,
pub name: OnceLock<Box<OsStr>>,
pub error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
}
impl FdInfo {
pub fn new<Fd: AsRawFd>(fd: Fd, kind: FdKind) -> Self {
Self {
fd: fd.as_raw_fd(),
kind,
name: Default::default(),
error_buffer: Default::default(),
}
}
pub fn new_with_name<Fd: AsRawFd>(fd: Fd, kind: FdKind, name: Box<OsStr>) -> Self {
Self {
fd: fd.as_raw_fd(),
kind,
name: OnceLock::from(name),
error_buffer: Default::default(),
}
}
}
impl FdInfo {
pub(crate) fn guess_name<Fd: AsRawFd>(fd: Fd) -> Result<Box<OsStr>, IoError> {
let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string());
fs_err::read_link(dev_fd_path)
.map(PathBuf::into_os_string)
.map(OsString::into_boxed_os_str)
}
pub fn name(&self) -> &OsStr {
if let Some(name) = self.name.get() {
return name;
}
match Self::guess_name(self.fd) {
Ok(name) => {
let prev = self.name.set(Box::from(name));
debug_assert_eq!(prev, Ok(()));
},
Err(e) => {
warn!(
"can't read link for {} /dev/fd/{}: {e}",
self.kind.name_str(),
self.fd,
);
return OsStr::new("«unknown»");
},
}
self.name.get().unwrap_or_else(|| unreachable!())
}
}
impl IdOrdItem for FdInfo {
type Key<'a> = &'a RawFd;
fn key(&self) -> &RawFd {
&self.fd
}
iddqd::id_upcast!();
}
#[derive(Copy)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
pub enum FdKind {
File,
Socket,
SockStream,
Poller,
Unknown,
}
impl FdKind {
pub fn name_str(self) -> &'static str {
use FdKind::*;
match self {
File => "file",
Socket => "socket",
SockStream => "socket stream",
Poller => "poller",
Unknown => "«unknown»",
}
}
}
impl Default for FdKind {
fn default() -> FdKind {
FdKind::Unknown
}
}
#[derive(Copy)] #[derive(Copy)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TokenFd { pub struct TokenFd {