Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPC framework based on tarpc #20

Merged
merged 12 commits into from
Nov 14, 2022
Merged

IPC framework based on tarpc #20

merged 12 commits into from
Nov 14, 2022

Conversation

pawelchcki
Copy link
Contributor

@pawelchcki pawelchcki commented Jun 1, 2022

What does this PR do?

Implement a msgpack based RPC / IPC. Where a server is spawned in an async context, and clients can be either async or sync. In addition the RPC can be used in a 1 way style. Where messages are simply sent onto a pipe, without requiring reading of any responses.

This way we can optimize for cases where data transmission is simply in a single direction.

This RPC framework uses tarpc crate and adds to it possibility to submit RPC requests without using tokio, and also transport FileDescriptors that can be used to safely delegate processing to the sidecar process.

See example use here:
https://github.com/DataDog/libdatadog/pull/20/files#diff-b5cbb946eecfad576b88206a0a05bf1a78331d41fcd5842db73e9b902831a202R26

Some quick benchmarks

write only interface    time:   [1.8545 µs 1.9404 µs 2.0843 µs]                                  
                        change: [-0.9627% +12.268% +31.181%] (p = 0.13 > 0.05)
                        No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe

two way interface       time:   [18.007 µs 19.087 µs 20.356 µs]                               
                        change: [-15.207% -4.5682% +5.7143%] (p = 0.47 > 0.05)
                        No change in performance detected.
Found 11 outliers among 100 measurements (11.00%)
  6 (6.00%) low mild
  4 (4.00%) high mild
  1 (1.00%) high severe

Motivation

In some cases the context of data we process needs to live longer than a os process that produces it. For these cases we can spawn a subprocess that will receive the data from producers - and later transform and send to the backend/agent.

For efficient operation we need an optimized RPC, and a method to coordinate sidecar lifecycle.

@ivoanjo
Copy link
Member

ivoanjo commented Oct 6, 2022

@pawelchcki is this PR still relevant, or can we close it? (It looks like #45 is a superset)

@pawelchcki
Copy link
Contributor Author

@ivoanjo sorry I missed the notification :|

@pawelchcki is this PR still relevant, or can we close it? (It looks like #45 is a superset)

This PR is actually a superset of #45 😅

@pawelchcki pawelchcki changed the title Example implementation of simple IPC, using either named or anonymous… IPC framework based on tarpc Oct 21, 2022
@paullegranddc
Copy link
Contributor

This PR is actually a superset of #45 😅

Would be easier for review to make the PR into #45 and then change that to master once #45 has been merged

Includes the ability to transfer file descriptors between processes, and
provides both an async (tokio based) interface as well as synchronous
interface
@pawelchcki pawelchcki marked this pull request as ready for review October 27, 2022 10:48
@pawelchcki pawelchcki requested review from a team as code owners October 27, 2022 10:48
let mut fds = [0; MAX_FDS];

unsafe {
let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unsafe behavior. You're casting uninitialiased memory to a &mut [u8]. You should call initialize_unfilled instead which will take care of zeroing the memory before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think filling with zero's would be safer, and more secure.
I basically followed what tokio AsyncRead was doing here.

This is part of their optimization - since its a buffer that will be written to - then the contents of memory don't matter as much. But I'd prefer to play on the safe side here

ddtelemetry/src/worker.rs Outdated Show resolved Hide resolved
self.send_request(req).await;
self.deadlines.schedule_next_heartbeat();
self.data.started = true;
if !self.data.started {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +92 to +94
let b = &mut *(buf_window.unfilled_mut()
as *mut [std::mem::MaybeUninit<u8>]
as *mut [u8]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment for the &mut [u8]

@pawelchcki
Copy link
Contributor Author

Thanks @paullegranddc for the review 🙇! 🎉

@pawelchcki pawelchcki merged commit 1b69c42 into main Nov 14, 2022
@pawelchcki pawelchcki deleted the pawel/fork_poc branch November 14, 2022 13:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants