Skip to content

Commit

Permalink
Move to 120 character line limit
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jan 10, 2021
1 parent c27685c commit feb392d
Show file tree
Hide file tree
Showing 14 changed files with 62 additions and 168 deletions.
1 change: 1 addition & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
max_width = 120
33 changes: 8 additions & 25 deletions cas/grpc_service/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use prost::Message;
use tonic::{Request, Response, Status};

use proto::build::bazel::remote::execution::v2::{
action_cache_server::ActionCache, action_cache_server::ActionCacheServer as Server,
ActionResult, GetActionResultRequest, UpdateActionResultRequest,
action_cache_server::ActionCache, action_cache_server::ActionCacheServer as Server, ActionResult,
GetActionResultRequest, UpdateActionResultRequest,
};

use common::{log, DigestInfo};
Expand Down Expand Up @@ -52,19 +52,11 @@ impl AcServer {
let mut cursor = Cursor::new(&mut store_data);
self.ac_store.get(&digest, &mut cursor).await?;

let action_result =
ActionResult::decode(Cursor::new(&store_data)).err_tip_with_code(|e| {
(
Code::NotFound,
format!("Stored value appears to be corrupt: {}", e),
)
})?;
let action_result = ActionResult::decode(Cursor::new(&store_data))
.err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))?;

if store_data.len() != digest.size_bytes as usize {
return Err(make_err!(
Code::NotFound,
"Found item, but size does not match"
));
return Err(make_err!(Code::NotFound, "Found item, but size does not match"));
}
Ok(Response::new(action_result))
}
Expand Down Expand Up @@ -92,9 +84,7 @@ impl AcServer {
.encode(&mut store_data)
.err_tip(|| "Provided ActionResult could not be serialized")?;

self.ac_store
.update(&digest, Box::new(Cursor::new(store_data)))
.await?;
self.ac_store.update(&digest, Box::new(Cursor::new(store_data))).await?;
Ok(Response::new(action_result))
}
}
Expand All @@ -106,10 +96,7 @@ impl ActionCache for AcServer {
grpc_request: Request<GetActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
let now = Instant::now();
log::info!(
"\x1b[0;31mget_action_result Req\x1b[0m: {:?}",
grpc_request.get_ref()
);
log::info!("\x1b[0;31mget_action_result Req\x1b[0m: {:?}", grpc_request.get_ref());
let resp = self.inner_get_action_result(grpc_request).await;
let d = now.elapsed().as_secs_f32();
log::info!("\x1b[0;31mget_action_result Resp\x1b[0m: {} {:?}", d, resp);
Expand All @@ -127,11 +114,7 @@ impl ActionCache for AcServer {
);
let resp = self.inner_update_action_result(grpc_request).await;
let d = now.elapsed().as_secs_f32();
log::info!(
"\x1b[0;31mupdate_action_result Resp\x1b[0m: {} {:?}",
d,
resp
);
log::info!("\x1b[0;31mupdate_action_result Resp\x1b[0m: {} {:?}", d, resp);
return resp.map_err(|e| e.into());
}
}
22 changes: 6 additions & 16 deletions cas/grpc_service/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tonic::{Request, Response, Status, Streaming};

use proto::google::bytestream::{
byte_stream_server::ByteStream, byte_stream_server::ByteStreamServer as Server,
QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
WriteResponse,
byte_stream_server::ByteStream, byte_stream_server::ByteStreamServer as Server, QueryWriteStatusRequest,
QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, WriteResponse,
};

use common::{log, DigestInfo};
Expand Down Expand Up @@ -55,9 +54,7 @@ impl ByteStreamServer {
let expected_size = stream.expected_size;
tokio::spawn(async move {
let rx = Box::new(rx.take(expected_size as u64));
store
.update(&DigestInfo::try_new(&hash, expected_size)?, rx)
.await
store.update(&DigestInfo::try_new(&hash, expected_size)?, rx).await
})
};

Expand Down Expand Up @@ -190,20 +187,13 @@ impl WriteRequestStreamWrapper {

#[tonic::async_trait]
impl ByteStream for ByteStreamServer {
type ReadStream =
Pin<Box<dyn Stream<Item = Result<ReadResponse, Status>> + Send + Sync + 'static>>;
async fn read(
&self,
_grpc_request: Request<ReadRequest>,
) -> Result<Response<Self::ReadStream>, Status> {
type ReadStream = Pin<Box<dyn Stream<Item = Result<ReadResponse, Status>> + Send + Sync + 'static>>;
async fn read(&self, _grpc_request: Request<ReadRequest>) -> Result<Response<Self::ReadStream>, Status> {
log::info!("\x1b[0;31mread\x1b[0m {:?}", _grpc_request.get_ref());
Err(Status::unimplemented(""))
}

async fn write(
&self,
grpc_request: Request<Streaming<WriteRequest>>,
) -> Result<Response<WriteResponse>, Status> {
async fn write(&self, grpc_request: Request<Streaming<WriteRequest>>) -> Result<Response<WriteResponse>, Status> {
log::info!("\x1b[0;31mWrite Req\x1b[0m: {:?}", grpc_request.get_ref());
let now = Instant::now();
let resp = self
Expand Down
7 changes: 2 additions & 5 deletions cas/grpc_service/capabilities_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use tonic::{Request, Response, Status};

use proto::build::bazel::remote::execution::v2::{
capabilities_server::Capabilities, capabilities_server::CapabilitiesServer as Server,
digest_function::Value as DigestFunction,
symlink_absolute_path_strategy::Value as SymlinkAbsolutePathStrategy,
digest_function::Value as DigestFunction, symlink_absolute_path_strategy::Value as SymlinkAbsolutePathStrategy,
ActionCacheUpdateCapabilities, CacheCapabilities, GetCapabilitiesRequest, ServerCapabilities,
};

Expand All @@ -29,9 +28,7 @@ impl Capabilities for CapabilitiesServer {
let resp = ServerCapabilities {
cache_capabilities: Some(CacheCapabilities {
digest_function: vec![DigestFunction::Sha256.into()],
action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities {
update_enabled: true,
}),
action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities { update_enabled: true }),
cache_priority_capabilities: None,
max_batch_total_size_bytes: 0,
symlink_absolute_path_strategy: SymlinkAbsolutePathStrategy::Disallowed.into(),
Expand Down
54 changes: 16 additions & 38 deletions cas/grpc_service/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ use tonic::{Request, Response, Status};
use proto::build::bazel::remote::execution::v2::{
batch_read_blobs_response, batch_update_blobs_response,
content_addressable_storage_server::ContentAddressableStorage,
content_addressable_storage_server::ContentAddressableStorageServer as Server,
BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
BatchUpdateBlobsResponse, FindMissingBlobsRequest, FindMissingBlobsResponse, GetTreeRequest,
GetTreeResponse,
content_addressable_storage_server::ContentAddressableStorageServer as Server, BatchReadBlobsRequest,
BatchReadBlobsResponse, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse, FindMissingBlobsRequest,
FindMissingBlobsResponse, GetTreeRequest, GetTreeResponse,
};
use proto::google::rpc::Status as GrpcStatus;

Expand Down Expand Up @@ -46,10 +45,10 @@ impl CasServer {
let store = self.store.clone();
let digest: DigestInfo = digest.try_into()?;
futures.push(tokio::spawn(async move {
store.has(&digest).await.map_or_else(
|_| None,
|success| if success { None } else { Some(digest) },
)
store
.has(&digest)
.await
.map_or_else(|_| None, |success| if success { None } else { Some(digest) })
}));
}
let mut responses = Vec::with_capacity(futures.len());
Expand All @@ -70,10 +69,7 @@ impl CasServer {
) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
let mut futures = futures::stream::FuturesOrdered::new();
for request in grpc_request.into_inner().requests {
let digest: DigestInfo = request
.digest
.err_tip(|| "Digest not found in request")?
.try_into()?;
let digest: DigestInfo = request.digest.err_tip(|| "Digest not found in request")?.try_into()?;
let digest_copy = digest.clone();
let store = self.store.clone();
let request_data = request.data;
Expand Down Expand Up @@ -103,9 +99,7 @@ impl CasServer {
while let Some(result) = futures.next().await {
responses.push(result.err_tip(|| "Internal error joining future")?);
}
Ok(Response::new(BatchUpdateBlobsResponse {
responses: responses,
}))
Ok(Response::new(BatchUpdateBlobsResponse { responses: responses }))
}

async fn inner_batch_read_blobs(
Expand All @@ -132,8 +126,7 @@ impl CasServer {
Ok(store_data)
}
.map(|result: Result<Vec<u8>, Error>| {
let (status, data) =
result.map_or_else(|e| (e.into(), vec![]), |v| (GrpcStatus::default(), v));
let (status, data) = result.map_or_else(|e| (e.into(), vec![]), |v| (GrpcStatus::default(), v));
batch_read_blobs_response::Response {
status: Some(status),
digest: Some(digest.into()),
Expand All @@ -147,9 +140,7 @@ impl CasServer {
while let Some(result) = futures.next().await {
responses.push(result.err_tip(|| "Internal error joining future")?);
}
Ok(Response::new(BatchReadBlobsResponse {
responses: responses,
}))
Ok(Response::new(BatchReadBlobsResponse { responses: responses }))
}
}

Expand All @@ -159,10 +150,7 @@ impl ContentAddressableStorage for CasServer {
&self,
grpc_request: Request<FindMissingBlobsRequest>,
) -> Result<Response<FindMissingBlobsResponse>, Status> {
log::info!(
"\x1b[0;31mfind_missing_blobs Req\x1b[0m: {:?}",
grpc_request.get_ref()
);
log::info!("\x1b[0;31mfind_missing_blobs Req\x1b[0m: {:?}", grpc_request.get_ref());
let now = Instant::now();
let resp = self
.inner_find_missing_blobs(grpc_request)
Expand All @@ -178,10 +166,7 @@ impl ContentAddressableStorage for CasServer {
&self,
grpc_request: Request<BatchUpdateBlobsRequest>,
) -> Result<Response<BatchUpdateBlobsResponse>, Status> {
log::info!(
"\x1b[0;31mbatch_update_blobs Req\x1b[0m: {:?}",
grpc_request.get_ref()
);
log::info!("\x1b[0;31mbatch_update_blobs Req\x1b[0m: {:?}", grpc_request.get_ref());
let now = Instant::now();
let resp = self
.inner_batch_update_blobs(grpc_request)
Expand All @@ -197,10 +182,7 @@ impl ContentAddressableStorage for CasServer {
&self,
grpc_request: Request<BatchReadBlobsRequest>,
) -> Result<Response<BatchReadBlobsResponse>, Status> {
log::info!(
"\x1b[0;31mbatch_read_blobs Req\x1b[0m: {:?}",
grpc_request.get_ref()
);
log::info!("\x1b[0;31mbatch_read_blobs Req\x1b[0m: {:?}", grpc_request.get_ref());
let now = Instant::now();
let resp = self
.inner_batch_read_blobs(grpc_request)
Expand All @@ -212,12 +194,8 @@ impl ContentAddressableStorage for CasServer {
resp
}

type GetTreeStream =
Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + Sync + 'static>>;
async fn get_tree(
&self,
_request: Request<GetTreeRequest>,
) -> Result<Response<Self::GetTreeStream>, Status> {
type GetTreeStream = Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + Sync + 'static>>;
async fn get_tree(&self, _request: Request<GetTreeRequest>) -> Result<Response<Self::GetTreeStream>, Status> {
use stdext::function_name;
let output = format!("{} not yet implemented", function_name!());
log::info!("\x1b[0;31mget_tree\x1b[0m: {:?}", output);
Expand Down
14 changes: 4 additions & 10 deletions cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use futures::Stream;
use tonic::{Request, Response, Status};

use proto::build::bazel::remote::execution::v2::{
execution_server::Execution, execution_server::ExecutionServer as Server, ExecuteRequest,
WaitExecutionRequest,
execution_server::Execution, execution_server::ExecutionServer as Server, ExecuteRequest, WaitExecutionRequest,
};
use proto::google::longrunning::Operation;

Expand All @@ -22,20 +21,15 @@ impl ExecutionServer {

#[tonic::async_trait]
impl Execution for ExecutionServer {
type ExecuteStream =
Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + Sync + 'static>>;
async fn execute(
&self,
_request: Request<ExecuteRequest>,
) -> Result<Response<Self::ExecuteStream>, Status> {
type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + Sync + 'static>>;
async fn execute(&self, _request: Request<ExecuteRequest>) -> Result<Response<Self::ExecuteStream>, Status> {
use stdext::function_name;
let output = format!("{} not yet implemented", function_name!());
println!("{}", output);
Err(Status::unimplemented(output))
}

type WaitExecutionStream =
Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + Sync + 'static>>;
type WaitExecutionStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + Sync + 'static>>;
async fn wait_execution(
&self,
_request: Request<WaitExecutionRequest>,
Expand Down
30 changes: 6 additions & 24 deletions cas/grpc_service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use std::io::Cursor;
use tonic::{Code, Request, Response, Status};

use prost::Message;
use proto::build::bazel::remote::execution::v2::{
action_cache_server::ActionCache, ActionResult, Digest,
};
use proto::build::bazel::remote::execution::v2::{action_cache_server::ActionCache, ActionResult, Digest};

use ac_server::AcServer;
use common::DigestInfo;
Expand All @@ -24,9 +22,7 @@ async fn insert_into_store<T: Message>(
let mut store_data = Vec::new();
action_result.encode(&mut store_data)?;
let digest = DigestInfo::try_new(&hash, store_data.len() as i64)?;
store
.update(&digest, Box::new(Cursor::new(store_data)))
.await?;
store.update(&digest, Box::new(Cursor::new(store_data))).await?;
Ok(digest.size_bytes as i64)
}

Expand All @@ -37,11 +33,7 @@ mod get_action_results {

use proto::build::bazel::remote::execution::v2::GetActionResultRequest;

async fn get_action_result(
ac_server: &AcServer,
hash: &str,
size: i64,
) -> Result<Response<ActionResult>, Status> {
async fn get_action_result(ac_server: &AcServer, hash: &str, size: i64) -> Result<Response<ActionResult>, Status> {
ac_server
.get_action_result(Request::new(GetActionResultRequest {
instance_name: INSTANCE_NAME.to_string(),
Expand Down Expand Up @@ -101,11 +93,7 @@ mod get_action_results {
let digest_size = insert_into_store(ac_store.as_ref(), &HASH1, &action_result).await?;
let raw_response = get_action_result(&ac_server, HASH1, digest_size).await;

assert!(
raw_response.is_ok(),
"Expected value, got error {:?}",
raw_response
);
assert!(raw_response.is_ok(), "Expected value, got error {:?}", raw_response);
assert_eq!(raw_response.unwrap().into_inner(), action_result);
Ok(())
}
Expand Down Expand Up @@ -195,18 +183,12 @@ mod update_action_result {
)
.await;

assert!(
raw_response.is_ok(),
"Expected success, got error {:?}",
raw_response
);
assert!(raw_response.is_ok(), "Expected success, got error {:?}", raw_response);
assert_eq!(raw_response.unwrap().into_inner(), action_result);

let mut raw_data = Vec::new();
let digest = DigestInfo::try_new(&HASH1, size_bytes)?;
ac_store
.get(&digest, &mut Cursor::new(&mut raw_data))
.await?;
ac_store.get(&digest, &mut Cursor::new(&mut raw_data)).await?;

let decoded_action_result = ActionResult::decode(Cursor::new(&raw_data))?;
assert_eq!(decoded_action_result, action_result);
Expand Down
8 changes: 3 additions & 5 deletions cas/grpc_service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,12 @@ pub mod write_tests {
{
// One for spawn() future and one for result.
let server_result = join_handle.await??;
let committed_size = usize::try_from(server_result.into_inner().committed_size)
.or(Err("Cant convert i64 to usize"))?;
let committed_size =
usize::try_from(server_result.into_inner().committed_size).or(Err("Cant convert i64 to usize"))?;
assert_eq!(committed_size as usize, raw_data.len());

// Now lets check our store to ensure it was written with proper data.
store
.has(&DigestInfo::try_new(&HASH1, raw_data.len())?)
.await?;
store.has(&DigestInfo::try_new(&HASH1, raw_data.len())?).await?;
let mut store_data = Vec::new();
store
.get(
Expand Down
Loading

0 comments on commit feb392d

Please sign in to comment.