use std::{ env, io, ops::Deref, os::fd::{AsFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, sync::LazyLock, time::Duration, }; use circular_buffer::CircularBuffer; use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; use rustix::{ buffer::{Buffer, spare_capacity}, fs::{FileType, OFlags, Stat, fcntl_setfl}, io::Errno, net::SocketFlags, process::Uid, termios::{ ControlModes, InputModes, LocalModes, OptionalActions, OutputModes, SpecialCodeIndex, SpecialCodes, Termios, tcgetattr, tcsetattr, }, }; use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; use crate::prelude::*; use crate::OwnedFdWithFlags; pub static UID: LazyLock = LazyLock::new(|| rustix::process::getuid()); pub static USER_SOCKET_DIR: LazyLock<&'static Path> = LazyLock::new(|| { let dir: Box = env::var_os("XDG_RUNTIME_DIR") .map(PathBuf::from) .unwrap_or_else(|| ["/", "run", "user", &UID.to_string()].into_iter().collect()) .into_boxed_path(); Box::leak(dir) }); pub static TMPDIR: LazyLock<&'static Path> = LazyLock::new(|| { let dir: Box = env::temp_dir().into_boxed_path(); Box::leak(dir) }); #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Deserialize, Serialize)] #[serde(untagged)] pub enum ConvenientAttrPath { Dotted(Box), Split(Box<[Box]>), } impl ConvenientAttrPath { pub fn clone_from_dotted(s: &str) -> Self { Self::Dotted(Box::from(s)) } pub fn clone_from_split(s: &[&str]) -> Self { Self::from_str_iter(s.into_iter().map(Deref::deref)) } pub fn from_str_iter<'i, I>(iter: I) -> Self where I: Iterator, { let boxed = iter.map(|s| Box::from(s)); Self::Split(Box::from_iter(boxed)) } } impl<'i> FromIterator<&'i str> for ConvenientAttrPath { fn from_iter(iter: I) -> Self where I: IntoIterator, { Self::from_str_iter(iter.into_iter()) } } impl FromIterator> for ConvenientAttrPath { fn from_iter(iter: I) -> Self where I: IntoIterator>, { Self::Split(Box::from_iter(iter)) } } #[derive(Debug, Clone, PartialEq)] #[derive(serde::Deserialize, serde::Serialize)] #[serde(tag = "action", content = "args", rename_all = "snake_case")] pub enum DaemonCmd { Append { name: ConvenientAttrPath, value: Box, }, } const TIMEOUT_NEVER: Option = None; #[derive(Debug)] pub struct Daemon { fd: OwnedFdWithFlags, path: Box, poll_error_buffer: CircularBuffer, fd_error_buffer: CircularBuffer, cmd_buffer: Vec, next_timeout: Option, } impl Daemon { pub fn new(fd: OwnedFd, name_or_path: Box) -> Self { let fd = OwnedFdWithFlags::new_with_fallback(fd); debug!( "opened daemon to {} file descriptor {fd:?}", name_or_path.display(), ); Self { fd, path: name_or_path, poll_error_buffer: Default::default(), fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, } } pub fn from_unix_socket_path(path: &Path) -> Result { // We unconditionally unlink() `path` before binding, but completely ignore the result. let _ = rustix::fs::unlink(path); let listener = UnixListener::bind(path) .tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?; //let (stream, _addr) = listener.accept().unwrap_or_else(|e| todo!("error: {e}")); //warn!("stream is: {stream:?} ({})", stream.as_raw_fd()); let listener_owned_fd = OwnedFd::from(listener); rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap(); let path: Box = path.to_path_buf().into_boxed_path().into_boxed_os_str(); Ok(Self::new(listener_owned_fd, path)) } pub fn open_default_socket() -> Result { use IoErrorKind::*; let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path(); let constructed = match Self::from_unix_socket_path(&preferred) { Ok(v) => v, Err(e) if e.kind() == Unsupported => { // return Err(e); }, Err(e) => { warn!( "failed binding AF_UNIX socket at {}: {e}; trying elsewhere", preferred.display() ); let fallback = TMPDIR.join("dynix.sock").into_boxed_path(); Self::from_unix_socket_path(&fallback).tap_err(|e| { error!( "failed binding AF_UNIX socket at {}: {e}", fallback.display() ) })? }, }; Ok(constructed) } /// This panics if stdin cannot be opened. /// /// If you want to handle that error, use [`Daemon::from_raw_parts()`]. pub fn from_stdin() -> Self { let stdin = io::stdin(); let fd = stdin .as_fd() .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); Self::new(fd, Box::from(OsStr::new("«stdin»"))) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { // Self { // fd: OwnedFdWithFlags::new_with_fallback(fd), // } //} pub fn fd(&self) -> BorrowedFd<'_> { self.fd.as_fd() } } const ERROR_BUFFER_LEN: usize = 8; /// Private helpers. impl Daemon { fn read_cmd(&mut self, fd: Option<&dyn AsFd>) -> Result<(), IoError> { let fd = match fd.as_ref() { Some(fd) => fd.as_fd(), None => self.fd.as_fd(), }; if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); } let _count = rustix::io::read(fd, spare_capacity(&mut self.cmd_buffer)) .tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?; // The buffer might have existing data from the last read. let deserializer = serde_json::Deserializer::from_slice(&self.cmd_buffer); let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter(); for cmd in stream { let cmd = match cmd { Ok(cmd) => cmd, Err(e) if e.is_eof() => { self.next_timeout = Some(Duration::from_secs(4)); warn!("Didn't get a valid daemon command; giving the other side 4 seconds..."); return Ok(()); }, Err(e) => { warn!("error deserializing command: {e}"); warn!("error count: {}", self.fd_error_buffer.len()); self.cmd_buffer.clear(); // Don't propagate the error unless we have too many. self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { error!("Accumulated too many errors for daemon fd {fd:?}: {e}") })?; return Ok(()); }, }; debug!("got cmd: {cmd:?}"); } self.cmd_buffer.clear(); Ok(()) } pub(crate) fn enter_loop(&mut self) -> Result, IoError> { let raw_fd = self.fd.as_raw_fd(); let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); let mut next_token_number: usize = 1; let mut extra_tokens: Vec<(Token, RawFd)> = Default::default(); let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); poll.registry() .register(&mut daemon_source, DAEMON, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); let mut events = Events::with_capacity(1024); let example = DaemonCmd::Append { //name: ConvenientAttrPath::Dotted(Box::from( // "services.gotosocial.settings.application-name", //)), //name: ConvenientAttrPath::Split(Box::from([ // Box::from("services"), // Box::from("gotosocial"), //])), name: ConvenientAttrPath::clone_from_split(&[ "services", "gotosocial", "settings", "application-name", ]), value: Box::from("foo"), }; //let example_as_json = serde_json::to_string_pretty(&example).unwrap(); //info!("{}", example_as_json); loop { match poll.poll(&mut events, self.next_timeout.take()) { Ok(_) => { if events.is_empty() { warn!("timeout expired"); self.cmd_buffer.clear(); } else { let _ = self.poll_error_buffer.pop_front(); } }, Err(e) => { warn!("mio Poll::poll() error: {e}"); self.poll_error_buffer.try_push_back(e).tap_err(|e| { error!("accumulated too many errors for mio::Poll::poll(): {e}") })?; }, } for event in &events { match event.token() { DAEMON => { let is_sock = rustix::fs::fstat(&self.fd) .map(|Stat { st_mode, .. }| st_mode) .map(FileType::from_raw_mode) .map(FileType::is_socket) .unwrap_or(false); if !is_sock { self.read_cmd(None).unwrap(); continue; } // Accept, first. let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC; let stream_fd = match rustix::net::accept_with(&self.fd, flags) { Ok(stream) => { debug!( "Accepted connection from socket {:?} as stream {:?}", self.fd, stream, ); stream }, Err(e) => { error!("accept4 on daemon socket failed: {e}"); self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { error!( "Accumulated too many errors for daemon fd {:?}: {e}", self.fd ) })?; continue; }, }; let mut stream_fd = stream_fd.into_raw_fd(); // And add this stream to our poll interest list. if let Err(idx) = extra_tokens.binary_search_by_key(&stream_fd, |(_, fd)| fd.as_raw_fd()) { let token = Token(next_token_number); extra_tokens.insert(idx, (token, stream_fd)); next_token_number = next_token_number.wrapping_add(1); let mut source = SourceFd(&mut stream_fd); poll.registry() .register(&mut source, token, Interest::READABLE) .unwrap(); } // Wait for the next poll to handle. //match self.read_cmd(Some(&stream_fd)) { // Ok(()) => (), // Err(e) if e.kind() == IoErrorKind::WouldBlock => { // continue; // }, // Err(e) => { // self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { // error!( // "Accumulated too many errors for fd {:?} {e}", // &stream_fd, // ) // })?; // }, //} }, other_token => { let index = match extra_tokens .binary_search_by_key(&other_token, |&(t, _)| t) { Ok(index) => index, Err(index) => unreachable!( "tried to get index ({index}) for non-existent token {other_token:?}" ), }; // This must be a stream fd. let (_, stream_fd) = extra_tokens[index]; // SAFETY: oh boy. let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; self.read_cmd(Some(&stream_fd)).unwrap(); }, } } } } fn get_fallback_fd_name(fd: Fd) -> Option> { let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string()); fs_err::read_link(dev_fd_path) .map(PathBuf::into_os_string) .map(OsString::into_boxed_os_str) .ok() } } impl Drop for Daemon { fn drop(&mut self) { let probably_synthetic = self .path .to_str() .map(|p| p.starts_with("«") && p.ends_with("»")) .unwrap_or(false); if probably_synthetic { return; } let _ = rustix::fs::unlink(&*self.path); } }