some semblance of an event loop
This commit is contained in:
parent
a72a48f92b
commit
74c2eaf66d
8 changed files with 384 additions and 14 deletions
244
src/daemon.rs
Normal file
244
src/daemon.rs
Normal file
|
|
@ -0,0 +1,244 @@
|
|||
use std::{
|
||||
io,
|
||||
ops::Deref,
|
||||
os::fd::{AsFd, BorrowedFd, OwnedFd},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use circular_buffer::CircularBuffer;
|
||||
|
||||
use mio::{Events, Interest, Poll, Token, unix::SourceFd};
|
||||
|
||||
use rustix::{
|
||||
buffer::{Buffer, spare_capacity},
|
||||
fs::{OFlags, fcntl_setfl},
|
||||
io::Errno,
|
||||
termios::{
|
||||
ControlModes, InputModes, LocalModes, OptionalActions, OutputModes, SpecialCodeIndex,
|
||||
SpecialCodes, Termios, tcgetattr, tcsetattr,
|
||||
},
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::StreamDeserializer;
|
||||
|
||||
use crate::prelude::*;
|
||||
|
||||
use crate::OwnedFdWithFlags;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum ConvenientAttrPath {
|
||||
Dotted(Box<str>),
|
||||
Split(Box<[Box<str>]>),
|
||||
}
|
||||
|
||||
impl ConvenientAttrPath {
|
||||
pub fn clone_from_dotted(s: &str) -> Self {
|
||||
Self::Dotted(Box::from(s))
|
||||
}
|
||||
|
||||
pub fn clone_from_split(s: &[&str]) -> Self {
|
||||
Self::from_str_iter(s.into_iter().map(Deref::deref))
|
||||
}
|
||||
|
||||
pub fn from_str_iter<'i, I>(iter: I) -> Self
|
||||
where
|
||||
I: Iterator<Item = &'i str>,
|
||||
{
|
||||
let boxed = iter.map(|s| Box::from(s));
|
||||
Self::Split(Box::from_iter(boxed))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i> FromIterator<&'i str> for ConvenientAttrPath {
|
||||
fn from_iter<I>(iter: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = &'i str>,
|
||||
{
|
||||
Self::from_str_iter(iter.into_iter())
|
||||
}
|
||||
}
|
||||
|
||||
impl FromIterator<Box<str>> for ConvenientAttrPath {
|
||||
fn from_iter<I>(iter: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = Box<str>>,
|
||||
{
|
||||
Self::Split(Box::from_iter(iter))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
#[serde(tag = "action", content = "args", rename_all = "snake_case")]
|
||||
pub enum DaemonCmd {
|
||||
Append {
|
||||
name: ConvenientAttrPath,
|
||||
value: Box<str>,
|
||||
},
|
||||
}
|
||||
|
||||
const TIMEOUT_NEVER: Option<Duration> = None;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Daemon {
|
||||
fd: OwnedFdWithFlags,
|
||||
poll_error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
|
||||
fd_error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
|
||||
|
||||
cmd_buffer: Vec<u8>,
|
||||
next_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
/// Panics if we cannot make `fd` non-blocking.
|
||||
pub fn new(fd: OwnedFd) -> Self {
|
||||
let fd = OwnedFdWithFlags::new_with_fallback(fd);
|
||||
|
||||
// Make non-blocking.
|
||||
//fcntl_setfl(fd.as_fd(), fd.oflags().union(OFlags::NONBLOCK))
|
||||
// .tap_err(|e| error!("F_SETFL O_NONBLOCK failed on fd {:?}: {e}", fd.as_fd()))
|
||||
// .unwrap();
|
||||
|
||||
Self {
|
||||
fd,
|
||||
poll_error_buffer: Default::default(),
|
||||
fd_error_buffer: Default::default(),
|
||||
cmd_buffer: Vec::with_capacity(1024),
|
||||
next_timeout: TIMEOUT_NEVER,
|
||||
}
|
||||
}
|
||||
|
||||
/// This panics if stdin cannot be opened.
|
||||
///
|
||||
/// If you want to handle that error, use [`Daemon::from_raw_parts()`].
|
||||
pub fn from_stdin() -> Self {
|
||||
let stdin = io::stdin();
|
||||
let fd = stdin
|
||||
.as_fd()
|
||||
.try_clone_to_owned()
|
||||
.expect("dynix daemon could not open stdin; try a Unix socket?");
|
||||
|
||||
debug!("opened daemon to stdin file descriptor {fd:?}");
|
||||
|
||||
Self::new(fd)
|
||||
}
|
||||
|
||||
//pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self {
|
||||
// Self {
|
||||
// fd: OwnedFdWithFlags::new_with_fallback(fd),
|
||||
// }
|
||||
//}
|
||||
|
||||
pub fn fd(&self) -> BorrowedFd<'_> {
|
||||
self.fd.as_fd()
|
||||
}
|
||||
}
|
||||
|
||||
const ERROR_BUFFER_LEN: usize = 8;
|
||||
|
||||
/// Private helpers.
|
||||
impl Daemon {
|
||||
fn read_cmd(&mut self) -> Result<(), IoError> {
|
||||
if self.cmd_buffer.len() == self.cmd_buffer.capacity() {
|
||||
self.cmd_buffer.reserve(1024);
|
||||
}
|
||||
|
||||
let _count = rustix::io::read(&self.fd, spare_capacity(&mut self.cmd_buffer))?;
|
||||
|
||||
// The buffer might have existing data from the last read.
|
||||
let deserializer = serde_json::Deserializer::from_slice(&self.cmd_buffer);
|
||||
let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter();
|
||||
for cmd in stream {
|
||||
let cmd = match cmd {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) if e.is_eof() => {
|
||||
self.next_timeout = Some(Duration::from_secs(4));
|
||||
warn!("Didn't get a valid daemon command; giving the other side 4 seconds...");
|
||||
return Ok(());
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("error deserializing command: {e}");
|
||||
warn!("error count: {}", self.fd_error_buffer.len());
|
||||
self.cmd_buffer.clear();
|
||||
// Don't propagate the error unless we have too many.
|
||||
self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| {
|
||||
error!(
|
||||
"Accumulated too many errors for daemon fd {:?}: {e}",
|
||||
self.fd(),
|
||||
)
|
||||
})?;
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
debug!("got cmd: {cmd:?}");
|
||||
}
|
||||
|
||||
self.cmd_buffer.clear();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn enter_loop(&mut self) -> Result<Option<()>, IoError> {
|
||||
let raw_fd = self.fd.as_raw_fd();
|
||||
let mut daemon_source = SourceFd(&raw_fd);
|
||||
const DAEMON: Token = Token(0);
|
||||
let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}"));
|
||||
|
||||
poll.registry()
|
||||
.register(&mut daemon_source, DAEMON, Interest::READABLE)
|
||||
.unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}"));
|
||||
|
||||
let mut events = Events::with_capacity(1024);
|
||||
|
||||
let example = DaemonCmd::Append {
|
||||
//name: ConvenientAttrPath::Dotted(Box::from(
|
||||
// "services.gotosocial.settings.application-name",
|
||||
//)),
|
||||
//name: ConvenientAttrPath::Split(Box::from([
|
||||
// Box::from("services"),
|
||||
// Box::from("gotosocial"),
|
||||
//])),
|
||||
name: ConvenientAttrPath::clone_from_split(&[
|
||||
"services",
|
||||
"gotosocial",
|
||||
"settings",
|
||||
"application-name",
|
||||
]),
|
||||
value: Box::from("foo"),
|
||||
};
|
||||
|
||||
//let example_as_json = serde_json::to_string_pretty(&example).unwrap();
|
||||
//info!("{}", example_as_json);
|
||||
|
||||
loop {
|
||||
match poll.poll(&mut events, self.next_timeout.take()) {
|
||||
Ok(_) => {
|
||||
if events.is_empty() {
|
||||
warn!("timeout expired");
|
||||
self.cmd_buffer.clear();
|
||||
} else {
|
||||
let _ = self.poll_error_buffer.pop_front();
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("mio Poll::poll() error: {e}");
|
||||
self.poll_error_buffer.try_push_back(e).tap_err(|e| {
|
||||
error!("accumulated too many errors for mio::Poll::poll(): {e}")
|
||||
})?;
|
||||
},
|
||||
}
|
||||
|
||||
for event in &events {
|
||||
match event.token() {
|
||||
DAEMON => {
|
||||
self.read_cmd().unwrap();
|
||||
},
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue