diff --git a/Cargo.lock b/Cargo.lock index 64fed08..fabdb82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,12 +11,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "allocator-api2" -version = "0.2.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" - [[package]] name = "anstream" version = "0.6.21" @@ -73,12 +67,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" -[[package]] -name = "bimap" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" - [[package]] name = "bitflags" version = "2.11.0" @@ -199,7 +187,6 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" name = "dynix" version = "0.1.0" dependencies = [ - "bimap", "bitflags", "bstr", "circular-buffer", @@ -208,8 +195,6 @@ dependencies = [ "const-str", "displaydoc", "fs-err", - "humantime", - "iddqd", "itertools", "libc", "mio", @@ -247,12 +232,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "foldhash" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" - [[package]] name = "fs-err" version = "3.3.0" @@ -267,9 +246,6 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" -dependencies = [ - "allocator-api2", -] [[package]] name = "heck" @@ -283,25 +259,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" -[[package]] -name = "humantime" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" - -[[package]] -name = "iddqd" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b215e67ed1d1a4b1702acd787c487d16e4c977c5dcbcc4587bdb5ea26b6ce06" -dependencies = [ - "allocator-api2", - "equivalent", - "foldhash", - "hashbrown", - "rustc-hash", -] - [[package]] name = "indexmap" version = "2.13.0" @@ -556,12 +513,6 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" -[[package]] -name = "rustc-hash" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" - [[package]] name = "rustix" version = "1.1.4" diff --git a/Cargo.toml b/Cargo.toml index 495e9e4..ed6bd37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ regex-full = ["dep:regex"] regex-lite = ["dep:regex-lite"] [dependencies] -bimap = "0.6.3" bitflags = { version = "2.11.0", features = ["std"] } bstr = "1.12.1" circular-buffer = "1.2.0" @@ -31,8 +30,6 @@ command-error = "0.8.0" const-str = "1.1.0" displaydoc = "0.2.5" fs-err = "3.2.2" -humantime = "2.3.0" -iddqd = "0.3.17" itertools = "0.14.0" libc = { version = "0.2.180", features = ["extra_traits"] } #macro_rules_attribute = { version = "0.2.2", features = ["better-docs", "verbose-expansions"] } diff --git a/src/daemon.rs b/src/daemon.rs index df477e5..506527a 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,30 +1,33 @@ use std::{ - env, io, mem, + env, io, ops::Deref, - os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd}, - sync::{ - Arc, LazyLock, - atomic::{AtomicUsize, Ordering}, - }, + os::fd::{AsFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, + sync::LazyLock, time::Duration, }; -use iddqd::{BiHashMap, IdOrdMap}; +use circular_buffer::CircularBuffer; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; -use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid}; +use rustix::{ + buffer::{Buffer, spare_capacity}, + fs::{FileType, OFlags, Stat, fcntl_setfl}, + io::Errno, + net::SocketFlags, + process::Uid, + termios::{ + ControlModes, InputModes, LocalModes, OptionalActions, OutputModes, SpecialCodeIndex, + SpecialCodes, Termios, tcgetattr, tcsetattr, + }, +}; use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; -use crate::{ - SourceFile, SourceLine, - daemon_tokfd::{FdInfo, FdKind}, - prelude::*, -}; +use crate::prelude::*; -use crate::{OwnedFdWithFlags, TokenFd}; +use crate::OwnedFdWithFlags; pub static UID: LazyLock = LazyLock::new(|| rustix::process::getuid()); @@ -67,17 +70,6 @@ impl ConvenientAttrPath { let boxed = iter.map(|s| Box::from(s)); Self::Split(Box::from_iter(boxed)) } - - pub fn to_nix_decl(&self) -> Box { - use ConvenientAttrPath::*; - match self { - Dotted(s) => s.clone(), - Split(path) => { - // FIXME: quote if necessary - path.join(".").into_boxed_str() - }, - } - } } impl<'i> FromIterator<&'i str> for ConvenientAttrPath { @@ -101,7 +93,6 @@ impl FromIterator> for ConvenientAttrPath { #[derive(Debug, Clone, PartialEq)] #[derive(serde::Deserialize, serde::Serialize)] #[serde(tag = "action", content = "args", rename_all = "snake_case")] -// FIXME: rename to not confuse with the clap argument type. pub enum DaemonCmd { Append { name: ConvenientAttrPath, @@ -111,189 +102,54 @@ pub enum DaemonCmd { const TIMEOUT_NEVER: Option = None; -static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1); -fn next_token() -> Token { - let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst); - - // If the increment wrapped to 0, then we just increment it again. - if tok == 0 { - return Token(NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst)); - } - - Token(tok) -} - #[derive(Debug)] pub struct Daemon { - config_path: Arc, fd: OwnedFdWithFlags, - path: Option>, - - poller: Poll, - - fd_info: IdOrdMap, - - // Bijective mapping of [`mio::Token`]s to [`RawFd`]s. - tokfd: BiHashMap, + path: Box, + poll_error_buffer: CircularBuffer, + fd_error_buffer: CircularBuffer, cmd_buffer: Vec, next_timeout: Option, } -/// `tokfd` handling. impl Daemon { - fn fd_error_pop(&mut self, fd: RawFd) -> Option { - let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| { - if let Ok(name) = FdInfo::guess_name(fd) { - panic!( - "tried to pop error for unknown fd {fd} ({})", - name.to_string_lossy(), - ); - } - panic!("tried to pop error for unknown fd {fd}") - }); - info.error_buffer.pop_front().tap_some(|e| { - trace!("Popping error for {}: {e}", info.display()); - }) - } - - fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> { - let mut info = self - .fd_info - .get_mut(&fd) - .unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}")); - trace!("Pushing error for {}: {}", info.display(), error); - info.error_buffer.try_push_back(error) - } - - fn main_fd_info(&self) -> &FdInfo { - self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| { - unreachable!( - "Main daemon fd {:?} was not registered with fd_info", - self.fd, - ) - }) - } - - fn register(&mut self, fd: RawFd, kind: FdKind) -> Token { - let token = next_token(); - - debug!( - "Registering new {} FdInfo for {fd} with token {token:?}", - kind.name_str(), - ); - - self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap(); - - self.tokfd - .insert_unique(TokenFd { token, fd }) - .unwrap_or_else(|e| todo!("{e}")); - - let mut source = SourceFd(&fd); - self.poller - .registry() - .register(&mut source, token, Interest::READABLE) - .unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}")); - - token - } - - fn deregister(&mut self, fd: RawFd) { - let info = self - .fd_info - .remove(&fd) - .unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}")); - debug!("Unregistering fd {}; calling close()", info.display()); - unsafe { rustix::io::close(fd) }; - let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) }; - debug_assert_eq!(res, -1); - debug_assert_eq!( - IoError::last_os_error().raw_os_error(), - Some(Errno::BADF.raw_os_error()), - ); - - self.tokfd.remove2(&fd).unwrap_or_else(|| todo!()); - } - - fn fd_for_token(&self, token: Token) -> Option { - self.tokfd - .get1(&token) - .map(|TokenFd { fd, .. }| fd) - .copied() - } - - fn token_for_fd(&self, fd: RawFd) -> Option { - self.tokfd - .get2(&fd) - .map(|TokenFd { token, .. }| token) - .copied() - } -} - -impl Daemon { - pub fn new( - config_path: Arc, - fd: OwnedFd, - kind: FdKind, - name: Option>, - ) -> Self { - let mut fd_info: IdOrdMap = Default::default(); - - // Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM. - // If any of those are the case, we're screwed anyway. - let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}")); - // Make sure we register the poller in `fd_info`, so we can keep track of its errors. - fd_info - .insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller)) - .unwrap_or_else(|e| unreachable!("{e}")); - + pub fn new(fd: OwnedFd, name_or_path: Box) -> Self { let fd = OwnedFdWithFlags::new_with_fallback(fd); - fd_info - .insert_unique(FdInfo::new(fd.as_raw_fd(), kind)) - .unwrap_or_else(|e| unreachable!("{e}")); - - debug!("opened daemon to {:?} file descriptor {fd:?}", name); - - let path = match &name { - Some(name) => Some(PathBuf::from(name).into_boxed_path()), - None => None, - }; + debug!( + "opened daemon to {} file descriptor {fd:?}", + name_or_path.display(), + ); Self { - config_path, fd, - path, - poller, - fd_info, - tokfd: Default::default(), + path: name_or_path, + poll_error_buffer: Default::default(), + fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, } } - pub fn from_unix_socket_path(config_path: Arc, path: &Path) -> Result { + pub fn from_unix_socket_path(path: &Path) -> Result { // We unconditionally unlink() `path` before binding, but completely ignore the result. let _ = rustix::fs::unlink(path); let listener = UnixListener::bind(path) .tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?; + //let (stream, _addr) = listener.accept().unwrap_or_else(|e| todo!("error: {e}")); + //warn!("stream is: {stream:?} ({})", stream.as_raw_fd()); let listener_owned_fd = OwnedFd::from(listener); - // FIXME: should we KEEP_ALIVE? rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); - Ok(Self::new( - config_path, - listener_owned_fd, - FdKind::Socket, - Some(path), - )) + Ok(Self::new(listener_owned_fd, path)) } - pub fn open_default_socket(config_path: Arc) -> Result { + pub fn open_default_socket() -> Result { use IoErrorKind::*; let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path(); - let constructed = match Self::from_unix_socket_path(config_path.clone(), &preferred) { + let constructed = match Self::from_unix_socket_path(&preferred) { Ok(v) => v, Err(e) if e.kind() == Unsupported => { // @@ -302,14 +158,14 @@ impl Daemon { Err(e) => { warn!( "failed binding AF_UNIX socket at {}: {e}; trying elsewhere", - preferred.display(), + preferred.display() ); let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); - Self::from_unix_socket_path(config_path, &fallback).tap_err(|e| { + Self::from_unix_socket_path(&fallback).tap_err(|e| { error!( "failed binding AF_UNIX socket at {}: {e}", - fallback.display(), + fallback.display() ) })? }, @@ -321,14 +177,14 @@ impl Daemon { /// This panics if stdin cannot be opened. /// /// If you want to handle that error, use [`Daemon::from_raw_parts()`]. - pub fn from_stdin(config_path: Arc) -> Self { + pub fn from_stdin() -> Self { let stdin = io::stdin(); let fd = stdin .as_fd() .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); - Self::new(config_path, fd, FdKind::File, None) + Self::new(fd, Box::from(OsStr::new("«stdin»"))) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { @@ -342,9 +198,16 @@ impl Daemon { } } +const ERROR_BUFFER_LEN: usize = 8; + /// Private helpers. impl Daemon { - fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> { + fn read_cmd(&mut self, fd: Option<&dyn AsFd>) -> Result<(), IoError> { + let fd = match fd.as_ref() { + Some(fd) => fd.as_fd(), + None => self.fd.as_fd(), + }; + if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); } @@ -352,11 +215,8 @@ impl Daemon { let _count = rustix::io::read(fd, spare_capacity(&mut self.cmd_buffer)) .tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?; - // So that the loop doesn't borrow from `self`. - let mut cmd_buffer = mem::take(&mut self.cmd_buffer); - // The buffer might have existing data from the last read. - let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer); + let deserializer = serde_json::Deserializer::from_slice(&self.cmd_buffer); let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter(); for cmd in stream { let cmd = match cmd { @@ -364,156 +224,90 @@ impl Daemon { Err(e) if e.is_eof() => { self.next_timeout = Some(Duration::from_secs(4)); warn!("Didn't get a valid daemon command; giving the other side 4 seconds..."); - let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer); return Ok(()); }, Err(e) => { warn!("error deserializing command: {e}"); - debug!("command buffer was: {:?}", cmd_buffer.as_bstr()); - cmd_buffer.clear(); - let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer); + warn!("error count: {}", self.fd_error_buffer.len()); + self.cmd_buffer.clear(); // Don't propagate the error unless we have too many. - self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| { + self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; return Ok(()); }, }; debug!("got cmd: {cmd:?}"); - let _ = rustix::io::write(fd, b""); - info!("dispatching command"); - self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}")); } - cmd_buffer.clear(); - let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer); - - Ok(()) - } - - fn dispatch_cmd(&mut self, cmd: DaemonCmd) -> Result<(), IoError> { - let (name, value) = match cmd { - DaemonCmd::Append { name, value } => (name, value), - }; - let mut opts = File::options(); - opts.read(true) - .write(true) - .create(false) - .custom_flags(libc::O_CLOEXEC); - let source_file = SourceFile::open_from(self.config_path.clone(), opts)?; - let pri = crate::get_where(source_file.clone()).unwrap_or_else(|e| todo!("{e}")); - let new_pri = pri - 1; - //let new_pri_line = - // crate::get_next_prio_line(source_file.clone(), Arc::from(name), Arc::from(value)); - // Get next priority line. - let source_lines = source_file.lines()?; - let penultimate = source_lines.get(source_lines.len() - 2); - // FIXME: don't rely on whitespace lol - debug_assert_eq!(penultimate.map(SourceLine::text).as_deref(), Some(" ];")); - let penultimate = penultimate.unwrap(); - let new_generation = 0 - new_pri; - let new_line = SourceLine { - line: penultimate.line, - path: source_file.path(), - text: Arc::from(format!( - " {} = lib.mkOverride ({}) ({}); # DYNIX GENERATION {}", - name.to_nix_decl(), - new_pri, - value, - new_generation, - )), - }; - - drop(source_lines); - - crate::write_next_prio(source_file, new_line).unwrap_or_else(|e| todo!("{e}")); + self.cmd_buffer.clear(); Ok(()) } pub(crate) fn enter_loop(&mut self) -> Result, IoError> { let raw_fd = self.fd.as_raw_fd(); - if cfg!(debug_assertions) { - assert!( - self.fd_info.contains_key(&raw_fd), - "we should know about daemon fd {raw_fd}", - ); - assert!( - self.fd_info.contains_key(&self.poller.as_raw_fd()), - "we should know about poller fd {}", - self.poller.as_raw_fd(), - ); - } let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); - self.tokfd - .insert_unique(TokenFd { - token: DAEMON, - fd: raw_fd, - }) - .unwrap(); + let mut next_token_number: usize = 1; + let mut extra_tokens: Vec<(Token, RawFd)> = Default::default(); + let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); - self.poller - .registry() + poll.registry() .register(&mut daemon_source, DAEMON, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); let mut events = Events::with_capacity(1024); + let example = DaemonCmd::Append { + //name: ConvenientAttrPath::Dotted(Box::from( + // "services.gotosocial.settings.application-name", + //)), + //name: ConvenientAttrPath::Split(Box::from([ + // Box::from("services"), + // Box::from("gotosocial"), + //])), + name: ConvenientAttrPath::clone_from_split(&[ + "services", + "gotosocial", + "settings", + "application-name", + ]), + value: Box::from("foo"), + }; + + //let example_as_json = serde_json::to_string_pretty(&example).unwrap(); + //info!("{}", example_as_json); + loop { - if tracing::enabled!(tracing::Level::DEBUG) { - // - trace!("Daemon loop iteration, with file descriptors: "); - for info in &self.fd_info { - trace!("- {}", info.display()); - } - } - - if let Some(timeout) = self.next_timeout { - debug!( - "epoll_wait() with a timeout: {}", - humantime::format_duration(timeout), - ); - } - - match self.poller.poll(&mut events, self.next_timeout.take()) { + match poll.poll(&mut events, self.next_timeout.take()) { Ok(_) => { - trace!( - "mio::Poller::poll() got events: {:?}", - events.iter().size_hint().0, - ); if events.is_empty() { warn!("timeout expired"); self.cmd_buffer.clear(); } else { - let _ = self.fd_error_pop(self.poller.as_raw_fd()); + let _ = self.poll_error_buffer.pop_front(); } }, - Err(e) if e.kind() == IoErrorKind::Interrupted => { - // EINTR is silly. - continue; - }, Err(e) => { - if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) { - unreachable!("EBADF on poller fd; IO safety violation?"); - } warn!("mio Poll::poll() error: {e}"); - self.fd_error_push(self.poller.as_raw_fd(), e) - .tap_err(|e| { - error!("accumulated too many errors for mio::Poll::poll(): {e}") - })?; + self.poll_error_buffer.try_push_back(e).tap_err(|e| { + error!("accumulated too many errors for mio::Poll::poll(): {e}") + })?; }, } for event in &events { - trace!("event is {event:?}"); match event.token() { DAEMON => { - let is_sock = self.main_fd_info().kind == FdKind::Socket; + let is_sock = rustix::fs::fstat(&self.fd) + .map(|Stat { st_mode, .. }| st_mode) + .map(FileType::from_raw_mode) + .map(FileType::is_socket) + .unwrap_or(false); + if !is_sock { - // SAFETY: oh boy: disjoint borrows with extra steps. - let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; - self.read_cmd(&file_fd).unwrap(); + self.read_cmd(None).unwrap(); continue; } @@ -529,48 +323,89 @@ impl Daemon { }, Err(e) => { error!("accept4 on daemon socket failed: {e}"); - self.fd_error_push(self.fd.as_raw_fd(), e.into()) - .tap_err(|e| { - error!( - "Accumulated too many errors for daemon fd {:?}: {e}", - self.fd - ) - })?; + self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + error!( + "Accumulated too many errors for daemon fd {:?}: {e}", + self.fd + ) + })?; continue; }, }; - // Add this stream to our poll interest list. - // NOTE: `stream_fd` is now effectively `ManuallyDrop`. - let stream_fd = stream_fd.into_raw_fd(); - let _token = self.register(stream_fd, FdKind::SockStream); + let mut stream_fd = stream_fd.into_raw_fd(); + + // And add this stream to our poll interest list. + if let Err(idx) = + extra_tokens.binary_search_by_key(&stream_fd, |(_, fd)| fd.as_raw_fd()) + { + let token = Token(next_token_number); + extra_tokens.insert(idx, (token, stream_fd)); + next_token_number = next_token_number.wrapping_add(1); + let mut source = SourceFd(&mut stream_fd); + poll.registry() + .register(&mut source, token, Interest::READABLE) + .unwrap(); + } // Wait for the next poll to handle. + + //match self.read_cmd(Some(&stream_fd)) { + // Ok(()) => (), + // Err(e) if e.kind() == IoErrorKind::WouldBlock => { + // continue; + // }, + // Err(e) => { + // self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + // error!( + // "Accumulated too many errors for fd {:?} {e}", + // &stream_fd, + // ) + // })?; + // }, + //} }, other_token => { - // This must be a stream fd. - let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| { - unreachable!("tried to get fd for no-existent token? {other_token:?}") - }); + let index = match extra_tokens + .binary_search_by_key(&other_token, |&(t, _)| t) + { + Ok(index) => index, + Err(index) => unreachable!( + "tried to get index ({index}) for non-existent token {other_token:?}" + ), + }; - if event.is_read_closed() { - self.deregister(stream_fd); - } else { - // SAFETY: oh boy. - let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; - self.read_cmd(&stream_fd).unwrap(); - } + // This must be a stream fd. + let (_, stream_fd) = extra_tokens[index]; + // SAFETY: oh boy. + let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; + self.read_cmd(Some(&stream_fd)).unwrap(); }, } } } } + + fn get_fallback_fd_name(fd: Fd) -> Option> { + let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); + + fs_err::read_link(dev_fd_path) + .map(PathBuf::into_os_string) + .map(OsString::into_boxed_os_str) + .ok() + } } impl Drop for Daemon { fn drop(&mut self) { - if let Some(path) = self.path.as_deref() { - let _ = rustix::fs::unlink(path); + let probably_synthetic = self + .path + .to_str() + .map(|p| p.starts_with("«") && p.ends_with("»")) + .unwrap_or(false); + if probably_synthetic { + return; } + let _ = rustix::fs::unlink(&*self.path); } } diff --git a/src/daemon_tokfd.rs b/src/daemon_tokfd.rs deleted file mode 100644 index 05d5ec9..0000000 --- a/src/daemon_tokfd.rs +++ /dev/null @@ -1,169 +0,0 @@ -use std::{os::fd::RawFd, sync::OnceLock}; - -use circular_buffer::CircularBuffer; -use iddqd::{BiHashItem, IdOrdItem}; -use mio::Token; - -use crate::prelude::*; - -const ERROR_BUFFER_LEN: usize = 8; - -#[derive(Debug)] -pub struct FdInfo { - pub fd: RawFd, - pub kind: FdKind, - pub name: OnceLock>, - pub error_buffer: CircularBuffer, -} - -impl FdInfo { - pub fn new(fd: Fd, kind: FdKind) -> Self { - Self { - fd: fd.as_raw_fd(), - kind, - name: Default::default(), - error_buffer: Default::default(), - } - } - - pub fn new_with_name(fd: Fd, kind: FdKind, name: Box) -> Self { - Self { - fd: fd.as_raw_fd(), - kind, - name: OnceLock::from(name), - error_buffer: Default::default(), - } - } -} - -impl FdInfo { - pub(crate) fn guess_name(fd: Fd) -> Result, IoError> { - let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); - - fs_err::read_link(dev_fd_path) - .map(PathBuf::into_os_string) - .map(OsString::into_boxed_os_str) - } - - pub fn name(&self) -> &OsStr { - if let Some(name) = self.name.get() { - return name; - } - - match Self::guess_name(self.fd) { - Ok(name) => { - let prev = self.name.set(Box::from(name)); - debug_assert_eq!(prev, Ok(())); - }, - Err(e) => { - warn!( - "can't read link for {} /dev/fd/{}: {e}", - self.kind.name_str(), - self.fd, - ); - return OsStr::new("«unknown»"); - }, - } - - self.name.get().unwrap_or_else(|| unreachable!()) - } - - pub fn display(&self) -> FdInfoDisplay<'_> { - FdInfoDisplay { inner: self } - } -} - -impl IdOrdItem for FdInfo { - type Key<'a> = &'a RawFd; - - fn key(&self) -> &RawFd { - &self.fd - } - - iddqd::id_upcast!(); -} - -#[derive(Debug)] -pub struct FdInfoDisplay<'a> { - inner: &'a FdInfo, -} -impl<'a> Display for FdInfoDisplay<'a> { - fn fmt(&self, f: &mut Formatter) -> FmtResult { - write!( - f, - "{} fd {} ({})", - self.inner.kind.name_str(), - self.inner.fd, - self.inner.name().to_string_lossy(), - )?; - if !self.inner.error_buffer.is_empty() { - write!(f, "; with errors: {}", self.inner.error_buffer.len())?; - } - - Ok(()) - } -} - -#[derive(Copy)] -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[non_exhaustive] -pub enum FdKind { - File, - Socket, - SockStream, - Poller, - Unknown, -} - -impl FdKind { - pub fn name_str(self) -> &'static str { - use FdKind::*; - match self { - File => "file", - Socket => "socket", - SockStream => "socket stream", - Poller => "poller", - Unknown => "«unknown»", - } - } -} - -impl Default for FdKind { - fn default() -> FdKind { - FdKind::Unknown - } -} - -#[derive(Copy)] -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct TokenFd { - pub token: Token, - pub fd: RawFd, -} - -impl BiHashItem for TokenFd { - type K1<'a> = Token; - type K2<'a> = RawFd; - - fn key1(&self) -> Token { - self.token - } - - fn key2(&self) -> RawFd { - self.fd - } - - iddqd::bi_upcast!(); -} - -impl From for (Token, RawFd) { - fn from(TokenFd { token, fd }: TokenFd) -> (Token, RawFd) { - (token, fd) - } -} - -impl From<(Token, RawFd)> for TokenFd { - fn from((token, fd): (Token, RawFd)) -> TokenFd { - TokenFd { token, fd } - } -} diff --git a/src/lib.rs b/src/lib.rs index 40f983b..e0211ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,11 +35,7 @@ pub(crate) mod prelude { pub use bstr::ByteSlice; - pub use itertools::Itertools; - - pub use rustix::io::Errno; - - pub use tap::{Pipe, Tap, TapFallible, TapOptional}; + pub use tap::{Pipe, Tap, TapFallible}; pub use tracing::{Level, debug, error, info, trace, warn}; @@ -57,8 +53,6 @@ mod daemon; pub use daemon::Daemon; mod daemon_io; pub use daemon_io::OwnedFdWithFlags; -mod daemon_tokfd; -pub(crate) use daemon_tokfd::TokenFd; pub mod line; pub use line::Line; mod nixcmd; @@ -135,27 +129,17 @@ pub fn do_append(args: Arc, append_args: AppendCmd) -> Result<(), BoxDynEr } //#[tracing::instrument(level = "debug")] -pub fn do_daemon(args: Arc, daemon_args: DaemonCmd) -> Result<(), BoxDynError> { - let config_file = Path::new(&args.file); - let config_file: PathBuf = if config_file.is_relative() && !config_file.starts_with("./") { - iter::once(OsStr::new("./")) - .chain(config_file.iter()) - .collect() - } else { - config_file.to_path_buf() - }; - let config_file: Arc = Arc::from(config_file); - +pub fn do_daemon(_args: Arc, daemon_args: DaemonCmd) -> Result<(), BoxDynError> { // FIXME: make configurable? let _ = rustix::process::umask(Mode::from_bits_retain(0o600).complement()); let mut daemon = match daemon_args { - DaemonCmd { stdin: true, .. } => Daemon::from_stdin(config_file), - DaemonCmd { socket: None, .. } => Daemon::open_default_socket(config_file)?, + DaemonCmd { stdin: true, .. } => Daemon::from_stdin(), + DaemonCmd { socket: None, .. } => Daemon::open_default_socket()?, DaemonCmd { socket: Some(socket), .. - } => Daemon::from_unix_socket_path(config_file, &socket)?, + } => Daemon::from_unix_socket_path(&socket)?, }; daemon.enter_loop().unwrap();