Skip to content

Commit

Permalink
Convert AwaitedAction to and from raw bytes
Browse files Browse the repository at this point in the history
Makes AwaitedAction Serialize/Deserialize and adds ability to convert to
and from raw bytes. This will be used to communicate operation data to
and external data storage such as Redis through the store API.
  • Loading branch information
zbirenbaum committed Jul 29, 2024
1 parent d8c407a commit 83e4905
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 3 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ rust_library(
"@crates//:parking_lot",
"@crates//:rand",
"@crates//:scopeguard",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:static_assertions",
"@crates//:tokio",
"@crates//:tokio-stream",
Expand Down
2 changes: 2 additions & 0 deletions nativelink-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ tokio = "1.38.0"
tokio-stream = { version = "0.1.15", default-features = false }
tonic = { version = "0.12.0", default-features = false }
tracing = { version = "0.1.40", default-features = false }
serde = { version = "1.0.203", features = ["rc"] }
serde_json = "1.0.120"
static_assertions = "1.1.0"

[dev-dependencies]
Expand Down
26 changes: 23 additions & 3 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use nativelink_error::{make_input_err, Error, ResultExt};
use nativelink_metric::{
MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
};
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
};
use serde::{Deserialize, Serialize};
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};

/// The version of the awaited action.
/// This number will always increment by one each time
/// the action is updated.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct AwaitedActionVersion(u64);

impl MetricsComponent for AwaitedActionVersion {
Expand All @@ -40,7 +42,7 @@ impl MetricsComponent for AwaitedActionVersion {
}

/// An action that is being awaited on and last known state.
#[derive(Debug, Clone, MetricsComponent)]
#[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)]
pub struct AwaitedAction {
/// The current version of the action.
#[metric(help = "The version of the AwaitedAction")]
Expand Down Expand Up @@ -146,13 +148,31 @@ impl AwaitedAction {
}
}

impl TryInto<Vec<u8>> for AwaitedAction {
type Error = Error;
fn try_into(self) -> Result<Vec<u8>, Self::Error> {
serde_json::to_vec(&self)
.map_err(|e| make_input_err!("{}", e.to_string()))
.err_tip(|| "In AwaitedAction::TryInto::<Vec<u8>>")
}
}

impl TryFrom<&[u8]> for AwaitedAction {
type Error = Error;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
serde_json::from_slice(value)
.map_err(|e| make_input_err!("{}", e.to_string()))
.err_tip(|| "In AwaitedAction::TryFrom::&[u8]")
}
}

/// The key used to sort the awaited actions.
///
/// The rules for sorting are as follows:
/// 1. priority of the action
/// 2. insert order of the action (lower = higher priority)
/// 3. (mostly random hash based on the action info)
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[repr(transparent)]
pub struct AwaitedActionSortKey(u64);

Expand Down

0 comments on commit 83e4905

Please sign in to comment.