diff --git a/Cargo.lock b/Cargo.lock index fabdb82..64fed08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anstream" version = "0.6.21" @@ -67,6 +73,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bitflags" version = "2.11.0" @@ -187,6 +199,7 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" name = "dynix" version = "0.1.0" dependencies = [ + "bimap", "bitflags", "bstr", "circular-buffer", @@ -195,6 +208,8 @@ dependencies = [ "const-str", "displaydoc", "fs-err", + "humantime", + "iddqd", "itertools", "libc", "mio", @@ -232,6 +247,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "fs-err" version = "3.3.0" @@ -246,6 +267,9 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", +] [[package]] name = "heck" @@ -259,6 +283,25 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + +[[package]] +name = "iddqd" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b215e67ed1d1a4b1702acd787c487d16e4c977c5dcbcc4587bdb5ea26b6ce06" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", + "hashbrown", + "rustc-hash", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -513,6 +556,12 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustix" version = "1.1.4" diff --git a/Cargo.toml b/Cargo.toml index ed6bd37..495e9e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ regex-full = ["dep:regex"] regex-lite = ["dep:regex-lite"] [dependencies] +bimap = "0.6.3" bitflags = { version = "2.11.0", features = ["std"] } bstr = "1.12.1" circular-buffer = "1.2.0" @@ -30,6 +31,8 @@ command-error = "0.8.0" const-str = "1.1.0" displaydoc = "0.2.5" fs-err = "3.2.2" +humantime = "2.3.0" +iddqd = "0.3.17" itertools = "0.14.0" libc = { version = "0.2.180", features = ["extra_traits"] } #macro_rules_attribute = { version = "0.2.2", features = ["better-docs", "verbose-expansions"] } diff --git a/src/daemon.rs b/src/daemon.rs index 506527a..df477e5 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,33 +1,30 @@ use std::{ - env, io, + env, io, mem, ops::Deref, - os::fd::{AsFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, - sync::LazyLock, + os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd}, + sync::{ + Arc, LazyLock, + atomic::{AtomicUsize, Ordering}, + }, time::Duration, }; -use circular_buffer::CircularBuffer; +use iddqd::{BiHashMap, IdOrdMap}; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; -use rustix::{ - buffer::{Buffer, spare_capacity}, - fs::{FileType, OFlags, Stat, fcntl_setfl}, - io::Errno, - net::SocketFlags, - process::Uid, - termios::{ - ControlModes, InputModes, LocalModes, OptionalActions, OutputModes, SpecialCodeIndex, - SpecialCodes, Termios, tcgetattr, tcsetattr, - }, -}; +use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid}; use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; -use crate::prelude::*; +use crate::{ + SourceFile, SourceLine, + daemon_tokfd::{FdInfo, FdKind}, + prelude::*, +}; -use crate::OwnedFdWithFlags; +use crate::{OwnedFdWithFlags, TokenFd}; pub static UID: LazyLock = LazyLock::new(|| rustix::process::getuid()); @@ -70,6 +67,17 @@ impl ConvenientAttrPath { let boxed = iter.map(|s| Box::from(s)); Self::Split(Box::from_iter(boxed)) } + + pub fn to_nix_decl(&self) -> Box { + use ConvenientAttrPath::*; + match self { + Dotted(s) => s.clone(), + Split(path) => { + // FIXME: quote if necessary + path.join(".").into_boxed_str() + }, + } + } } impl<'i> FromIterator<&'i str> for ConvenientAttrPath { @@ -93,6 +101,7 @@ impl FromIterator> for ConvenientAttrPath { #[derive(Debug, Clone, PartialEq)] #[derive(serde::Deserialize, serde::Serialize)] #[serde(tag = "action", content = "args", rename_all = "snake_case")] +// FIXME: rename to not confuse with the clap argument type. pub enum DaemonCmd { Append { name: ConvenientAttrPath, @@ -102,54 +111,189 @@ pub enum DaemonCmd { const TIMEOUT_NEVER: Option = None; +static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1); +fn next_token() -> Token { + let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst); + + // If the increment wrapped to 0, then we just increment it again. + if tok == 0 { + return Token(NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst)); + } + + Token(tok) +} + #[derive(Debug)] pub struct Daemon { + config_path: Arc, fd: OwnedFdWithFlags, - path: Box, - poll_error_buffer: CircularBuffer, - fd_error_buffer: CircularBuffer, + path: Option>, + + poller: Poll, + + fd_info: IdOrdMap, + + // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. + tokfd: BiHashMap, cmd_buffer: Vec, next_timeout: Option, } +/// `tokfd` handling. impl Daemon { - pub fn new(fd: OwnedFd, name_or_path: Box) -> Self { - let fd = OwnedFdWithFlags::new_with_fallback(fd); + fn fd_error_pop(&mut self, fd: RawFd) -> Option { + let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| { + if let Ok(name) = FdInfo::guess_name(fd) { + panic!( + "tried to pop error for unknown fd {fd} ({})", + name.to_string_lossy(), + ); + } + panic!("tried to pop error for unknown fd {fd}") + }); + info.error_buffer.pop_front().tap_some(|e| { + trace!("Popping error for {}: {e}", info.display()); + }) + } + + fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> { + let mut info = self + .fd_info + .get_mut(&fd) + .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")); + trace!("Pushing error for {}: {}", info.display(), error); + info.error_buffer.try_push_back(error) + } + + fn main_fd_info(&self) -> &FdInfo { + self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| { + unreachable!( + "Main daemon fd {:?} was not registered with fd_info", + self.fd, + ) + }) + } + + fn register(&mut self, fd: RawFd, kind: FdKind) -> Token { + let token = next_token(); debug!( - "opened daemon to {} file descriptor {fd:?}", - name_or_path.display(), + "Registering new {} FdInfo for {fd} with token {token:?}", + kind.name_str(), ); + self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap(); + + self.tokfd + .insert_unique(TokenFd { token, fd }) + .unwrap_or_else(|e| todo!("{e}")); + + let mut source = SourceFd(&fd); + self.poller + .registry() + .register(&mut source, token, Interest::READABLE) + .unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}")); + + token + } + + fn deregister(&mut self, fd: RawFd) { + let info = self + .fd_info + .remove(&fd) + .unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}")); + debug!("Unregistering fd {}; calling close()", info.display()); + unsafe { rustix::io::close(fd) }; + let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) }; + debug_assert_eq!(res, -1); + debug_assert_eq!( + IoError::last_os_error().raw_os_error(), + Some(Errno::BADF.raw_os_error()), + ); + + self.tokfd.remove2(&fd).unwrap_or_else(|| todo!()); + } + + fn fd_for_token(&self, token: Token) -> Option { + self.tokfd + .get1(&token) + .map(|TokenFd { fd, .. }| fd) + .copied() + } + + fn token_for_fd(&self, fd: RawFd) -> Option { + self.tokfd + .get2(&fd) + .map(|TokenFd { token, .. }| token) + .copied() + } +} + +impl Daemon { + pub fn new( + config_path: Arc, + fd: OwnedFd, + kind: FdKind, + name: Option>, + ) -> Self { + let mut fd_info: IdOrdMap = Default::default(); + + // Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM. + // If any of those are the case, we're screwed anyway. + let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}")); + // Make sure we register the poller in `fd_info`, so we can keep track of its errors. + fd_info + .insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller)) + .unwrap_or_else(|e| unreachable!("{e}")); + + let fd = OwnedFdWithFlags::new_with_fallback(fd); + + fd_info + .insert_unique(FdInfo::new(fd.as_raw_fd(), kind)) + .unwrap_or_else(|e| unreachable!("{e}")); + + debug!("opened daemon to {:?} file descriptor {fd:?}", name); + + let path = match &name { + Some(name) => Some(PathBuf::from(name).into_boxed_path()), + None => None, + }; + Self { + config_path, fd, - path: name_or_path, - poll_error_buffer: Default::default(), - fd_error_buffer: Default::default(), + path, + poller, + fd_info, + tokfd: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, } } - pub fn from_unix_socket_path(path: &Path) -> Result { + pub fn from_unix_socket_path(config_path: Arc, path: &Path) -> Result { // We unconditionally unlink() `path` before binding, but completely ignore the result. let _ = rustix::fs::unlink(path); let listener = UnixListener::bind(path) .tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?; - //let (stream, _addr) = listener.accept().unwrap_or_else(|e| todo!("error: {e}")); - //warn!("stream is: {stream:?} ({})", stream.as_raw_fd()); let listener_owned_fd = OwnedFd::from(listener); + // FIXME: should we KEEP_ALIVE? rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); - Ok(Self::new(listener_owned_fd, path)) + Ok(Self::new( + config_path, + listener_owned_fd, + FdKind::Socket, + Some(path), + )) } - pub fn open_default_socket() -> Result { + pub fn open_default_socket(config_path: Arc) -> Result { use IoErrorKind::*; let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path(); - let constructed = match Self::from_unix_socket_path(&preferred) { + let constructed = match Self::from_unix_socket_path(config_path.clone(), &preferred) { Ok(v) => v, Err(e) if e.kind() == Unsupported => { // @@ -158,14 +302,14 @@ impl Daemon { Err(e) => { warn!( "failed binding AF_UNIX socket at {}: {e}; trying elsewhere", - preferred.display() + preferred.display(), ); let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); - Self::from_unix_socket_path(&fallback).tap_err(|e| { + Self::from_unix_socket_path(config_path, &fallback).tap_err(|e| { error!( "failed binding AF_UNIX socket at {}: {e}", - fallback.display() + fallback.display(), ) })? }, @@ -177,14 +321,14 @@ impl Daemon { /// This panics if stdin cannot be opened. /// /// If you want to handle that error, use [`Daemon::from_raw_parts()`]. - pub fn from_stdin() -> Self { + pub fn from_stdin(config_path: Arc) -> Self { let stdin = io::stdin(); let fd = stdin .as_fd() .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); - Self::new(fd, Box::from(OsStr::new("«stdin»"))) + Self::new(config_path, fd, FdKind::File, None) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { @@ -198,16 +342,9 @@ impl Daemon { } } -const ERROR_BUFFER_LEN: usize = 8; - /// Private helpers. impl Daemon { - fn read_cmd(&mut self, fd: Option<&dyn AsFd>) -> Result<(), IoError> { - let fd = match fd.as_ref() { - Some(fd) => fd.as_fd(), - None => self.fd.as_fd(), - }; - + fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); } @@ -215,8 +352,11 @@ impl Daemon { let _count = rustix::io::read(fd, spare_capacity(&mut self.cmd_buffer)) .tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?; + // So that the loop doesn't borrow from `self`. + let mut cmd_buffer = mem::take(&mut self.cmd_buffer); + // The buffer might have existing data from the last read. - let deserializer = serde_json::Deserializer::from_slice(&self.cmd_buffer); + let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer); let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter(); for cmd in stream { let cmd = match cmd { @@ -224,90 +364,156 @@ impl Daemon { Err(e) if e.is_eof() => { self.next_timeout = Some(Duration::from_secs(4)); warn!("Didn't get a valid daemon command; giving the other side 4 seconds..."); + let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer); return Ok(()); }, Err(e) => { warn!("error deserializing command: {e}"); - warn!("error count: {}", self.fd_error_buffer.len()); - self.cmd_buffer.clear(); + debug!("command buffer was: {:?}", cmd_buffer.as_bstr()); + cmd_buffer.clear(); + let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer); // Don't propagate the error unless we have too many. - self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; return Ok(()); }, }; debug!("got cmd: {cmd:?}"); + let _ = rustix::io::write(fd, b""); + info!("dispatching command"); + self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}")); } - self.cmd_buffer.clear(); + cmd_buffer.clear(); + let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer); + + Ok(()) + } + + fn dispatch_cmd(&mut self, cmd: DaemonCmd) -> Result<(), IoError> { + let (name, value) = match cmd { + DaemonCmd::Append { name, value } => (name, value), + }; + let mut opts = File::options(); + opts.read(true) + .write(true) + .create(false) + .custom_flags(libc::O_CLOEXEC); + let source_file = SourceFile::open_from(self.config_path.clone(), opts)?; + let pri = crate::get_where(source_file.clone()).unwrap_or_else(|e| todo!("{e}")); + let new_pri = pri - 1; + //let new_pri_line = + // crate::get_next_prio_line(source_file.clone(), Arc::from(name), Arc::from(value)); + // Get next priority line. + let source_lines = source_file.lines()?; + let penultimate = source_lines.get(source_lines.len() - 2); + // FIXME: don't rely on whitespace lol + debug_assert_eq!(penultimate.map(SourceLine::text).as_deref(), Some(" ];")); + let penultimate = penultimate.unwrap(); + let new_generation = 0 - new_pri; + let new_line = SourceLine { + line: penultimate.line, + path: source_file.path(), + text: Arc::from(format!( + " {} = lib.mkOverride ({}) ({}); # DYNIX GENERATION {}", + name.to_nix_decl(), + new_pri, + value, + new_generation, + )), + }; + + drop(source_lines); + + crate::write_next_prio(source_file, new_line).unwrap_or_else(|e| todo!("{e}")); Ok(()) } pub(crate) fn enter_loop(&mut self) -> Result, IoError> { let raw_fd = self.fd.as_raw_fd(); + if cfg!(debug_assertions) { + assert!( + self.fd_info.contains_key(&raw_fd), + "we should know about daemon fd {raw_fd}", + ); + assert!( + self.fd_info.contains_key(&self.poller.as_raw_fd()), + "we should know about poller fd {}", + self.poller.as_raw_fd(), + ); + } let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); - let mut next_token_number: usize = 1; - let mut extra_tokens: Vec<(Token, RawFd)> = Default::default(); - let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); + self.tokfd + .insert_unique(TokenFd { + token: DAEMON, + fd: raw_fd, + }) + .unwrap(); - poll.registry() + self.poller + .registry() .register(&mut daemon_source, DAEMON, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); let mut events = Events::with_capacity(1024); - let example = DaemonCmd::Append { - //name: ConvenientAttrPath::Dotted(Box::from( - // "services.gotosocial.settings.application-name", - //)), - //name: ConvenientAttrPath::Split(Box::from([ - // Box::from("services"), - // Box::from("gotosocial"), - //])), - name: ConvenientAttrPath::clone_from_split(&[ - "services", - "gotosocial", - "settings", - "application-name", - ]), - value: Box::from("foo"), - }; - - //let example_as_json = serde_json::to_string_pretty(&example).unwrap(); - //info!("{}", example_as_json); - loop { - match poll.poll(&mut events, self.next_timeout.take()) { + if tracing::enabled!(tracing::Level::DEBUG) { + // + trace!("Daemon loop iteration, with file descriptors: "); + for info in &self.fd_info { + trace!("- {}", info.display()); + } + } + + if let Some(timeout) = self.next_timeout { + debug!( + "epoll_wait() with a timeout: {}", + humantime::format_duration(timeout), + ); + } + + match self.poller.poll(&mut events, self.next_timeout.take()) { Ok(_) => { + trace!( + "mio::Poller::poll() got events: {:?}", + events.iter().size_hint().0, + ); if events.is_empty() { warn!("timeout expired"); self.cmd_buffer.clear(); } else { - let _ = self.poll_error_buffer.pop_front(); + let _ = self.fd_error_pop(self.poller.as_raw_fd()); } }, + Err(e) if e.kind() == IoErrorKind::Interrupted => { + // EINTR is silly. + continue; + }, Err(e) => { + if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) { + unreachable!("EBADF on poller fd; IO safety violation?"); + } warn!("mio Poll::poll() error: {e}"); - self.poll_error_buffer.try_push_back(e).tap_err(|e| { - error!("accumulated too many errors for mio::Poll::poll(): {e}") - })?; + self.fd_error_push(self.poller.as_raw_fd(), e) + .tap_err(|e| { + error!("accumulated too many errors for mio::Poll::poll(): {e}") + })?; }, } for event in &events { + trace!("event is {event:?}"); match event.token() { DAEMON => { - let is_sock = rustix::fs::fstat(&self.fd) - .map(|Stat { st_mode, .. }| st_mode) - .map(FileType::from_raw_mode) - .map(FileType::is_socket) - .unwrap_or(false); - + let is_sock = self.main_fd_info().kind == FdKind::Socket; if !is_sock { - self.read_cmd(None).unwrap(); + // SAFETY: oh boy: disjoint borrows with extra steps. + let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; + self.read_cmd(&file_fd).unwrap(); continue; } @@ -323,89 +529,48 @@ impl Daemon { }, Err(e) => { error!("accept4 on daemon socket failed: {e}"); - self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - error!( - "Accumulated too many errors for daemon fd {:?}: {e}", - self.fd - ) - })?; + self.fd_error_push(self.fd.as_raw_fd(), e.into()) + .tap_err(|e| { + error!( + "Accumulated too many errors for daemon fd {:?}: {e}", + self.fd + ) + })?; continue; }, }; - let mut stream_fd = stream_fd.into_raw_fd(); - - // And add this stream to our poll interest list. - if let Err(idx) = - extra_tokens.binary_search_by_key(&stream_fd, |(_, fd)| fd.as_raw_fd()) - { - let token = Token(next_token_number); - extra_tokens.insert(idx, (token, stream_fd)); - next_token_number = next_token_number.wrapping_add(1); - let mut source = SourceFd(&mut stream_fd); - poll.registry() - .register(&mut source, token, Interest::READABLE) - .unwrap(); - } + // Add this stream to our poll interest list. + // NOTE: `stream_fd` is now effectively `ManuallyDrop`. + let stream_fd = stream_fd.into_raw_fd(); + let _token = self.register(stream_fd, FdKind::SockStream); // Wait for the next poll to handle. - - //match self.read_cmd(Some(&stream_fd)) { - // Ok(()) => (), - // Err(e) if e.kind() == IoErrorKind::WouldBlock => { - // continue; - // }, - // Err(e) => { - // self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - // error!( - // "Accumulated too many errors for fd {:?} {e}", - // &stream_fd, - // ) - // })?; - // }, - //} }, other_token => { - let index = match extra_tokens - .binary_search_by_key(&other_token, |&(t, _)| t) - { - Ok(index) => index, - Err(index) => unreachable!( - "tried to get index ({index}) for non-existent token {other_token:?}" - ), - }; - // This must be a stream fd. - let (_, stream_fd) = extra_tokens[index]; - // SAFETY: oh boy. - let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; - self.read_cmd(Some(&stream_fd)).unwrap(); + let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| { + unreachable!("tried to get fd for no-existent token? {other_token:?}") + }); + + if event.is_read_closed() { + self.deregister(stream_fd); + } else { + // SAFETY: oh boy. + let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; + self.read_cmd(&stream_fd).unwrap(); + } }, } } } } - - fn get_fallback_fd_name(fd: Fd) -> Option> { - let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); - - fs_err::read_link(dev_fd_path) - .map(PathBuf::into_os_string) - .map(OsString::into_boxed_os_str) - .ok() - } } impl Drop for Daemon { fn drop(&mut self) { - let probably_synthetic = self - .path - .to_str() - .map(|p| p.starts_with("«") && p.ends_with("»")) - .unwrap_or(false); - if probably_synthetic { - return; + if let Some(path) = self.path.as_deref() { + let _ = rustix::fs::unlink(path); } - let _ = rustix::fs::unlink(&*self.path); } } diff --git a/src/daemon_tokfd.rs b/src/daemon_tokfd.rs new file mode 100644 index 0000000..05d5ec9 --- /dev/null +++ b/src/daemon_tokfd.rs @@ -0,0 +1,169 @@ +use std::{os::fd::RawFd, sync::OnceLock}; + +use circular_buffer::CircularBuffer; +use iddqd::{BiHashItem, IdOrdItem}; +use mio::Token; + +use crate::prelude::*; + +const ERROR_BUFFER_LEN: usize = 8; + +#[derive(Debug)] +pub struct FdInfo { + pub fd: RawFd, + pub kind: FdKind, + pub name: OnceLock>, + pub error_buffer: CircularBuffer, +} + +impl FdInfo { + pub fn new(fd: Fd, kind: FdKind) -> Self { + Self { + fd: fd.as_raw_fd(), + kind, + name: Default::default(), + error_buffer: Default::default(), + } + } + + pub fn new_with_name(fd: Fd, kind: FdKind, name: Box) -> Self { + Self { + fd: fd.as_raw_fd(), + kind, + name: OnceLock::from(name), + error_buffer: Default::default(), + } + } +} + +impl FdInfo { + pub(crate) fn guess_name(fd: Fd) -> Result, IoError> { + let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); + + fs_err::read_link(dev_fd_path) + .map(PathBuf::into_os_string) + .map(OsString::into_boxed_os_str) + } + + pub fn name(&self) -> &OsStr { + if let Some(name) = self.name.get() { + return name; + } + + match Self::guess_name(self.fd) { + Ok(name) => { + let prev = self.name.set(Box::from(name)); + debug_assert_eq!(prev, Ok(())); + }, + Err(e) => { + warn!( + "can't read link for {} /dev/fd/{}: {e}", + self.kind.name_str(), + self.fd, + ); + return OsStr::new("«unknown»"); + }, + } + + self.name.get().unwrap_or_else(|| unreachable!()) + } + + pub fn display(&self) -> FdInfoDisplay<'_> { + FdInfoDisplay { inner: self } + } +} + +impl IdOrdItem for FdInfo { + type Key<'a> = &'a RawFd; + + fn key(&self) -> &RawFd { + &self.fd + } + + iddqd::id_upcast!(); +} + +#[derive(Debug)] +pub struct FdInfoDisplay<'a> { + inner: &'a FdInfo, +} +impl<'a> Display for FdInfoDisplay<'a> { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!( + f, + "{} fd {} ({})", + self.inner.kind.name_str(), + self.inner.fd, + self.inner.name().to_string_lossy(), + )?; + if !self.inner.error_buffer.is_empty() { + write!(f, "; with errors: {}", self.inner.error_buffer.len())?; + } + + Ok(()) + } +} + +#[derive(Copy)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[non_exhaustive] +pub enum FdKind { + File, + Socket, + SockStream, + Poller, + Unknown, +} + +impl FdKind { + pub fn name_str(self) -> &'static str { + use FdKind::*; + match self { + File => "file", + Socket => "socket", + SockStream => "socket stream", + Poller => "poller", + Unknown => "«unknown»", + } + } +} + +impl Default for FdKind { + fn default() -> FdKind { + FdKind::Unknown + } +} + +#[derive(Copy)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct TokenFd { + pub token: Token, + pub fd: RawFd, +} + +impl BiHashItem for TokenFd { + type K1<'a> = Token; + type K2<'a> = RawFd; + + fn key1(&self) -> Token { + self.token + } + + fn key2(&self) -> RawFd { + self.fd + } + + iddqd::bi_upcast!(); +} + +impl From for (Token, RawFd) { + fn from(TokenFd { token, fd }: TokenFd) -> (Token, RawFd) { + (token, fd) + } +} + +impl From<(Token, RawFd)> for TokenFd { + fn from((token, fd): (Token, RawFd)) -> TokenFd { + TokenFd { token, fd } + } +} diff --git a/src/lib.rs b/src/lib.rs index e0211ec..40f983b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,11 @@ pub(crate) mod prelude { pub use bstr::ByteSlice; - pub use tap::{Pipe, Tap, TapFallible}; + pub use itertools::Itertools; + + pub use rustix::io::Errno; + + pub use tap::{Pipe, Tap, TapFallible, TapOptional}; pub use tracing::{Level, debug, error, info, trace, warn}; @@ -53,6 +57,8 @@ mod daemon; pub use daemon::Daemon; mod daemon_io; pub use daemon_io::OwnedFdWithFlags; +mod daemon_tokfd; +pub(crate) use daemon_tokfd::TokenFd; pub mod line; pub use line::Line; mod nixcmd; @@ -129,17 +135,27 @@ pub fn do_append(args: Arc, append_args: AppendCmd) -> Result<(), BoxDynEr } //#[tracing::instrument(level = "debug")] -pub fn do_daemon(_args: Arc, daemon_args: DaemonCmd) -> Result<(), BoxDynError> { +pub fn do_daemon(args: Arc, daemon_args: DaemonCmd) -> Result<(), BoxDynError> { + let config_file = Path::new(&args.file); + let config_file: PathBuf = if config_file.is_relative() && !config_file.starts_with("./") { + iter::once(OsStr::new("./")) + .chain(config_file.iter()) + .collect() + } else { + config_file.to_path_buf() + }; + let config_file: Arc = Arc::from(config_file); + // FIXME: make configurable? let _ = rustix::process::umask(Mode::from_bits_retain(0o600).complement()); let mut daemon = match daemon_args { - DaemonCmd { stdin: true, .. } => Daemon::from_stdin(), - DaemonCmd { socket: None, .. } => Daemon::open_default_socket()?, + DaemonCmd { stdin: true, .. } => Daemon::from_stdin(config_file), + DaemonCmd { socket: None, .. } => Daemon::open_default_socket(config_file)?, DaemonCmd { socket: Some(socket), .. - } => Daemon::from_unix_socket_path(&socket)?, + } => Daemon::from_unix_socket_path(config_file, &socket)?, }; daemon.enter_loop().unwrap();