Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CI in main branch by running cargo fmt #183

Merged
merged 1 commit into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions ipc/tarpc/tarpc/examples/custom_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,29 @@
//
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// let bind_addr = "/tmp/tarpc_on_unix_example.sock";
//
// let _ = std::fs::remove_file(bind_addr);
//
// let listener = UnixListener::bind(bind_addr).unwrap();
// let codec_builder = LengthDelimitedCodec::builder();
// tokio::spawn(async move {
// loop {
// let (conn, _addr) = listener.accept().await.unwrap();
// let framed = codec_builder.new_framed(conn);
// let transport = transport::new(framed, Bincode::default());
//
// let fut = BaseChannel::with_defaults(transport).execute(Service.serve());
// tokio::spawn(fut);
// }
// });
//
// let conn = UnixStream::connect(bind_addr).await?;
// let transport = transport::new(codec_builder.new_framed(conn), Bincode::default());
// PingServiceClient::new(Default::default(), transport)
// .spawn()
// .ping(tarpc::context::current())
// .await?;
//
// let bind_addr = "/tmp/tarpc_on_unix_example.sock";
//
// let _ = std::fs::remove_file(bind_addr);
//
// let listener = UnixListener::bind(bind_addr).unwrap();
// let codec_builder = LengthDelimitedCodec::builder();
// tokio::spawn(async move {
// loop {
// let (conn, _addr) = listener.accept().await.unwrap();
// let framed = codec_builder.new_framed(conn);
// let transport = transport::new(framed, Bincode::default());
//
// let fut = BaseChannel::with_defaults(transport).execute(Service.serve());
// tokio::spawn(fut);
// }
// });
//
// let conn = UnixStream::connect(bind_addr).await?;
// let transport = transport::new(codec_builder.new_framed(conn), Bincode::default());
// PingServiceClient::new(Default::default(), transport)
// .spawn()
// .ping(tarpc::context::current())
// .await?;
//
Ok(())
}
2 changes: 1 addition & 1 deletion ipc/tarpc/tarpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Context {
trace_context: trace::Context::try_from(&span)
.unwrap_or_else(|_| trace::Context::default()),
discard_response: false,
deadline: Deadline::default().0
deadline: Deadline::default().0,
}
}

Expand Down
24 changes: 15 additions & 9 deletions ipc/tarpc/tarpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

//! Provides a server that concurrently handles many connections sending multiplexed requests.

use crate::{cancellations::{cancellations, CanceledRequests, RequestCancellation}, context::{self}, trace, ClientMessage, Request, Response, Transport};
#[cfg(feature = "opentelemetry")]
use crate::context::SpanExt;
use crate::{
cancellations::{cancellations, CanceledRequests, RequestCancellation},
context::{self},
trace, ClientMessage, Request, Response, Transport,
};
use ::tokio::sync::mpsc;
use futures::{
future::{AbortRegistration, Abortable},
Expand Down Expand Up @@ -275,7 +279,8 @@ pub struct TrackedRequest<Req> {
/// created by [`BaseChannel`].
pub trait Channel
where
Self: Transport<RequestResponse<<Self as Channel>::Resp>, TrackedRequest<<Self as Channel>::Req>>,
Self:
Transport<RequestResponse<<Self as Channel>::Resp>, TrackedRequest<<Self as Channel>::Req>>,
{
/// Type of request item.
type Req;
Expand Down Expand Up @@ -481,7 +486,10 @@ where
.map_err(ChannelError::Transport)
}

fn start_send(mut self: Pin<&mut Self>, response: RequestResponse<Resp>) -> Result<(), Self::Error> {
fn start_send(
mut self: Pin<&mut Self>,
response: RequestResponse<Resp>,
) -> Result<(), Self::Error> {
match response {
RequestResponse::Response(response) => {
if let Some(span) = self
Expand All @@ -498,11 +506,11 @@ where
// If the request isn't tracked anymore, there's no need to send the response.
Ok(())
}
},
}
RequestResponse::Discarded { request_id } => {
self.in_flight_requests_mut().remove_request(request_id);
Ok(())
},
}
}
}

Expand Down Expand Up @@ -761,9 +769,7 @@ impl<Req, Res> InFlightRequest<Req, Res> {

tracing::info!("CompleteRequest");
if context.discard_response {
let response = RequestResponse::Discarded {
request_id,
};
let response = RequestResponse::Discarded { request_id };
let _ = response_tx.send(response).await;
tracing::info!("DiscardingResponse");
} else {
Expand Down Expand Up @@ -815,6 +821,7 @@ where
#[cfg(test)]
mod tests {
use super::{in_flight_requests::AlreadyExistsError, BaseChannel, Channel, Config, Requests};
use crate::server::RequestResponse;
use crate::{
context, trace,
transport::channel::{self, UnboundedChannel},
Expand All @@ -828,7 +835,6 @@ mod tests {
};
use futures_test::task::noop_context;
use std::{pin::Pin, task::Poll};
use crate::server::RequestResponse;

fn test_channel<Req, Resp>() -> (
Pin<Box<BaseChannel<Req, Resp, UnboundedChannel<ClientMessage<Req>, Response<Resp>>>>>,
Expand Down
37 changes: 22 additions & 15 deletions ipc/tarpc/tarpc/src/server/limits/requests_per_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use crate::server::RequestResponse;
use crate::{
server::{Channel, Config},
Response, ServerError,
};
use futures::{prelude::*, ready, task::*};
use pin_project::pin_project;
use std::{io, pin::Pin};
use crate::server::RequestResponse;

/// A [`Channel`] that limits the number of concurrent requests by throttling.
///
Expand Down Expand Up @@ -66,13 +66,14 @@ where
"ThrottleRequest",
);

self.as_mut().start_send(RequestResponse::Response(Response {
request_id: r.request.id,
message: Err(ServerError {
kind: io::ErrorKind::WouldBlock,
detail: "server throttled the request.".into(),
}),
}))?;
self.as_mut()
.start_send(RequestResponse::Response(Response {
request_id: r.request.id,
message: Err(ServerError {
kind: io::ErrorKind::WouldBlock,
detail: "server throttled the request.".into(),
}),
}))?;
}
None => return Poll::Ready(None),
}
Expand Down Expand Up @@ -179,7 +180,10 @@ where
mod tests {
use super::*;

use crate::server::{RequestResponse, testing::{self, FakeChannel, PollExt}, TrackedRequest};
use crate::server::{
testing::{self, FakeChannel, PollExt},
RequestResponse, TrackedRequest,
};
use pin_utils::pin_mut;
use std::{
marker::PhantomData,
Expand Down Expand Up @@ -255,7 +259,7 @@ mod tests {
RequestResponse::Response(resp) => {
assert_eq!(resp.request_id, 1);
assert!(resp.message.is_err());
},
}
_ => unimplemented!(),
}
}
Expand Down Expand Up @@ -342,11 +346,14 @@ mod tests {
assert_eq!(throttler.inner.in_flight_requests.len(), 0);
match throttler.inner.sink.get(0).unwrap() {
RequestResponse::Response(resp) => {
assert_eq!(resp, &Response {
request_id: 0,
message: Ok(1),
});
},
assert_eq!(
resp,
&Response {
request_id: 0,
message: Ok(1),
}
);
}
_ => unimplemented!(),
}
}
Expand Down
10 changes: 7 additions & 3 deletions ipc/tarpc/tarpc/src/server/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
cancellations::{cancellations, CanceledRequests, RequestCancellation},
context,
server::{Channel, Config, RequestResponse, ResponseGuard, TrackedRequest},
Request
Request,
};
use futures::{task::*, Sink, Stream};
use pin_project::pin_project;
Expand Down Expand Up @@ -45,7 +45,10 @@ impl<In, Resp> Sink<RequestResponse<Resp>> for FakeChannel<In, RequestResponse<R
self.project().sink.poll_ready(cx).map_err(|e| match e {})
}

fn start_send(mut self: Pin<&mut Self>, response: RequestResponse<Resp>) -> Result<(), Self::Error> {
fn start_send(
mut self: Pin<&mut Self>,
response: RequestResponse<Resp>,
) -> Result<(), Self::Error> {
if let RequestResponse::Response(ref response) = response {
self.as_mut()
.project()
Expand Down Expand Up @@ -114,7 +117,8 @@ impl<Req, Resp> FakeChannel<io::Result<TrackedRequest<Req>>, RequestResponse<Res
}

impl FakeChannel<(), ()> {
pub fn default<Req, Resp>() -> FakeChannel<io::Result<TrackedRequest<Req>>, RequestResponse<Resp>> {
pub fn default<Req, Resp>(
) -> FakeChannel<io::Result<TrackedRequest<Req>>, RequestResponse<Resp>> {
let (request_cancellation, canceled_requests) = cancellations();
FakeChannel {
stream: Default::default(),
Expand Down