diff --git a/src/daemon.rs b/src/daemon.rs index 7fe7898..6543d3a 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -10,7 +10,7 @@ use std::{ use iddqd::{BiHashMap, IdOrdMap}; -use mio::{Events, Interest, Poll, Token, net::UnixListener, unix::SourceFd}; +use mio::{Events, Interest, Poll, Token, event::Event, net::UnixListener, unix::SourceFd}; use rustix::{buffer::spare_capacity, net::SocketFlags, process::Uid}; @@ -53,7 +53,8 @@ fn next_token() -> Token { // If the increment wrapped to 0, then we just increment it again. if tok == 0 { - return Token(NEXT_TOKEN_NUMBER.fetch_add(1, Ordering::SeqCst)); + warn!("File descriptor token wrapped. That's... a lot."); + return next_token(); } Token(tok) @@ -274,6 +275,7 @@ impl Daemon { self.fd.as_fd() } } +const DAEMON: Token = Token(0); /// Private helpers. impl Daemon { @@ -368,7 +370,6 @@ impl Daemon { ); } let mut daemon_source = SourceFd(&raw_fd); - const DAEMON: Token = Token(0); self.tokfd .insert_unique(TokenFd { token: DAEMON, @@ -385,103 +386,123 @@ impl Daemon { loop { if tracing::enabled!(tracing::Level::DEBUG) { - // trace!("Daemon loop iteration, with file descriptors: "); for info in &self.fd_info { trace!("- {}", info.display()); } } - match self.poller.poll(&mut events, TIMEOUT_NEVER) { - Ok(_) => { - trace!( - "mio::Poller::poll() got events: {:?}", - events.iter().size_hint().0, - ); - if events.is_empty() { - unreachable!( - "epoll_wait() with a \"forever\" timeout should never give empty events", - ); - } - - let _ = self.fd_error_pop(self.poller.as_raw_fd()); - }, - Err(e) if e.kind() == IoErrorKind::Interrupted => { - // EINTR is silly. - continue; - }, - Err(e) => { - if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) { - unreachable!("EBADF on poller fd; IO safety violation?"); - } - warn!("mio Poll::poll() error: {e}"); - self.fd_error_push(self.poller.as_raw_fd(), e) - .tap_err(|e| { - error!("accumulated too many errors for mio::Poll::poll(): {e}") - })?; - }, - } - - for event in &events { - trace!("event is {event:?}"); - match event.token() { - DAEMON => { - let is_sock = self.main_fd_info().kind == FdKind::Socket; - if !is_sock { - // SAFETY: oh boy: disjoint borrows with extra steps. - let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; - self.read_cmd(&file_fd).unwrap(); - continue; - } - - // Accept, first. - let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC; - let stream_fd = match rustix::net::accept_with(&self.fd, flags) { - Ok(stream) => { - debug!( - "Accepted connection from socket {:?} as stream {:?}", - self.fd, stream, - ); - stream - }, - Err(e) => { - error!("accept4 on daemon socket failed: {e}"); - self.fd_error_push(self.fd.as_raw_fd(), e.into()) - .tap_err(|e| { - error!( - "Accumulated too many errors for daemon fd {:?}: {e}", - self.fd - ) - })?; - continue; - }, - }; - - // Add this stream to our poll interest list. - // NOTE: `stream_fd` is now effectively `ManuallyDrop`. - let stream_fd = stream_fd.into_raw_fd(); - let _token = self.register(stream_fd, FdKind::SockStream); - - // Wait for the next poll to handle. - }, - other_token => { - // This must be a stream fd. - let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| { - unreachable!("tried to get fd for no-existent token? {other_token:?}") - }); - - if event.is_read_closed() { - self.deregister(stream_fd); - } else { - // SAFETY: oh boy. - let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; - self.read_cmd(&stream_fd).unwrap(); - } - }, - } - } + let poll_result = self.poller.poll(&mut events, TIMEOUT_NEVER); + self.handle_poll(poll_result, &events)?; } } + + fn handle_poll( + &mut self, + poll_result: Result<(), IoError>, + events: &Events, + ) -> Result<(), IoError> { + match poll_result { + Ok(()) => { + trace!( + "mio::Poller::poll() got events: {:?}", + events.iter().size_hint().0, + ); + if events.is_empty() { + unreachable!( + "epoll_wait() with a \"forever\" timeout should never give empty events", + ); + } + + let _ = self.fd_error_pop(self.poller.as_raw_fd()); + }, + Err(e) if e.kind() == IoErrorKind::Interrupted => { + // EINTR is silly. + // Return early, and poll() again. + return Ok(()); + }, + Err(e) => { + if let Some(Errno::BADF) = e.raw_os_error().map(Errno::from_raw_os_error) { + panic!("EBADF on poller fd; IO safety violation?"); + } + warn!("mio Poll::poll() error: {e}"); + self.fd_error_push(self.poller.as_raw_fd(), e) + .tap_err(|e| { + error!("accumulated too many errors for mio::Poll::poll(): {e}") + })?; + }, + } + + for event in events { + self.handle_event(event)?; + } + + Ok(()) + } + + fn handle_event(&mut self, event: &Event) -> Result<(), IoError> { + trace!("Handling event {event:?}"); + + match event.token() { + DAEMON => { + let is_sock = self.main_fd_info().kind == FdKind::Socket; + if !is_sock { + // SAFETY: oh boy: disjoint borrows with extra steps. + let file_fd = unsafe { BorrowedFd::borrow_raw(self.fd.as_raw_fd()) }; + self.read_cmd(&file_fd).unwrap(); + + return Ok(()); + } + + // Accept, first. + let flags = SocketFlags::NONBLOCK | SocketFlags::CLOEXEC; + let stream_fd = match rustix::net::accept_with(&self.fd, flags) { + Ok(stream) => { + debug!( + "Accepted connection from socket {:?} as stream {:?}", + self.fd, stream, + ); + stream + }, + Err(e) => { + error!("accept4 on daemon socket failed: {e}"); + self.fd_error_push(self.fd.as_raw_fd(), e.into()) + .tap_err(|e| { + error!( + "Accumulated too many errors for daemon fd {:?}: {e}", + self.fd + ) + })?; + + return Ok(()); + }, + }; + + // Add this stream to our poll interest list. + // NOTE: `stream_fd` is now effectively `ManuallyDrop`. + let stream_fd = stream_fd.into_raw_fd(); + let _token = self.register(stream_fd, FdKind::SockStream); + + // Wait for the next poll to handle. + }, + other_token => { + // This must be a stream fd. + let stream_fd = self.fd_for_token(other_token).unwrap_or_else(|| { + unreachable!("tried to get fd for non-existent token? {other_token:?}") + }); + + if event.is_read_closed() { + self.deregister(stream_fd); + } else { + // SAFETY: oh boy. + let stream_fd = unsafe { BorrowedFd::borrow_raw(stream_fd) }; + self.read_cmd(&stream_fd).unwrap(); + } + }, + } + + Ok(()) + } } impl Drop for Daemon {