diff --git a/Cargo.lock b/Cargo.lock index e641a9a..fabdb82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,6 +160,23 @@ dependencies = [ "utf8-command", ] +[[package]] +name = "const-str" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18f12cc9948ed9604230cdddc7c86e270f9401ccbe3c2e98a4378c5e7632212f" + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dyn-clone" version = "1.0.20" @@ -175,6 +192,8 @@ dependencies = [ "circular-buffer", "clap", "command-error", + "const-str", + "displaydoc", "fs-err", "itertools", "libc", diff --git a/Cargo.toml b/Cargo.toml index b63a6a9..ed6bd37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,13 +27,16 @@ bstr = "1.12.1" circular-buffer = "1.2.0" clap = { version = "4.5.54", features = ["color", "derive"] } command-error = "0.8.0" +const-str = "1.1.0" +displaydoc = "0.2.5" fs-err = "3.2.2" itertools = "0.14.0" libc = { version = "0.2.180", features = ["extra_traits"] } -mio = { version = "1.1.1", features = ["os-ext", "os-poll"] } +#macro_rules_attribute = { version = "0.2.2", features = ["better-docs", "verbose-expansions"] } +mio = { version = "1.1.1", features = ["os-ext", "os-poll", "net"] } regex = { version = "1.12.3", optional = true } regex-lite = { version = "0.1.9", optional = true } -rustix = { version = "1.1.4", features = ["event", "fs", "termios"] } +rustix = { version = "1.1.4", features = ["event", "fs", "net", "process", "termios"] } serde = { version = "1.0.228", features = ["derive", "rc"] } serde_json = "1.0.149" sync-fd = "0.1.0" diff --git a/src/args.rs b/src/args.rs index 65232a7..5c9bb73 100644 --- a/src/args.rs +++ b/src/args.rs @@ -25,7 +25,16 @@ pub struct AppendCmd { #[derive(Debug, Clone, PartialEq, clap::Parser)] #[command(long_about = None)] pub struct DaemonCmd { - // FIXME: support Unix sockets. + /// Read from stdin instead of a Unix socket. + #[arg(long)] + pub stdin: bool, + + /// Manually specify the full alternative path to the server socket. + /// + /// If not specified and `--stdin` is not specified, defaults to $XDG_RUNTIME_DIR/dynix.sock + #[arg(long)] + #[arg(conflicts_with = "stdin")] + pub socket: Option, } #[derive(Debug, Clone, PartialEq, clap::Subcommand)] diff --git a/src/boxext.rs b/src/boxext.rs new file mode 100644 index 0000000..03200ec --- /dev/null +++ b/src/boxext.rs @@ -0,0 +1,27 @@ +use std::{ffi::OsStr, path::Path, ptr}; + +/// Until [`Box::map()`] is stable, there's no way to do this that is safe, stable, and +/// allocationless. +pub trait BoxedPathExt { + fn into_boxed_os_str(self) -> Box; +} + +impl BoxedPathExt for Box { + fn into_boxed_os_str(self) -> Box { + let path_mut_ptr: *mut Path = Box::into_raw(self); + + // SAFETY: we just got `path_mut_ptr` from a valid Box. + let os_str_mut_ptr: *mut OsStr = unsafe { + let os_str_mut_ref = path_mut_ptr + .as_mut() + .expect("avoided undefined behavior") + .as_mut_os_str(); + + ptr::from_mut(os_str_mut_ref) + }; + + // SAFETY: allocation came from `Box::into_raw()`, and we are not extending + // its lifetime. There are also no shared or exclusive references. + unsafe { Box::::from_raw(os_str_mut_ptr) } + } +} diff --git a/src/daemon.rs b/src/daemon.rs index 48498b9..506527a 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,18 +1,21 @@ use std::{ - io, + env, io, ops::Deref, - os::fd::{AsFd, BorrowedFd, OwnedFd}, + os::fd::{AsFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, + sync::LazyLock, time::Duration, }; use circular_buffer::CircularBuffer; -use mio::{Events, Interest, Poll, Token, unix::SourceFd}; +use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; use rustix::{ buffer::{Buffer, spare_capacity}, - fs::{OFlags, fcntl_setfl}, + fs::{FileType, OFlags, Stat, fcntl_setfl}, io::Errno, + net::SocketFlags, + process::Uid, termios::{ ControlModes, InputModes, LocalModes, OptionalActions, OutputModes, SpecialCodeIndex, SpecialCodes, Termios, tcgetattr, tcsetattr, @@ -26,6 +29,23 @@ use crate::prelude::*; use crate::OwnedFdWithFlags; +pub static UID: LazyLock = LazyLock::new(|| rustix::process::getuid()); + +pub static USER_SOCKET_DIR: LazyLock<&'static Path> = LazyLock::new(|| { + let dir: Box = env::var_os("XDG_RUNTIME_DIR") + .map(PathBuf::from) + .unwrap_or_else(|| ["/", "run", "user", &UID.to_string()].into_iter().collect()) + .into_boxed_path(); + + Box::leak(dir) +}); + +pub static TMPDIR: LazyLock<&'static Path> = LazyLock::new(|| { + let dir: Box = env::temp_dir().into_boxed_path(); + + Box::leak(dir) +}); + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Deserialize, Serialize)] #[serde(untagged)] @@ -85,6 +105,7 @@ const TIMEOUT_NEVER: Option = None; #[derive(Debug)] pub struct Daemon { fd: OwnedFdWithFlags, + path: Box, poll_error_buffer: CircularBuffer, fd_error_buffer: CircularBuffer, @@ -93,17 +114,17 @@ pub struct Daemon { } impl Daemon { - /// Panics if we cannot make `fd` non-blocking. - pub fn new(fd: OwnedFd) -> Self { + pub fn new(fd: OwnedFd, name_or_path: Box) -> Self { let fd = OwnedFdWithFlags::new_with_fallback(fd); - // Make non-blocking. - //fcntl_setfl(fd.as_fd(), fd.oflags().union(OFlags::NONBLOCK)) - // .tap_err(|e| error!("F_SETFL O_NONBLOCK failed on fd {:?}: {e}", fd.as_fd())) - // .unwrap(); + debug!( + "opened daemon to {} file descriptor {fd:?}", + name_or_path.display(), + ); Self { fd, + path: name_or_path, poll_error_buffer: Default::default(), fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), @@ -111,6 +132,48 @@ impl Daemon { } } + pub fn from_unix_socket_path(path: &Path) -> Result { + // We unconditionally unlink() `path` before binding, but completely ignore the result. + let _ = rustix::fs::unlink(path); + let listener = UnixListener::bind(path) + .tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?; + //let (stream, _addr) = listener.accept().unwrap_or_else(|e| todo!("error: {e}")); + //warn!("stream is: {stream:?} ({})", stream.as_raw_fd()); + let listener_owned_fd = OwnedFd::from(listener); + rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); + let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); + + Ok(Self::new(listener_owned_fd, path)) + } + + pub fn open_default_socket() -> Result { + use IoErrorKind::*; + let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path(); + let constructed = match Self::from_unix_socket_path(&preferred) { + Ok(v) => v, + Err(e) if e.kind() == Unsupported => { + // + return Err(e); + }, + Err(e) => { + warn!( + "failed binding AF_UNIX socket at {}: {e}; trying elsewhere", + preferred.display() + ); + + let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); + Self::from_unix_socket_path(&fallback).tap_err(|e| { + error!( + "failed binding AF_UNIX socket at {}: {e}", + fallback.display() + ) + })? + }, + }; + + Ok(constructed) + } + /// This panics if stdin cannot be opened. /// /// If you want to handle that error, use [`Daemon::from_raw_parts()`]. @@ -121,9 +184,7 @@ impl Daemon { .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); - debug!("opened daemon to stdin file descriptor {fd:?}"); - - Self::new(fd) + Self::new(fd, Box::from(OsStr::new("«stdin»"))) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { @@ -141,12 +202,18 @@ const ERROR_BUFFER_LEN: usize = 8; /// Private helpers. impl Daemon { - fn read_cmd(&mut self) -> Result<(), IoError> { + fn read_cmd(&mut self, fd: Option<&dyn AsFd>) -> Result<(), IoError> { + let fd = match fd.as_ref() { + Some(fd) => fd.as_fd(), + None => self.fd.as_fd(), + }; + if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); } - let _count = rustix::io::read(&self.fd, spare_capacity(&mut self.cmd_buffer))?; + let _count = rustix::io::read(fd, spare_capacity(&mut self.cmd_buffer)) + .tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?; // The buffer might have existing data from the last read. let deserializer = serde_json::Deserializer::from_slice(&self.cmd_buffer); @@ -165,10 +232,7 @@ impl Daemon { self.cmd_buffer.clear(); // Don't propagate the error unless we have too many. self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { - error!( - "Accumulated too many errors for daemon fd {:?}: {e}", - self.fd(), - ) + error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; return Ok(()); }, @@ -185,6 +249,8 @@ impl Daemon { let raw_fd = self.fd.as_raw_fd(); let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); + let mut next_token_number: usize = 1; + let mut extra_tokens: Vec<(Token, RawFd)> = Default::default(); let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); poll.registry() @@ -234,11 +300,112 @@ impl Daemon { for event in &events { match event.token() { DAEMON => { - self.read_cmd().unwrap(); + let is_sock = rustix::fs::fstat(&self.fd) + .map(|Stat { st_mode, .. }| st_mode) + .map(FileType::from_raw_mode) + .map(FileType::is_socket) + .unwrap_or(false); + + if !is_sock { + self.read_cmd(None).unwrap(); + continue; + } + + // Accept, first. + let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC; + let stream_fd = match rustix::net::accept_with(&self.fd, flags) { + Ok(stream) => { + debug!( + "Accepted connection from socket {:?} as stream {:?}", + self.fd, stream, + ); + stream + }, + Err(e) => { + error!("accept4 on daemon socket failed: {e}"); + self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + error!( + "Accumulated too many errors for daemon fd {:?}: {e}", + self.fd + ) + })?; + continue; + }, + }; + + let mut stream_fd = stream_fd.into_raw_fd(); + + // And add this stream to our poll interest list. + if let Err(idx) = + extra_tokens.binary_search_by_key(&stream_fd, |(_, fd)| fd.as_raw_fd()) + { + let token = Token(next_token_number); + extra_tokens.insert(idx, (token, stream_fd)); + next_token_number = next_token_number.wrapping_add(1); + let mut source = SourceFd(&mut stream_fd); + poll.registry() + .register(&mut source, token, Interest::READABLE) + .unwrap(); + } + + // Wait for the next poll to handle. + + //match self.read_cmd(Some(&stream_fd)) { + // Ok(()) => (), + // Err(e) if e.kind() == IoErrorKind::WouldBlock => { + // continue; + // }, + // Err(e) => { + // self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { + // error!( + // "Accumulated too many errors for fd {:?} {e}", + // &stream_fd, + // ) + // })?; + // }, + //} + }, + other_token => { + let index = match extra_tokens + .binary_search_by_key(&other_token, |&(t, _)| t) + { + Ok(index) => index, + Err(index) => unreachable!( + "tried to get index ({index}) for non-existent token {other_token:?}" + ), + }; + + // This must be a stream fd. + let (_, stream_fd) = extra_tokens[index]; + // SAFETY: oh boy. + let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; + self.read_cmd(Some(&stream_fd)).unwrap(); }, - _ => unreachable!(), } } } } + + fn get_fallback_fd_name(fd: Fd) -> Option> { + let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); + + fs_err::read_link(dev_fd_path) + .map(PathBuf::into_os_string) + .map(OsString::into_boxed_os_str) + .ok() + } +} + +impl Drop for Daemon { + fn drop(&mut self) { + let probably_synthetic = self + .path + .to_str() + .map(|p| p.starts_with("«") && p.ends_with("»")) + .unwrap_or(false); + if probably_synthetic { + return; + } + let _ = rustix::fs::unlink(&*self.path); + } } diff --git a/src/daemon_io.rs b/src/daemon_io.rs index 40beed1..891a518 100644 --- a/src/daemon_io.rs +++ b/src/daemon_io.rs @@ -52,7 +52,7 @@ impl Debug for OwnedFdWithFlags { bitflags::parser::to_writer(&self.oflags, &mut flags_str)?; f.debug_struct("OwnedFdWithFlags") - .field("fd", &self.fd) + .field("fd", &self.fd.as_raw_fd()) .field("oflags", &flags_str) .finish() } diff --git a/src/lib.rs b/src/lib.rs index 3997aec..e0211ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,12 +38,15 @@ pub(crate) mod prelude { pub use tap::{Pipe, Tap, TapFallible}; pub use tracing::{Level, debug, error, info, trace, warn}; + + pub use crate::boxext::BoxedPathExt; } use prelude::*; pub mod args; pub use args::{AppendCmd, Args}; +mod boxext; mod color; pub use color::{_CLI_ENABLE_COLOR, SHOULD_COLOR}; mod daemon; @@ -54,6 +57,7 @@ pub mod line; pub use line::Line; mod nixcmd; pub mod source; +use rustix::fs::Mode; pub use source::{SourceFile, SourceLine}; #[cfg(all(not(feature = "regex-full"), not(feature = "regex-lite")))] @@ -125,10 +129,24 @@ pub fn do_append(args: Arc, append_args: AppendCmd) -> Result<(), BoxDynEr } //#[tracing::instrument(level = "debug")] -pub fn do_daemon(args: Arc, daemon_args: DaemonCmd) -> Result<(), BoxDynError> { - let mut daemon = Daemon::from_stdin(); +pub fn do_daemon(_args: Arc, daemon_args: DaemonCmd) -> Result<(), BoxDynError> { + // FIXME: make configurable? + let _ = rustix::process::umask(Mode::from_bits_retain(0o600).complement()); + + let mut daemon = match daemon_args { + DaemonCmd { stdin: true, .. } => Daemon::from_stdin(), + DaemonCmd { socket: None, .. } => Daemon::open_default_socket()?, + DaemonCmd { + socket: Some(socket), + .. + } => Daemon::from_unix_socket_path(&socket)?, + }; + daemon.enter_loop().unwrap(); - todo!(); + + info!("daemon has exited"); + + Ok(()) } #[derive(Debug, Clone, PartialEq, Hash, Serialize, Deserialize)]