remove the persistent buffer until a refactor makes it per-connection
This commit is contained in:
parent
bd3ec3a904
commit
88be53cd2f
1 changed files with 14 additions and 33 deletions
|
|
@ -1,5 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
env, io, mem,
|
env, io,
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd},
|
os::fd::{AsFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd},
|
||||||
sync::{
|
sync::{
|
||||||
|
|
@ -135,9 +135,6 @@ pub struct Daemon {
|
||||||
|
|
||||||
// Bijective mapping of [`mio::Token`]s to [`RawFd`]s.
|
// Bijective mapping of [`mio::Token`]s to [`RawFd`]s.
|
||||||
tokfd: BiHashMap<TokenFd>,
|
tokfd: BiHashMap<TokenFd>,
|
||||||
|
|
||||||
cmd_buffer: Vec<u8>,
|
|
||||||
next_timeout: Option<Duration>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `tokfd` handling.
|
/// `tokfd` handling.
|
||||||
|
|
@ -222,6 +219,8 @@ impl Daemon {
|
||||||
.copied()
|
.copied()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Not currently used, but here for completeness.
|
||||||
|
#[expect(dead_code)]
|
||||||
fn token_for_fd(&self, fd: RawFd) -> Option<Token> {
|
fn token_for_fd(&self, fd: RawFd) -> Option<Token> {
|
||||||
self.tokfd
|
self.tokfd
|
||||||
.get2(&fd)
|
.get2(&fd)
|
||||||
|
|
@ -267,8 +266,6 @@ impl Daemon {
|
||||||
poller,
|
poller,
|
||||||
fd_info,
|
fd_info,
|
||||||
tokfd: Default::default(),
|
tokfd: Default::default(),
|
||||||
cmd_buffer: Vec::with_capacity(1024),
|
|
||||||
next_timeout: TIMEOUT_NEVER,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -345,16 +342,12 @@ impl Daemon {
|
||||||
/// Private helpers.
|
/// Private helpers.
|
||||||
impl Daemon {
|
impl Daemon {
|
||||||
fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> {
|
fn read_cmd(&mut self, fd: &BorrowedFd) -> Result<(), IoError> {
|
||||||
if self.cmd_buffer.len() == self.cmd_buffer.capacity() {
|
// FIXME: don't use a new allocation every time.
|
||||||
self.cmd_buffer.reserve(1024);
|
let mut cmd_buffer: Vec<u8> = Vec::with_capacity(1024);
|
||||||
}
|
|
||||||
|
|
||||||
let _count = rustix::io::read(fd, spare_capacity(&mut self.cmd_buffer))
|
let _count = rustix::io::read(fd, spare_capacity(&mut cmd_buffer))
|
||||||
.tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?;
|
.tap_err(|e| error!("read() on daemon fd {fd:?} failed: {e}"))?;
|
||||||
|
|
||||||
// So that the loop doesn't borrow from `self`.
|
|
||||||
let mut cmd_buffer = mem::take(&mut self.cmd_buffer);
|
|
||||||
|
|
||||||
// The buffer might have existing data from the last read.
|
// The buffer might have existing data from the last read.
|
||||||
let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer);
|
let deserializer = serde_json::Deserializer::from_slice(&cmd_buffer);
|
||||||
let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter();
|
let stream: StreamDeserializer<_, DaemonCmd> = deserializer.into_iter();
|
||||||
|
|
@ -362,16 +355,13 @@ impl Daemon {
|
||||||
let cmd = match cmd {
|
let cmd = match cmd {
|
||||||
Ok(cmd) => cmd,
|
Ok(cmd) => cmd,
|
||||||
Err(e) if e.is_eof() => {
|
Err(e) if e.is_eof() => {
|
||||||
self.next_timeout = Some(Duration::from_secs(4));
|
warn!("Got EOF before a valid command");
|
||||||
warn!("Didn't get a valid daemon command; giving the other side 4 seconds...");
|
debug!("command buffer was: {:?}", cmd_buffer.as_bstr());
|
||||||
let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer);
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("error deserializing command: {e}");
|
warn!("error deserializing command: {e}");
|
||||||
debug!("command buffer was: {:?}", cmd_buffer.as_bstr());
|
debug!("command buffer was: {:?}", cmd_buffer.as_bstr());
|
||||||
cmd_buffer.clear();
|
|
||||||
let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer);
|
|
||||||
// Don't propagate the error unless we have too many.
|
// Don't propagate the error unless we have too many.
|
||||||
self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| {
|
self.fd_error_push(fd.as_raw_fd(), e.into()).tap_err(|e| {
|
||||||
error!("Accumulated too many errors for daemon fd {fd:?}: {e}")
|
error!("Accumulated too many errors for daemon fd {fd:?}: {e}")
|
||||||
|
|
@ -385,9 +375,6 @@ impl Daemon {
|
||||||
self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}"));
|
self.dispatch_cmd(cmd).unwrap_or_else(|e| todo!("{e}"));
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd_buffer.clear();
|
|
||||||
let _ = mem::replace(&mut self.cmd_buffer, cmd_buffer);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -469,25 +456,19 @@ impl Daemon {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(timeout) = self.next_timeout {
|
match self.poller.poll(&mut events, TIMEOUT_NEVER) {
|
||||||
debug!(
|
|
||||||
"epoll_wait() with a timeout: {}",
|
|
||||||
humantime::format_duration(timeout),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
match self.poller.poll(&mut events, self.next_timeout.take()) {
|
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
trace!(
|
trace!(
|
||||||
"mio::Poller::poll() got events: {:?}",
|
"mio::Poller::poll() got events: {:?}",
|
||||||
events.iter().size_hint().0,
|
events.iter().size_hint().0,
|
||||||
);
|
);
|
||||||
if events.is_empty() {
|
if events.is_empty() {
|
||||||
warn!("timeout expired");
|
unreachable!(
|
||||||
self.cmd_buffer.clear();
|
"epoll_wait() with a \"forever\" timeout should never give empty events",
|
||||||
} else {
|
);
|
||||||
let _ = self.fd_error_pop(self.poller.as_raw_fd());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let _ = self.fd_error_pop(self.poller.as_raw_fd());
|
||||||
},
|
},
|
||||||
Err(e) if e.kind() == IoErrorKind::Interrupted => {
|
Err(e) if e.kind() == IoErrorKind::Interrupted => {
|
||||||
// EINTR is silly.
|
// EINTR is silly.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue