From aee8dcb31a0e531273ede4b08a8ec92ade380fd5 Mon Sep 17 00:00:00 2001 From: Qyriad Date: Thu, 19 Mar 2026 19:45:09 +0100 Subject: [PATCH 1/4] more reasonable tracking of open FDs --- Cargo.lock | 49 +++++++++++++++ Cargo.toml | 3 + src/daemon.rs | 146 +++++++++++++++++++++----------------------- src/daemon_io.rs | 2 + src/daemon_tokfd.rs | 38 ++++++++++++ src/lib.rs | 2 + 6 files changed, 163 insertions(+), 77 deletions(-) create mode 100644 src/daemon_tokfd.rs diff --git a/Cargo.lock b/Cargo.lock index fabdb82..64fed08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anstream" version = "0.6.21" @@ -67,6 +73,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bitflags" version = "2.11.0" @@ -187,6 +199,7 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" name = "dynix" version = "0.1.0" dependencies = [ + "bimap", "bitflags", "bstr", "circular-buffer", @@ -195,6 +208,8 @@ dependencies = [ "const-str", "displaydoc", "fs-err", + "humantime", + "iddqd", "itertools", "libc", "mio", @@ -232,6 +247,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "fs-err" version = "3.3.0" @@ -246,6 +267,9 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", +] [[package]] name = "heck" @@ -259,6 +283,25 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + +[[package]] +name = "iddqd" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b215e67ed1d1a4b1702acd787c487d16e4c977c5dcbcc4587bdb5ea26b6ce06" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", + "hashbrown", + "rustc-hash", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -513,6 +556,12 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustix" version = "1.1.4" diff --git a/Cargo.toml b/Cargo.toml index ed6bd37..495e9e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ regex-full = ["dep:regex"] regex-lite = ["dep:regex-lite"] [dependencies] +bimap = "0.6.3" bitflags = { version = "2.11.0", features = ["std"] } bstr = "1.12.1" circular-buffer = "1.2.0" @@ -30,6 +31,8 @@ command-error = "0.8.0" const-str = "1.1.0" displaydoc = "0.2.5" fs-err = "3.2.2" +humantime = "2.3.0" +iddqd = "0.3.17" itertools = "0.14.0" libc = { version = "0.2.180", features = ["extra_traits"] } #macro_rules_attribute = { version = "0.2.2", features = ["better-docs", "verbose-expansions"] } diff --git a/src/daemon.rs b/src/daemon.rs index 506527a..f48b827 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -7,19 +7,15 @@ use std::{ }; use circular_buffer::CircularBuffer; +use iddqd::BiHashMap; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; use rustix::{ - buffer::{Buffer, spare_capacity}, - fs::{FileType, OFlags, Stat, fcntl_setfl}, - io::Errno, + buffer::spare_capacity, + fs::{FileType, Stat}, net::SocketFlags, process::Uid, - termios::{ - ControlModes, InputModes, LocalModes, OptionalActions, OutputModes, SpecialCodeIndex, - SpecialCodes, Termios, tcgetattr, tcsetattr, - }, }; use serde::{Deserialize, Serialize}; @@ -27,7 +23,7 @@ use serde_json::StreamDeserializer; use crate::prelude::*; -use crate::OwnedFdWithFlags; +use crate::{OwnedFdWithFlags, TokenFd}; pub static UID: LazyLock = LazyLock::new(|| rustix::process::getuid()); @@ -109,10 +105,33 @@ pub struct Daemon { poll_error_buffer: CircularBuffer, fd_error_buffer: CircularBuffer, + // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. + tokfd: BiHashMap, + cmd_buffer: Vec, next_timeout: Option, } +/// `tokfd` handling. +impl Daemon { + //fn register(&mut self, token: Token, fd: RawFd) { + // self.insert_unique + //} + fn fd_for_token(&self, token: Token) -> Option { + self.tokfd + .get1(&token) + .map(|TokenFd { fd, .. }| fd) + .copied() + } + + fn token_for_fd(&self, fd: RawFd) -> Option { + self.tokfd + .get2(&fd) + .map(|TokenFd { token, .. }| token) + .copied() + } +} + impl Daemon { pub fn new(fd: OwnedFd, name_or_path: Box) -> Self { let fd = OwnedFdWithFlags::new_with_fallback(fd); @@ -126,6 +145,7 @@ impl Daemon { fd, path: name_or_path, poll_error_buffer: Default::default(), + tokfd: Default::default(), fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, @@ -137,9 +157,8 @@ impl Daemon { let _ = rustix::fs::unlink(path); let listener = UnixListener::bind(path) .tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?; - //let (stream, _addr) = listener.accept().unwrap_or_else(|e| todo!("error: {e}")); - //warn!("stream is: {stream:?} ({})", stream.as_raw_fd()); let listener_owned_fd = OwnedFd::from(listener); + // FIXME: should we KEEP_ALIVE? rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); @@ -249,8 +268,19 @@ impl Daemon { let raw_fd = self.fd.as_raw_fd(); let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); + self.tokfd + .insert_unique(TokenFd { + token: DAEMON, + fd: raw_fd, + }) + .unwrap(); let mut next_token_number: usize = 1; - let mut extra_tokens: Vec<(Token, RawFd)> = Default::default(); + let mut next_token = || -> Token { + let t = Token(next_token_number); + next_token_number = next_token_number.saturating_add(1); + t + }; + let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); poll.registry() @@ -259,27 +289,14 @@ impl Daemon { let mut events = Events::with_capacity(1024); - let example = DaemonCmd::Append { - //name: ConvenientAttrPath::Dotted(Box::from( - // "services.gotosocial.settings.application-name", - //)), - //name: ConvenientAttrPath::Split(Box::from([ - // Box::from("services"), - // Box::from("gotosocial"), - //])), - name: ConvenientAttrPath::clone_from_split(&[ - "services", - "gotosocial", - "settings", - "application-name", - ]), - value: Box::from("foo"), - }; - - //let example_as_json = serde_json::to_string_pretty(&example).unwrap(); - //info!("{}", example_as_json); - loop { + if let Some(timeout) = self.next_timeout { + debug!( + "epoll_wait() with a timeout: {}", + humantime::format_duration(timeout), + ); + } + match poll.poll(&mut events, self.next_timeout.take()) { Ok(_) => { if events.is_empty() { @@ -333,50 +350,25 @@ impl Daemon { }, }; + // Add this stream to our poll interest list. let mut stream_fd = stream_fd.into_raw_fd(); - - // And add this stream to our poll interest list. - if let Err(idx) = - extra_tokens.binary_search_by_key(&stream_fd, |(_, fd)| fd.as_raw_fd()) - { - let token = Token(next_token_number); - extra_tokens.insert(idx, (token, stream_fd)); - next_token_number = next_token_number.wrapping_add(1); - let mut source = SourceFd(&mut stream_fd); - poll.registry() - .register(&mut source, token, Interest::READABLE) - .unwrap(); - } + let token = next_token(); + self.tokfd + .insert_unique((token, stream_fd).into()) + .unwrap_or_else(|e| unreachable!("? {e}")); + let mut source = SourceFd(&mut stream_fd); + poll.registry() + .register(&mut source, token, Interest::READABLE) + .unwrap(); // Wait for the next poll to handle. - - //match self.read_cmd(Some(&stream_fd)) { - // Ok(()) => (), - // Err(e) if e.kind() == IoErrorKind::WouldBlock => { - // continue; - // }, - // Err(e) => { - // self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - // error!( - // "Accumulated too many errors for fd {:?} {e}", - // &stream_fd, - // ) - // })?; - // }, - //} }, other_token => { - let index = match extra_tokens - .binary_search_by_key(&other_token, |&(t, _)| t) - { - Ok(index) => index, - Err(index) => unreachable!( - "tried to get index ({index}) for non-existent token {other_token:?}" - ), - }; - // This must be a stream fd. - let (_, stream_fd) = extra_tokens[index]; + let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| { + unreachable!("tried to get fd for no-existent token? {other_token:?}") + }); + // SAFETY: oh boy. let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; self.read_cmd(Some(&stream_fd)).unwrap(); @@ -386,14 +378,14 @@ impl Daemon { } } - fn get_fallback_fd_name(fd: Fd) -> Option> { - let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); - - fs_err::read_link(dev_fd_path) - .map(PathBuf::into_os_string) - .map(OsString::into_boxed_os_str) - .ok() - } + //fn get_fallback_fd_name(fd: Fd) -> Option> { + // let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); + // + // fs_err::read_link(dev_fd_path) + // .map(PathBuf::into_os_string) + // .map(OsString::into_boxed_os_str) + // .ok() + //} } impl Drop for Daemon { diff --git a/src/daemon_io.rs b/src/daemon_io.rs index 891a518..a1caa97 100644 --- a/src/daemon_io.rs +++ b/src/daemon_io.rs @@ -6,6 +6,8 @@ use std::{ }, }; +use iddqd::BiHashItem; +use mio::Token; use rustix::{ fs::{OFlags, fcntl_getfl, fcntl_setfl}, io::Errno, diff --git a/src/daemon_tokfd.rs b/src/daemon_tokfd.rs new file mode 100644 index 0000000..008c88d --- /dev/null +++ b/src/daemon_tokfd.rs @@ -0,0 +1,38 @@ +use std::os::fd::RawFd; + +use iddqd::BiHashItem; +use mio::Token; + +#[derive(Copy)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct TokenFd { + pub token: Token, + pub fd: RawFd, +} + +impl BiHashItem for TokenFd { + type K1<'a> = Token; + type K2<'a> = RawFd; + + fn key1(&self) -> Token { + self.token + } + + fn key2(&self) -> RawFd { + self.fd + } + + iddqd::bi_upcast!(); +} + +impl From for (Token, RawFd) { + fn from(TokenFd { token, fd }: TokenFd) -> (Token, RawFd) { + (token, fd) + } +} + +impl From<(Token, RawFd)> for TokenFd { + fn from((token, fd): (Token, RawFd)) -> TokenFd { + TokenFd { token, fd } + } +} diff --git a/src/lib.rs b/src/lib.rs index e0211ec..d5d53e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,8 @@ mod daemon; pub use daemon::Daemon; mod daemon_io; pub use daemon_io::OwnedFdWithFlags; +mod daemon_tokfd; +pub(crate) use daemon_tokfd::TokenFd; pub mod line; pub use line::Line; mod nixcmd; From b11627d0305659a43b789c82692404285d89ee47 Mon Sep 17 00:00:00 2001 From: Qyriad Date: Thu, 19 Mar 2026 20:44:57 +0100 Subject: [PATCH 2/4] track poller and daemon fds like the rest --- src/daemon.rs | 241 ++++++++++++++++++++++++++++++-------------- src/daemon_io.rs | 2 - src/daemon_tokfd.rs | 110 +++++++++++++++++++- 3 files changed, 271 insertions(+), 82 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index f48b827..9f4856e 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,13 +1,15 @@ use std::{ env, io, ops::Deref, - os::fd::{AsFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, - sync::LazyLock, + os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd}, + sync::{ + LazyLock, + atomic::{AtomicUsize, Ordering}, + }, time::Duration, }; -use circular_buffer::CircularBuffer; -use iddqd::BiHashMap; +use iddqd::{BiHashMap, IdOrdMap}; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; @@ -21,7 +23,10 @@ use rustix::{ use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; -use crate::prelude::*; +use crate::{ + daemon_tokfd::{FdInfo, FdKind}, + prelude::*, +}; use crate::{OwnedFdWithFlags, TokenFd}; @@ -98,12 +103,27 @@ pub enum DaemonCmd { const TIMEOUT_NEVER: Option = None; +static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1); +fn next_token() -> Token { + let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst); + + // If the increment wrapped to 0, then we just increment it again. + if tok == 0 { + return Token(NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst)); + } + + Token(tok) +} + #[derive(Debug)] pub struct Daemon { fd: OwnedFdWithFlags, - path: Box, - poll_error_buffer: CircularBuffer, - fd_error_buffer: CircularBuffer, + path: Option>, + + poller: Poll, + + //fd_error_buffer: CircularBuffer, + fd_info: IdOrdMap, // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. tokfd: BiHashMap, @@ -114,9 +134,52 @@ pub struct Daemon { /// `tokfd` handling. impl Daemon { - //fn register(&mut self, token: Token, fd: RawFd) { - // self.insert_unique - //} + fn fd_error_pop(&mut self, fd: RawFd) -> Option { + self.fd_info + .get_mut(&fd) + .unwrap_or_else(|| { + if let Ok(name) = FdInfo::guess_name(fd) { + panic!( + "tried to pop error for unknown fd {fd} ({})", + name.to_string_lossy(), + ); + } + panic!("tried to pop error for unknown fd {fd}") + }) + .error_buffer + .pop_front() + } + + fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> { + self.fd_info + .get_mut(&fd) + .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")) + .error_buffer + .try_push_back(error) + } + + fn register(&mut self, fd: RawFd) -> Token { + self.register_with_kind(fd, FdKind::Unknown) + } + + fn register_with_kind(&mut self, fd: RawFd, kind: FdKind) -> Token { + let token = next_token(); + + self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap(); + + self.tokfd + .insert_unique(TokenFd { token, fd }) + .unwrap_or_else(|e| todo!("{e}")); + + let mut source = SourceFd(&fd); + self.poller + .registry() + .register(&mut source, token, Interest::READABLE) + .unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}")); + + token + } + fn fd_for_token(&self, token: Token) -> Option { self.tokfd .get1(&token) @@ -133,20 +196,38 @@ impl Daemon { } impl Daemon { - pub fn new(fd: OwnedFd, name_or_path: Box) -> Self { + pub fn new(fd: OwnedFd, kind: FdKind, name: Option>) -> Self { + let mut fd_info: IdOrdMap = Default::default(); + + // Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM. + // If any of those are the case, we're screwed anyway. + let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}")); + // Make sure we register the poller in `fd_info`, so we can keep track of its errors. + fd_info + .insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller)) + .unwrap_or_else(|e| unreachable!("{e}")); + let fd = OwnedFdWithFlags::new_with_fallback(fd); - debug!( - "opened daemon to {} file descriptor {fd:?}", - name_or_path.display(), - ); + fd_info + .insert_unique(FdInfo::new(fd.as_raw_fd(), kind)) + .unwrap_or_else(|e| unreachable!("{e}")); + + debug!("opened daemon to {:?} file descriptor {fd:?}", name); + + let path = match &name { + Some(name) => Some(PathBuf::from(name).into_boxed_path()), + None => None, + }; Self { fd, - path: name_or_path, - poll_error_buffer: Default::default(), + path, + poller, + //poll_error_buffer: Default::default(), + fd_info, tokfd: Default::default(), - fd_error_buffer: Default::default(), + //fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, } @@ -162,7 +243,7 @@ impl Daemon { rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); - Ok(Self::new(listener_owned_fd, path)) + Ok(Self::new(listener_owned_fd, FdKind::Socket, Some(path))) } pub fn open_default_socket() -> Result { @@ -203,7 +284,7 @@ impl Daemon { .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); - Self::new(fd, Box::from(OsStr::new("«stdin»"))) + Self::new(fd, FdKind::File, None) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { @@ -217,15 +298,13 @@ impl Daemon { } } -const ERROR_BUFFER_LEN: usize = 8; - /// Private helpers. impl Daemon { - fn read_cmd(&mut self, fd: Option<&dyn AsFd>) -> Result<(), IoError> { - let fd = match fd.as_ref() { - Some(fd) => fd.as_fd(), - None => self.fd.as_fd(), - }; + fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { + //let fd = match fd.as_ref() { + // Some(fd) => fd.as_fd(), + // None => self.fd.as_fd(), + //}; if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); @@ -247,12 +326,16 @@ impl Daemon { }, Err(e) => { warn!("error deserializing command: {e}"); - warn!("error count: {}", self.fd_error_buffer.len()); + debug!("command buffer was: {}", self.cmd_buffer.as_bstr()); + //warn!("error count for fd {fd:?}: {}", self.fd_error_buffer.len()); self.cmd_buffer.clear(); // Don't propagate the error unless we have too many. - self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; + //self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + // error!("Accumulated too many errors for daemon fd {fd:?}: {e}") + //})?; return Ok(()); }, }; @@ -266,6 +349,17 @@ impl Daemon { pub(crate) fn enter_loop(&mut self) -> Result, IoError> { let raw_fd = self.fd.as_raw_fd(); + if cfg!(debug_assertions) { + assert!( + self.fd_info.contains_key(&raw_fd), + "we should know about daemon fd {raw_fd}", + ); + assert!( + self.fd_info.contains_key(&self.poller.as_raw_fd()), + "we should know about poller fd {}", + self.poller.as_raw_fd(), + ); + } let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); self.tokfd @@ -274,16 +368,9 @@ impl Daemon { fd: raw_fd, }) .unwrap(); - let mut next_token_number: usize = 1; - let mut next_token = || -> Token { - let t = Token(next_token_number); - next_token_number = next_token_number.saturating_add(1); - t - }; - let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); - - poll.registry() + self.poller + .registry() .register(&mut daemon_source, DAEMON, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); @@ -297,20 +384,22 @@ impl Daemon { ); } - match poll.poll(&mut events, self.next_timeout.take()) { + match self.poller.poll(&mut events, self.next_timeout.take()) { Ok(_) => { if events.is_empty() { warn!("timeout expired"); self.cmd_buffer.clear(); } else { - let _ = self.poll_error_buffer.pop_front(); + //let _ = self.poll_error_buffer.pop_front(); + let _ = self.fd_error_pop(self.poller.as_raw_fd()); } }, Err(e) => { warn!("mio Poll::poll() error: {e}"); - self.poll_error_buffer.try_push_back(e).tap_err(|e| { - error!("accumulated too many errors for mio::Poll::poll(): {e}") - })?; + self.fd_error_push(self.poller.as_raw_fd(), e) + .tap_err(|e| { + error!("accumulated too many errors for mio::Poll::poll(): {e}") + })?; }, } @@ -324,7 +413,9 @@ impl Daemon { .unwrap_or(false); if !is_sock { - self.read_cmd(None).unwrap(); + // SAFETY: oh boy: disjoint borrows with extra steps. + let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; + self.read_cmd(&file_fd).unwrap(); continue; } @@ -340,26 +431,27 @@ impl Daemon { }, Err(e) => { error!("accept4 on daemon socket failed: {e}"); - self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - error!( - "Accumulated too many errors for daemon fd {:?}: {e}", - self.fd - ) - })?; + self.fd_error_push(self.fd.as_raw_fd(), e.into()) + .tap_err(|e| { + error!( + "Accumulated too many errors for daemon fd {:?}: {e}", + self.fd + ) + })?; + //self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + // error!( + // "Accumulated too many errors for daemon fd {:?}: {e}", + // self.fd, + // ) + //})?; continue; }, }; // Add this stream to our poll interest list. - let mut stream_fd = stream_fd.into_raw_fd(); - let token = next_token(); - self.tokfd - .insert_unique((token, stream_fd).into()) - .unwrap_or_else(|e| unreachable!("? {e}")); - let mut source = SourceFd(&mut stream_fd); - poll.registry() - .register(&mut source, token, Interest::READABLE) - .unwrap(); + // NOTE: `stream_fd` is now effectively `ManuallyDrop`. + let stream_fd = stream_fd.into_raw_fd(); + let _token = self.register(stream_fd); // Wait for the next poll to handle. }, @@ -371,33 +463,26 @@ impl Daemon { // SAFETY: oh boy. let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; - self.read_cmd(Some(&stream_fd)).unwrap(); + self.read_cmd(&stream_fd).unwrap(); }, } } } } - - //fn get_fallback_fd_name(fd: Fd) -> Option> { - // let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); - // - // fs_err::read_link(dev_fd_path) - // .map(PathBuf::into_os_string) - // .map(OsString::into_boxed_os_str) - // .ok() - //} } impl Drop for Daemon { fn drop(&mut self) { - let probably_synthetic = self - .path - .to_str() - .map(|p| p.starts_with("«") && p.ends_with("»")) - .unwrap_or(false); - if probably_synthetic { - return; + //let probably_synthetic = self + // .path + // .to_str() + // .map(|p| p.starts_with("«") && p.ends_with("»")) + // .unwrap_or(false); + //if probably_synthetic { + // return; + //} + if let Some(path) = self.path.as_deref() { + let _ = rustix::fs::unlink(path); } - let _ = rustix::fs::unlink(&*self.path); } } diff --git a/src/daemon_io.rs b/src/daemon_io.rs index a1caa97..891a518 100644 --- a/src/daemon_io.rs +++ b/src/daemon_io.rs @@ -6,8 +6,6 @@ use std::{ }, }; -use iddqd::BiHashItem; -use mio::Token; use rustix::{ fs::{OFlags, fcntl_getfl, fcntl_setfl}, io::Errno, diff --git a/src/daemon_tokfd.rs b/src/daemon_tokfd.rs index 008c88d..b5e52bd 100644 --- a/src/daemon_tokfd.rs +++ b/src/daemon_tokfd.rs @@ -1,8 +1,114 @@ -use std::os::fd::RawFd; +use std::{os::fd::RawFd, sync::OnceLock}; -use iddqd::BiHashItem; +use circular_buffer::CircularBuffer; +use iddqd::{BiHashItem, IdOrdItem}; use mio::Token; +use crate::prelude::*; + +const ERROR_BUFFER_LEN: usize = 8; + +#[derive(Debug)] +pub struct FdInfo { + pub fd: RawFd, + pub kind: FdKind, + pub name: OnceLock>, + pub error_buffer: CircularBuffer, +} + +impl FdInfo { + pub fn new(fd: Fd, kind: FdKind) -> Self { + Self { + fd: fd.as_raw_fd(), + kind, + name: Default::default(), + error_buffer: Default::default(), + } + } + + pub fn new_with_name(fd: Fd, kind: FdKind, name: Box) -> Self { + Self { + fd: fd.as_raw_fd(), + kind, + name: OnceLock::from(name), + error_buffer: Default::default(), + } + } +} + +impl FdInfo { + pub(crate) fn guess_name(fd: Fd) -> Result, IoError> { + let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); + + fs_err::read_link(dev_fd_path) + .map(PathBuf::into_os_string) + .map(OsString::into_boxed_os_str) + } + + pub fn name(&self) -> &OsStr { + if let Some(name) = self.name.get() { + return name; + } + + match Self::guess_name(self.fd) { + Ok(name) => { + let prev = self.name.set(Box::from(name)); + debug_assert_eq!(prev, Ok(())); + }, + Err(e) => { + warn!( + "can't read link for {} /dev/fd/{}: {e}", + self.kind.name_str(), + self.fd, + ); + return OsStr::new("«unknown»"); + }, + } + + self.name.get().unwrap_or_else(|| unreachable!()) + } +} + +impl IdOrdItem for FdInfo { + type Key<'a> = &'a RawFd; + + fn key(&self) -> &RawFd { + &self.fd + } + + iddqd::id_upcast!(); +} + +#[derive(Copy)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[non_exhaustive] +pub enum FdKind { + File, + Socket, + SockStream, + Poller, + Unknown, +} + +impl FdKind { + pub fn name_str(self) -> &'static str { + use FdKind::*; + match self { + File => "file", + Socket => "socket", + SockStream => "socket stream", + Poller => "poller", + Unknown => "«unknown»", + } + } +} + +impl Default for FdKind { + fn default() -> FdKind { + FdKind::Unknown + } +} + #[derive(Copy)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct TokenFd { From 16f2649334a77bf1afc67f3955106818bffbd7f6 Mon Sep 17 00:00:00 2001 From: Qyriad Date: Thu, 19 Mar 2026 21:39:11 +0100 Subject: [PATCH 3/4] close() fds that have nothing more to read --- src/daemon.rs | 146 ++++++++++++++++++++++++-------------------- src/daemon_tokfd.rs | 25 ++++++++ src/lib.rs | 4 +- 3 files changed, 108 insertions(+), 67 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 9f4856e..9492f04 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -13,12 +13,7 @@ use iddqd::{BiHashMap, IdOrdMap}; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; -use rustix::{ - buffer::spare_capacity, - fs::{FileType, Stat}, - net::SocketFlags, - process::Uid, -}; +use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid}; use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; @@ -122,7 +117,6 @@ pub struct Daemon { poller: Poll, - //fd_error_buffer: CircularBuffer, fd_info: IdOrdMap, // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. @@ -135,36 +129,46 @@ pub struct Daemon { /// `tokfd` handling. impl Daemon { fn fd_error_pop(&mut self, fd: RawFd) -> Option { - self.fd_info - .get_mut(&fd) - .unwrap_or_else(|| { - if let Ok(name) = FdInfo::guess_name(fd) { - panic!( - "tried to pop error for unknown fd {fd} ({})", - name.to_string_lossy(), - ); - } - panic!("tried to pop error for unknown fd {fd}") - }) - .error_buffer - .pop_front() + let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| { + if let Ok(name) = FdInfo::guess_name(fd) { + panic!( + "tried to pop error for unknown fd {fd} ({})", + name.to_string_lossy(), + ); + } + panic!("tried to pop error for unknown fd {fd}") + }); + info.error_buffer.pop_front().tap_some(|e| { + trace!("Popping error for {}: {e}", info.display()); + }) } fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> { - self.fd_info + let mut info = self + .fd_info .get_mut(&fd) - .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")) - .error_buffer - .try_push_back(error) + .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")); + trace!("Pushing error for {}: {}", info.display(), error); + info.error_buffer.try_push_back(error) } - fn register(&mut self, fd: RawFd) -> Token { - self.register_with_kind(fd, FdKind::Unknown) + fn main_fd_info(&self) -> &FdInfo { + self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| { + unreachable!( + "Main daemon fd {:?} was not registered with fd_info", + self.fd, + ) + }) } - fn register_with_kind(&mut self, fd: RawFd, kind: FdKind) -> Token { + fn register(&mut self, fd: RawFd, kind: FdKind) -> Token { let token = next_token(); + debug!( + "Registering new {} FdInfo for {fd} with token {token:?}", + kind.name_str(), + ); + self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap(); self.tokfd @@ -180,6 +184,21 @@ impl Daemon { token } + fn deregister(&mut self, fd: RawFd) { + let info = self + .fd_info + .remove(&fd) + .unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}")); + debug!("Unregistering fd {}; calling close()", info.display()); + unsafe { rustix::io::close(fd) }; + let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) }; + debug_assert_eq!(res, -1); + debug_assert_eq!( + IoError::last_os_error().raw_os_error(), + Some(Errno::BADF.raw_os_error()), + ); + } + fn fd_for_token(&self, token: Token) -> Option { self.tokfd .get1(&token) @@ -224,10 +243,8 @@ impl Daemon { fd, path, poller, - //poll_error_buffer: Default::default(), fd_info, tokfd: Default::default(), - //fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, } @@ -258,14 +275,14 @@ impl Daemon { Err(e) => { warn!( "failed binding AF_UNIX socket at {}: {e}; trying elsewhere", - preferred.display() + preferred.display(), ); let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); Self::from_unix_socket_path(&fallback).tap_err(|e| { error!( "failed binding AF_UNIX socket at {}: {e}", - fallback.display() + fallback.display(), ) })? }, @@ -301,11 +318,6 @@ impl Daemon { /// Private helpers. impl Daemon { fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { - //let fd = match fd.as_ref() { - // Some(fd) => fd.as_fd(), - // None => self.fd.as_fd(), - //}; - if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); } @@ -326,20 +338,18 @@ impl Daemon { }, Err(e) => { warn!("error deserializing command: {e}"); - debug!("command buffer was: {}", self.cmd_buffer.as_bstr()); - //warn!("error count for fd {fd:?}: {}", self.fd_error_buffer.len()); + debug!("command buffer was: {:?}", self.cmd_buffer.as_bstr()); self.cmd_buffer.clear(); // Don't propagate the error unless we have too many. self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; - //self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - // error!("Accumulated too many errors for daemon fd {fd:?}: {e}") - //})?; return Ok(()); }, }; debug!("got cmd: {cmd:?}"); + let _ = rustix::io::write(fd, b""); + info!("dispatching command"); } self.cmd_buffer.clear(); @@ -377,6 +387,14 @@ impl Daemon { let mut events = Events::with_capacity(1024); loop { + if tracing::enabled!(tracing::Level::DEBUG) { + // + trace!("Daemon loop iteration, with file descriptors: "); + for info in &self.fd_info { + trace!("- {}", info.display()); + } + } + if let Some(timeout) = self.next_timeout { debug!( "epoll_wait() with a timeout: {}", @@ -386,15 +404,25 @@ impl Daemon { match self.poller.poll(&mut events, self.next_timeout.take()) { Ok(_) => { + trace!( + "mio::Poller::poll() got events: {:?}", + events.iter().size_hint().0, + ); if events.is_empty() { warn!("timeout expired"); self.cmd_buffer.clear(); } else { - //let _ = self.poll_error_buffer.pop_front(); let _ = self.fd_error_pop(self.poller.as_raw_fd()); } }, + Err(e) if e.kind() == IoErrorKind::Interrupted => { + // EINTR is silly. + continue; + }, Err(e) => { + if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) { + unreachable!("EBADF on poller fd; IO safety violation?"); + } warn!("mio Poll::poll() error: {e}"); self.fd_error_push(self.poller.as_raw_fd(), e) .tap_err(|e| { @@ -404,14 +432,10 @@ impl Daemon { } for event in &events { + trace!("event is {event:?}"); match event.token() { DAEMON => { - let is_sock = rustix::fs::fstat(&self.fd) - .map(|Stat { st_mode, .. }| st_mode) - .map(FileType::from_raw_mode) - .map(FileType::is_socket) - .unwrap_or(false); - + let is_sock = self.main_fd_info().kind == FdKind::Socket; if !is_sock { // SAFETY: oh boy: disjoint borrows with extra steps. let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; @@ -438,12 +462,6 @@ impl Daemon { self.fd ) })?; - //self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - // error!( - // "Accumulated too many errors for daemon fd {:?}: {e}", - // self.fd, - // ) - //})?; continue; }, }; @@ -451,7 +469,7 @@ impl Daemon { // Add this stream to our poll interest list. // NOTE: `stream_fd` is now effectively `ManuallyDrop`. let stream_fd = stream_fd.into_raw_fd(); - let _token = self.register(stream_fd); + let _token = self.register(stream_fd, FdKind::SockStream); // Wait for the next poll to handle. }, @@ -461,9 +479,13 @@ impl Daemon { unreachable!("tried to get fd for no-existent token? {other_token:?}") }); - // SAFETY: oh boy. - let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; - self.read_cmd(&stream_fd).unwrap(); + if event.is_read_closed() { + self.deregister(stream_fd); + } else { + // SAFETY: oh boy. + let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; + self.read_cmd(&stream_fd).unwrap(); + } }, } } @@ -473,14 +495,6 @@ impl Daemon { impl Drop for Daemon { fn drop(&mut self) { - //let probably_synthetic = self - // .path - // .to_str() - // .map(|p| p.starts_with("«") && p.ends_with("»")) - // .unwrap_or(false); - //if probably_synthetic { - // return; - //} if let Some(path) = self.path.as_deref() { let _ = rustix::fs::unlink(path); } diff --git a/src/daemon_tokfd.rs b/src/daemon_tokfd.rs index b5e52bd..05d5ec9 100644 --- a/src/daemon_tokfd.rs +++ b/src/daemon_tokfd.rs @@ -67,6 +67,10 @@ impl FdInfo { self.name.get().unwrap_or_else(|| unreachable!()) } + + pub fn display(&self) -> FdInfoDisplay<'_> { + FdInfoDisplay { inner: self } + } } impl IdOrdItem for FdInfo { @@ -79,6 +83,27 @@ impl IdOrdItem for FdInfo { iddqd::id_upcast!(); } +#[derive(Debug)] +pub struct FdInfoDisplay<'a> { + inner: &'a FdInfo, +} +impl<'a> Display for FdInfoDisplay<'a> { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!( + f, + "{} fd {} ({})", + self.inner.kind.name_str(), + self.inner.fd, + self.inner.name().to_string_lossy(), + )?; + if !self.inner.error_buffer.is_empty() { + write!(f, "; with errors: {}", self.inner.error_buffer.len())?; + } + + Ok(()) + } +} + #[derive(Copy)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[non_exhaustive] diff --git a/src/lib.rs b/src/lib.rs index d5d53e5..426fc44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,9 @@ pub(crate) mod prelude { pub use bstr::ByteSlice; - pub use tap::{Pipe, Tap, TapFallible}; + pub use rustix::io::Errno; + + pub use tap::{Pipe, Tap, TapFallible, TapOptional}; pub use tracing::{Level, debug, error, info, trace, warn}; From ac53850fd17068a21617db184bda04df75c750b3 Mon Sep 17 00:00:00 2001 From: Qyriad Date: Fri, 20 Mar 2026 20:43:12 +0100 Subject: [PATCH 4/4] PoC daemon edition --- src/daemon.rs | 102 +++++++++++++++++++++++++++++++++++++++++++------- src/lib.rs | 20 ++++++++-- 2 files changed, 104 insertions(+), 18 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 9492f04..df477e5 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,9 +1,9 @@ use std::{ - env, io, + env, io, mem, ops::Deref, os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd}, sync::{ - LazyLock, + Arc, LazyLock, atomic::{AtomicUsize, Ordering}, }, time::Duration, @@ -19,6 +19,7 @@ use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; use crate::{ + SourceFile, SourceLine, daemon_tokfd::{FdInfo, FdKind}, prelude::*, }; @@ -66,6 +67,17 @@ impl ConvenientAttrPath { let boxed = iter.map(|s| Box::from(s)); Self::Split(Box::from_iter(boxed)) } + + pub fn to_nix_decl(&self) -> Box { + use ConvenientAttrPath::*; + match self { + Dotted(s) => s.clone(), + Split(path) => { + // FIXME: quote if necessary + path.join(".").into_boxed_str() + }, + } + } } impl<'i> FromIterator<&'i str> for ConvenientAttrPath { @@ -89,6 +101,7 @@ impl FromIterator> for ConvenientAttrPath { #[derive(Debug, Clone, PartialEq)] #[derive(serde::Deserialize, serde::Serialize)] #[serde(tag = "action", content = "args", rename_all = "snake_case")] +// FIXME: rename to not confuse with the clap argument type. pub enum DaemonCmd { Append { name: ConvenientAttrPath, @@ -112,6 +125,7 @@ fn next_token() -> Token { #[derive(Debug)] pub struct Daemon { + config_path: Arc, fd: OwnedFdWithFlags, path: Option>, @@ -197,6 +211,8 @@ impl Daemon { IoError::last_os_error().raw_os_error(), Some(Errno::BADF.raw_os_error()), ); + + self.tokfd.remove2(&fd).unwrap_or_else(|| todo!()); } fn fd_for_token(&self, token: Token) -> Option { @@ -215,7 +231,12 @@ impl Daemon { } impl Daemon { - pub fn new(fd: OwnedFd, kind: FdKind, name: Option>) -> Self { + pub fn new( + config_path: Arc, + fd: OwnedFd, + kind: FdKind, + name: Option>, + ) -> Self { let mut fd_info: IdOrdMap = Default::default(); // Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM. @@ -240,6 +261,7 @@ impl Daemon { }; Self { + config_path, fd, path, poller, @@ -250,7 +272,7 @@ impl Daemon { } } - pub fn from_unix_socket_path(path: &Path) -> Result { + pub fn from_unix_socket_path(config_path: Arc, path: &Path) -> Result { // We unconditionally unlink() `path` before binding, but completely ignore the result. let _ = rustix::fs::unlink(path); let listener = UnixListener::bind(path) @@ -260,13 +282,18 @@ impl Daemon { rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); - Ok(Self::new(listener_owned_fd, FdKind::Socket, Some(path))) + Ok(Self::new( + config_path, + listener_owned_fd, + FdKind::Socket, + Some(path), + )) } - pub fn open_default_socket() -> Result { + pub fn open_default_socket(config_path: Arc) -> Result { use IoErrorKind::*; let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path(); - let constructed = match Self::from_unix_socket_path(&preferred) { + let constructed = match Self::from_unix_socket_path(config_path.clone(), &preferred) { Ok(v) => v, Err(e) if e.kind() == Unsupported => { // @@ -279,7 +306,7 @@ impl Daemon { ); let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); - Self::from_unix_socket_path(&fallback).tap_err(|e| { + Self::from_unix_socket_path(config_path, &fallback).tap_err(|e| { error!( "failed binding AF_UNIX socket at {}: {e}", fallback.display(), @@ -294,14 +321,14 @@ impl Daemon { /// This panics if stdin cannot be opened. /// /// If you want to handle that error, use [`Daemon::from_raw_parts()`]. - pub fn from_stdin() -> Self { + pub fn from_stdin(config_path: Arc) -> Self { let stdin = io::stdin(); let fd = stdin .as_fd() .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); - Self::new(fd, FdKind::File, None) + Self::new(config_path, fd, FdKind::File, None) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { @@ -325,8 +352,11 @@ impl Daemon { let _count = rustix::io::read(fd, spare_capacity(&mut self.cmd_buffer)) .tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?; + // So that the loop doesn't borrow from `self`. + let mut cmd_buffer = mem::take(&mut self.cmd_buffer); + // The buffer might have existing data from the last read. - let deserializer = serde_json::Deserializer::from_slice(&self.cmd_buffer); + let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer); let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter(); for cmd in stream { let cmd = match cmd { @@ -334,12 +364,14 @@ impl Daemon { Err(e) if e.is_eof() => { self.next_timeout = Some(Duration::from_secs(4)); warn!("Didn't get a valid daemon command; giving the other side 4 seconds..."); + let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer); return Ok(()); }, Err(e) => { warn!("error deserializing command: {e}"); - debug!("command buffer was: {:?}", self.cmd_buffer.as_bstr()); - self.cmd_buffer.clear(); + debug!("command buffer was: {:?}", cmd_buffer.as_bstr()); + cmd_buffer.clear(); + let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer); // Don't propagate the error unless we have too many. self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") @@ -350,9 +382,51 @@ impl Daemon { debug!("got cmd: {cmd:?}"); let _ = rustix::io::write(fd, b""); info!("dispatching command"); + self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}")); } - self.cmd_buffer.clear(); + cmd_buffer.clear(); + let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer); + + Ok(()) + } + + fn dispatch_cmd(&mut self, cmd: DaemonCmd) -> Result<(), IoError> { + let (name, value) = match cmd { + DaemonCmd::Append { name, value } => (name, value), + }; + let mut opts = File::options(); + opts.read(true) + .write(true) + .create(false) + .custom_flags(libc::O_CLOEXEC); + let source_file = SourceFile::open_from(self.config_path.clone(), opts)?; + let pri = crate::get_where(source_file.clone()).unwrap_or_else(|e| todo!("{e}")); + let new_pri = pri - 1; + //let new_pri_line = + // crate::get_next_prio_line(source_file.clone(), Arc::from(name), Arc::from(value)); + // Get next priority line. + let source_lines = source_file.lines()?; + let penultimate = source_lines.get(source_lines.len() - 2); + // FIXME: don't rely on whitespace lol + debug_assert_eq!(penultimate.map(SourceLine::text).as_deref(), Some(" ];")); + let penultimate = penultimate.unwrap(); + let new_generation = 0 - new_pri; + let new_line = SourceLine { + line: penultimate.line, + path: source_file.path(), + text: Arc::from(format!( + " {} = lib.mkOverride ({}) ({}); # DYNIX GENERATION {}", + name.to_nix_decl(), + new_pri, + value, + new_generation, + )), + }; + + drop(source_lines); + + crate::write_next_prio(source_file, new_line).unwrap_or_else(|e| todo!("{e}")); Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 426fc44..40f983b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,8 @@ pub(crate) mod prelude { pub use bstr::ByteSlice; + pub use itertools::Itertools; + pub use rustix::io::Errno; pub use tap::{Pipe, Tap, TapFallible, TapOptional}; @@ -133,17 +135,27 @@ pub fn do_append(args: Arc, append_args: AppendCmd) -> Result<(), BoxDynEr } //#[tracing::instrument(level = "debug")] -pub fn do_daemon(_args: Arc, daemon_args: DaemonCmd) -> Result<(), BoxDynError> { +pub fn do_daemon(args: Arc, daemon_args: DaemonCmd) -> Result<(), BoxDynError> { + let config_file = Path::new(&args.file); + let config_file: PathBuf = if config_file.is_relative() && !config_file.starts_with("./") { + iter::once(OsStr::new("./")) + .chain(config_file.iter()) + .collect() + } else { + config_file.to_path_buf() + }; + let config_file: Arc = Arc::from(config_file); + // FIXME: make configurable? let _ = rustix::process::umask(Mode::from_bits_retain(0o600).complement()); let mut daemon = match daemon_args { - DaemonCmd { stdin: true, .. } => Daemon::from_stdin(), - DaemonCmd { socket: None, .. } => Daemon::open_default_socket()?, + DaemonCmd { stdin: true, .. } => Daemon::from_stdin(config_file), + DaemonCmd { socket: None, .. } => Daemon::open_default_socket(config_file)?, DaemonCmd { socket: Some(socket), .. - } => Daemon::from_unix_socket_path(&socket)?, + } => Daemon::from_unix_socket_path(config_file, &socket)?, }; daemon.enter_loop().unwrap();