Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sinks): Drop the custom SinkContext::default implementation #17804

Merged
merged 1 commit into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync

dyn_clone::clone_trait_object!(SinkConfig);

#[derive(Debug, Clone)]
#[derive(Clone, Debug, Default)]
pub struct SinkContext {
pub healthcheck: SinkHealthcheckOptions,
pub globals: GlobalOptions,
Expand All @@ -244,16 +244,6 @@ pub struct SinkContext {
}

impl SinkContext {
#[cfg(test)]
pub fn new_test() -> Self {
Self {
healthcheck: SinkHealthcheckOptions::default(),
globals: GlobalOptions::default(),
proxy: ProxyConfig::default(),
schema: schema::Options::default(),
}
}

pub const fn globals(&self) -> &GlobalOptions {
&self.globals
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/amqp/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn amqp_happy_path() {
.await
.unwrap();

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let (sink, healthcheck) = config.build(cx).await.unwrap();
healthcheck.await.expect("Health check failed");

Expand Down Expand Up @@ -153,7 +153,7 @@ async fn amqp_round_trip() {
.await
.unwrap();

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let (amqp_sink, healthcheck) = config.build(cx).await.unwrap();
healthcheck.await.expect("Health check failed");

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/appsignal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mod test {
.expect("config should be valid");
config.endpoint = mock_endpoint.to_string();

let context = SinkContext::new_test();
let context = SinkContext::default();
let (sink, _healthcheck) = config.build(context).await.unwrap();

let event = Event::Log(LogEvent::from("simple message"));
Expand Down
12 changes: 6 additions & 6 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn cloudwatch_insert_log_event() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

Expand Down Expand Up @@ -101,7 +101,7 @@ async fn cloudwatch_insert_log_events_sorted() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now() - Duration::days(1);

Expand Down Expand Up @@ -176,7 +176,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let now = chrono::Utc::now();

Expand Down Expand Up @@ -255,7 +255,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

Expand Down Expand Up @@ -310,7 +310,7 @@ async fn cloudwatch_insert_log_event_batched() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

Expand Down Expand Up @@ -360,7 +360,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_cloudwatch_metrics/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn cloudwatch_metrics_healthcheck() {

#[tokio::test]
async fn cloudwatch_metrics_put_data() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();
let config = config();
let client = config.create_client(&cx.globals.proxy).await.unwrap();
let sink = CloudWatchMetricsSvc::new(config, client).unwrap();
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn cloudwatch_metrics_put_data() {

#[tokio::test]
async fn cloudwatch_metrics_namespace_partitioning() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();
let config = config();
let client = config.create_client(&cx.globals.proxy).await.unwrap();
let sink = CloudWatchMetricsSvc::new(config, client).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn firehose_put_records() {

let config = KinesisFirehoseSinkConfig { batch, base };

let cx = SinkContext::new_test();
let cx = SinkContext::default();

let (sink, _) = config.build(cx).await.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_kinesis/firehose/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn check_batch_size() {

let config = KinesisFirehoseSinkConfig { batch, base };

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let res = config.build(cx).await;

assert_eq!(
Expand Down Expand Up @@ -69,7 +69,7 @@ async fn check_batch_events() {

let config = KinesisFirehoseSinkConfig { batch, base };

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let res = config.build(cx).await;

assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_kinesis/streams/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn kinesis_address() -> String {
// base,
// };
//
// let cx = SinkContext::new_test();
// let cx = SinkContext::default();
//
// let sink = config.build(cx).await.unwrap().0;
//
Expand Down Expand Up @@ -107,7 +107,7 @@ async fn kinesis_put_records_without_partition_key() {
base,
};

let cx = SinkContext::new_test();
let cx = SinkContext::default();

let sink = config.build(cx).await.unwrap().0;

Expand Down
18 changes: 9 additions & 9 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn s3_address() -> String {

#[tokio::test]
async fn s3_insert_message_into_with_flat_key_prefix() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -85,7 +85,7 @@ async fn s3_insert_message_into_with_flat_key_prefix() {

#[tokio::test]
async fn s3_insert_message_into_with_folder_key_prefix() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -119,7 +119,7 @@ async fn s3_insert_message_into_with_folder_key_prefix() {

#[tokio::test]
async fn s3_insert_message_into_with_ssekms_key_id() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -156,7 +156,7 @@ async fn s3_insert_message_into_with_ssekms_key_id() {

#[tokio::test]
async fn s3_rotate_files_after_the_buffer_size_is_reached() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -213,7 +213,7 @@ async fn s3_gzip() {
// to 1000, and using gzip compression. We test to ensure that all of the keys we end up
// writing represent the sum total of the lines: we expect 3 batches, each of which should
// have 1000 lines.
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -258,7 +258,7 @@ async fn s3_zstd() {
// to 1000, and using zstd compression. We test to ensure that all of the keys we end up
// writing represent the sum total of the lines: we expect 3 batches, each of which should
// have 1000 lines.
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -303,7 +303,7 @@ async fn s3_zstd() {
// https://github.com/localstack/localstack/issues/4166
#[tokio::test]
async fn s3_insert_message_into_object_lock() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -357,7 +357,7 @@ async fn s3_insert_message_into_object_lock() {

#[tokio::test]
async fn acknowledges_failures() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -408,7 +408,7 @@ async fn s3_healthchecks_invalid_bucket() {

#[tokio::test]
async fn s3_flush_on_exhaustion() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();
create_bucket(&bucket, false).await;
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/axiom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ mod integration_tests {
assert!(!token.is_empty(), "$AXIOM_TOKEN required");
let dataset = env::var("AXIOM_DATASET").unwrap();

let cx = SinkContext::new_test();
let cx = SinkContext::default();

let config = AxiomConfig {
url: Some(url.clone()),
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/azure_monitor_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ mod tests {
default_headers: HeaderMap::new(),
};

let context = SinkContext::new_test();
let context = SinkContext::default();
let client =
HttpClient::new(None, &context.proxy).expect("should not fail to create HTTP client");

Expand Down Expand Up @@ -617,7 +617,7 @@ mod tests {
"#,
)
.unwrap();
if config.build(SinkContext::new_test()).await.is_ok() {
if config.build(SinkContext::default()).await.is_ok() {
panic!("config.build failed to error");
}
}
Expand Down Expand Up @@ -657,7 +657,7 @@ mod tests {
"#,
)
.unwrap();
if config.build(SinkContext::new_test()).await.is_ok() {
if config.build(SinkContext::default()).await.is_ok() {
panic!("config.build failed to error");
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/sinks/clickhouse/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn insert_events() {
)
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (mut input_event, mut receiver) = make_event();
input_event
Expand Down Expand Up @@ -114,7 +114,7 @@ async fn skip_unknown_fields() {
.create_table(&table, "host String, timestamp String, message String")
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (mut input_event, mut receiver) = make_event();
input_event.as_mut_log().insert("unknown", "mysteries");
Expand Down Expand Up @@ -167,7 +167,7 @@ async fn insert_events_unix_timestamps() {
)
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (mut input_event, _receiver) = make_event();

Expand Down Expand Up @@ -235,7 +235,7 @@ timestamp_format = "unix""#,
)
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (mut input_event, _receiver) = make_event();

Expand Down Expand Up @@ -298,7 +298,7 @@ async fn no_retry_on_incorrect_data() {
.create_table(&table, "host String, timestamp String")
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (input_event, mut receiver) = make_event();

Expand Down Expand Up @@ -340,7 +340,7 @@ async fn no_retry_on_incorrect_data_warp() {
batch,
..Default::default()
};
let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (input_event, mut receiver) = make_event();

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/databend/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn insert_event_with_cfg(cfg: String, table: String, client: DatabendAPICl
.unwrap();

let (config, _) = load_sink::<DatabendConfig>(&cfg).unwrap();
let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (input_event, mut receiver) = make_event();
run_and_assert_sink_compliance(
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl ElasticsearchCommon {
#[cfg(test)]
pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
let mut commons =
Self::parse_many(config, crate::config::SinkContext::new_test().proxy()).await?;
Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?;
assert_eq!(commons.len(), 1);
Ok(commons.remove(0))
}
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/elasticsearch/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn structures_events_correctly() {
.expect("Config error");
let base_url = common.base_url.clone();

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let (sink, _hc) = config.build(cx.clone()).await.unwrap();

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
Expand Down Expand Up @@ -555,7 +555,7 @@ async fn run_insert_tests_with_config(
};
let base_url = common.base_url.clone();

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let (sink, healthcheck) = config
.build(cx.clone())
.await
Expand Down Expand Up @@ -639,7 +639,7 @@ async fn run_insert_tests_with_config(
}

async fn run_insert_tests_with_multiple_endpoints(config: &ElasticsearchConfig) {
let cx = SinkContext::new_test();
let cx = SinkContext::default();
let commons = ElasticsearchCommon::parse_many(config, cx.proxy())
.await
.expect("Config error");
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/gcp/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ mod integration_tests {
log_type: &str,
auth_path: &str,
) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
let cx = SinkContext::new_test();
let cx = SinkContext::default();
config(log_type, auth_path).build(cx).await
}

Expand Down
Loading