diff --git a/Cargo.lock b/Cargo.lock index fabdb82..64fed08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anstream" version = "0.6.21" @@ -67,6 +73,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bitflags" version = "2.11.0" @@ -187,6 +199,7 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" name = "dynix" version = "0.1.0" dependencies = [ + "bimap", "bitflags", "bstr", "circular-buffer", @@ -195,6 +208,8 @@ dependencies = [ "const-str", "displaydoc", "fs-err", + "humantime", + "iddqd", "itertools", "libc", "mio", @@ -232,6 +247,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "fs-err" version = "3.3.0" @@ -246,6 +267,9 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", +] [[package]] name = "heck" @@ -259,6 +283,25 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + +[[package]] +name = "iddqd" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b215e67ed1d1a4b1702acd787c487d16e4c977c5dcbcc4587bdb5ea26b6ce06" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", + "hashbrown", + "rustc-hash", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -513,6 +556,12 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustix" version = "1.1.4" diff --git a/Cargo.toml b/Cargo.toml index ed6bd37..495e9e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ regex-full = ["dep:regex"] regex-lite = ["dep:regex-lite"] [dependencies] +bimap = "0.6.3" bitflags = { version = "2.11.0", features = ["std"] } bstr = "1.12.1" circular-buffer = "1.2.0" @@ -30,6 +31,8 @@ command-error = "0.8.0" const-str = "1.1.0" displaydoc = "0.2.5" fs-err = "3.2.2" +humantime = "2.3.0" +iddqd = "0.3.17" itertools = "0.14.0" libc = { version = "0.2.180", features = ["extra_traits"] } #macro_rules_attribute = { version = "0.2.2", features = ["better-docs", "verbose-expansions"] } diff --git a/src/daemon.rs b/src/daemon.rs index 506527a..f48b827 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -7,19 +7,15 @@ use std::{ }; use circular_buffer::CircularBuffer; +use iddqd::BiHashMap; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; use rustix::{ - buffer::{Buffer, spare_capacity}, - fs::{FileType, OFlags, Stat, fcntl_setfl}, - io::Errno, + buffer::spare_capacity, + fs::{FileType, Stat}, net::SocketFlags, process::Uid, - termios::{ - ControlModes, InputModes, LocalModes, OptionalActions, OutputModes, SpecialCodeIndex, - SpecialCodes, Termios, tcgetattr, tcsetattr, - }, }; use serde::{Deserialize, Serialize}; @@ -27,7 +23,7 @@ use serde_json::StreamDeserializer; use crate::prelude::*; -use crate::OwnedFdWithFlags; +use crate::{OwnedFdWithFlags, TokenFd}; pub static UID: LazyLock = LazyLock::new(|| rustix::process::getuid()); @@ -109,10 +105,33 @@ pub struct Daemon { poll_error_buffer: CircularBuffer, fd_error_buffer: CircularBuffer, + // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. + tokfd: BiHashMap, + cmd_buffer: Vec, next_timeout: Option, } +/// `tokfd` handling. +impl Daemon { + //fn register(&mut self, token: Token, fd: RawFd) { + // self.insert_unique + //} + fn fd_for_token(&self, token: Token) -> Option { + self.tokfd + .get1(&token) + .map(|TokenFd { fd, .. }| fd) + .copied() + } + + fn token_for_fd(&self, fd: RawFd) -> Option { + self.tokfd + .get2(&fd) + .map(|TokenFd { token, .. }| token) + .copied() + } +} + impl Daemon { pub fn new(fd: OwnedFd, name_or_path: Box) -> Self { let fd = OwnedFdWithFlags::new_with_fallback(fd); @@ -126,6 +145,7 @@ impl Daemon { fd, path: name_or_path, poll_error_buffer: Default::default(), + tokfd: Default::default(), fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, @@ -137,9 +157,8 @@ impl Daemon { let _ = rustix::fs::unlink(path); let listener = UnixListener::bind(path) .tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?; - //let (stream, _addr) = listener.accept().unwrap_or_else(|e| todo!("error: {e}")); - //warn!("stream is: {stream:?} ({})", stream.as_raw_fd()); let listener_owned_fd = OwnedFd::from(listener); + // FIXME: should we KEEP_ALIVE? rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); @@ -249,8 +268,19 @@ impl Daemon { let raw_fd = self.fd.as_raw_fd(); let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); + self.tokfd + .insert_unique(TokenFd { + token: DAEMON, + fd: raw_fd, + }) + .unwrap(); let mut next_token_number: usize = 1; - let mut extra_tokens: Vec<(Token, RawFd)> = Default::default(); + let mut next_token = || -> Token { + let t = Token(next_token_number); + next_token_number = next_token_number.saturating_add(1); + t + }; + let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); poll.registry() @@ -259,27 +289,14 @@ impl Daemon { let mut events = Events::with_capacity(1024); - let example = DaemonCmd::Append { - //name: ConvenientAttrPath::Dotted(Box::from( - // "services.gotosocial.settings.application-name", - //)), - //name: ConvenientAttrPath::Split(Box::from([ - // Box::from("services"), - // Box::from("gotosocial"), - //])), - name: ConvenientAttrPath::clone_from_split(&[ - "services", - "gotosocial", - "settings", - "application-name", - ]), - value: Box::from("foo"), - }; - - //let example_as_json = serde_json::to_string_pretty(&example).unwrap(); - //info!("{}", example_as_json); - loop { + if let Some(timeout) = self.next_timeout { + debug!( + "epoll_wait() with a timeout: {}", + humantime::format_duration(timeout), + ); + } + match poll.poll(&mut events, self.next_timeout.take()) { Ok(_) => { if events.is_empty() { @@ -333,50 +350,25 @@ impl Daemon { }, }; + // Add this stream to our poll interest list. let mut stream_fd = stream_fd.into_raw_fd(); - - // And add this stream to our poll interest list. - if let Err(idx) = - extra_tokens.binary_search_by_key(&stream_fd, |(_, fd)| fd.as_raw_fd()) - { - let token = Token(next_token_number); - extra_tokens.insert(idx, (token, stream_fd)); - next_token_number = next_token_number.wrapping_add(1); - let mut source = SourceFd(&mut stream_fd); - poll.registry() - .register(&mut source, token, Interest::READABLE) - .unwrap(); - } + let token = next_token(); + self.tokfd + .insert_unique((token, stream_fd).into()) + .unwrap_or_else(|e| unreachable!("? {e}")); + let mut source = SourceFd(&mut stream_fd); + poll.registry() + .register(&mut source, token, Interest::READABLE) + .unwrap(); // Wait for the next poll to handle. - - //match self.read_cmd(Some(&stream_fd)) { - // Ok(()) => (), - // Err(e) if e.kind() == IoErrorKind::WouldBlock => { - // continue; - // }, - // Err(e) => { - // self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - // error!( - // "Accumulated too many errors for fd {:?} {e}", - // &stream_fd, - // ) - // })?; - // }, - //} }, other_token => { - let index = match extra_tokens - .binary_search_by_key(&other_token, |&(t, _)| t) - { - Ok(index) => index, - Err(index) => unreachable!( - "tried to get index ({index}) for non-existent token {other_token:?}" - ), - }; - // This must be a stream fd. - let (_, stream_fd) = extra_tokens[index]; + let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| { + unreachable!("tried to get fd for no-existent token? {other_token:?}") + }); + // SAFETY: oh boy. let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; self.read_cmd(Some(&stream_fd)).unwrap(); @@ -386,14 +378,14 @@ impl Daemon { } } - fn get_fallback_fd_name(fd: Fd) -> Option> { - let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); - - fs_err::read_link(dev_fd_path) - .map(PathBuf::into_os_string) - .map(OsString::into_boxed_os_str) - .ok() - } + //fn get_fallback_fd_name(fd: Fd) -> Option> { + // let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); + // + // fs_err::read_link(dev_fd_path) + // .map(PathBuf::into_os_string) + // .map(OsString::into_boxed_os_str) + // .ok() + //} } impl Drop for Daemon { diff --git a/src/daemon_io.rs b/src/daemon_io.rs index 891a518..a1caa97 100644 --- a/src/daemon_io.rs +++ b/src/daemon_io.rs @@ -6,6 +6,8 @@ use std::{ }, }; +use iddqd::BiHashItem; +use mio::Token; use rustix::{ fs::{OFlags, fcntl_getfl, fcntl_setfl}, io::Errno, diff --git a/src/daemon_tokfd.rs b/src/daemon_tokfd.rs new file mode 100644 index 0000000..008c88d --- /dev/null +++ b/src/daemon_tokfd.rs @@ -0,0 +1,38 @@ +use std::os::fd::RawFd; + +use iddqd::BiHashItem; +use mio::Token; + +#[derive(Copy)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct TokenFd { + pub token: Token, + pub fd: RawFd, +} + +impl BiHashItem for TokenFd { + type K1<'a> = Token; + type K2<'a> = RawFd; + + fn key1(&self) -> Token { + self.token + } + + fn key2(&self) -> RawFd { + self.fd + } + + iddqd::bi_upcast!(); +} + +impl From for (Token, RawFd) { + fn from(TokenFd { token, fd }: TokenFd) -> (Token, RawFd) { + (token, fd) + } +} + +impl From<(Token, RawFd)> for TokenFd { + fn from((token, fd): (Token, RawFd)) -> TokenFd { + TokenFd { token, fd } + } +} diff --git a/src/lib.rs b/src/lib.rs index e0211ec..d5d53e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,8 @@ mod daemon; pub use daemon::Daemon; mod daemon_io; pub use daemon_io::OwnedFdWithFlags; +mod daemon_tokfd; +pub(crate) use daemon_tokfd::TokenFd; pub mod line; pub use line::Line; mod nixcmd;