Skip to content

Commit

Permalink
Fix clippy errors
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Mar 25, 2024
1 parent d029c6f commit b215d35
Show file tree
Hide file tree
Showing 18 changed files with 75 additions and 167 deletions.
91 changes: 8 additions & 83 deletions ipa-core/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, sync::Mutex};
use std::sync::Mutex;

use async_trait::async_trait;

Expand All @@ -7,14 +7,11 @@ use crate::{
query::{PrepareQuery, QueryConfig, QueryInput},
routing::{Addr, RouteId},
ApiError, BodyStream, HelperIdentity, HelperResponse, RequestHandler, Transport,
TransportIdentity, TransportImpl,
TransportImpl,
},
hpke::{KeyPair, KeyRegistry},
protocol::QueryId,
query::{
NewQueryError, QueryCompletionError, QueryInputError, QueryProcessor, QueryStatus,
QueryStatusError,
},
query::{NewQueryError, QueryProcessor, QueryStatus},
sync::Arc,
};

Expand All @@ -31,79 +28,7 @@ pub struct HelperApp {
transport: TransportImpl,
}

#[async_trait]
impl RequestHandler for HelperApp {
type Identity = HelperIdentity;
async fn handle(
&self,
req: Addr<Self::Identity>,
data: BodyStream,
) -> Result<HelperResponse, ApiError> {
fn ext_query_id(req: &Addr<HelperIdentity>) -> Result<QueryId, ApiError> {
req.query_id.ok_or_else(|| {
ApiError::BadRequest("Query input is missing query_id argument".into())
})
}

let transport = self.transport.clone();
let qp = Arc::clone(&self.query_processor);

Ok(match req.route {
RouteId::Records => HelperResponse::ok(),
RouteId::ReceiveQuery => {
let req = req.into::<QueryConfig>()?;
HelperResponse::from(qp.new_query(transport, req).await?)
}
RouteId::PrepareQuery => {
let req = req.into::<PrepareQuery>()?;
HelperResponse::from(qp.prepare(&transport, req)?)
}
RouteId::QueryInput => {
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.receive_inputs(
transport,
QueryInput {
query_id,
input_stream: data,
},
)?)
}
RouteId::QueryStatus => {
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.query_status(query_id)?)
}
RouteId::CompleteQuery => {
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.complete(query_id).await?)
}
})
// receive_query: Box::new(move |transport: TransportImpl, receive_query| {
// let processor = Arc::clone(&rqp);
// Box::pin(async move {
// let r = processor.new_query(transport, receive_query).await?;
//
// Ok(r.query_id)
// })
// }),
// prepare_query: Box::new(move |transport: TransportImpl, prepare_query| {
// let processor = Arc::clone(&pqp);
// Box::pin(async move { processor.prepare(&transport, prepare_query) })
// }),
// query_input: Box::new(move |transport: TransportImpl, query_input| {
// let processor = Arc::clone(&iqp);
// Box::pin(async move { processor.receive_inputs(transport, query_input) })
// }),
// query_status: Box::new(move |_transport: TransportImpl, query_id| {
// let processor = Arc::clone(&sqp);
// Box::pin(async move { processor.query_status(query_id) })
// }),
// complete_query: Box::new(move |_transport: TransportImpl, query_id| {
// let processor = Arc::clone(&cqp);
// Box::pin(async move { processor.complete(query_id).await })
// }),
}
}

/// This handles requests to initiate and control IPA queries.
struct QueryRequestHandler {
qp: Arc<QueryProcessor>,
transport: Arc<Mutex<Option<TransportImpl>>>,
Expand Down Expand Up @@ -133,17 +58,17 @@ impl RequestHandler for QueryRequestHandler {
}
RouteId::ReceiveQuery => {
let req = req.into::<QueryConfig>()?;
let transport = self.transport.lock().unwrap().as_ref().unwrap().clone();
let transport = Clone::clone(self.transport.lock().unwrap().as_ref().unwrap());
HelperResponse::from(qp.new_query(transport, req).await?)
}
RouteId::PrepareQuery => {
let req = req.into::<PrepareQuery>()?;
let transport = self.transport.lock().unwrap().as_ref().unwrap().clone();
let transport = Clone::clone(self.transport.lock().unwrap().as_ref().unwrap());
HelperResponse::from(qp.prepare(&transport, req)?)
}
RouteId::QueryInput => {
let query_id = ext_query_id(&req)?;
let transport = self.transport.lock().unwrap().as_ref().unwrap().clone();
let transport = Clone::clone(self.transport.lock().unwrap().as_ref().unwrap());
HelperResponse::from(qp.receive_inputs(
transport,
QueryInput {
Expand Down Expand Up @@ -212,7 +137,7 @@ impl Setup {

/// Instantiate [`HelperApp`] by connecting it to the provided transport implementation
pub fn connect(self, transport: TransportImpl) -> HelperApp {
self.handler_setup.finish(transport.clone());
self.handler_setup.finish(Clone::clone(&transport));
HelperApp::new(transport, self.query_processor)
}
}
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ pub use prss_protocol::negotiate as negotiate_prss;
#[cfg(feature = "web-app")]
pub use transport::WrappedAxumBodyStream;
pub use transport::{
callbacks::*, make_boxed_handler, query, routing, ApiError, BodyStream, BytesStream,
HelperResponse, Identity as TransportIdentity, LengthDelimitedStream, LogErrors, NoQueryId,
make_boxed_handler, query, routing, ApiError, BodyStream, BytesStream, HelperResponse,
Identity as TransportIdentity, LengthDelimitedStream, LogErrors, NoQueryId,
NoResourceIdentifier, NoStep, PanickingHandler, QueryIdBinding, ReceiveRecords, RecordsStream,
RequestHandler, RouteParams, StepBinding, StreamCollection, StreamKey, Transport,
WrappedBoxBodyStream,
Expand Down
29 changes: 14 additions & 15 deletions ipa-core/src/helpers/transport/handler.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
use std::{
any::type_name,
fmt::{Debug, Formatter},
future::Future,
iter::once,
marker::PhantomData,
};

use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream, Stream};
use futures_util::StreamExt;
use serde::{de::DeserializeOwned, Deserialize};
use serde::de::DeserializeOwned;
use serde_json::json;

use crate::{
error::BoxError,
helpers::{
query::PrepareQuery,
transport::{routing::Addr, stream::BoxBytesStream},
BodyStream, BytesStream, HelperIdentity, TransportIdentity,
query::PrepareQuery, transport::routing::Addr, BodyStream, HelperIdentity,
TransportIdentity,
},
protocol::QueryId,
query::{
NewQueryError, PrepareQueryError, ProtocolResult, QueryCompletionError, QueryInputError,
QueryStatus, QueryStatusError,
Expand All @@ -42,21 +35,27 @@ pub struct HelperResponse {
}

impl Debug for HelperResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
todo!()
}
}

impl HelperResponse {
/// Returns
/// Returns an empty response that indicates that incoming request has been processed successfully
#[must_use]
pub fn ok() -> Self {
Self { body: Vec::new() }
}

/// Consumes [`Self`] and returns the body of the response.
#[must_use]
pub fn into_body(self) -> Vec<u8> {
self.body
}

/// Attempts to interpret [`Self`] body as JSON-serialized `T`.
/// ## Errors
/// if `T` cannot be deserialized from response body.
pub fn try_into_owned<T: DeserializeOwned>(self) -> Result<T, serde_json::Error> {
serde_json::from_slice(&self.body)
}
Expand All @@ -70,7 +69,7 @@ impl From<PrepareQuery> for HelperResponse {
}

impl From<()> for HelperResponse {
fn from(value: ()) -> Self {
fn from(_value: ()) -> Self {
Self::ok()
}
}
Expand Down Expand Up @@ -175,7 +174,7 @@ where
})
}

pub struct PanickingHandler<I: TransportIdentity> {
pub struct PanickingHandler<I: TransportIdentity = HelperIdentity> {
phantom: PhantomData<I>,
}

Expand All @@ -194,7 +193,7 @@ impl<I: TransportIdentity> RequestHandler for PanickingHandler<I> {
async fn handle(
&self,
req: Addr<Self::Identity>,
data: BodyStream,
_data: BodyStream,
) -> Result<HelperResponse, Error> {
panic!("unexpected call: {req:?}");
}
Expand Down
17 changes: 7 additions & 10 deletions ipa-core/src/helpers/transport/in_memory/transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
collections::HashMap,
convert,
fmt::{Debug, Formatter},
io,
pin::Pin,
Expand All @@ -22,12 +21,10 @@ use crate::{
error::BoxError,
helpers::{
transport::routing::{Addr, RouteId},
ApiError, BodyStream, HelperIdentity, HelperResponse, NoResourceIdentifier, QueryIdBinding,
ReceiveRecords, RequestHandler, RouteParams, StepBinding, StreamCollection, Transport,
TransportIdentity,
ApiError, BodyStream, HelperResponse, NoResourceIdentifier, QueryIdBinding, ReceiveRecords,
RequestHandler, RouteParams, StepBinding, StreamCollection, Transport, TransportIdentity,
},
protocol::{step::Gate, QueryId},
sharding::ShardIndex,
sync::{Arc, Weak},
};

Expand Down Expand Up @@ -96,7 +93,7 @@ impl<I: TransportIdentity> InMemoryTransport<I> {
tokio::spawn(
{
let streams = self.record_streams.clone();
let this = Arc::downgrade(self);
let _this = Arc::downgrade(self);
async move {
while let Some((addr, stream, ack)) = rx.recv().await {
tracing::trace!("received new message: {addr:?}");
Expand Down Expand Up @@ -355,7 +352,7 @@ impl<I: TransportIdentity> Setup<I> {
mod tests {
use std::{
collections::HashMap,
convert, io,
io,
io::ErrorKind,
num::NonZeroUsize,
panic::AssertUnwindSafe,
Expand Down Expand Up @@ -415,7 +412,7 @@ mod tests {
// }),
// ..Default::default()
// }
Setup::new(HelperIdentity::ONE).into_active_conn(Some(Box::new(move |addr: Addr<HelperIdentity>, stream| {
Setup::new(HelperIdentity::ONE).into_active_conn(Some(Box::new(move |addr: Addr<HelperIdentity>, _stream| {
let RouteId::ReceiveQuery = addr.route else {
panic!("unexpected call: {addr:?}")
};
Expand All @@ -427,7 +424,7 @@ mod tests {
.unwrap()
.take()
.expect("query callback invoked more than once")
.send(query_config.clone())
.send(query_config)
.unwrap();
Ok(HelperResponse::from(PrepareQuery {
query_id: QueryId,
Expand All @@ -440,7 +437,7 @@ mod tests {

send_and_ack(
&tx,
Addr::from_route(Some(HelperIdentity::TWO), &expected),
Addr::from_route(Some(HelperIdentity::TWO), expected),
InMemoryStream::empty(),
)
.await;
Expand Down
1 change: 0 additions & 1 deletion ipa-core/src/helpers/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{borrow::Borrow, fmt::Debug, hash::Hash};

use async_trait::async_trait;
use futures::Stream;
use serde::{Deserialize, Serialize};

use crate::{
helpers::HelperIdentity,
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/helpers/transport/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{borrow::Borrow, fmt::Debug};
use serde::de::DeserializeOwned;

use crate::{
helpers::{query::QueryConfig, QueryIdBinding, RouteParams, StepBinding, TransportIdentity},
helpers::{QueryIdBinding, RouteParams, StepBinding, TransportIdentity},
protocol::{step::Gate, QueryId},
};

Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/helpers/transport/stream/axum_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl WrappedAxumBodyStream {
Self(inner.map_err(axum::Error::into_inner as fn(axum::Error) -> BoxError))
}

#[must_use]
pub fn empty() -> Self {
Self::from_body(Bytes::new())
}
Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/helpers/transport/stream/box_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ impl WrappedBoxBodyStream {
Self(Box::pin(input.map(Bytes::from).map(Ok)))
}

#[must_use]
pub fn empty() -> Self {
WrappedBoxBodyStream(Box::pin(futures::stream::empty()))
}
Expand Down
12 changes: 4 additions & 8 deletions ipa-core/src/net/server/handlers/query/create.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::future::Future;

use axum::{routing::post, Extension, Json, Router};
use hyper::StatusCode;

use crate::{
helpers::{ApiError::NewQuery, BodyStream, HelperResponse, Transport},
helpers::{ApiError::NewQuery, BodyStream, Transport},
net::{http_serde, Error, HttpTransport},
query::NewQueryError,
sync::Arc,
Expand All @@ -21,9 +19,7 @@ async fn handler(
.dispatch(req.query_config, BodyStream::empty())
.await
{
Ok(resp) => {
Ok(Json(resp.try_into()?))
},
Ok(resp) => Ok(Json(resp.try_into()?)),
Err(err @ NewQuery(NewQueryError::State { .. })) => {
Err(Error::application(StatusCode::CONFLICT, err))
}
Expand All @@ -39,7 +35,7 @@ pub fn router(transport: Arc<HttpTransport>) -> Router {

#[cfg(all(test, unit_test))]
mod tests {
use std::{future::ready, num::NonZeroU32};
use std::num::NonZeroU32;

use axum::http::Request;
use hyper::{
Expand All @@ -65,7 +61,7 @@ mod tests {
async fn create_test(expected_query_config: QueryConfig) {
let TestServer { server, .. } = TestServer::builder()
.with_request_handler(Box::new(
move |addr: Addr<HelperIdentity>, data: BodyStream| {
move |addr: Addr<HelperIdentity>, _data: BodyStream| {
let RouteId::ReceiveQuery = addr.route else {
panic!("unexpected call");
};
Expand Down
5 changes: 2 additions & 3 deletions ipa-core/src/net/server/handlers/query/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub fn router(transport: Arc<HttpTransport>) -> Router {

#[cfg(all(test, unit_test))]
mod tests {
use std::future::ready;

use axum::{http::Request, Extension};
use hyper::{Body, StatusCode};
Expand Down Expand Up @@ -72,10 +71,10 @@ mod tests {
config: QueryConfig::new(TestMultiply, FieldType::Fp31, 1).unwrap(),
roles: RoleAssignment::new(HelperIdentity::make_three()),
});
let expected_prepare_query = req.data.clone();
let expected_prepare_query = req.data;
let TestServer { transport, .. } = TestServer::builder()
.with_request_handler(Box::new(
move |addr: Addr<HelperIdentity>, data: BodyStream| {
move |addr: Addr<HelperIdentity>, _data: BodyStream| {
let RouteId::PrepareQuery = addr.route else {
panic!("unexpected call");
};
Expand Down
Loading

0 comments on commit b215d35

Please sign in to comment.