Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow clients to define custom callbacks to handle telemetry #1080

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion mountpoint-s3-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ pub mod error_metadata;

pub use object_client::{ObjectClient, PutObjectRequest};

pub use s3_crt_client::{get_object::S3GetObjectRequest, put_object::S3PutObjectRequest, S3CrtClient, S3RequestError};
pub use s3_crt_client::{
get_object::S3GetObjectRequest, put_object::S3PutObjectRequest, OnTelemetry, S3CrtClient, S3RequestError,
};

/// Configuration for the S3 client
pub mod config {
Expand Down
21 changes: 21 additions & 0 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub struct S3ClientConfig {
read_backpressure: bool,
initial_read_window: usize,
network_interface_names: Vec<String>,
telemetry_callback: Option<Arc<dyn OnTelemetry>>,
aws-hans-pistor marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for S3ClientConfig {
Expand All @@ -118,6 +119,7 @@ impl Default for S3ClientConfig {
read_backpressure: false,
initial_read_window: DEFAULT_PART_SIZE,
network_interface_names: vec![],
telemetry_callback: None,
}
}
}
Expand Down Expand Up @@ -219,6 +221,13 @@ impl S3ClientConfig {
self.network_interface_names = network_interface_names;
self
}

/// Set a custom callback to handle telemetry events
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
self.telemetry_callback = Some(telemetry_callback);
self
}
}

/// Authentication configuration for the CRT-based S3 client
Expand Down Expand Up @@ -286,6 +295,7 @@ struct S3CrtClientInner {
bucket_owner: Option<String>,
credentials_provider: Option<CredentialsProvider>,
host_resolver: HostResolver,
telemetry_callback: Option<Arc<dyn OnTelemetry>>,
}

impl S3CrtClientInner {
Expand Down Expand Up @@ -420,6 +430,7 @@ impl S3CrtClientInner {
bucket_owner: config.bucket_owner,
credentials_provider: Some(credentials_provider),
host_resolver,
telemetry_callback: config.telemetry_callback,
})
}

Expand Down Expand Up @@ -556,6 +567,8 @@ impl S3CrtClientInner {
let total_bytes = Arc::new(AtomicU64::new(0));
let total_bytes_clone = Arc::clone(&total_bytes);

let telemetry_callback = self.telemetry_callback.clone();

options
.on_telemetry(move |metrics| {
let _guard = span_telemetry.enter();
Expand Down Expand Up @@ -593,6 +606,10 @@ impl S3CrtClientInner {
} else if request_canceled {
metrics::counter!("s3.requests.canceled", "op" => op, "type" => request_type).increment(1);
}

if let Some(telemetry_callback) = &telemetry_callback {
telemetry_callback.on_telemetry(metrics);
}
})
.on_headers(move |headers, response_status| {
(on_headers)(headers, response_status);
Expand Down Expand Up @@ -1354,6 +1371,10 @@ impl ObjectClient for S3CrtClient {
}
}

pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've defined a trait since I find it a little more readable than raw Fn types, let me know if you prefer differently

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion, but elsewhere in this crate we do it as a type alias, so maybe we should match that idiom:

type ReviewCallback = dyn FnOnce(UploadReview) -> bool + Send;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair, I can swap to a type alias

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is not possible

S3ClientConfig requires that everything implements std::fmt::Debug, and type aliases can only implement auto-traits.

fn on_telemetry(&self, request_metrics: &RequestMetrics);
}

#[cfg(test)]
mod tests {
use mountpoint_s3_crt::common::error::Error;
Expand Down
10 changes: 9 additions & 1 deletion mountpoint-s3-client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use bytes::Bytes;
use futures::{pin_mut, Stream, StreamExt};
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::GetObjectRequest;
use mountpoint_s3_client::S3CrtClient;
use mountpoint_s3_client::{OnTelemetry, S3CrtClient};
use mountpoint_s3_crt::common::allocator::Allocator;
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use mountpoint_s3_crt::common::uri::Uri;
use rand::rngs::OsRng;
use rand::RngCore;
use std::ops::Range;
use std::sync::Arc;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt as _;
use tracing_subscriber::{EnvFilter, Layer};
Expand Down Expand Up @@ -87,6 +88,13 @@ pub fn get_test_backpressure_client(initial_read_window: usize, part_size: Optio
S3CrtClient::new(config).expect("could not create test client")
}

pub fn get_test_client_with_custom_telemetry(telemetry_callback: Arc<dyn OnTelemetry>) -> S3CrtClient {
let config = S3ClientConfig::new()
.endpoint_config(EndpointConfig::new(&get_test_region()))
.telemetry_callback(telemetry_callback);
S3CrtClient::new(config).expect("could not create test client")
}

pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) {
let bucket = get_test_bucket();
let prefix = get_unique_test_prefix(test_name);
Expand Down
73 changes: 72 additions & 1 deletion mountpoint-s3-client/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
};
use mountpoint_s3_client::error::ObjectClientError;
use mountpoint_s3_client::types::{GetObjectParams, HeadObjectParams};
use mountpoint_s3_client::{ObjectClient, S3CrtClient, S3RequestError};
use mountpoint_s3_client::{ObjectClient, OnTelemetry, S3CrtClient, S3RequestError};
use mountpoint_s3_crt::s3::client::RequestMetrics;
use regex::Regex;
use rusty_fork::rusty_fork_test;
use tracing::Level;
Expand Down Expand Up @@ -280,3 +281,73 @@
runtime.block_on(test_head_object_403());
}
}

async fn test_custom_telemetry_callback() {
let sdk_client = get_test_sdk_client().await;
let (bucket, prefix) = get_test_bucket_and_prefix("test_custom_telemetry_callback");

let key = format!("{prefix}/test");
let body = vec![0x42; 100];
sdk_client
.put_object()
.bucket(&bucket)
.key(&key)
.body(ByteStream::from(body.clone()))
.send()
.await
.unwrap();

let recorder = TestRecorder::default();
metrics::set_global_recorder(recorder.clone()).unwrap();

#[derive(Debug)]
struct CustomOnTelemetry {
metric_name: String,
}

impl OnTelemetry for CustomOnTelemetry {
fn on_telemetry(&self, request_metrics: &RequestMetrics) {
metrics::counter!(self.metric_name.clone()).absolute(request_metrics.total_duration().as_micros() as u64);
}
}

let request_duration_metric_name = "request_duration_us";

let custom_telemetry_callback = CustomOnTelemetry {
metric_name: String::from(request_duration_metric_name),
};

let client = get_test_client_with_custom_telemetry(Arc::new(custom_telemetry_callback));
let result = client
.get_object(&bucket, &key, None, None)

Check failure on line 322 in mountpoint-s3-client/tests/metrics.rs

View workflow job for this annotation

GitHub Actions / Integration / S3 Express One Zone tests (Amazon Linux arm, FUSE 2)

this method takes 3 arguments but 4 arguments were supplied

Check failure on line 322 in mountpoint-s3-client/tests/metrics.rs

View workflow job for this annotation

GitHub Actions / Integration / S3 Express One Zone tests (Ubuntu x86, FUSE 2)

this method takes 3 arguments but 4 arguments were supplied

Check failure on line 322 in mountpoint-s3-client/tests/metrics.rs

View workflow job for this annotation

GitHub Actions / Integration / S3 Express One Zone tests (Ubuntu x86, FUSE 3)

this method takes 3 arguments but 4 arguments were supplied

Check failure on line 322 in mountpoint-s3-client/tests/metrics.rs

View workflow job for this annotation

GitHub Actions / Check all targets

this method takes 3 arguments but 4 arguments were supplied

Check failure on line 322 in mountpoint-s3-client/tests/metrics.rs

View workflow job for this annotation

GitHub Actions / Integration / Tests (Ubuntu x86, FUSE 3)

this method takes 3 arguments but 4 arguments were supplied

Check failure on line 322 in mountpoint-s3-client/tests/metrics.rs

View workflow job for this annotation

GitHub Actions / Clippy

this method takes 3 arguments but 4 arguments were supplied

Check failure on line 322 in mountpoint-s3-client/tests/metrics.rs

View workflow job for this annotation

GitHub Actions / Integration / Tests (Ubuntu x86, FUSE 2)

this method takes 3 arguments but 4 arguments were supplied

Check failure on line 322 in mountpoint-s3-client/tests/metrics.rs

View workflow job for this annotation

GitHub Actions / Integration / Address sanitizer

this method takes 3 arguments but 4 arguments were supplied

Check failure on line 322 in mountpoint-s3-client/tests/metrics.rs

View workflow job for this annotation

GitHub Actions / Integration / Tests (Amazon Linux arm, FUSE 2)

this method takes 3 arguments but 4 arguments were supplied
.await
.expect("get_object should succeed");
let result = result
.map_ok(|(_offset, bytes)| bytes.len())
.try_fold(0, |a, b| async move { Ok(a + b) })
.await
.expect("get_object should succeed");
assert_eq!(result, body.len());

let metrics = recorder.metrics.lock().unwrap().clone();

let (_, request_duration_us) = metrics
.get(request_duration_metric_name, None, None)
.expect("The custom metric should be emitted");

let Metric::Counter(request_duration_us) = request_duration_us else {
panic!("Expected a counter metric")
};
assert!(
*request_duration_us.lock().unwrap() > 0,
"The request duration should be more than 0 microseconds"
);
}

rusty_fork_test! {
#[test]
fn custom_telemetry_callbacks_are_called() {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(test_custom_telemetry_callback());
}
}
Loading