From a22e437e44d63686fb5819fb370c75f51b9dd513 Mon Sep 17 00:00:00 2001 From: Chris Staite Date: Tue, 18 Jul 2023 13:32:59 +0000 Subject: [PATCH] Fix incorrect type check. When re-factoring the GrpcScheduler change, a dodgy copy-paste made it in. Fix up the type check to the correct type. Also fixes an issue in the action_name() implementation that was detected when writing a test. The size is not encoded but is expected for decoding. --- cas/grpc_service/cas_server.rs | 2 +- cas/scheduler/action_messages.rs | 85 ++++++++++++++------- cas/scheduler/simple_scheduler.rs | 8 +- cas/scheduler/tests/action_messages_test.rs | 14 ++-- cas/store/dedup_store.rs | 2 +- cas/store/filesystem_store.rs | 4 +- cas/store/grpc_store.rs | 9 ++- cas/store/memory_store.rs | 2 +- cas/store/s3_store.rs | 4 +- cas/store/tests/filesystem_store_test.rs | 6 +- util/common.rs | 8 +- util/evicting_map.rs | 2 +- 12 files changed, 91 insertions(+), 55 deletions(-) diff --git a/cas/grpc_service/cas_server.rs b/cas/grpc_service/cas_server.rs index 0bb005591..751943255 100644 --- a/cas/grpc_service/cas_server.rs +++ b/cas/grpc_service/cas_server.rs @@ -93,7 +93,7 @@ impl CasServer { log::error!( "Error during .has() call in .find_missing_blobs() : {:?} - {}", err, - digest_info.str() + digest_info.hash_str() ); Some(Ok(digest)) } diff --git a/cas/scheduler/action_messages.rs b/cas/scheduler/action_messages.rs index fe1f21d98..ee59b930b 100644 --- a/cas/scheduler/action_messages.rs +++ b/cas/scheduler/action_messages.rs @@ -71,7 +71,13 @@ impl ActionInfoHashKey { /// Returns the salt used for cache busting/hashing. #[inline] pub fn action_name(&self) -> String { - format!("{}/{}/{:X}", self.instance_name, self.digest.str(), self.salt) + format!( + "{}/{}-{}/{:X}", + self.instance_name, + self.digest.hash_str(), + self.digest.size_bytes, + self.salt + ) } } @@ -758,18 +764,49 @@ impl TryFrom for ActionStage { } } +// TODO: Should be able to remove this after tokio-rs/prost#299 +trait TypeUrl: Message { + const TYPE_URL: &'static str; +} + +impl TypeUrl for ExecuteResponse { + const TYPE_URL: &'static str = "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse"; +} + +impl TypeUrl for ExecuteOperationMetadata { + const TYPE_URL: &'static str = "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata"; +} + +fn from_any(message: &Any) -> Result +where + T: TypeUrl + Default, +{ + error_if!( + message.type_url != T::TYPE_URL, + "Incorrect type when decoding Any. {} != {}", + message.type_url, + T::TYPE_URL.to_string() + ); + Ok(T::decode(message.value.as_slice())?) +} + +fn to_any(message: &T) -> Any +where + T: TypeUrl, +{ + Any { + type_url: T::TYPE_URL.to_string(), + value: message.encode_to_vec(), + } +} + impl TryFrom for ActionState { type Error = Error; fn try_from(operation: Operation) -> Result { - let metadata_data = operation.metadata.err_tip(|| "No metadata in upstream operation")?; - error_if!( - metadata_data.type_url != "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse", - "Incorrect metadata structure in upstream operation. {} != type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse", - metadata_data.type_url - ); - let metadata = ExecuteOperationMetadata::decode(metadata_data.value.as_slice()) - .err_tip(|| "Could not decode metadata in upstream operation")?; + let metadata = + from_any::(&operation.metadata.err_tip(|| "No metadata in upstream operation")?) + .err_tip(|| "Could not decode metadata in upstream operation")?; let action_digest = metadata .action_digest @@ -792,12 +829,9 @@ impl TryFrom for ActionState { LongRunningResult::Error(error) => ActionStage::Error((error.into(), ActionResult::default())), LongRunningResult::Response(response) => { // Could be Completed, CompletedFromCache or Error. - error_if!( - response.type_url != "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse", - "Incorrect result structure for completed upstream action. {} != type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse", - response.type_url - ); - ExecuteResponse::decode(response.value.as_slice())?.try_into()? + from_any::(&response) + .err_tip(|| "Could not decode result structure for completed upstream action")? + .try_into()? } } } @@ -842,14 +876,13 @@ impl ActionState { impl From for Operation { fn from(val: ActionState) -> Self { - let has_action_result = val.stage.has_action_result(); let stage = Into::::into(&val.stage) as i32; - let execute_response: ExecuteResponse = val.stage.into(); - let serialized_response = if has_action_result { - execute_response.encode_to_vec() + let result = if val.stage.has_action_result() { + let execute_response: ExecuteResponse = val.stage.into(); + Some(LongRunningResult::Response(to_any(&execute_response))) } else { - vec![] + None }; let metadata = ExecuteOperationMetadata { @@ -862,15 +895,9 @@ impl From for Operation { Self { name: val.unique_qualifier.action_name(), - metadata: Some(Any { - type_url: "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata".to_string(), - value: metadata.encode_to_vec(), - }), - done: has_action_result, - result: Some(LongRunningResult::Response(Any { - type_url: "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse".to_string(), - value: serialized_response, - })), + metadata: Some(to_any(&metadata)), + done: result.is_some(), + result, } } } diff --git a/cas/scheduler/simple_scheduler.rs b/cas/scheduler/simple_scheduler.rs index 6272f9e03..d41fd44b7 100644 --- a/cas/scheduler/simple_scheduler.rs +++ b/cas/scheduler/simple_scheduler.rs @@ -341,7 +341,7 @@ impl SimpleSchedulerImpl { // again. log::warn!( "Action {} has no more listeners during evict_worker()", - action_info.digest().str() + action_info.digest().hash_str() ); } } @@ -380,7 +380,7 @@ impl SimpleSchedulerImpl { let Some(awaited_action) = self.queued_actions.get(action_info.as_ref()) else { log::error!( "queued_actions out of sync with itself for action {}", - action_info.digest().str() + action_info.digest().hash_str() ); continue; }; @@ -415,7 +415,7 @@ impl SimpleSchedulerImpl { // again. log::warn!( "Action {} has no more listeners", - awaited_action.action_info.digest().str() + awaited_action.action_info.digest().hash_str() ); } awaited_action.attempts += 1; @@ -520,7 +520,7 @@ impl SimpleSchedulerImpl { if send_result.is_err() { log::warn!( "Action {} has no more listeners during update_action()", - action_info.digest().str() + action_info.digest().hash_str() ); } // If the operation is not finished it means the worker is still working on it, so put it diff --git a/cas/scheduler/tests/action_messages_test.rs b/cas/scheduler/tests/action_messages_test.rs index b36fcebd2..035aab368 100644 --- a/cas/scheduler/tests/action_messages_test.rs +++ b/cas/scheduler/tests/action_messages_test.rs @@ -39,17 +39,18 @@ mod action_messages_tests { #[tokio::test] async fn action_state_any_url_test() -> Result<(), Error> { - let operation: Operation = ActionState { + let action_state = ActionState { unique_qualifier: ActionInfoHashKey { instance_name: "foo_instance".to_string(), digest: DigestInfo::new([1u8; 32], 5), salt: 0, }, - stage: ActionStage::Unknown, - } - .into(); + // Result is only populated if has_action_result. + stage: ActionStage::Completed(ActionResult::default()), + }; + let operation: Operation = action_state.clone().into(); - match operation.result { + match &operation.result { Some(operation::Result::Response(any)) => assert_eq!( any.type_url, "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse" @@ -57,6 +58,9 @@ mod action_messages_tests { other => panic!("Expected Some(Result(Any)), got: {other:?}"), } + let action_state_round_trip: ActionState = operation.try_into()?; + assert_eq!(action_state, action_state_round_trip); + Ok(()) } diff --git a/cas/store/dedup_store.rs b/cas/store/dedup_store.rs index 704deaf14..1993ec7cf 100644 --- a/cas/store/dedup_store.rs +++ b/cas/store/dedup_store.rs @@ -123,7 +123,7 @@ impl StoreTrait for DedupStore { Err(e) => { log::warn!( "Failed to deserialize index in dedup store : {} - {:?}", - digest.str(), + digest.hash_str(), e ); // We return the equivalent of NotFound here so the client is happy. diff --git a/cas/store/filesystem_store.rs b/cas/store/filesystem_store.rs index 53ee2451d..105600592 100644 --- a/cas/store/filesystem_store.rs +++ b/cas/store/filesystem_store.rs @@ -108,7 +108,7 @@ impl Drop for EncodedFilePath { #[inline] fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> String { - format!("{}/{}-{}", folder, digest.str(), digest.size_bytes) + format!("{}/{}-{}", folder, digest.hash_str(), digest.size_bytes) } #[async_trait] @@ -301,7 +301,7 @@ impl LenEntry for FileEntryImpl { log::info!( "\x1b[0;31mFilesystem Store\x1b[0m: Unref {}, moving file {} to {}", - encoded_file_path.digest.str(), + encoded_file_path.digest.hash_str(), &from_path, &to_path ); diff --git a/cas/store/grpc_store.rs b/cas/store/grpc_store.rs index 6e6c5f3ff..1fe231e07 100644 --- a/cas/store/grpc_store.rs +++ b/cas/store/grpc_store.rs @@ -401,7 +401,7 @@ impl StoreTrait for GrpcStore { "{}/uploads/{}/blobs/{}/{}", &self.instance_name, Uuid::new_v4().hyphenated().encode_lower(&mut buf), - digest.str(), + digest.hash_str(), digest.size_bytes, ); @@ -467,7 +467,12 @@ impl StoreTrait for GrpcStore { return self.get_action_result_as_part(digest, writer, offset, length).await; } - let resource_name = format!("{}/blobs/{}/{}", &self.instance_name, digest.str(), digest.size_bytes,); + let resource_name = format!( + "{}/blobs/{}/{}", + &self.instance_name, + digest.hash_str(), + digest.size_bytes, + ); let mut stream = self .read(Request::new(ReadRequest { diff --git a/cas/store/memory_store.rs b/cas/store/memory_store.rs index fa2bb9ec4..233953b51 100644 --- a/cas/store/memory_store.rs +++ b/cas/store/memory_store.rs @@ -111,7 +111,7 @@ impl StoreTrait for MemoryStore { .map .get(&digest) .await - .err_tip_with_code(|_| (Code::NotFound, format!("Hash {} not found", digest.str())))?; + .err_tip_with_code(|_| (Code::NotFound, format!("Hash {} not found", digest.hash_str())))?; let default_len = value.len() - offset; let length = length.unwrap_or(default_len).min(default_len); diff --git a/cas/store/s3_store.rs b/cas/store/s3_store.rs index cd8fe97c1..0e10cc7eb 100644 --- a/cas/store/s3_store.rs +++ b/cas/store/s3_store.rs @@ -182,7 +182,7 @@ impl S3Store { } fn make_s3_path(&self, digest: &DigestInfo) -> String { - format!("{}{}-{}", self.key_prefix, digest.str(), digest.size_bytes) + format!("{}{}-{}", self.key_prefix, digest.hash_str(), digest.size_bytes) } } @@ -267,7 +267,7 @@ impl StoreTrait for S3Store { error_if!( write_buf.len() > max_size, "More data than provided max_size in s3_store {}", - digest.str() + digest.hash_str() ); let content_length = write_buf.len(); ( diff --git a/cas/store/tests/filesystem_store_test.rs b/cas/store/tests/filesystem_store_test.rs index 4454cd2ce..33c57ad85 100644 --- a/cas/store/tests/filesystem_store_test.rs +++ b/cas/store/tests/filesystem_store_test.rs @@ -261,7 +261,7 @@ mod filesystem_store_tests { // Insert data into store. store.as_ref().update_oneshot(digest1, VALUE1.into()).await?; - let expected_file_name = format!("{}/{}-{}", content_path, digest1.str(), digest1.size_bytes); + let expected_file_name = format!("{}/{}-{}", content_path, digest1.hash_str(), digest1.size_bytes); { // Check to ensure our file exists where it should and content matches. let data = read_file_contents(&expected_file_name).await?; @@ -551,8 +551,8 @@ mod filesystem_store_tests { fs::create_dir_all(&content_path).await?; // Make the two files on disk before loading the store. - let file1 = format!("{}/{}-{}", content_path, digest1.str(), digest1.size_bytes); - let file2 = format!("{}/{}-{}", content_path, digest2.str(), digest2.size_bytes); + let file1 = format!("{}/{}-{}", content_path, digest1.hash_str(), digest1.size_bytes); + let file2 = format!("{}/{}-{}", content_path, digest2.hash_str(), digest2.size_bytes); write_file(&file1, VALUE1.as_bytes()).await?; write_file(&file2, VALUE2.as_bytes()).await?; set_file_atime(&file1, FileTime::from_unix_time(0, 0))?; diff --git a/util/common.rs b/util/common.rs index e9682b90e..64cee102b 100644 --- a/util/common.rs +++ b/util/common.rs @@ -63,7 +63,7 @@ impl DigestInfo { }) } - pub fn str(&self) -> String { + pub fn hash_str(&self) -> String { hex::encode(self.packed_hash) } @@ -83,7 +83,7 @@ impl fmt::Debug for DigestInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DigestInfo") .field("size_bytes", &self.size_bytes) - .field("hash", &self.str()) + .field("hash", &self.hash_str()) .finish() } } @@ -104,7 +104,7 @@ impl TryFrom for DigestInfo { impl From for Digest { fn from(val: DigestInfo) -> Self { Digest { - hash: val.str(), + hash: val.hash_str(), size_bytes: val.size_bytes, } } @@ -113,7 +113,7 @@ impl From for Digest { impl From<&DigestInfo> for Digest { fn from(val: &DigestInfo) -> Self { Digest { - hash: val.str(), + hash: val.hash_str(), size_bytes: val.size_bytes, } } diff --git a/util/evicting_map.rs b/util/evicting_map.rs index ed810afdc..7a1da862e 100644 --- a/util/evicting_map.rs +++ b/util/evicting_map.rs @@ -207,7 +207,7 @@ where state.sum_store_size -= eviction_item.data.len() as u64; // Note: See comment in `unref()` requring global lock of insert/remove. eviction_item.data.unref().await; - log::info!("\x1b[0;31mEvicting Map\x1b[0m: Evicting {}", key.str()); + log::info!("\x1b[0;31mEvicting Map\x1b[0m: Evicting {}", key.hash_str()); peek_entry = if let Some((_, entry)) = state.lru.peek_lru() { entry