Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Move static info to flight record header #58

Merged
merged 2 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions crates/sparrow-qfr/examples/produce_flight_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use std::time::Duration;
use futures::StreamExt;
use itertools::Itertools;

use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecord;
use sparrow_qfr::kaskada::sparrow::v1alpha::flight_record_header::BuildInfo;
use sparrow_qfr::kaskada::sparrow::v1alpha::{FlightRecord, FlightRecordHeader};
use sparrow_qfr::{
activity, gauge, Activity, FlightRecorderFactory, Gauge, Registration, Registrations,
activity, gauge, Activity, FlightRecorderFactory, Gauge, PushRegistration, Registration,
Registrations,
};

const NUM_OUTPUTS: Gauge<u64> = gauge!("num_outputs");
Expand All @@ -32,6 +34,19 @@ async fn main() {
.collect()
.await;

println!(
"Header:{:?}",
FlightRecordHeader::with_registrations(
"the_request_id".to_owned(),
BuildInfo {
sparrow_version: "sparrow_version0.1.0".to_owned(),
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
github_ref: "some ref".to_owned(),
github_sha: "some sha".to_owned(),
github_workflow: "the workflow".to_owned(),
}
)
);

println!(
"Records:\n{}",
records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,28 @@ message FlightRecordHeader {
// Build information about the Sparrow Binary.
BuildInfo sparrow_build_info = 3;

// Register activities that may be reported during the query.
//
// This includes the set of all activities which may be produced by any query.
// Not all activities will be reported by any specific query.
repeated RegisterActivity activities = 4;

// Register metrics that may be reported during the query.
//
// Metrics may be reported as part of an activity (see ReportActivity) or
// at the top level during execution (see ReportMetrics).
//
// The set of metrics are static within the query engine, and correspond
// to any value that may be reported during the execution of a query. Not
// all metrics need to be reported for any given query.
repeated RegisterMetric metrics = 5;

message BuildInfo {
string sparrow_version = 1;
string github_ref = 2;
string github_sha = 3;
string github_workflow = 4;
}
}

message FlightRecord {
oneof record {
// Register an activity.
//
// The set of activities are static within the query engine, and correspond
// to any task that may be instrumented during the execution of a query. Not
// all activities need to be reported by any given query.
RegisterActivity register_activity = 2;

// Register a metric.
//
// Metrics may be reported as part of an activity (see ReportActivity) or
// at the top level during execution (see ReportMetrics).
//
// The set of metrics are static within the query engine, and correspond
// to any value that may be reported during the execution of a query. Not
// all metrics need to be reported for any given query.
RegisterMetric register_metric = 3;

// Register an execution thread.
// This is sent for each computation thread spawned for a specific query.
RegisterThread register_thread = 1;
ReportActivity report_activity = 4;
ReportMetrics report_metrics = 5;
}

message RegisterThread {
uint32 thread_id = 1;
string label = 2;
}

message RegisterActivity {
uint32 activity_id = 1;
Expand Down Expand Up @@ -91,6 +75,25 @@ message FlightRecord {
METRIC_KIND_F64_COUNTER = 22;
}
}
}

message FlightRecord {
oneof record {
// Register an execution thread.
// This is sent for each computation thread spawned for a specific query.
RegisterThread register_thread = 1;

/// Report an activity execution during the query.
ReportActivity report_activity = 2;

/// Report a top-level metric.
ReportMetrics report_metrics = 3;
}

message RegisterThread {
uint32 thread_id = 1;
string label = 2;
}

message ReportActivity {
uint32 activity_id = 1;
Expand Down
16 changes: 1 addition & 15 deletions crates/sparrow-qfr/src/activity.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use futures::Future;

use crate::activation::Activation;
use crate::kaskada::sparrow::v1alpha::flight_record::{Record, RegisterActivity};
use crate::kaskada::sparrow::v1alpha::FlightRecord;
use crate::{FlightRecorder, Metrics, TimedTask, ToRegistration};
use crate::{FlightRecorder, Metrics, TimedTask};

/// An activity represents something that one or more threads do.
///
Expand Down Expand Up @@ -42,18 +40,6 @@ impl Activity {
}
}

impl ToRegistration for Activity {
fn to_registration(&self) -> crate::kaskada::sparrow::v1alpha::FlightRecord {
FlightRecord {
record: Some(Record::RegisterActivity(RegisterActivity {
activity_id: self.activity_id,
label: self.label.to_owned(),
parent_activity_id: self.parent_activity_id,
})),
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
28 changes: 0 additions & 28 deletions crates/sparrow-qfr/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use tokio::time::Instant;

use crate::kaskada::sparrow::v1alpha::flight_record::RegisterThread;
use crate::kaskada::sparrow::v1alpha::FlightRecord;
use crate::registration::Registration;
use crate::FlightRecorder;

#[derive(Clone)]
Expand All @@ -17,34 +16,7 @@ pub enum FlightRecorderFactory {

impl FlightRecorderFactory {
/// Create a new `FlightRecorderFactory` for the given sender.
///
/// Registers all of the metrics and activities that have been added to the `inventory`.
pub async fn new(tx: tokio::sync::mpsc::Sender<FlightRecord>) -> Self {
Self::new_with_registrations(tx, inventory::iter::<&'static Registration>).await
}

pub(crate) async fn new_with_registrations(
tx: tokio::sync::mpsc::Sender<FlightRecord>,
registrations: impl IntoIterator<Item = &&'static Registration>,
) -> Self {
let registrations: Vec<_> = registrations
.into_iter()
.flat_map(|r| r.records())
.cloned()
.collect();

for record in registrations {
match tx.send(record).await {
Ok(()) => (),
Err(e) => {
tracing::warn!(
"Failed to register thread with flight recorder: {e}; Disabling."
);
return Self::Disabled;
}
}
}

Self::Active {
next_thread_id: 0,
trace_start: Instant::now(),
Expand Down
54 changes: 30 additions & 24 deletions crates/sparrow-qfr/src/io/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use fallible_iterator::FallibleIterator;
use tempfile::NamedTempFile;

use super::{FlightRecordReader, FlightRecordWriter};
use crate::kaskada::sparrow::v1alpha::flight_record::{RegisterActivity, ReportActivity};
use crate::kaskada::sparrow::v1alpha::flight_record_header::BuildInfo;
use crate::kaskada::sparrow::v1alpha::flight_record::ReportActivity;
use crate::kaskada::sparrow::v1alpha::flight_record_header::{BuildInfo, RegisterActivity};
use crate::kaskada::sparrow::v1alpha::metric_value::Value;
use crate::kaskada::sparrow::v1alpha::{FlightRecord, FlightRecordHeader, MetricValue};

Expand All @@ -20,41 +20,48 @@ fn round_trip_small() {
github_sha: "sha".to_owned(),
github_workflow: "workflow".to_owned(),
}),
activities: vec![
RegisterActivity {
activity_id: 57,
label: "hello".to_owned(),
parent_activity_id: None,
},
RegisterActivity {
activity_id: 58,
label: "world".to_owned(),
parent_activity_id: Some(57),
},
],
metrics: vec![],
};

let record1 = FlightRecord {
record: Some(
crate::kaskada::sparrow::v1alpha::flight_record::Record::RegisterActivity(
RegisterActivity {
activity_id: 57,
label: "hello".to_owned(),
parent_activity_id: None,
},
),
),
};

let record2 = FlightRecord {
record: Some(
crate::kaskada::sparrow::v1alpha::flight_record::Record::RegisterActivity(
RegisterActivity {
crate::kaskada::sparrow::v1alpha::flight_record::Record::ReportActivity(
ReportActivity {
activity_id: 58,
label: "world".to_owned(),
parent_activity_id: Some(57),
thread_id: 18,
wall_timestamp_us: 1000,
wall_duration_us: 8710,
cpu_duration_us: 879_791_878,
metrics: vec![MetricValue {
metric_id: 987,
value: Some(Value::I64Value(18)),
}],
},
),
),
};

let record3 = FlightRecord {
let record2 = FlightRecord {
record: Some(
crate::kaskada::sparrow::v1alpha::flight_record::Record::ReportActivity(
ReportActivity {
activity_id: 58,
activity_id: 59,
thread_id: 18,
wall_timestamp_us: 1000,
wall_duration_us: 8710,
cpu_duration_us: 879_791_878,
wall_duration_us: 87920,
cpu_duration_us: 879_797_878,
metrics: vec![MetricValue {
metric_id: 987,
value: Some(Value::I64Value(18)),
Expand All @@ -68,14 +75,13 @@ fn round_trip_small() {
let mut writer = FlightRecordWriter::try_new(file.reopen().unwrap(), header.clone()).unwrap();
writer.write(record1.clone()).unwrap();
writer.write(record2.clone()).unwrap();
writer.write(record3.clone()).unwrap();
writer.flush().unwrap();

let reader = FlightRecordReader::try_new(file.path()).unwrap();
assert_eq!(reader.header(), &header);

let records: Vec<_> = reader.records().unwrap().collect().unwrap();
assert_eq!(records, vec![record1, record2, record3]);
assert_eq!(records, vec![record1, record2]);

file.close().unwrap();
}
26 changes: 4 additions & 22 deletions crates/sparrow-qfr/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::marker::PhantomData;

use crate::kaskada::sparrow::v1alpha::flight_record::register_metric::MetricKind;
use crate::kaskada::sparrow::v1alpha::flight_record::{Record, RegisterMetric};
use crate::kaskada::sparrow::v1alpha::{metric_value, FlightRecord, MetricValue};
use crate::ToRegistration;
use crate::kaskada::sparrow::v1alpha::flight_record_header::register_metric::MetricKind;
use crate::kaskada::sparrow::v1alpha::{metric_value, MetricValue};

/// Trait for values that can be encoded as metric values.
pub trait IntoMetricValue {
Expand Down Expand Up @@ -86,8 +84,8 @@ where
T: IntoMetricValue,
K: MetricKindTrait<T>,
{
label: &'static str,
metric_id: u32,
pub label: &'static str,
pub metric_id: u32,
_phantom: PhantomData<fn(T, K) -> K>,
}

Expand All @@ -114,19 +112,3 @@ where

pub type Gauge<T> = Metric<T, GaugeKind>;
pub type Counter<T> = Metric<T, CounterKind>;

impl<T, K> ToRegistration for Metric<T, K>
where
T: IntoMetricValue,
K: MetricKindTrait<T>,
{
fn to_registration(&self) -> crate::kaskada::sparrow::v1alpha::FlightRecord {
FlightRecord {
record: Some(Record::RegisterMetric(RegisterMetric {
metric_id: self.metric_id,
kind: K::KIND as i32,
label: self.label.to_owned(),
})),
}
}
}
Loading