Compare commits
No commits in common. "ac53850fd17068a21617db184bda04df75c750b3" and "398fccc5d0971e99c8e2fffdcafde0cb016b7c9a" have entirely different histories.
ac53850fd1
...
398fccc5d0
5 changed files with 158 additions and 560 deletions
49
Cargo.lock
generated
49
Cargo.lock
generated
|
|
@ -11,12 +11,6 @@ dependencies = [
|
|||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "allocator-api2"
|
||||
version = "0.2.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
|
||||
|
||||
[[package]]
|
||||
name = "anstream"
|
||||
version = "0.6.21"
|
||||
|
|
@ -73,12 +67,6 @@ version = "1.5.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
|
||||
|
||||
[[package]]
|
||||
name = "bimap"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "2.11.0"
|
||||
|
|
@ -199,7 +187,6 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
|
|||
name = "dynix"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bimap",
|
||||
"bitflags",
|
||||
"bstr",
|
||||
"circular-buffer",
|
||||
|
|
@ -208,8 +195,6 @@ dependencies = [
|
|||
"const-str",
|
||||
"displaydoc",
|
||||
"fs-err",
|
||||
"humantime",
|
||||
"iddqd",
|
||||
"itertools",
|
||||
"libc",
|
||||
"mio",
|
||||
|
|
@ -247,12 +232,6 @@ dependencies = [
|
|||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "foldhash"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
|
||||
|
||||
[[package]]
|
||||
name = "fs-err"
|
||||
version = "3.3.0"
|
||||
|
|
@ -267,9 +246,6 @@ name = "hashbrown"
|
|||
version = "0.16.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
|
||||
dependencies = [
|
||||
"allocator-api2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
|
|
@ -283,25 +259,6 @@ version = "0.5.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
|
||||
|
||||
[[package]]
|
||||
name = "iddqd"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6b215e67ed1d1a4b1702acd787c487d16e4c977c5dcbcc4587bdb5ea26b6ce06"
|
||||
dependencies = [
|
||||
"allocator-api2",
|
||||
"equivalent",
|
||||
"foldhash",
|
||||
"hashbrown",
|
||||
"rustc-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.13.0"
|
||||
|
|
@ -556,12 +513,6 @@ version = "0.8.10"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "1.1.4"
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ regex-full = ["dep:regex"]
|
|||
regex-lite = ["dep:regex-lite"]
|
||||
|
||||
[dependencies]
|
||||
bimap = "0.6.3"
|
||||
bitflags = { version = "2.11.0", features = ["std"] }
|
||||
bstr = "1.12.1"
|
||||
circular-buffer = "1.2.0"
|
||||
|
|
@ -31,8 +30,6 @@ command-error = "0.8.0"
|
|||
const-str = "1.1.0"
|
||||
displaydoc = "0.2.5"
|
||||
fs-err = "3.2.2"
|
||||
humantime = "2.3.0"
|
||||
iddqd = "0.3.17"
|
||||
itertools = "0.14.0"
|
||||
libc = { version = "0.2.180", features = ["extra_traits"] }
|
||||
#macro_rules_attribute = { version = "0.2.2", features = ["better-docs", "verbose-expansions"] }
|
||||
|
|
|
|||
471
src/daemon.rs
471
src/daemon.rs
|
|
@ -1,30 +1,33 @@
|
|||
use std::{
|
||||
env, io, mem,
|
||||
env, io,
|
||||
ops::Deref,
|
||||
os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd},
|
||||
sync::{
|
||||
Arc, LazyLock,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
os::fd::{AsFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},
|
||||
sync::LazyLock,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use iddqd::{BiHashMap, IdOrdMap};
|
||||
use circular_buffer::CircularBuffer;
|
||||
|
||||
use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd};
|
||||
|
||||
use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid};
|
||||
use rustix::{
|
||||
buffer::{Buffer, spare_capacity},
|
||||
fs::{FileType, OFlags, Stat, fcntl_setfl},
|
||||
io::Errno,
|
||||
net::SocketFlags,
|
||||
process::Uid,
|
||||
termios::{
|
||||
ControlModes, InputModes, LocalModes, OptionalActions, OutputModes, SpecialCodeIndex,
|
||||
SpecialCodes, Termios, tcgetattr, tcsetattr,
|
||||
},
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::StreamDeserializer;
|
||||
|
||||
use crate::{
|
||||
SourceFile, SourceLine,
|
||||
daemon_tokfd::{FdInfo, FdKind},
|
||||
prelude::*,
|
||||
};
|
||||
use crate::prelude::*;
|
||||
|
||||
use crate::{OwnedFdWithFlags, TokenFd};
|
||||
use crate::OwnedFdWithFlags;
|
||||
|
||||
pub static UID: LazyLock<Uid> = LazyLock::new(|| rustix::process::getuid());
|
||||
|
||||
|
|
@ -67,17 +70,6 @@ impl ConvenientAttrPath {
|
|||
let boxed = iter.map(|s| Box::from(s));
|
||||
Self::Split(Box::from_iter(boxed))
|
||||
}
|
||||
|
||||
pub fn to_nix_decl(&self) -> Box<str> {
|
||||
use ConvenientAttrPath::*;
|
||||
match self {
|
||||
Dotted(s) => s.clone(),
|
||||
Split(path) => {
|
||||
// FIXME: quote if necessary
|
||||
path.join(".").into_boxed_str()
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i> FromIterator<&'i str> for ConvenientAttrPath {
|
||||
|
|
@ -101,7 +93,6 @@ impl FromIterator<Box<str>> for ConvenientAttrPath {
|
|||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
#[serde(tag = "action", content = "args", rename_all = "snake_case")]
|
||||
// FIXME: rename to not confuse with the clap argument type.
|
||||
pub enum DaemonCmd {
|
||||
Append {
|
||||
name: ConvenientAttrPath,
|
||||
|
|
@ -111,189 +102,54 @@ pub enum DaemonCmd {
|
|||
|
||||
const TIMEOUT_NEVER: Option<Duration> = None;
|
||||
|
||||
static NEXT_TOKEN_NUMBER: AtomicUsize = AtomicUsize::new(1);
|
||||
fn next_token() -> Token {
|
||||
let tok = NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
// If the increment wrapped to 0, then we just increment it again.
|
||||
if tok == 0 {
|
||||
return Token(NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst));
|
||||
}
|
||||
|
||||
Token(tok)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Daemon {
|
||||
config_path: Arc<Path>,
|
||||
fd: OwnedFdWithFlags,
|
||||
path: Option<Box<Path>>,
|
||||
|
||||
poller: Poll,
|
||||
|
||||
fd_info: IdOrdMap<FdInfo>,
|
||||
|
||||
// Bijective mapping of [`mio::Token`]s to [`RawFd`]s.
|
||||
tokfd: BiHashMap<TokenFd>,
|
||||
path: Box<OsStr>,
|
||||
poll_error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
|
||||
fd_error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
|
||||
|
||||
cmd_buffer: Vec<u8>,
|
||||
next_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
/// `tokfd` handling.
|
||||
impl Daemon {
|
||||
fn fd_error_pop(&mut self, fd: RawFd) -> Option<IoError> {
|
||||
let mut info = self.fd_info.get_mut(&fd).unwrap_or_else(|| {
|
||||
if let Ok(name) = FdInfo::guess_name(fd) {
|
||||
panic!(
|
||||
"tried to pop error for unknown fd {fd} ({})",
|
||||
name.to_string_lossy(),
|
||||
);
|
||||
}
|
||||
panic!("tried to pop error for unknown fd {fd}")
|
||||
});
|
||||
info.error_buffer.pop_front().tap_some(|e| {
|
||||
trace!("Popping error for {}: {e}", info.display());
|
||||
})
|
||||
}
|
||||
|
||||
fn fd_error_push(&mut self, fd: RawFd, error: IoError) -> Result<(), IoError> {
|
||||
let mut info = self
|
||||
.fd_info
|
||||
.get_mut(&fd)
|
||||
.unwrap_or_else(|| panic!("tried to push error for unknown fd {fd}"));
|
||||
trace!("Pushing error for {}: {}", info.display(), error);
|
||||
info.error_buffer.try_push_back(error)
|
||||
}
|
||||
|
||||
fn main_fd_info(&self) -> &FdInfo {
|
||||
self.fd_info.get(&self.fd.as_raw_fd()).unwrap_or_else(|| {
|
||||
unreachable!(
|
||||
"Main daemon fd {:?} was not registered with fd_info",
|
||||
self.fd,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn register(&mut self, fd: RawFd, kind: FdKind) -> Token {
|
||||
let token = next_token();
|
||||
|
||||
debug!(
|
||||
"Registering new {} FdInfo for {fd} with token {token:?}",
|
||||
kind.name_str(),
|
||||
);
|
||||
|
||||
self.fd_info.insert_unique(FdInfo::new(fd, kind)).unwrap();
|
||||
|
||||
self.tokfd
|
||||
.insert_unique(TokenFd { token, fd })
|
||||
.unwrap_or_else(|e| todo!("{e}"));
|
||||
|
||||
let mut source = SourceFd(&fd);
|
||||
self.poller
|
||||
.registry()
|
||||
.register(&mut source, token, Interest::READABLE)
|
||||
.unwrap_or_else(|e| unreachable!("registering {fd:?} with poller failed: {e}"));
|
||||
|
||||
token
|
||||
}
|
||||
|
||||
fn deregister(&mut self, fd: RawFd) {
|
||||
let info = self
|
||||
.fd_info
|
||||
.remove(&fd)
|
||||
.unwrap_or_else(|| unreachable!("tried to unregister unknown fd {fd}"));
|
||||
debug!("Unregistering fd {}; calling close()", info.display());
|
||||
unsafe { rustix::io::close(fd) };
|
||||
let res = unsafe { libc::fcntl(fd, libc::F_GETFD, 0) };
|
||||
debug_assert_eq!(res, -1);
|
||||
debug_assert_eq!(
|
||||
IoError::last_os_error().raw_os_error(),
|
||||
Some(Errno::BADF.raw_os_error()),
|
||||
);
|
||||
|
||||
self.tokfd.remove2(&fd).unwrap_or_else(|| todo!());
|
||||
}
|
||||
|
||||
fn fd_for_token(&self, token: Token) -> Option<RawFd> {
|
||||
self.tokfd
|
||||
.get1(&token)
|
||||
.map(|TokenFd { fd, .. }| fd)
|
||||
.copied()
|
||||
}
|
||||
|
||||
fn token_for_fd(&self, fd: RawFd) -> Option<Token> {
|
||||
self.tokfd
|
||||
.get2(&fd)
|
||||
.map(|TokenFd { token, .. }| token)
|
||||
.copied()
|
||||
}
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
pub fn new(
|
||||
config_path: Arc<Path>,
|
||||
fd: OwnedFd,
|
||||
kind: FdKind,
|
||||
name: Option<Box<OsStr>>,
|
||||
) -> Self {
|
||||
let mut fd_info: IdOrdMap<FdInfo> = Default::default();
|
||||
|
||||
// Supposedly, the only possible ways this can fail are EMFILE, ENFILE, and ENOMEM.
|
||||
// If any of those are the case, we're screwed anyway.
|
||||
let poller = Poll::new().unwrap_or_else(|e| panic!("can't create new mio::Poll: {e}"));
|
||||
// Make sure we register the poller in `fd_info`, so we can keep track of its errors.
|
||||
fd_info
|
||||
.insert_unique(FdInfo::new(poller.as_raw_fd(), FdKind::Poller))
|
||||
.unwrap_or_else(|e| unreachable!("{e}"));
|
||||
|
||||
pub fn new(fd: OwnedFd, name_or_path: Box<OsStr>) -> Self {
|
||||
let fd = OwnedFdWithFlags::new_with_fallback(fd);
|
||||
|
||||
fd_info
|
||||
.insert_unique(FdInfo::new(fd.as_raw_fd(), kind))
|
||||
.unwrap_or_else(|e| unreachable!("{e}"));
|
||||
|
||||
debug!("opened daemon to {:?} file descriptor {fd:?}", name);
|
||||
|
||||
let path = match &name {
|
||||
Some(name) => Some(PathBuf::from(name).into_boxed_path()),
|
||||
None => None,
|
||||
};
|
||||
debug!(
|
||||
"opened daemon to {} file descriptor {fd:?}",
|
||||
name_or_path.display(),
|
||||
);
|
||||
|
||||
Self {
|
||||
config_path,
|
||||
fd,
|
||||
path,
|
||||
poller,
|
||||
fd_info,
|
||||
tokfd: Default::default(),
|
||||
path: name_or_path,
|
||||
poll_error_buffer: Default::default(),
|
||||
fd_error_buffer: Default::default(),
|
||||
cmd_buffer: Vec::with_capacity(1024),
|
||||
next_timeout: TIMEOUT_NEVER,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_unix_socket_path(config_path: Arc<Path>, path: &Path) -> Result<Self, IoError> {
|
||||
pub fn from_unix_socket_path(path: &Path) -> Result<Self, IoError> {
|
||||
// We unconditionally unlink() `path` before binding, but completely ignore the result.
|
||||
let _ = rustix::fs::unlink(path);
|
||||
let listener = UnixListener::bind(path)
|
||||
.tap_err(|e| error!("failed to bind AF_UNIX socket at {}: {e}", path.display()))?;
|
||||
//let (stream, _addr) = listener.accept().unwrap_or_else(|e| todo!("error: {e}"));
|
||||
//warn!("stream is: {stream:?} ({})", stream.as_raw_fd());
|
||||
let listener_owned_fd = OwnedFd::from(listener);
|
||||
// FIXME: should we KEEP_ALIVE?
|
||||
rustix::net::sockopt::set_socket_keepalive(&listener_owned_fd, true).unwrap();
|
||||
let path: Box<OsStr> = path.to_path_buf().into_boxed_path().into_boxed_os_str();
|
||||
|
||||
Ok(Self::new(
|
||||
config_path,
|
||||
listener_owned_fd,
|
||||
FdKind::Socket,
|
||||
Some(path),
|
||||
))
|
||||
Ok(Self::new(listener_owned_fd, path))
|
||||
}
|
||||
|
||||
pub fn open_default_socket(config_path: Arc<Path>) -> Result<Self, IoError> {
|
||||
pub fn open_default_socket() -> Result<Self, IoError> {
|
||||
use IoErrorKind::*;
|
||||
let preferred = USER_SOCKET_DIR.join("dynix.sock").into_boxed_path();
|
||||
let constructed = match Self::from_unix_socket_path(config_path.clone(), &preferred) {
|
||||
let constructed = match Self::from_unix_socket_path(&preferred) {
|
||||
Ok(v) => v,
|
||||
Err(e) if e.kind() == Unsupported => {
|
||||
//
|
||||
|
|
@ -302,14 +158,14 @@ impl Daemon {
|
|||
Err(e) => {
|
||||
warn!(
|
||||
"failed binding AF_UNIX socket at {}: {e}; trying elsewhere",
|
||||
preferred.display(),
|
||||
preferred.display()
|
||||
);
|
||||
|
||||
let fallback = TMPDIR.join("dynix.sock").into_boxed_path();
|
||||
Self::from_unix_socket_path(config_path, &fallback).tap_err(|e| {
|
||||
Self::from_unix_socket_path(&fallback).tap_err(|e| {
|
||||
error!(
|
||||
"failed binding AF_UNIX socket at {}: {e}",
|
||||
fallback.display(),
|
||||
fallback.display()
|
||||
)
|
||||
})?
|
||||
},
|
||||
|
|
@ -321,14 +177,14 @@ impl Daemon {
|
|||
/// This panics if stdin cannot be opened.
|
||||
///
|
||||
/// If you want to handle that error, use [`Daemon::from_raw_parts()`].
|
||||
pub fn from_stdin(config_path: Arc<Path>) -> Self {
|
||||
pub fn from_stdin() -> Self {
|
||||
let stdin = io::stdin();
|
||||
let fd = stdin
|
||||
.as_fd()
|
||||
.try_clone_to_owned()
|
||||
.expect("dynix daemon could not open stdin; try a Unix socket?");
|
||||
|
||||
Self::new(config_path, fd, FdKind::File, None)
|
||||
Self::new(fd, Box::from(OsStr::new("«stdin»")))
|
||||
}
|
||||
|
||||
//pub unsafe fn from_raw_parts(fd: OwnedFd) -> Self {
|
||||
|
|
@ -342,9 +198,16 @@ impl Daemon {
|
|||
}
|
||||
}
|
||||
|
||||
const ERROR_BUFFER_LEN: usize = 8;
|
||||
|
||||
/// Private helpers.
|
||||
impl Daemon {
|
||||
fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> {
|
||||
fn read_cmd(&mut self, fd: Option<&dyn AsFd>) -> Result<(), IoError> {
|
||||
let fd = match fd.as_ref() {
|
||||
Some(fd) => fd.as_fd(),
|
||||
None => self.fd.as_fd(),
|
||||
};
|
||||
|
||||
if self.cmd_buffer.len() == self.cmd_buffer.capacity() {
|
||||
self.cmd_buffer.reserve(1024);
|
||||
}
|
||||
|
|
@ -352,11 +215,8 @@ impl Daemon {
|
|||
let _count = rustix::io::read(fd, spare_capacity(&mut self.cmd_buffer))
|
||||
.tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?;
|
||||
|
||||
// So that the loop doesn't borrow from `self`.
|
||||
let mut cmd_buffer = mem::take(&mut self.cmd_buffer);
|
||||
|
||||
// The buffer might have existing data from the last read.
|
||||
let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer);
|
||||
let deserializer = serde_json::Deserializer::from_slice(&self.cmd_buffer);
|
||||
let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter();
|
||||
for cmd in stream {
|
||||
let cmd = match cmd {
|
||||
|
|
@ -364,156 +224,90 @@ impl Daemon {
|
|||
Err(e) if e.is_eof() => {
|
||||
self.next_timeout = Some(Duration::from_secs(4));
|
||||
warn!("Didn't get a valid daemon command; giving the other side 4 seconds...");
|
||||
let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer);
|
||||
return Ok(());
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("error deserializing command: {e}");
|
||||
debug!("command buffer was: {:?}", cmd_buffer.as_bstr());
|
||||
cmd_buffer.clear();
|
||||
let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer);
|
||||
warn!("error count: {}", self.fd_error_buffer.len());
|
||||
self.cmd_buffer.clear();
|
||||
// Don't propagate the error unless we have too many.
|
||||
self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| {
|
||||
self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| {
|
||||
error!("Accumulated too many errors for daemon fd {fd:?}: {e}")
|
||||
})?;
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
debug!("got cmd: {cmd:?}");
|
||||
let _ = rustix::io::write(fd, b"");
|
||||
info!("dispatching command");
|
||||
self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}"));
|
||||
}
|
||||
|
||||
cmd_buffer.clear();
|
||||
let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dispatch_cmd(&mut self, cmd: DaemonCmd) -> Result<(), IoError> {
|
||||
let (name, value) = match cmd {
|
||||
DaemonCmd::Append { name, value } => (name, value),
|
||||
};
|
||||
let mut opts = File::options();
|
||||
opts.read(true)
|
||||
.write(true)
|
||||
.create(false)
|
||||
.custom_flags(libc::O_CLOEXEC);
|
||||
let source_file = SourceFile::open_from(self.config_path.clone(), opts)?;
|
||||
let pri = crate::get_where(source_file.clone()).unwrap_or_else(|e| todo!("{e}"));
|
||||
let new_pri = pri - 1;
|
||||
//let new_pri_line =
|
||||
// crate::get_next_prio_line(source_file.clone(), Arc::from(name), Arc::from(value));
|
||||
// Get next priority line.
|
||||
let source_lines = source_file.lines()?;
|
||||
let penultimate = source_lines.get(source_lines.len() - 2);
|
||||
// FIXME: don't rely on whitespace lol
|
||||
debug_assert_eq!(penultimate.map(SourceLine::text).as_deref(), Some(" ];"));
|
||||
let penultimate = penultimate.unwrap();
|
||||
let new_generation = 0 - new_pri;
|
||||
let new_line = SourceLine {
|
||||
line: penultimate.line,
|
||||
path: source_file.path(),
|
||||
text: Arc::from(format!(
|
||||
" {} = lib.mkOverride ({}) ({}); # DYNIX GENERATION {}",
|
||||
name.to_nix_decl(),
|
||||
new_pri,
|
||||
value,
|
||||
new_generation,
|
||||
)),
|
||||
};
|
||||
|
||||
drop(source_lines);
|
||||
|
||||
crate::write_next_prio(source_file, new_line).unwrap_or_else(|e| todo!("{e}"));
|
||||
self.cmd_buffer.clear();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn enter_loop(&mut self) -> Result<Option<()>, IoError> {
|
||||
let raw_fd = self.fd.as_raw_fd();
|
||||
if cfg!(debug_assertions) {
|
||||
assert!(
|
||||
self.fd_info.contains_key(&raw_fd),
|
||||
"we should know about daemon fd {raw_fd}",
|
||||
);
|
||||
assert!(
|
||||
self.fd_info.contains_key(&self.poller.as_raw_fd()),
|
||||
"we should know about poller fd {}",
|
||||
self.poller.as_raw_fd(),
|
||||
);
|
||||
}
|
||||
let mut daemon_source = SourceFd(&raw_fd);
|
||||
const DAEMON: Token = Token(0);
|
||||
self.tokfd
|
||||
.insert_unique(TokenFd {
|
||||
token: DAEMON,
|
||||
fd: raw_fd,
|
||||
})
|
||||
.unwrap();
|
||||
let mut next_token_number: usize = 1;
|
||||
let mut extra_tokens: Vec<(Token, RawFd)> = Default::default();
|
||||
let mut poll = Poll::new().unwrap_or_else(|e| unreachable!("creating new mio Poll: {e}"));
|
||||
|
||||
self.poller
|
||||
.registry()
|
||||
poll.registry()
|
||||
.register(&mut daemon_source, DAEMON, Interest::READABLE)
|
||||
.unwrap_or_else(|e| unreachable!("registering mio Poll for daemon fd {raw_fd:?}: {e}"));
|
||||
|
||||
let mut events = Events::with_capacity(1024);
|
||||
|
||||
let example = DaemonCmd::Append {
|
||||
//name: ConvenientAttrPath::Dotted(Box::from(
|
||||
// "services.gotosocial.settings.application-name",
|
||||
//)),
|
||||
//name: ConvenientAttrPath::Split(Box::from([
|
||||
// Box::from("services"),
|
||||
// Box::from("gotosocial"),
|
||||
//])),
|
||||
name: ConvenientAttrPath::clone_from_split(&[
|
||||
"services",
|
||||
"gotosocial",
|
||||
"settings",
|
||||
"application-name",
|
||||
]),
|
||||
value: Box::from("foo"),
|
||||
};
|
||||
|
||||
//let example_as_json = serde_json::to_string_pretty(&example).unwrap();
|
||||
//info!("{}", example_as_json);
|
||||
|
||||
loop {
|
||||
if tracing::enabled!(tracing::Level::DEBUG) {
|
||||
//
|
||||
trace!("Daemon loop iteration, with file descriptors: ");
|
||||
for info in &self.fd_info {
|
||||
trace!("- {}", info.display());
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(timeout) = self.next_timeout {
|
||||
debug!(
|
||||
"epoll_wait() with a timeout: {}",
|
||||
humantime::format_duration(timeout),
|
||||
);
|
||||
}
|
||||
|
||||
match self.poller.poll(&mut events, self.next_timeout.take()) {
|
||||
match poll.poll(&mut events, self.next_timeout.take()) {
|
||||
Ok(_) => {
|
||||
trace!(
|
||||
"mio::Poller::poll() got events: {:?}",
|
||||
events.iter().size_hint().0,
|
||||
);
|
||||
if events.is_empty() {
|
||||
warn!("timeout expired");
|
||||
self.cmd_buffer.clear();
|
||||
} else {
|
||||
let _ = self.fd_error_pop(self.poller.as_raw_fd());
|
||||
let _ = self.poll_error_buffer.pop_front();
|
||||
}
|
||||
},
|
||||
Err(e) if e.kind() == IoErrorKind::Interrupted => {
|
||||
// EINTR is silly.
|
||||
continue;
|
||||
},
|
||||
Err(e) => {
|
||||
if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) {
|
||||
unreachable!("EBADF on poller fd; IO safety violation?");
|
||||
}
|
||||
warn!("mio Poll::poll() error: {e}");
|
||||
self.fd_error_push(self.poller.as_raw_fd(), e)
|
||||
.tap_err(|e| {
|
||||
error!("accumulated too many errors for mio::Poll::poll(): {e}")
|
||||
})?;
|
||||
self.poll_error_buffer.try_push_back(e).tap_err(|e| {
|
||||
error!("accumulated too many errors for mio::Poll::poll(): {e}")
|
||||
})?;
|
||||
},
|
||||
}
|
||||
|
||||
for event in &events {
|
||||
trace!("event is {event:?}");
|
||||
match event.token() {
|
||||
DAEMON => {
|
||||
let is_sock = self.main_fd_info().kind == FdKind::Socket;
|
||||
let is_sock = rustix::fs::fstat(&self.fd)
|
||||
.map(|Stat { st_mode, .. }| st_mode)
|
||||
.map(FileType::from_raw_mode)
|
||||
.map(FileType::is_socket)
|
||||
.unwrap_or(false);
|
||||
|
||||
if !is_sock {
|
||||
// SAFETY: oh boy: disjoint borrows with extra steps.
|
||||
let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) };
|
||||
self.read_cmd(&file_fd).unwrap();
|
||||
self.read_cmd(None).unwrap();
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -529,48 +323,89 @@ impl Daemon {
|
|||
},
|
||||
Err(e) => {
|
||||
error!("accept4 on daemon socket failed: {e}");
|
||||
self.fd_error_push(self.fd.as_raw_fd(), e.into())
|
||||
.tap_err(|e| {
|
||||
error!(
|
||||
"Accumulated too many errors for daemon fd {:?}: {e}",
|
||||
self.fd
|
||||
)
|
||||
})?;
|
||||
self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| {
|
||||
error!(
|
||||
"Accumulated too many errors for daemon fd {:?}: {e}",
|
||||
self.fd
|
||||
)
|
||||
})?;
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
||||
// Add this stream to our poll interest list.
|
||||
// NOTE: `stream_fd` is now effectively `ManuallyDrop`.
|
||||
let stream_fd = stream_fd.into_raw_fd();
|
||||
let _token = self.register(stream_fd, FdKind::SockStream);
|
||||
let mut stream_fd = stream_fd.into_raw_fd();
|
||||
|
||||
// And add this stream to our poll interest list.
|
||||
if let Err(idx) =
|
||||
extra_tokens.binary_search_by_key(&stream_fd, |(_, fd)| fd.as_raw_fd())
|
||||
{
|
||||
let token = Token(next_token_number);
|
||||
extra_tokens.insert(idx, (token, stream_fd));
|
||||
next_token_number = next_token_number.wrapping_add(1);
|
||||
let mut source = SourceFd(&mut stream_fd);
|
||||
poll.registry()
|
||||
.register(&mut source, token, Interest::READABLE)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Wait for the next poll to handle.
|
||||
|
||||
//match self.read_cmd(Some(&stream_fd)) {
|
||||
// Ok(()) => (),
|
||||
// Err(e) if e.kind() == IoErrorKind::WouldBlock => {
|
||||
// continue;
|
||||
// },
|
||||
// Err(e) => {
|
||||
// self.fd_error_buffer.try_push_back(e.into()).tap_err(|e| {
|
||||
// error!(
|
||||
// "Accumulated too many errors for fd {:?} {e}",
|
||||
// &stream_fd,
|
||||
// )
|
||||
// })?;
|
||||
// },
|
||||
//}
|
||||
},
|
||||
other_token => {
|
||||
// This must be a stream fd.
|
||||
let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| {
|
||||
unreachable!("tried to get fd for no-existent token? {other_token:?}")
|
||||
});
|
||||
let index = match extra_tokens
|
||||
.binary_search_by_key(&other_token, |&(t, _)| t)
|
||||
{
|
||||
Ok(index) => index,
|
||||
Err(index) => unreachable!(
|
||||
"tried to get index ({index}) for non-existent token {other_token:?}"
|
||||
),
|
||||
};
|
||||
|
||||
if event.is_read_closed() {
|
||||
self.deregister(stream_fd);
|
||||
} else {
|
||||
// SAFETY: oh boy.
|
||||
let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) };
|
||||
self.read_cmd(&stream_fd).unwrap();
|
||||
}
|
||||
// This must be a stream fd.
|
||||
let (_, stream_fd) = extra_tokens[index];
|
||||
// SAFETY: oh boy.
|
||||
let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) };
|
||||
self.read_cmd(Some(&stream_fd)).unwrap();
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_fallback_fd_name<Fd: AsRawFd>(fd: Fd) -> Option<Box<OsStr>> {
|
||||
let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string());
|
||||
|
||||
fs_err::read_link(dev_fd_path)
|
||||
.map(PathBuf::into_os_string)
|
||||
.map(OsString::into_boxed_os_str)
|
||||
.ok()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Daemon {
|
||||
fn drop(&mut self) {
|
||||
if let Some(path) = self.path.as_deref() {
|
||||
let _ = rustix::fs::unlink(path);
|
||||
let probably_synthetic = self
|
||||
.path
|
||||
.to_str()
|
||||
.map(|p| p.starts_with("«") && p.ends_with("»"))
|
||||
.unwrap_or(false);
|
||||
if probably_synthetic {
|
||||
return;
|
||||
}
|
||||
let _ = rustix::fs::unlink(&*self.path);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,169 +0,0 @@
|
|||
use std::{os::fd::RawFd, sync::OnceLock};
|
||||
|
||||
use circular_buffer::CircularBuffer;
|
||||
use iddqd::{BiHashItem, IdOrdItem};
|
||||
use mio::Token;
|
||||
|
||||
use crate::prelude::*;
|
||||
|
||||
const ERROR_BUFFER_LEN: usize = 8;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FdInfo {
|
||||
pub fd: RawFd,
|
||||
pub kind: FdKind,
|
||||
pub name: OnceLock<Box<OsStr>>,
|
||||
pub error_buffer: CircularBuffer<ERROR_BUFFER_LEN, IoError>,
|
||||
}
|
||||
|
||||
impl FdInfo {
|
||||
pub fn new<Fd: AsRawFd>(fd: Fd, kind: FdKind) -> Self {
|
||||
Self {
|
||||
fd: fd.as_raw_fd(),
|
||||
kind,
|
||||
name: Default::default(),
|
||||
error_buffer: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_name<Fd: AsRawFd>(fd: Fd, kind: FdKind, name: Box<OsStr>) -> Self {
|
||||
Self {
|
||||
fd: fd.as_raw_fd(),
|
||||
kind,
|
||||
name: OnceLock::from(name),
|
||||
error_buffer: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FdInfo {
|
||||
pub(crate) fn guess_name<Fd: AsRawFd>(fd: Fd) -> Result<Box<OsStr>, IoError> {
|
||||
let dev_fd_path = Path::new("/dev/fd").join(fd.as_raw_fd().to_string());
|
||||
|
||||
fs_err::read_link(dev_fd_path)
|
||||
.map(PathBuf::into_os_string)
|
||||
.map(OsString::into_boxed_os_str)
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &OsStr {
|
||||
if let Some(name) = self.name.get() {
|
||||
return name;
|
||||
}
|
||||
|
||||
match Self::guess_name(self.fd) {
|
||||
Ok(name) => {
|
||||
let prev = self.name.set(Box::from(name));
|
||||
debug_assert_eq!(prev, Ok(()));
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"can't read link for {} /dev/fd/{}: {e}",
|
||||
self.kind.name_str(),
|
||||
self.fd,
|
||||
);
|
||||
return OsStr::new("«unknown»");
|
||||
},
|
||||
}
|
||||
|
||||
self.name.get().unwrap_or_else(|| unreachable!())
|
||||
}
|
||||
|
||||
pub fn display(&self) -> FdInfoDisplay<'_> {
|
||||
FdInfoDisplay { inner: self }
|
||||
}
|
||||
}
|
||||
|
||||
impl IdOrdItem for FdInfo {
|
||||
type Key<'a> = &'a RawFd;
|
||||
|
||||
fn key(&self) -> &RawFd {
|
||||
&self.fd
|
||||
}
|
||||
|
||||
iddqd::id_upcast!();
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FdInfoDisplay<'a> {
|
||||
inner: &'a FdInfo,
|
||||
}
|
||||
impl<'a> Display for FdInfoDisplay<'a> {
|
||||
fn fmt(&self, f: &mut Formatter) -> FmtResult {
|
||||
write!(
|
||||
f,
|
||||
"{} fd {} ({})",
|
||||
self.inner.kind.name_str(),
|
||||
self.inner.fd,
|
||||
self.inner.name().to_string_lossy(),
|
||||
)?;
|
||||
if !self.inner.error_buffer.is_empty() {
|
||||
write!(f, "; with errors: {}", self.inner.error_buffer.len())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
#[non_exhaustive]
|
||||
pub enum FdKind {
|
||||
File,
|
||||
Socket,
|
||||
SockStream,
|
||||
Poller,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl FdKind {
|
||||
pub fn name_str(self) -> &'static str {
|
||||
use FdKind::*;
|
||||
match self {
|
||||
File => "file",
|
||||
Socket => "socket",
|
||||
SockStream => "socket stream",
|
||||
Poller => "poller",
|
||||
Unknown => "«unknown»",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FdKind {
|
||||
fn default() -> FdKind {
|
||||
FdKind::Unknown
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct TokenFd {
|
||||
pub token: Token,
|
||||
pub fd: RawFd,
|
||||
}
|
||||
|
||||
impl BiHashItem for TokenFd {
|
||||
type K1<'a> = Token;
|
||||
type K2<'a> = RawFd;
|
||||
|
||||
fn key1(&self) -> Token {
|
||||
self.token
|
||||
}
|
||||
|
||||
fn key2(&self) -> RawFd {
|
||||
self.fd
|
||||
}
|
||||
|
||||
iddqd::bi_upcast!();
|
||||
}
|
||||
|
||||
impl From<TokenFd> for (Token, RawFd) {
|
||||
fn from(TokenFd { token, fd }: TokenFd) -> (Token, RawFd) {
|
||||
(token, fd)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(Token, RawFd)> for TokenFd {
|
||||
fn from((token, fd): (Token, RawFd)) -> TokenFd {
|
||||
TokenFd { token, fd }
|
||||
}
|
||||
}
|
||||
26
src/lib.rs
26
src/lib.rs
|
|
@ -35,11 +35,7 @@ pub(crate) mod prelude {
|
|||
|
||||
pub use bstr::ByteSlice;
|
||||
|
||||
pub use itertools::Itertools;
|
||||
|
||||
pub use rustix::io::Errno;
|
||||
|
||||
pub use tap::{Pipe, Tap, TapFallible, TapOptional};
|
||||
pub use tap::{Pipe, Tap, TapFallible};
|
||||
|
||||
pub use tracing::{Level, debug, error, info, trace, warn};
|
||||
|
||||
|
|
@ -57,8 +53,6 @@ mod daemon;
|
|||
pub use daemon::Daemon;
|
||||
mod daemon_io;
|
||||
pub use daemon_io::OwnedFdWithFlags;
|
||||
mod daemon_tokfd;
|
||||
pub(crate) use daemon_tokfd::TokenFd;
|
||||
pub mod line;
|
||||
pub use line::Line;
|
||||
mod nixcmd;
|
||||
|
|
@ -135,27 +129,17 @@ pub fn do_append(args: Arc<Args>, append_args: AppendCmd) -> Result<(), BoxDynEr
|
|||
}
|
||||
|
||||
//#[tracing::instrument(level = "debug")]
|
||||
pub fn do_daemon(args: Arc<Args>, daemon_args: DaemonCmd) -> Result<(), BoxDynError> {
|
||||
let config_file = Path::new(&args.file);
|
||||
let config_file: PathBuf = if config_file.is_relative() && !config_file.starts_with("./") {
|
||||
iter::once(OsStr::new("./"))
|
||||
.chain(config_file.iter())
|
||||
.collect()
|
||||
} else {
|
||||
config_file.to_path_buf()
|
||||
};
|
||||
let config_file: Arc<Path> = Arc::from(config_file);
|
||||
|
||||
pub fn do_daemon(_args: Arc<Args>, daemon_args: DaemonCmd) -> Result<(), BoxDynError> {
|
||||
// FIXME: make configurable?
|
||||
let _ = rustix::process::umask(Mode::from_bits_retain(0o600).complement());
|
||||
|
||||
let mut daemon = match daemon_args {
|
||||
DaemonCmd { stdin: true, .. } => Daemon::from_stdin(config_file),
|
||||
DaemonCmd { socket: None, .. } => Daemon::open_default_socket(config_file)?,
|
||||
DaemonCmd { stdin: true, .. } => Daemon::from_stdin(),
|
||||
DaemonCmd { socket: None, .. } => Daemon::open_default_socket()?,
|
||||
DaemonCmd {
|
||||
socket: Some(socket),
|
||||
..
|
||||
} => Daemon::from_unix_socket_path(config_file, &socket)?,
|
||||
} => Daemon::from_unix_socket_path(&socket)?,
|
||||
};
|
||||
|
||||
daemon.enter_loop().unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue