Skip to content

Commit

Permalink
Merge pull request #58 from kaskada-ai/move-registry-to-header
Browse files Browse the repository at this point in the history
ref: Move static info to flight record header
  • Loading branch information
bjchambers authored Mar 1, 2023
2 parents 5f94220 + 2ad99ed commit 026f473
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 141 deletions.
14 changes: 6 additions & 8 deletions crates/sparrow-main/src/serve/compute_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,16 @@ async fn execute_impl(
let flight_record_local_path = flight_record_tempfile.as_ref().map(|t| t.path().to_owned());

let build_info = BuildInfo::default();
let flight_record_header = FlightRecordHeader {
version: sparrow_qfr::QFR_VERSION,
// TODO: Set the actual request ID.
// TODO: Add the query time.
request_id: "todo_set_request_id".to_owned(),
sparrow_build_info: Some(flight_record_header::BuildInfo {
let flight_record_header = FlightRecordHeader::with_registrations(
// Set the actual request ID.
"todo_set_request_id".to_owned(),
flight_record_header::BuildInfo {
sparrow_version: build_info.sparrow_version.to_owned(),
github_ref: build_info.github_ref.to_owned(),
github_sha: build_info.github_sha.to_owned(),
github_workflow: build_info.github_workflow.to_owned(),
}),
};
},
);

let progress_stream = sparrow_runtime::execute::execute(
request,
Expand Down
19 changes: 17 additions & 2 deletions crates/sparrow-qfr/examples/produce_flight_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use std::time::Duration;
use futures::StreamExt;
use itertools::Itertools;

use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecord;
use sparrow_qfr::kaskada::sparrow::v1alpha::flight_record_header::BuildInfo;
use sparrow_qfr::kaskada::sparrow::v1alpha::{FlightRecord, FlightRecordHeader};
use sparrow_qfr::{
activity, gauge, Activity, FlightRecorderFactory, Gauge, Registration, Registrations,
activity, gauge, Activity, FlightRecorderFactory, Gauge, PushRegistration, Registration,
Registrations,
};

const NUM_OUTPUTS: Gauge<u64> = gauge!("num_outputs");
Expand All @@ -32,6 +34,19 @@ async fn main() {
.collect()
.await;

println!(
"Header:{:?}",
FlightRecordHeader::with_registrations(
"the_request_id".to_owned(),
BuildInfo {
sparrow_version: "sparrow_version0.1.0".to_owned(),
github_ref: "some ref".to_owned(),
github_sha: "some sha".to_owned(),
github_workflow: "the workflow".to_owned(),
}
)
);

println!(
"Records:\n{}",
records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,28 @@ message FlightRecordHeader {
// Build information about the Sparrow Binary.
BuildInfo sparrow_build_info = 3;

// Register activities that may be reported during the query.
//
// This includes the set of all activities which may be produced by any query.
// Not all activities will be reported by any specific query.
repeated RegisterActivity activities = 4;

// Register metrics that may be reported during the query.
//
// Metrics may be reported as part of an activity (see ReportActivity) or
// at the top level during execution (see ReportMetrics).
//
// The set of metrics are static within the query engine, and correspond
// to any value that may be reported during the execution of a query. Not
// all metrics need to be reported for any given query.
repeated RegisterMetric metrics = 5;

message BuildInfo {
string sparrow_version = 1;
string github_ref = 2;
string github_sha = 3;
string github_workflow = 4;
}
}

message FlightRecord {
oneof record {
// Register an activity.
//
// The set of activities are static within the query engine, and correspond
// to any task that may be instrumented during the execution of a query. Not
// all activities need to be reported by any given query.
RegisterActivity register_activity = 2;

// Register a metric.
//
// Metrics may be reported as part of an activity (see ReportActivity) or
// at the top level during execution (see ReportMetrics).
//
// The set of metrics are static within the query engine, and correspond
// to any value that may be reported during the execution of a query. Not
// all metrics need to be reported for any given query.
RegisterMetric register_metric = 3;

// Register an execution thread.
// This is sent for each computation thread spawned for a specific query.
RegisterThread register_thread = 1;
ReportActivity report_activity = 4;
ReportMetrics report_metrics = 5;
}

message RegisterThread {
uint32 thread_id = 1;
string label = 2;
}

message RegisterActivity {
uint32 activity_id = 1;
Expand Down Expand Up @@ -91,6 +75,25 @@ message FlightRecord {
METRIC_KIND_F64_COUNTER = 22;
}
}
}

message FlightRecord {
oneof record {
// Register an execution thread.
// This is sent for each computation thread spawned for a specific query.
RegisterThread register_thread = 1;

/// Report an activity execution during the query.
ReportActivity report_activity = 2;

/// Report a top-level metric.
ReportMetrics report_metrics = 3;
}

message RegisterThread {
uint32 thread_id = 1;
string label = 2;
}

message ReportActivity {
uint32 activity_id = 1;
Expand Down
16 changes: 1 addition & 15 deletions crates/sparrow-qfr/src/activity.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use futures::Future;

use crate::activation::Activation;
use crate::kaskada::sparrow::v1alpha::flight_record::{Record, RegisterActivity};
use crate::kaskada::sparrow::v1alpha::FlightRecord;
use crate::{FlightRecorder, Metrics, TimedTask, ToRegistration};
use crate::{FlightRecorder, Metrics, TimedTask};

/// An activity represents something that one or more threads do.
///
Expand Down Expand Up @@ -42,18 +40,6 @@ impl Activity {
}
}

impl ToRegistration for Activity {
fn to_registration(&self) -> crate::kaskada::sparrow::v1alpha::FlightRecord {
FlightRecord {
record: Some(Record::RegisterActivity(RegisterActivity {
activity_id: self.activity_id,
label: self.label.to_owned(),
parent_activity_id: self.parent_activity_id,
})),
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
28 changes: 0 additions & 28 deletions crates/sparrow-qfr/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use tokio::time::Instant;

use crate::kaskada::sparrow::v1alpha::flight_record::RegisterThread;
use crate::kaskada::sparrow::v1alpha::FlightRecord;
use crate::registration::Registration;
use crate::FlightRecorder;

#[derive(Clone)]
Expand All @@ -17,34 +16,7 @@ pub enum FlightRecorderFactory {

impl FlightRecorderFactory {
/// Create a new `FlightRecorderFactory` for the given sender.
///
/// Registers all of the metrics and activities that have been added to the `inventory`.
pub async fn new(tx: tokio::sync::mpsc::Sender<FlightRecord>) -> Self {
Self::new_with_registrations(tx, inventory::iter::<&'static Registration>).await
}

pub(crate) async fn new_with_registrations(
tx: tokio::sync::mpsc::Sender<FlightRecord>,
registrations: impl IntoIterator<Item = &&'static Registration>,
) -> Self {
let registrations: Vec<_> = registrations
.into_iter()
.flat_map(|r| r.records())
.cloned()
.collect();

for record in registrations {
match tx.send(record).await {
Ok(()) => (),
Err(e) => {
tracing::warn!(
"Failed to register thread with flight recorder: {e}; Disabling."
);
return Self::Disabled;
}
}
}

Self::Active {
next_thread_id: 0,
trace_start: Instant::now(),
Expand Down
54 changes: 30 additions & 24 deletions crates/sparrow-qfr/src/io/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use fallible_iterator::FallibleIterator;
use tempfile::NamedTempFile;

use super::{FlightRecordReader, FlightRecordWriter};
use crate::kaskada::sparrow::v1alpha::flight_record::{RegisterActivity, ReportActivity};
use crate::kaskada::sparrow::v1alpha::flight_record_header::BuildInfo;
use crate::kaskada::sparrow::v1alpha::flight_record::ReportActivity;
use crate::kaskada::sparrow::v1alpha::flight_record_header::{BuildInfo, RegisterActivity};
use crate::kaskada::sparrow::v1alpha::metric_value::Value;
use crate::kaskada::sparrow::v1alpha::{FlightRecord, FlightRecordHeader, MetricValue};

Expand All @@ -20,41 +20,48 @@ fn round_trip_small() {
github_sha: "sha".to_owned(),
github_workflow: "workflow".to_owned(),
}),
activities: vec![
RegisterActivity {
activity_id: 57,
label: "hello".to_owned(),
parent_activity_id: None,
},
RegisterActivity {
activity_id: 58,
label: "world".to_owned(),
parent_activity_id: Some(57),
},
],
metrics: vec![],
};

let record1 = FlightRecord {
record: Some(
crate::kaskada::sparrow::v1alpha::flight_record::Record::RegisterActivity(
RegisterActivity {
activity_id: 57,
label: "hello".to_owned(),
parent_activity_id: None,
},
),
),
};

let record2 = FlightRecord {
record: Some(
crate::kaskada::sparrow::v1alpha::flight_record::Record::RegisterActivity(
RegisterActivity {
crate::kaskada::sparrow::v1alpha::flight_record::Record::ReportActivity(
ReportActivity {
activity_id: 58,
label: "world".to_owned(),
parent_activity_id: Some(57),
thread_id: 18,
wall_timestamp_us: 1000,
wall_duration_us: 8710,
cpu_duration_us: 879_791_878,
metrics: vec![MetricValue {
metric_id: 987,
value: Some(Value::I64Value(18)),
}],
},
),
),
};

let record3 = FlightRecord {
let record2 = FlightRecord {
record: Some(
crate::kaskada::sparrow::v1alpha::flight_record::Record::ReportActivity(
ReportActivity {
activity_id: 58,
activity_id: 59,
thread_id: 18,
wall_timestamp_us: 1000,
wall_duration_us: 8710,
cpu_duration_us: 879_791_878,
wall_duration_us: 87920,
cpu_duration_us: 879_797_878,
metrics: vec![MetricValue {
metric_id: 987,
value: Some(Value::I64Value(18)),
Expand All @@ -68,14 +75,13 @@ fn round_trip_small() {
let mut writer = FlightRecordWriter::try_new(file.reopen().unwrap(), header.clone()).unwrap();
writer.write(record1.clone()).unwrap();
writer.write(record2.clone()).unwrap();
writer.write(record3.clone()).unwrap();
writer.flush().unwrap();

let reader = FlightRecordReader::try_new(file.path()).unwrap();
assert_eq!(reader.header(), &header);

let records: Vec<_> = reader.records().unwrap().collect().unwrap();
assert_eq!(records, vec![record1, record2, record3]);
assert_eq!(records, vec![record1, record2]);

file.close().unwrap();
}
26 changes: 4 additions & 22 deletions crates/sparrow-qfr/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::marker::PhantomData;

use crate::kaskada::sparrow::v1alpha::flight_record::register_metric::MetricKind;
use crate::kaskada::sparrow::v1alpha::flight_record::{Record, RegisterMetric};
use crate::kaskada::sparrow::v1alpha::{metric_value, FlightRecord, MetricValue};
use crate::ToRegistration;
use crate::kaskada::sparrow::v1alpha::flight_record_header::register_metric::MetricKind;
use crate::kaskada::sparrow::v1alpha::{metric_value, MetricValue};

/// Trait for values that can be encoded as metric values.
pub trait IntoMetricValue {
Expand Down Expand Up @@ -86,8 +84,8 @@ where
T: IntoMetricValue,
K: MetricKindTrait<T>,
{
label: &'static str,
metric_id: u32,
pub label: &'static str,
pub metric_id: u32,
_phantom: PhantomData<fn(T, K) -> K>,
}

Expand All @@ -114,19 +112,3 @@ where

pub type Gauge<T> = Metric<T, GaugeKind>;
pub type Counter<T> = Metric<T, CounterKind>;

impl<T, K> ToRegistration for Metric<T, K>
where
T: IntoMetricValue,
K: MetricKindTrait<T>,
{
fn to_registration(&self) -> crate::kaskada::sparrow::v1alpha::FlightRecord {
FlightRecord {
record: Some(Record::RegisterMetric(RegisterMetric {
metric_id: self.metric_id,
kind: K::KIND as i32,
label: self.label.to_owned(),
})),
}
}
}
Loading

0 comments on commit 026f473

Please sign in to comment.