Skip to content

Commit

Permalink
Fix incorrect type check.
Browse files Browse the repository at this point in the history
When re-factoring the GrpcScheduler change, a dodgy copy-paste
made it in.  Fix up the type check to the correct type.

Also fixes an issue in the action_name() implementation that was
detected when writing a test.  The size is not encoded but is
expected for decoding.
  • Loading branch information
chrisstaite-menlo committed Jul 18, 2023
1 parent e9ab61e commit a22e437
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 55 deletions.
2 changes: 1 addition & 1 deletion cas/grpc_service/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl CasServer {
log::error!(
"Error during .has() call in .find_missing_blobs() : {:?} - {}",
err,
digest_info.str()
digest_info.hash_str()
);
Some(Ok(digest))
}
Expand Down
85 changes: 56 additions & 29 deletions cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ impl ActionInfoHashKey {
/// Returns the salt used for cache busting/hashing.
#[inline]
pub fn action_name(&self) -> String {
format!("{}/{}/{:X}", self.instance_name, self.digest.str(), self.salt)
format!(
"{}/{}-{}/{:X}",
self.instance_name,
self.digest.hash_str(),
self.digest.size_bytes,
self.salt
)
}
}

Expand Down Expand Up @@ -758,18 +764,49 @@ impl TryFrom<ExecuteResponse> for ActionStage {
}
}

// TODO: Should be able to remove this after tokio-rs/prost#299
trait TypeUrl: Message {
const TYPE_URL: &'static str;
}

impl TypeUrl for ExecuteResponse {
const TYPE_URL: &'static str = "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse";
}

impl TypeUrl for ExecuteOperationMetadata {
const TYPE_URL: &'static str = "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata";
}

fn from_any<T>(message: &Any) -> Result<T, Error>
where
T: TypeUrl + Default,
{
error_if!(
message.type_url != T::TYPE_URL,
"Incorrect type when decoding Any. {} != {}",
message.type_url,
T::TYPE_URL.to_string()
);
Ok(T::decode(message.value.as_slice())?)
}

fn to_any<T>(message: &T) -> Any
where
T: TypeUrl,
{
Any {
type_url: T::TYPE_URL.to_string(),
value: message.encode_to_vec(),
}
}

impl TryFrom<Operation> for ActionState {
type Error = Error;

fn try_from(operation: Operation) -> Result<ActionState, Error> {
let metadata_data = operation.metadata.err_tip(|| "No metadata in upstream operation")?;
error_if!(
metadata_data.type_url != "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse",
"Incorrect metadata structure in upstream operation. {} != type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse",
metadata_data.type_url
);
let metadata = ExecuteOperationMetadata::decode(metadata_data.value.as_slice())
.err_tip(|| "Could not decode metadata in upstream operation")?;
let metadata =
from_any::<ExecuteOperationMetadata>(&operation.metadata.err_tip(|| "No metadata in upstream operation")?)
.err_tip(|| "Could not decode metadata in upstream operation")?;

let action_digest = metadata
.action_digest
Expand All @@ -792,12 +829,9 @@ impl TryFrom<Operation> for ActionState {
LongRunningResult::Error(error) => ActionStage::Error((error.into(), ActionResult::default())),
LongRunningResult::Response(response) => {
// Could be Completed, CompletedFromCache or Error.
error_if!(
response.type_url != "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse",
"Incorrect result structure for completed upstream action. {} != type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse",
response.type_url
);
ExecuteResponse::decode(response.value.as_slice())?.try_into()?
from_any::<ExecuteResponse>(&response)
.err_tip(|| "Could not decode result structure for completed upstream action")?
.try_into()?
}
}
}
Expand Down Expand Up @@ -842,14 +876,13 @@ impl ActionState {

impl From<ActionState> for Operation {
fn from(val: ActionState) -> Self {
let has_action_result = val.stage.has_action_result();
let stage = Into::<execution_stage::Value>::into(&val.stage) as i32;
let execute_response: ExecuteResponse = val.stage.into();

let serialized_response = if has_action_result {
execute_response.encode_to_vec()
let result = if val.stage.has_action_result() {
let execute_response: ExecuteResponse = val.stage.into();
Some(LongRunningResult::Response(to_any(&execute_response)))
} else {
vec![]
None
};

let metadata = ExecuteOperationMetadata {
Expand All @@ -862,15 +895,9 @@ impl From<ActionState> for Operation {

Self {
name: val.unique_qualifier.action_name(),
metadata: Some(Any {
type_url: "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata".to_string(),
value: metadata.encode_to_vec(),
}),
done: has_action_result,
result: Some(LongRunningResult::Response(Any {
type_url: "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse".to_string(),
value: serialized_response,
})),
metadata: Some(to_any(&metadata)),
done: result.is_some(),
result,
}
}
}
8 changes: 4 additions & 4 deletions cas/scheduler/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl SimpleSchedulerImpl {
// again.
log::warn!(
"Action {} has no more listeners during evict_worker()",
action_info.digest().str()
action_info.digest().hash_str()
);
}
}
Expand Down Expand Up @@ -380,7 +380,7 @@ impl SimpleSchedulerImpl {
let Some(awaited_action) = self.queued_actions.get(action_info.as_ref()) else {
log::error!(
"queued_actions out of sync with itself for action {}",
action_info.digest().str()
action_info.digest().hash_str()
);
continue;
};
Expand Down Expand Up @@ -415,7 +415,7 @@ impl SimpleSchedulerImpl {
// again.
log::warn!(
"Action {} has no more listeners",
awaited_action.action_info.digest().str()
awaited_action.action_info.digest().hash_str()
);
}
awaited_action.attempts += 1;
Expand Down Expand Up @@ -520,7 +520,7 @@ impl SimpleSchedulerImpl {
if send_result.is_err() {
log::warn!(
"Action {} has no more listeners during update_action()",
action_info.digest().str()
action_info.digest().hash_str()
);
}
// If the operation is not finished it means the worker is still working on it, so put it
Expand Down
14 changes: 9 additions & 5 deletions cas/scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,28 @@ mod action_messages_tests {

#[tokio::test]
async fn action_state_any_url_test() -> Result<(), Error> {
let operation: Operation = ActionState {
let action_state = ActionState {
unique_qualifier: ActionInfoHashKey {
instance_name: "foo_instance".to_string(),
digest: DigestInfo::new([1u8; 32], 5),
salt: 0,
},
stage: ActionStage::Unknown,
}
.into();
// Result is only populated if has_action_result.
stage: ActionStage::Completed(ActionResult::default()),
};
let operation: Operation = action_state.clone().into();

match operation.result {
match &operation.result {
Some(operation::Result::Response(any)) => assert_eq!(
any.type_url,
"type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse"
),
other => panic!("Expected Some(Result(Any)), got: {other:?}"),
}

let action_state_round_trip: ActionState = operation.try_into()?;
assert_eq!(action_state, action_state_round_trip);

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion cas/store/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl StoreTrait for DedupStore {
Err(e) => {
log::warn!(
"Failed to deserialize index in dedup store : {} - {:?}",
digest.str(),
digest.hash_str(),
e
);
// We return the equivalent of NotFound here so the client is happy.
Expand Down
4 changes: 2 additions & 2 deletions cas/store/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Drop for EncodedFilePath {

#[inline]
fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> String {
format!("{}/{}-{}", folder, digest.str(), digest.size_bytes)
format!("{}/{}-{}", folder, digest.hash_str(), digest.size_bytes)
}

#[async_trait]
Expand Down Expand Up @@ -301,7 +301,7 @@ impl LenEntry for FileEntryImpl {

log::info!(
"\x1b[0;31mFilesystem Store\x1b[0m: Unref {}, moving file {} to {}",
encoded_file_path.digest.str(),
encoded_file_path.digest.hash_str(),
&from_path,
&to_path
);
Expand Down
9 changes: 7 additions & 2 deletions cas/store/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl StoreTrait for GrpcStore {
"{}/uploads/{}/blobs/{}/{}",
&self.instance_name,
Uuid::new_v4().hyphenated().encode_lower(&mut buf),
digest.str(),
digest.hash_str(),
digest.size_bytes,
);

Expand Down Expand Up @@ -467,7 +467,12 @@ impl StoreTrait for GrpcStore {
return self.get_action_result_as_part(digest, writer, offset, length).await;
}

let resource_name = format!("{}/blobs/{}/{}", &self.instance_name, digest.str(), digest.size_bytes,);
let resource_name = format!(
"{}/blobs/{}/{}",
&self.instance_name,
digest.hash_str(),
digest.size_bytes,
);

let mut stream = self
.read(Request::new(ReadRequest {
Expand Down
2 changes: 1 addition & 1 deletion cas/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl StoreTrait for MemoryStore {
.map
.get(&digest)
.await
.err_tip_with_code(|_| (Code::NotFound, format!("Hash {} not found", digest.str())))?;
.err_tip_with_code(|_| (Code::NotFound, format!("Hash {} not found", digest.hash_str())))?;

let default_len = value.len() - offset;
let length = length.unwrap_or(default_len).min(default_len);
Expand Down
4 changes: 2 additions & 2 deletions cas/store/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl S3Store {
}

fn make_s3_path(&self, digest: &DigestInfo) -> String {
format!("{}{}-{}", self.key_prefix, digest.str(), digest.size_bytes)
format!("{}{}-{}", self.key_prefix, digest.hash_str(), digest.size_bytes)
}
}

Expand Down Expand Up @@ -267,7 +267,7 @@ impl StoreTrait for S3Store {
error_if!(
write_buf.len() > max_size,
"More data than provided max_size in s3_store {}",
digest.str()
digest.hash_str()
);
let content_length = write_buf.len();
(
Expand Down
6 changes: 3 additions & 3 deletions cas/store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ mod filesystem_store_tests {
// Insert data into store.
store.as_ref().update_oneshot(digest1, VALUE1.into()).await?;

let expected_file_name = format!("{}/{}-{}", content_path, digest1.str(), digest1.size_bytes);
let expected_file_name = format!("{}/{}-{}", content_path, digest1.hash_str(), digest1.size_bytes);
{
// Check to ensure our file exists where it should and content matches.
let data = read_file_contents(&expected_file_name).await?;
Expand Down Expand Up @@ -551,8 +551,8 @@ mod filesystem_store_tests {
fs::create_dir_all(&content_path).await?;

// Make the two files on disk before loading the store.
let file1 = format!("{}/{}-{}", content_path, digest1.str(), digest1.size_bytes);
let file2 = format!("{}/{}-{}", content_path, digest2.str(), digest2.size_bytes);
let file1 = format!("{}/{}-{}", content_path, digest1.hash_str(), digest1.size_bytes);
let file2 = format!("{}/{}-{}", content_path, digest2.hash_str(), digest2.size_bytes);
write_file(&file1, VALUE1.as_bytes()).await?;
write_file(&file2, VALUE2.as_bytes()).await?;
set_file_atime(&file1, FileTime::from_unix_time(0, 0))?;
Expand Down
8 changes: 4 additions & 4 deletions util/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl DigestInfo {
})
}

pub fn str(&self) -> String {
pub fn hash_str(&self) -> String {
hex::encode(self.packed_hash)
}

Expand All @@ -83,7 +83,7 @@ impl fmt::Debug for DigestInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DigestInfo")
.field("size_bytes", &self.size_bytes)
.field("hash", &self.str())
.field("hash", &self.hash_str())
.finish()
}
}
Expand All @@ -104,7 +104,7 @@ impl TryFrom<Digest> for DigestInfo {
impl From<DigestInfo> for Digest {
fn from(val: DigestInfo) -> Self {
Digest {
hash: val.str(),
hash: val.hash_str(),
size_bytes: val.size_bytes,
}
}
Expand All @@ -113,7 +113,7 @@ impl From<DigestInfo> for Digest {
impl From<&DigestInfo> for Digest {
fn from(val: &DigestInfo) -> Self {
Digest {
hash: val.str(),
hash: val.hash_str(),
size_bytes: val.size_bytes,
}
}
Expand Down
2 changes: 1 addition & 1 deletion util/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ where
state.sum_store_size -= eviction_item.data.len() as u64;
// Note: See comment in `unref()` requring global lock of insert/remove.
eviction_item.data.unref().await;
log::info!("\x1b[0;31mEvicting Map\x1b[0m: Evicting {}", key.str());
log::info!("\x1b[0;31mEvicting Map\x1b[0m: Evicting {}", key.hash_str());

peek_entry = if let Some((_, entry)) = state.lru.peek_lru() {
entry
Expand Down

0 comments on commit a22e437

Please sign in to comment.