Skip to content

Commit

Permalink
server/os_i_o: Undo IPC channel extension
Browse files Browse the repository at this point in the history
via `Vec` and drastically increase the IPC message queue size instead.
Measurements didn't discover a drastic increase in RAM caused by this,
and it is a much easier fix for the problem at hand.
  • Loading branch information
har7an committed Jan 22, 2023
1 parent 5712598 commit 894b5e6
Showing 1 changed file with 9 additions and 60 deletions.
69 changes: 9 additions & 60 deletions zellij-server/src/os_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use zellij_utils::{
};

use std::{
cell::RefCell,
collections::{BTreeMap, HashMap, HashSet, VecDeque},
collections::{BTreeMap, HashMap, HashSet},
env,
fs::File,
io::Write,
Expand All @@ -41,10 +40,6 @@ use std::{
sync::{Arc, Mutex},
};

// Dynamically allocated retry queue for ServerToClient instructions. This is allocated on demand
// when the regular message queue can't hold all the messages trickling in.
const CLIENT_RETRY_QUEUE_MAX_LEN: usize = 1000;

pub use async_trait::async_trait;
pub use nix::unistd::Pid;

Expand Down Expand Up @@ -345,12 +340,11 @@ fn spawn_terminal(
struct ClientSender {
client_id: ClientId,
client_buffer_sender: channels::Sender<ServerToClientMsg>,
client_retry_queue: RefCell<VecDeque<ServerToClientMsg>>,
}

impl ClientSender {
pub fn new(client_id: ClientId, mut sender: IpcSenderWithContext<ServerToClientMsg>) -> Self {
let (client_buffer_sender, client_buffer_receiver) = channels::bounded(50);
let (client_buffer_sender, client_buffer_receiver) = channels::bounded(5000);
std::thread::spawn(move || {
let err_context = || format!("failed to send message to client {client_id}");
for msg in client_buffer_receiver.iter() {
Expand All @@ -362,7 +356,6 @@ impl ClientSender {
ClientSender {
client_id,
client_buffer_sender,
client_retry_queue: RefCell::new(VecDeque::new()),
}
}
pub fn send_or_buffer(&self, msg: ServerToClientMsg) -> Result<()> {
Expand All @@ -373,58 +366,14 @@ impl ClientSender {
)
};

if self.client_retry_queue.borrow().is_empty() {
if let Err(err) = self.client_buffer_sender.try_send(msg) {
match err {
TrySendError::Full(msg) => {
// Try again later
self.client_retry_queue.borrow_mut().push_back(msg);
log::warn!(
"client {} is processing server messages too slow",
self.client_id
);
},
_ => return Err(err).with_context(err_context),
self.client_buffer_sender.try_send(msg)
.or_else(|err| {
if let TrySendError::Full(_) = err {
log::warn!("client {} is processing server messages too slow", self.client_id);
}
}
} else {
// Retry queue not empty, add this message to the back of it
self.client_retry_queue.borrow_mut().push_back(msg);
if self.client_retry_queue.borrow().len() >= CLIENT_RETRY_QUEUE_MAX_LEN {
// Just give up, there's no point in this...
return Err(ZellijError::ClientTooSlow {
client_id: self.client_id,
})
.with_context(err_context);
}

loop {
// Get the next message from the queue
let msg = if let Some(msg) = self.client_retry_queue.borrow_mut().pop_front() {
msg
} else {
// Queue is empty, all messages have been sent out
log::info!(
"client {} caught up processing server messages",
self.client_id
);
return Ok(());
};

// Send all the messages away we get
if let Err(err) = self.client_buffer_sender.try_send(msg) {
match err {
TrySendError::Full(msg) => {
// Put the message back where we found it to maintain the order
self.client_retry_queue.borrow_mut().push_front(msg);
return Ok(());
},
_ => return Err(err).with_context(err_context),
}
}
}
}
Ok(())
Err(err)
})
.with_context(err_context)
}
}

Expand Down

0 comments on commit 894b5e6

Please sign in to comment.