Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop fd acking in sidecar transport #481

Merged
merged 2 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions ipc/src/platform/unix/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ impl Write for Channel {
"failed to write whole buffer",
));
}
Ok(n) => {
self.metadata.defer_close_handles(handles);
buf = &buf[n..]
}
Ok(n) => buf = &buf[n..],
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => {
self.metadata.reenqueue_for_sending(handles);
Expand Down
9 changes: 1 addition & 8 deletions ipc/src/platform/unix/channel/async_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,7 @@ impl AsyncWrite for AsyncChannel {
if !handles.is_empty() {
let fds: Vec<RawFd> = handles.iter().map(AsRawFd::as_raw_fd).collect();
match project.inner.send_with_fd(buf, &fds) {
Ok(sent) => {
project
.metadata
.lock()
.unwrap()
.defer_close_handles(handles);
Poll::Ready(Ok(sent))
}
Ok(sent) => Poll::Ready(Ok(sent)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
project
.metadata
Expand Down
35 changes: 2 additions & 33 deletions ipc/src/platform/unix/channel/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::{BTreeMap, VecDeque},
collections::VecDeque,
io,
os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
};

use io_lifetimes::OwnedFd;
Expand All @@ -18,8 +18,6 @@ use crate::{
pub struct ChannelMetadata {
fds_to_send: Vec<PlatformHandle<OwnedFd>>,
fds_received: VecDeque<RawFd>,
fds_acked: Vec<RawFd>,
fds_to_close: BTreeMap<RawFd, PlatformHandle<OwnedFd>>,
pid: libc::pid_t, // must always be set to current Process ID
}

Expand All @@ -28,8 +26,6 @@ impl Default for ChannelMetadata {
Self {
fds_to_send: Default::default(),
fds_received: Default::default(),
fds_acked: Default::default(),
fds_to_close: Default::default(),
pid: nix::unistd::getpid().as_raw(),
}
}
Expand All @@ -40,27 +36,6 @@ impl ChannelMetadata {
where
T: TransferHandles,
{
{
let fds_to_close = message
.acked_handles
.into_iter()
.flat_map(|fd| self.fds_to_close.remove(&fd));

// if ACK came from the same PID, it means there is a duplicate PlatformHandle instance
// in the same process. Thus we should leak the handles allowing other
// PlatformHandle's to safely close
if message.pid == self.pid {
for h in fds_to_close {
h.into_owned_handle()
.map(|h| h.into_raw_fd())
.unwrap_or_default();
}
} else {
// drain iterator closing all open file desriptors that were ACKed by the other
// party
fds_to_close.last();
}
}
let mut item = message.item;

item.receive_handles(self)?;
Expand All @@ -75,18 +50,12 @@ impl ChannelMetadata {

let message = Message {
item,
acked_handles: self.fds_acked.drain(..).collect(),
pid: self.pid,
};

Ok(message)
}

pub(crate) fn defer_close_handles<T>(&mut self, handles: Vec<PlatformHandle<T>>) {
let handles = handles.into_iter().map(|h| (h.as_raw_fd(), h.to_untyped()));
self.fds_to_close.extend(handles);
}

pub(crate) fn enqueue_for_sending<T>(&mut self, handle: PlatformHandle<T>) {
self.fds_to_send.push(handle.to_untyped())
}
Expand Down
3 changes: 0 additions & 3 deletions ipc/src/platform/unix/message.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::os::unix::prelude::RawFd;

use serde::{Deserialize, Serialize};

/// sendfd crate's API is not able to resize the received FD container.
Expand All @@ -13,6 +11,5 @@ pub const MAX_FDS: usize = 20;
#[derive(Deserialize, Serialize)]
pub struct Message<Item> {
pub item: Item,
pub acked_handles: Vec<RawFd>,
pub pid: libc::pid_t,
}