use std::{ io, ops::Deref, os::fd::{AsFd, BorrowedFd, OwnedFd}, time::Duration, }; use circular_buffer::CircularBuffer; use mio::{Events, Interest, Poll, Token, unix::SourceFd}; use rustix::{ buffer::{Buffer, spare_capacity}, fs::{OFlags, fcntl_setfl}, io::Errno, termios::{ ControlModes, InputModes, LocalModes, OptionalActions, OutputModes, SpecialCodeIndex, SpecialCodes, Termios, tcgetattr, tcsetattr, }, }; use serde::{Deserialize, Serialize}; use serde_json::StreamDeserializer; use crate::prelude::*; use crate::OwnedFdWithFlags; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Deserialize, Serialize)] #[serde(untagged)] pub enum ConvenientAttrPath { Dotted(Box), Split(Box<[Box]>), } impl ConvenientAttrPath { pub fn clone_from_dotted(s: &str) -> Self { Self::Dotted(Box::from(s)) } pub fn clone_from_split(s: &[&str]) -> Self { Self::from_str_iter(s.into_iter().map(Deref::deref)) } pub fn from_str_iter<'i, I>(iter: I) -> Self where I: Iterator, { let boxed = iter.map(|s| Box::from(s)); Self::Split(Box::from_iter(boxed)) } } impl<'i> FromIterator<&'i str> for ConvenientAttrPath { fn from_iter(iter: I) -> Self where I: IntoIterator, { Self::from_str_iter(iter.into_iter()) } } impl FromIterator> for ConvenientAttrPath { fn from_iter(iter: I) -> Self where I: IntoIterator>, { Self::Split(Box::from_iter(iter)) } } #[derive(Debug, Clone, PartialEq)] #[derive(serde::Deserialize, serde::Serialize)] #[serde(tag = "action", content = "args", rename_all = "snake_case")] pub enum DaemonCmd { Append { name: ConvenientAttrPath, value: Box, }, } const TIMEOUT_NEVER: Option = None; #[derive(Debug)] pub struct Daemon { fd: OwnedFdWithFlags, poll_error_buffer: CircularBuffer, fd_error_buffer: CircularBuffer, cmd_buffer: Vec, next_timeout: Option, } impl Daemon { /// Panics if we cannot make `fd` non-blocking. pub fn new(fd: OwnedFd) -> Self { let fd = OwnedFdWithFlags::new_with_fallback(fd); // Make non-blocking. //fcntl_setfl(fd.as_fd(), fd.oflags().union(OFlags::NONBLOCK)) // .tap_err(|e| error!("F_SETFL O_NONBLOCK failed on fd {:?}: {e}", fd.as_fd())) // .unwrap(); Self { fd, poll_error_buffer: Default::default(), fd_error_buffer: Default::default(), cmd_buffer: Vec::with_capacity(1024), next_timeout: TIMEOUT_NEVER, } } /// This panics if stdin cannot be opened. /// /// If you want to handle that error, use [`Daemon::from_raw_parts()`]. pub fn from_stdin() -> Self { let stdin = io::stdin(); let fd = stdin .as_fd() .try_clone_to_owned() .expect("dynix daemon could not open stdin; try a Unix socket?"); debug!("opened daemon to stdin file descriptor {fd:?}"); Self::new(fd) } //pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self { // Self { // fd: OwnedFdWithFlags::new_with_fallback(fd), // } //} pub fn fd(&self) -> BorrowedFd<'_> { self.fd.as_fd() } } const ERROR_BUFFER_LEN: usize = 8; /// Private helpers. impl Daemon { fn read_cmd(&mut self) -> Result<(), IoError> { if self.cmd_buffer.len() == self.cmd_buffer.capacity() { self.cmd_buffer.reserve(1024); } let _count = rustix::io::read(&self.fd, spare_capacity(&mut self.cmd_buffer))?; // The buffer might have existing data from the last read. let deserializer = serde_json::Deserializer::from_slice(&self.cmd_buffer); let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter(); for cmd in stream { let cmd = match cmd { Ok(cmd) => cmd, Err(e) if e.is_eof() => { self.next_timeout = Some(Duration::from_secs(4)); warn!("Didn't get a valid daemon command; giving the other side 4 seconds..."); return Ok(()); }, Err(e) => { warn!("error deserializing command: {e}"); warn!("error count: {}", self.fd_error_buffer.len()); self.cmd_buffer.clear(); // Don't propagate the error unless we have too many. self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| { error!( "Accumulated too many errors for daemon fd {:?}: {e}", self.fd(), ) })?; return Ok(()); }, }; debug!("got cmd: {cmd:?}"); } self.cmd_buffer.clear(); Ok(()) } pub(crate) fn enter_loop(&mut self) -> Result, IoError> { let raw_fd = self.fd.as_raw_fd(); let mut daemon_source = SourceFd(&raw_fd); const DAEMON: Token = Token(0); let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}")); poll.registry() .register(&mut daemon_source, DAEMON, Interest::READABLE) .unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}")); let mut events = Events::with_capacity(1024); let example = DaemonCmd::Append { //name: ConvenientAttrPath::Dotted(Box::from( // "services.gotosocial.settings.application-name", //)), //name: ConvenientAttrPath::Split(Box::from([ // Box::from("services"), // Box::from("gotosocial"), //])), name: ConvenientAttrPath::clone_from_split(&[ "services", "gotosocial", "settings", "application-name", ]), value: Box::from("foo"), }; //let example_as_json = serde_json::to_string_pretty(&example).unwrap(); //info!("{}", example_as_json); loop { match poll.poll(&mut events, self.next_timeout.take()) { Ok(_) => { if events.is_empty() { warn!("timeout expired"); self.cmd_buffer.clear(); } else { let _ = self.poll_error_buffer.pop_front(); } }, Err(e) => { warn!("mio Poll::poll() error: {e}"); self.poll_error_buffer.try_push_back(e).tap_err(|e| { error!("accumulated too many errors for mio::Poll::poll(): {e}") })?; }, } for event in &events { match event.token() { DAEMON => { self.read_cmd().unwrap(); }, _ => unreachable!(), } } } } }