refactor daemon polling and event handling
This commit is contained in:
parent
d4b40e8cc1
commit
2ecd987be6
1 changed files with 113 additions and 92 deletions
205
src/daemon.rs
205
src/daemon.rs
|
|
@ -10,7 +10,7 @@ use std::{
|
||||||
|
|
||||||
use iddqd::{BiHashMap, IdOrdMap};
|
use iddqd::{BiHashMap, IdOrdMap};
|
||||||
|
|
||||||
use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd};
|
use mio::{Events, Interest, Poll, Token, event::Event, net::UnixListener, unix::SourceFd};
|
||||||
|
|
||||||
use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid};
|
use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid};
|
||||||
|
|
||||||
|
|
@ -53,7 +53,8 @@ fn next_token() -> Token {
|
||||||
|
|
||||||
// If the increment wrapped to 0, then we just increment it again.
|
// If the increment wrapped to 0, then we just increment it again.
|
||||||
if tok == 0 {
|
if tok == 0 {
|
||||||
return Token(NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst));
|
warn!("File descriptor token wrapped. That's... a lot.");
|
||||||
|
return next_token();
|
||||||
}
|
}
|
||||||
|
|
||||||
Token(tok)
|
Token(tok)
|
||||||
|
|
@ -274,6 +275,7 @@ impl Daemon {
|
||||||
self.fd.as_fd()
|
self.fd.as_fd()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
const DAEMON: Token = Token(0);
|
||||||
|
|
||||||
/// Private helpers.
|
/// Private helpers.
|
||||||
impl Daemon {
|
impl Daemon {
|
||||||
|
|
@ -368,7 +370,6 @@ impl Daemon {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
let mut daemon_source = SourceFd(&raw_fd);
|
let mut daemon_source = SourceFd(&raw_fd);
|
||||||
const DAEMON: Token = Token(0);
|
|
||||||
self.tokfd
|
self.tokfd
|
||||||
.insert_unique(TokenFd {
|
.insert_unique(TokenFd {
|
||||||
token: DAEMON,
|
token: DAEMON,
|
||||||
|
|
@ -385,103 +386,123 @@ impl Daemon {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if tracing::enabled!(tracing::Level::DEBUG) {
|
if tracing::enabled!(tracing::Level::DEBUG) {
|
||||||
//
|
|
||||||
trace!("Daemon loop iteration, with file descriptors: ");
|
trace!("Daemon loop iteration, with file descriptors: ");
|
||||||
for info in &self.fd_info {
|
for info in &self.fd_info {
|
||||||
trace!("- {}", info.display());
|
trace!("- {}", info.display());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.poller.poll(&mut events, TIMEOUT_NEVER) {
|
let poll_result = self.poller.poll(&mut events, TIMEOUT_NEVER);
|
||||||
Ok(_) => {
|
self.handle_poll(poll_result, &events)?;
|
||||||
trace!(
|
|
||||||
"mio::Poller::poll() got events: {:?}",
|
|
||||||
events.iter().size_hint().0,
|
|
||||||
);
|
|
||||||
if events.is_empty() {
|
|
||||||
unreachable!(
|
|
||||||
"epoll_wait() with a \"forever\" timeout should never give empty events",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = self.fd_error_pop(self.poller.as_raw_fd());
|
|
||||||
},
|
|
||||||
Err(e) if e.kind() == IoErrorKind::Interrupted => {
|
|
||||||
// EINTR is silly.
|
|
||||||
continue;
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) {
|
|
||||||
unreachable!("EBADF on poller fd; IO safety violation?");
|
|
||||||
}
|
|
||||||
warn!("mio Poll::poll() error: {e}");
|
|
||||||
self.fd_error_push(self.poller.as_raw_fd(), e)
|
|
||||||
.tap_err(|e| {
|
|
||||||
error!("accumulated too many errors for mio::Poll::poll(): {e}")
|
|
||||||
})?;
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for event in &events {
|
|
||||||
trace!("event is {event:?}");
|
|
||||||
match event.token() {
|
|
||||||
DAEMON => {
|
|
||||||
let is_sock = self.main_fd_info().kind == FdKind::Socket;
|
|
||||||
if !is_sock {
|
|
||||||
// SAFETY: oh boy: disjoint borrows with extra steps.
|
|
||||||
let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) };
|
|
||||||
self.read_cmd(&file_fd).unwrap();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept, first.
|
|
||||||
let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC;
|
|
||||||
let stream_fd = match rustix::net::accept_with(&self.fd, flags) {
|
|
||||||
Ok(stream) => {
|
|
||||||
debug!(
|
|
||||||
"Accepted connection from socket {:?} as stream {:?}",
|
|
||||||
self.fd, stream,
|
|
||||||
);
|
|
||||||
stream
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
error!("accept4 on daemon socket failed: {e}");
|
|
||||||
self.fd_error_push(self.fd.as_raw_fd(), e.into())
|
|
||||||
.tap_err(|e| {
|
|
||||||
error!(
|
|
||||||
"Accumulated too many errors for daemon fd {:?}: {e}",
|
|
||||||
self.fd
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
continue;
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
// Add this stream to our poll interest list.
|
|
||||||
// NOTE: `stream_fd` is now effectively `ManuallyDrop`.
|
|
||||||
let stream_fd = stream_fd.into_raw_fd();
|
|
||||||
let _token = self.register(stream_fd, FdKind::SockStream);
|
|
||||||
|
|
||||||
// Wait for the next poll to handle.
|
|
||||||
},
|
|
||||||
other_token => {
|
|
||||||
// This must be a stream fd.
|
|
||||||
let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| {
|
|
||||||
unreachable!("tried to get fd for no-existent token? {other_token:?}")
|
|
||||||
});
|
|
||||||
|
|
||||||
if event.is_read_closed() {
|
|
||||||
self.deregister(stream_fd);
|
|
||||||
} else {
|
|
||||||
// SAFETY: oh boy.
|
|
||||||
let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) };
|
|
||||||
self.read_cmd(&stream_fd).unwrap();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn handle_poll(
|
||||||
|
&mut self,
|
||||||
|
poll_result: Result<(), IoError>,
|
||||||
|
events: &Events,
|
||||||
|
) -> Result<(), IoError> {
|
||||||
|
match poll_result {
|
||||||
|
Ok(()) => {
|
||||||
|
trace!(
|
||||||
|
"mio::Poller::poll() got events: {:?}",
|
||||||
|
events.iter().size_hint().0,
|
||||||
|
);
|
||||||
|
if events.is_empty() {
|
||||||
|
unreachable!(
|
||||||
|
"epoll_wait() with a \"forever\" timeout should never give empty events",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = self.fd_error_pop(self.poller.as_raw_fd());
|
||||||
|
},
|
||||||
|
Err(e) if e.kind() == IoErrorKind::Interrupted => {
|
||||||
|
// EINTR is silly.
|
||||||
|
// Return early, and poll() again.
|
||||||
|
return Ok(());
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) {
|
||||||
|
panic!("EBADF on poller fd; IO safety violation?");
|
||||||
|
}
|
||||||
|
warn!("mio Poll::poll() error: {e}");
|
||||||
|
self.fd_error_push(self.poller.as_raw_fd(), e)
|
||||||
|
.tap_err(|e| {
|
||||||
|
error!("accumulated too many errors for mio::Poll::poll(): {e}")
|
||||||
|
})?;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for event in events {
|
||||||
|
self.handle_event(event)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(&mut self, event: &Event) -> Result<(), IoError> {
|
||||||
|
trace!("Handling event {event:?}");
|
||||||
|
|
||||||
|
match event.token() {
|
||||||
|
DAEMON => {
|
||||||
|
let is_sock = self.main_fd_info().kind == FdKind::Socket;
|
||||||
|
if !is_sock {
|
||||||
|
// SAFETY: oh boy: disjoint borrows with extra steps.
|
||||||
|
let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) };
|
||||||
|
self.read_cmd(&file_fd).unwrap();
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accept, first.
|
||||||
|
let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC;
|
||||||
|
let stream_fd = match rustix::net::accept_with(&self.fd, flags) {
|
||||||
|
Ok(stream) => {
|
||||||
|
debug!(
|
||||||
|
"Accepted connection from socket {:?} as stream {:?}",
|
||||||
|
self.fd, stream,
|
||||||
|
);
|
||||||
|
stream
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
error!("accept4 on daemon socket failed: {e}");
|
||||||
|
self.fd_error_push(self.fd.as_raw_fd(), e.into())
|
||||||
|
.tap_err(|e| {
|
||||||
|
error!(
|
||||||
|
"Accumulated too many errors for daemon fd {:?}: {e}",
|
||||||
|
self.fd
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// Add this stream to our poll interest list.
|
||||||
|
// NOTE: `stream_fd` is now effectively `ManuallyDrop`.
|
||||||
|
let stream_fd = stream_fd.into_raw_fd();
|
||||||
|
let _token = self.register(stream_fd, FdKind::SockStream);
|
||||||
|
|
||||||
|
// Wait for the next poll to handle.
|
||||||
|
},
|
||||||
|
other_token => {
|
||||||
|
// This must be a stream fd.
|
||||||
|
let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| {
|
||||||
|
unreachable!("tried to get fd for non-existent token? {other_token:?}")
|
||||||
|
});
|
||||||
|
|
||||||
|
if event.is_read_closed() {
|
||||||
|
self.deregister(stream_fd);
|
||||||
|
} else {
|
||||||
|
// SAFETY: oh boy.
|
||||||
|
let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) };
|
||||||
|
self.read_cmd(&stream_fd).unwrap();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Daemon {
|
impl Drop for Daemon {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue