diff --git a/Cargo.lock b/Cargo.lock index 31e92085ee21b..fe9d6e8541f18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3286,6 +3286,7 @@ dependencies = [ "tokio-stream", "tokio-util 0.7.2", "tracing", + "tracing-subscriber", "url", "uuid", ] @@ -3978,6 +3979,7 @@ dependencies = [ "tokio", "tokio-postgres", "tower-http", + "tracing", "uuid", "walkdir", ] diff --git a/clippy.toml b/clippy.toml index 6956b005b8d5c..541e4d90597e4 100644 --- a/clippy.toml +++ b/clippy.toml @@ -8,4 +8,6 @@ disallowed-methods = [ { path = "tokio::runtime::Handle::spawn_blocking", reason = "use the spawn wrappers in `mz_ore::task` instead" }, { path = "tokio::runtime::Runtime::spawn", reason = "use the spawn wrappers in `mz_ore::task` instead" }, { path = "tokio::runtime::Runtime::spawn_blocking", reason = "use the spawn wrappers in `mz_ore::task` instead" }, + # Use the wrapper that sets the log level correctly + { path = "rdkafka::config::ClientConfig::new", reason = "use the `client::create_new_client_config` wrapper in `kafka_util` instead" }, ] diff --git a/src/compute/src/bin/computed.rs b/src/compute/src/bin/computed.rs index 0a8f78a02ac4c..ad4b08fd897b8 100644 --- a/src/compute/src/bin/computed.rs +++ b/src/compute/src/bin/computed.rs @@ -17,7 +17,6 @@ use futures::sink::SinkExt; use futures::stream::TryStreamExt; use http::HeaderMap; use mz_build_info::{build_info, BuildInfo}; -use mz_dataflow_types::sources::AwsExternalId; use serde::de::DeserializeOwned; use serde::ser::Serialize; use tokio::net::TcpListener; @@ -27,6 +26,7 @@ use tracing_subscriber::filter::Targets; use mz_dataflow_types::client::{ComputeClient, GenericClient}; use mz_dataflow_types::reconciliation::command::ComputeCommandReconcile; +use mz_dataflow_types::ConnectorContext; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::SYSTEM_TIME; @@ -254,10 +254,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { experimental_mode: false, metrics_registry: MetricsRegistry::new(), now: SYSTEM_TIME.clone(), - aws_external_id: args - .aws_external_id - .map(AwsExternalId::ISwearThisCameFromACliArgOrEnvVariable) - .unwrap_or(AwsExternalId::NotProvided), + connector_context: ConnectorContext::from_cli_args(&args.log_filter, args.aws_external_id), }; let serve_config = ServeConfig { diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index d06c8e086f358..ee62914784721 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -26,6 +26,7 @@ use tokio::sync::mpsc; use mz_dataflow_types::client::{ComputeCommand, ComputeResponse}; use mz_dataflow_types::logging::LoggingConfig; +use mz_dataflow_types::ConnectorContext; use mz_dataflow_types::{DataflowError, PeekResponse, TailResponse}; use mz_repr::{Diff, GlobalId, Row, Timestamp}; use mz_storage::boundary::ComputeReplay; @@ -63,6 +64,9 @@ pub struct ComputeState { pub sink_metrics: SinkBaseMetrics, /// The logger, from Timely's logging framework, if logs are enabled. pub materialized_logger: Option, + /// Configuration for sink connectors. + // TODO: remove when sinks move to storage. + pub connector_context: ConnectorContext, } /// A wrapper around [ComputeState] with a live timely worker and response channel. diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index fdee2dd252022..b34fac761454e 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -20,7 +20,7 @@ use timely::worker::Worker as TimelyWorker; use tokio::sync::mpsc; use mz_dataflow_types::client::{ComputeCommand, ComputeResponse, LocalClient, LocalComputeClient}; -use mz_dataflow_types::sources::AwsExternalId; +use mz_dataflow_types::ConnectorContext; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::NowFn; use mz_storage::boundary::ComputeReplay; @@ -42,8 +42,9 @@ pub struct Config { pub now: NowFn, /// Metrics registry through which dataflow metrics will be reported. pub metrics_registry: MetricsRegistry, - /// An external ID to use for all AWS AssumeRole operations. - pub aws_external_id: AwsExternalId, + /// Configuration for sink connectors. + // TODO: remove when sinks move to storage. + pub connector_context: ConnectorContext, } /// A handle to a running dataflow server. @@ -106,6 +107,7 @@ pub fn serve_boundary CR + Send + Sync + 'sta compute_boundary, compute_response_tx, metrics_bundle: metrics_bundle.clone(), + connector_context: config.connector_context.clone(), } .run() }) @@ -143,6 +145,9 @@ where compute_response_tx: mpsc::UnboundedSender, /// Metrics bundle. metrics_bundle: (SinkBaseMetrics, TraceMetrics), + /// Configuration for sink connectors. + // TODO: remove when sinks move to storage. + pub connector_context: ConnectorContext, } impl<'w, A, CR> Worker<'w, A, CR> @@ -201,6 +206,7 @@ where reported_frontiers: HashMap::new(), sink_metrics: self.metrics_bundle.0.clone(), materialized_logger: None, + connector_context: self.connector_context.clone(), }); } ComputeCommand::DropInstance => { diff --git a/src/compute/src/sink/kafka.rs b/src/compute/src/sink/kafka.rs index 83dfa87a83c99..d1282de46452f 100644 --- a/src/compute/src/sink/kafka.rs +++ b/src/compute/src/sink/kafka.rs @@ -51,7 +51,7 @@ use mz_interchange::avro::{ self, get_debezium_transaction_schema, AvroEncoder, AvroSchemaGenerator, }; use mz_interchange::encode::Encode; -use mz_kafka_util::client::MzClientContext; +use mz_kafka_util::client::{create_new_client_config, MzClientContext}; use mz_ore::cast::CastFrom; use mz_ore::collections::CollectionExt; use mz_ore::metrics::{CounterVecExt, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVecExt}; @@ -137,6 +137,7 @@ where sink.as_of.clone(), Rc::clone(&shared_frontier), &compute_state.sink_metrics.kafka, + compute_state.connector_context.librdkafka_log_level, ); compute_state @@ -434,9 +435,11 @@ impl KafkaSinkState { activator: Activator, write_frontier: Rc>>, metrics: &KafkaBaseMetrics, + librdkafka_log_level: tracing::Level, ) -> Self { - let config = Self::create_producer_config(&connector); - let consistency_client_config = Self::create_consistency_client_config(&connector); + let config = Self::create_producer_config(&connector, librdkafka_log_level); + let consistency_client_config = + Self::create_consistency_client_config(&connector, librdkafka_log_level); let metrics = Arc::new(SinkMetrics::new( metrics, @@ -486,8 +489,11 @@ impl KafkaSinkState { } } - fn create_producer_config(connector: &KafkaSinkConnector) -> ClientConfig { - let mut config = ClientConfig::new(); + fn create_producer_config( + connector: &KafkaSinkConnector, + librdkafka_log_level: tracing::Level, + ) -> ClientConfig { + let mut config = create_new_client_config(librdkafka_log_level); config.set("bootstrap.servers", &connector.addrs.to_string()); // Ensure that messages are sinked in order and without duplicates. Note that @@ -535,8 +541,11 @@ impl KafkaSinkState { config } - fn create_consistency_client_config(connector: &KafkaSinkConnector) -> ClientConfig { - let mut config = ClientConfig::new(); + fn create_consistency_client_config( + connector: &KafkaSinkConnector, + librdkafka_log_level: tracing::Level, + ) -> ClientConfig { + let mut config = create_new_client_config(librdkafka_log_level); config.set("bootstrap.servers", &connector.addrs.to_string()); for (k, v) in connector.config_options.iter() { // We explicitly reject `statistics.interval.ms` here so that we don't @@ -1023,6 +1032,7 @@ fn kafka( as_of: SinkAsOf, write_frontier: Rc>>, metrics: &KafkaBaseMetrics, + librdkafka_log_level: tracing::Level, ) -> Rc where G: Scope, @@ -1089,6 +1099,7 @@ where shared_gate_ts, write_frontier, metrics, + librdkafka_log_level, ) } @@ -1112,6 +1123,7 @@ pub fn produce_to_kafka( shared_gate_ts: Rc>>, write_frontier: Rc>>, metrics: &KafkaBaseMetrics, + librdkafka_log_level: tracing::Level, ) -> Rc where G: Scope, @@ -1131,6 +1143,7 @@ where activator, write_frontier, metrics, + librdkafka_log_level, ); let mut vector = Vec::new(); diff --git a/src/coord/src/catalog.rs b/src/coord/src/catalog.rs index c5759c9f9afa6..d599b80bf4ed1 100644 --- a/src/coord/src/catalog.rs +++ b/src/coord/src/catalog.rs @@ -31,7 +31,7 @@ use mz_dataflow_types::client::{ use mz_dataflow_types::logging::LoggingConfig as DataflowLoggingConfig; use mz_dataflow_types::sinks::{SinkConnector, SinkConnectorBuilder, SinkEnvelope}; use mz_dataflow_types::sources::{ - AwsExternalId, ConnectorInner, ExternalSourceConnector, SourceConnector, Timeline, + ConnectorInner, ExternalSourceConnector, SourceConnector, Timeline, }; use mz_expr::{ExprHumanizer, MirScalarExpr, OptimizedMirRelationExpr}; use mz_ore::collections::CollectionExt; @@ -1377,7 +1377,6 @@ impl Catalog { cluster_id: config.storage.cluster_id(), session_id: Uuid::new_v4(), build_info: config.build_info, - aws_external_id: config.aws_external_id.clone(), timestamp_frequency: config.timestamp_frequency, now: config.now.clone(), }, @@ -1917,7 +1916,6 @@ impl Catalog { storage, experimental_mode, build_info: &DUMMY_BUILD_INFO, - aws_external_id: AwsExternalId::NotProvided, timestamp_frequency: Duration::from_secs(1), now, skip_migrations: true, diff --git a/src/coord/src/catalog/config.rs b/src/coord/src/catalog/config.rs index 4b78f21de8a81..1327766f6f50c 100644 --- a/src/coord/src/catalog/config.rs +++ b/src/coord/src/catalog/config.rs @@ -10,7 +10,6 @@ use std::time::Duration; use mz_build_info::BuildInfo; -use mz_dataflow_types::sources::AwsExternalId; use mz_ore::metrics::MetricsRegistry; use crate::catalog::storage; @@ -24,10 +23,6 @@ pub struct Config<'a, S> { pub experimental_mode: Option, /// Information about this build of Materialize. pub build_info: &'static BuildInfo, - /// An [External ID][] to use for all AWS AssumeRole operations. - /// - /// [External ID]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html - pub aws_external_id: AwsExternalId, /// Timestamp frequency to use for CREATE SOURCE pub timestamp_frequency: Duration, /// Function to generate wall clock now; can be mocked. diff --git a/src/coord/src/coord.rs b/src/coord/src/coord.rs index 38a6dcb080aeb..bcc791699f472 100644 --- a/src/coord/src/coord.rs +++ b/src/coord/src/coord.rs @@ -99,10 +99,10 @@ use mz_dataflow_types::client::{ }; use mz_dataflow_types::sinks::{SinkAsOf, SinkConnector, SinkDesc, TailSinkConnector}; use mz_dataflow_types::sources::{ - AwsExternalId, ExternalSourceConnector, PostgresSourceConnector, SourceConnector, Timeline, + ExternalSourceConnector, PostgresSourceConnector, SourceConnector, Timeline, }; use mz_dataflow_types::{ - BuildDesc, DataflowDesc, DataflowDescription, IndexDesc, PeekResponse, Update, + BuildDesc, ConnectorContext, DataflowDesc, DataflowDescription, IndexDesc, PeekResponse, Update, }; use mz_expr::{ permutation_for_arrangement, CollectionPlan, ExprHumanizer, MirRelationExpr, MirScalarExpr, @@ -233,12 +233,12 @@ pub struct Config { pub logical_compaction_window: Option, pub experimental_mode: bool, pub build_info: &'static BuildInfo, - pub aws_external_id: AwsExternalId, pub metrics_registry: MetricsRegistry, pub now: NowFn, pub secrets_controller: Box, pub availability_zones: Vec, pub replica_sizes: ClusterReplicaSizeMap, + pub connector_context: ConnectorContext, } struct PendingPeek { @@ -363,6 +363,9 @@ pub struct Coordinator { replica_sizes: ClusterReplicaSizeMap, /// Valid availability zones for replicas. availability_zones: Vec, + + /// Extra context to pass through to connector creation. + connector_context: ConnectorContext, } /// Metadata about an active connection. @@ -627,9 +630,13 @@ impl Coordinator { panic!("sink already initialized during catalog boot") } }; - let connector = sink_connector::build(builder.clone(), entry.id()) - .await - .with_context(|| format!("recreating sink {}", entry.name()))?; + let connector = sink_connector::build( + builder.clone(), + entry.id(), + self.connector_context.clone(), + ) + .await + .with_context(|| format!("recreating sink {}", entry.name()))?; self.handle_sink_connector_ready( entry.id(), entry.oid(), @@ -1410,8 +1417,8 @@ impl Coordinator { match mz_sql::connectors::populate_connectors(stmt, &catalog, &mut vec![]) { Ok(stmt) => mz_sql::pure::purify_create_source( self.now(), - self.catalog.config().aws_external_id.clone(), stmt, + self.connector_context.clone(), ), Err(e) => return tx.send(Err(e.into()), session), }; @@ -2355,6 +2362,7 @@ impl Coordinator { // main coordinator thread when the future completes. let connector_builder = sink.connector_builder; let internal_cmd_tx = self.internal_cmd_tx.clone(); + let connector_context = self.connector_context.clone(); task::spawn( || format!("sink_connector_ready:{}", sink.from), async move { @@ -2364,7 +2372,8 @@ impl Coordinator { tx, id, oid, - result: sink_connector::build(connector_builder, id).await, + result: sink_connector::build(connector_builder, id, connector_context) + .await, compute_instance, })) .expect("sending to internal_cmd_tx cannot fail"); @@ -4711,12 +4720,12 @@ pub async fn serve( logical_compaction_window, experimental_mode, build_info, - aws_external_id, metrics_registry, now, secrets_controller, replica_sizes, availability_zones, + connector_context, }: Config, ) -> Result<(Handle, Client), CoordError> { let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); @@ -4726,7 +4735,6 @@ pub async fn serve( storage, experimental_mode: Some(experimental_mode), build_info, - aws_external_id, timestamp_frequency, now: now.clone(), skip_migrations: false, @@ -4766,6 +4774,7 @@ pub async fn serve( secrets_controller, replica_sizes, availability_zones, + connector_context, }; let bootstrap = handle.block_on(coord.bootstrap(builtin_table_updates)); let ok = bootstrap.is_ok(); diff --git a/src/coord/src/sink_connector.rs b/src/coord/src/sink_connector.rs index c57dfd0db10de..cb60e37810622 100644 --- a/src/coord/src/sink_connector.rs +++ b/src/coord/src/sink_connector.rs @@ -11,14 +11,14 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, ResourceSpecifier, TopicReplication}; -use rdkafka::config::ClientConfig; use mz_dataflow_types::sinks::{ KafkaSinkConnector, KafkaSinkConnectorBuilder, KafkaSinkConnectorRetention, KafkaSinkConsistencyConnector, PersistSinkConnector, PersistSinkConnectorBuilder, PublishedSchemaInfo, SinkConnector, SinkConnectorBuilder, }; -use mz_kafka_util::client::MzClientContext; +use mz_dataflow_types::ConnectorContext; +use mz_kafka_util::client::{create_new_client_config, MzClientContext}; use mz_ore::collections::CollectionExt; use mz_repr::GlobalId; @@ -27,9 +27,10 @@ use crate::error::CoordError; pub async fn build( builder: SinkConnectorBuilder, id: GlobalId, + connector_context: ConnectorContext, ) -> Result { match builder { - SinkConnectorBuilder::Kafka(k) => build_kafka(k, id).await, + SinkConnectorBuilder::Kafka(k) => build_kafka(k, id, connector_context).await, SinkConnectorBuilder::Persist(p) => build_persist_sink(p, id), } } @@ -204,6 +205,7 @@ async fn publish_kafka_schemas( async fn build_kafka( builder: KafkaSinkConnectorBuilder, id: GlobalId, + connector_context: ConnectorContext, ) -> Result { let maybe_append_nonce = { let reuse_topic = builder.reuse_topic; @@ -219,7 +221,7 @@ async fn build_kafka( let topic = maybe_append_nonce(&builder.topic_prefix); // Create Kafka topic - let mut config = ClientConfig::new(); + let mut config = create_new_client_config(connector_context.librdkafka_log_level); config.set("bootstrap.servers", &builder.broker_addrs.to_string()); for (k, v) in builder.config_options.iter() { // Explicitly reject the statistics interval option here because its not diff --git a/src/dataflow-types/Cargo.toml b/src/dataflow-types/Cargo.toml index f1e81fa5834e3..5528921dde08e 100644 --- a/src/dataflow-types/Cargo.toml +++ b/src/dataflow-types/Cargo.toml @@ -28,7 +28,7 @@ mz-ccsr = { path = "../ccsr" } mz-expr = { path = "../expr" } mz-interchange = { path = "../interchange" } mz-kafka-util = { path = "../kafka-util" } -mz-ore = { path = "../ore" } +mz-ore = { path = "../ore", features = ["tracing_"] } mz-orchestrator = { path = "../orchestrator" } mz-persist = { path = "../persist" } mz-persist-client = { path = "../persist-client" } @@ -47,6 +47,7 @@ tokio-serde = { version = "0.8.0", features = ["bincode"] } tokio-stream = " 0.1.8" tokio-util = { version = "0.7.2", features = ["codec"] } tracing = "0.1.34" +tracing-subscriber = "0.3.11" url = { version = "2.2.2", features = ["serde"] } uuid = { version = "0.8.2", features = ["serde", "v4"] } proptest = { git = "https://github.com/MaterializeInc/proptest.git", default-features = false, features = ["std"]} diff --git a/src/dataflow-types/src/types.rs b/src/dataflow-types/src/types.rs index 90732a69461de..2978eed7c8b58 100644 --- a/src/dataflow-types/src/types.rs +++ b/src/dataflow-types/src/types.rs @@ -28,6 +28,7 @@ use mz_repr::proto::{any_uuid, FromProtoIfSome, ProtoRepr, TryFromProtoError, Tr use mz_repr::{Diff, GlobalId, RelationType, Row}; use crate::client::controller::storage::CollectionMetadata; +use crate::types::aws::AwsExternalId; use crate::types::sinks::SinkDesc; use crate::types::sources::SourceDesc; use crate::Plan; @@ -486,6 +487,183 @@ impl Arbitrary for DataflowDescription, + /// The AWS role to assume. + pub role: Option, + /// The custom AWS endpoint to use, if any. + pub endpoint: Option, + } + + /// AWS credentials for a source or sink. + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] + pub enum AwsCredentials { + /// Look for credentials using the [default credentials chain][credchain] + /// + /// [credchain]: aws_config::default_provider::credentials::DefaultCredentialsChain + Default, + /// Load credentials using the given named profile + Profile { profile_name: String }, + /// Use the enclosed static credentials + Static { + access_key_id: String, + secret_access_key: String, + session_token: Option, + }, + } + + /// A role for Materialize to assume when performing AWS API calls. + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] + pub struct AwsAssumeRole { + /// The Amazon Resource Name of the role to assume. + pub arn: String, + } + + impl AwsConfig { + /// Loads the AWS SDK configuration object from the environment, then + /// applies the overrides from this object. + pub async fn load(&self, external_id: Option<&AwsExternalId>) -> aws_types::SdkConfig { + use aws_config::default_provider::credentials::DefaultCredentialsChain; + use aws_config::default_provider::region::DefaultRegionChain; + use aws_config::sts::AssumeRoleProvider; + use aws_smithy_http::endpoint::Endpoint; + use aws_types::credentials::SharedCredentialsProvider; + use aws_types::region::Region; + + let region = match &self.region { + Some(region) => Some(Region::new(region.clone())), + _ => { + let mut rc = DefaultRegionChain::builder(); + if let AwsCredentials::Profile { profile_name } = &self.credentials { + rc = rc.profile_name(profile_name); + } + rc.build().region().await + } + }; + + let mut cred_provider = match &self.credentials { + AwsCredentials::Default => SharedCredentialsProvider::new( + DefaultCredentialsChain::builder() + .region(region.clone()) + .build() + .await, + ), + AwsCredentials::Profile { profile_name } => SharedCredentialsProvider::new( + DefaultCredentialsChain::builder() + .profile_name(profile_name) + .region(region.clone()) + .build() + .await, + ), + AwsCredentials::Static { + access_key_id, + secret_access_key, + session_token, + } => SharedCredentialsProvider::new(aws_types::Credentials::from_keys( + access_key_id, + secret_access_key, + session_token.clone(), + )), + }; + + if let Some(AwsAssumeRole { arn }) = &self.role { + let mut role = AssumeRoleProvider::builder(arn).session_name("materialized"); + // This affects which region to perform STS on, not where + // anything else happens. + if let Some(region) = ®ion { + role = role.region(region.clone()); + } + if let Some(external_id) = external_id { + role = role.external_id(&external_id.0); + } + cred_provider = SharedCredentialsProvider::new(role.build(cred_provider)); + } + + let mut loader = aws_config::from_env() + .region(region) + .credentials_provider(cred_provider); + if let Some(endpoint) = &self.endpoint { + loader = loader.endpoint_resolver(Endpoint::immutable(endpoint.0.clone())); + } + loader.load().await + } + } +} + +/// Extra context to pass through when instantiating a connector for a source or +/// sink. +/// +/// Should be kept cheaply cloneable. +#[derive(Debug, Clone)] +pub struct ConnectorContext { + /// The level for librdkafka's logs. + pub librdkafka_log_level: tracing::Level, + /// An external ID to use for all AWS AssumeRole operations. + pub aws_external_id: Option, +} + +impl ConnectorContext { + /// Constructs a new connector context from command line arguments. + /// + /// **WARNING:** it is critical for security that the `aws_external_id` be + /// provided by the operator of the Materialize service (i.e., via a CLI + /// argument or environment variable) and not the end user of Materialize + /// (e.g., via a configuration option in a SQL statement). See + /// [`AwsExternalId`] for details. + pub fn from_cli_args( + filter: &tracing_subscriber::filter::Targets, + aws_external_id: Option, + ) -> ConnectorContext { + ConnectorContext { + librdkafka_log_level: mz_ore::tracing::target_level(filter, "librdkafka"), + aws_external_id: aws_external_id.map(AwsExternalId), + } + } +} + +impl Default for ConnectorContext { + fn default() -> ConnectorContext { + ConnectorContext { + librdkafka_log_level: tracing::Level::INFO, + aws_external_id: None, + } + } +} + impl DataflowDescription { /// Creates a new dataflow description with a human-readable name. pub fn new(name: String) -> Self { @@ -848,7 +1026,6 @@ pub mod sources { use chrono::NaiveDateTime; use differential_dataflow::lattice::Lattice; use globset::Glob; - use http::Uri; use mz_persist_client::read::ReadHandle; use mz_persist_client::{PersistLocation, ShardId}; use mz_persist_types::Codec64; @@ -862,6 +1039,7 @@ pub mod sources { use mz_repr::proto::TryFromProtoError; use mz_repr::{ColumnType, GlobalId, RelationDesc, RelationType, Row, ScalarType}; + use crate::aws::AwsConfig; use crate::postgres_source::PostgresSourceDetails; use crate::DataflowError; @@ -2029,150 +2207,6 @@ pub mod sources { SqsNotifications { queue: String }, } - /// A wrapper for [`Uri`] that implements [`Serialize`] and `Deserialize`. - #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] - pub struct SerdeUri(#[serde(with = "http_serde::uri")] pub Uri); - - /// AWS configuration overrides for a source or sink. - /// - /// This is a distinct type from any of the configuration types built into the - /// AWS SDK so that we can implement `Serialize` and `Deserialize`. - #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] - pub struct AwsConfig { - /// AWS Credentials, or where to find them - pub credentials: AwsCredentials, - /// The AWS region to use. - /// - /// Uses the default region (looking at env vars, config files, etc) if not provided. - pub region: Option, - /// The AWS role to assume. - pub role: Option, - /// The custom AWS endpoint to use, if any. - pub endpoint: Option, - } - - /// AWS credentials for a source or sink. - #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] - pub enum AwsCredentials { - /// Look for credentials using the [default credentials chain][credchain] - /// - /// [credchain]: aws_config::default_provider::credentials::DefaultCredentialsChain - Default, - /// Load credentials using the given named profile - Profile { profile_name: String }, - /// Use the enclosed static credentials - Static { - access_key_id: String, - secret_access_key: String, - session_token: Option, - }, - } - - /// A role for Materialize to assume when performing AWS API calls. - #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] - pub struct AwsAssumeRole { - /// The Amazon Resource Name of the role to assume. - pub arn: String, - } - - /// An external ID to use for all AWS AssumeRole operations. - /// - /// Note that it is critical for security that this ID can **not** be provided by users running - /// in Materialize Cloud, it must be provided by Materialize. Currently this guarantee is - /// satisfied by only making this accessible from the CLI, which users do not have access to. - /// - /// - #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] - pub enum AwsExternalId { - NotProvided, - ISwearThisCameFromACliArgOrEnvVariable(String), - } - - impl Default for AwsExternalId { - fn default() -> Self { - AwsExternalId::NotProvided - } - } - - impl AwsExternalId { - fn get(&self) -> Option<&str> { - match self { - AwsExternalId::NotProvided => None, - AwsExternalId::ISwearThisCameFromACliArgOrEnvVariable(v) => Some(v), - } - } - } - - impl AwsConfig { - /// Loads the AWS SDK configuration object from the environment, then - /// applies the overrides from this object. - pub async fn load(&self, external_id: AwsExternalId) -> aws_types::SdkConfig { - use aws_config::default_provider::credentials::DefaultCredentialsChain; - use aws_config::default_provider::region::DefaultRegionChain; - use aws_config::sts::AssumeRoleProvider; - use aws_smithy_http::endpoint::Endpoint; - use aws_types::credentials::SharedCredentialsProvider; - use aws_types::region::Region; - - let region = match &self.region { - Some(region) => Some(Region::new(region.clone())), - _ => { - let mut rc = DefaultRegionChain::builder(); - if let AwsCredentials::Profile { profile_name } = &self.credentials { - rc = rc.profile_name(profile_name); - } - rc.build().region().await - } - }; - - let mut cred_provider = match &self.credentials { - AwsCredentials::Default => SharedCredentialsProvider::new( - DefaultCredentialsChain::builder() - .region(region.clone()) - .build() - .await, - ), - AwsCredentials::Profile { profile_name } => SharedCredentialsProvider::new( - DefaultCredentialsChain::builder() - .profile_name(profile_name) - .region(region.clone()) - .build() - .await, - ), - AwsCredentials::Static { - access_key_id, - secret_access_key, - session_token, - } => SharedCredentialsProvider::new(aws_types::Credentials::from_keys( - access_key_id, - secret_access_key, - session_token.clone(), - )), - }; - - if let Some(AwsAssumeRole { arn }) = &self.role { - let mut role = AssumeRoleProvider::builder(arn).session_name("materialized"); - // This affects which region to perform STS on, not where - // anything else happens. - if let Some(region) = ®ion { - role = role.region(region.clone()); - } - if let Some(external_id) = external_id.get() { - role = role.external_id(external_id); - } - cred_provider = SharedCredentialsProvider::new(role.build(cred_provider)); - } - - let mut loader = aws_config::from_env() - .region(region) - .credentials_provider(cred_provider); - if let Some(endpoint) = &self.endpoint { - loader = loader.endpoint_resolver(Endpoint::immutable(endpoint.0.clone())); - } - loader.load().await - } - } - #[derive(Debug, Clone)] pub struct SourceData(pub Result); diff --git a/src/kafka-util/src/bin/kgen.rs b/src/kafka-util/src/bin/kgen.rs index e2d785dfb7fad..d81c6eb2f2533 100644 --- a/src/kafka-util/src/bin/kgen.rs +++ b/src/kafka-util/src/bin/kgen.rs @@ -26,7 +26,6 @@ use rdkafka::error::KafkaError; use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer}; use rdkafka::types::RDKafkaErrorCode; use rdkafka::util::Timeout; -use rdkafka::ClientConfig; use serde_json::Map; use url::Url; @@ -695,7 +694,7 @@ async fn main() -> anyhow::Result<()> { let mut key_gen = key_gen.clone(); let mut value_gen = value_gen.clone(); let producer: ThreadedProducer = - ClientConfig::new() + mz_kafka_util::client::create_new_client_config_simple() .set("bootstrap.servers", args.bootstrap_server.to_string()) .create_with_context(mz_kafka_util::client::MzClientContext) .unwrap(); diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 96726df203e2d..40e8c55b7d6d7 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -14,10 +14,11 @@ use std::time::Duration; use anyhow::bail; use mz_ore::collections::CollectionExt; use rdkafka::client::Client; +use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; use rdkafka::consumer::ConsumerContext; use rdkafka::producer::{DefaultProducerContext, DeliveryResult, ProducerContext}; use rdkafka::ClientContext; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, warn, Level}; /// A `ClientContext` implementation that uses `tracing` instead of `log` macros. /// @@ -90,3 +91,56 @@ pub fn get_partitions( Ok(meta_topic.partitions().iter().map(|x| x.id()).collect()) } + +/// A simpler version of [`create_new_client_config`] that defaults +/// the `log_level` to `INFO` and should only be used in tests. +pub fn create_new_client_config_simple() -> ClientConfig { + create_new_client_config(tracing::Level::INFO) +} + +/// Build a new [`rdkafka`] [`ClientConfig`] with its `log_level` set correctly +/// based on the passed through [`tracing::Level`]. This level should be +/// determined for `target: "librdkafka"`. +pub fn create_new_client_config(tracing_level: Level) -> ClientConfig { + #[allow(clippy::disallowed_methods)] + let mut config = ClientConfig::new(); + + let level = if tracing_level >= Level::DEBUG { + RDKafkaLogLevel::Debug + } else if tracing_level >= Level::INFO { + RDKafkaLogLevel::Info + } else if tracing_level >= Level::WARN { + RDKafkaLogLevel::Warning + } else { + RDKafkaLogLevel::Error + }; + // WARNING WARNING WARNING + // + // For whatever reason, if you change this `target` to something else, this + // log line might break. I (guswynn) did some extensive investigation with + // the tracing folks, and we learned that this edge case only happens with + // 1. a different target + // 2. only this file (so far as I can tell) + // 3. only in certain subscriber combinations + // 4. only if the `tracing-log` feature is on. + // + // Our conclusion was that one of our dependencies is doing something + // problematic with `log`. + // + // For now, this works, and prints a nice log line exactly when we want it. + // + // TODO(guswynn): when we can remove `tracing-log`, remove this warning + tracing::debug!(target: "librdkafka", level = ?level, "Determined log level for librdkafka"); + config.set_log_level(level); + + // Patch the librdkafka debug log system into the Rust `log` ecosystem. This + // is a very simple integration at the moment; enabling `debug`-level logs + // for the `librdkafka` target enables the full firehouse of librdkafka + // debug logs. We may want to investigate finer-grained control. + if tracing_level >= Level::DEBUG { + tracing::debug!(target: "librdkafka", "Enabling debug logs for rdkafka"); + config.set("debug", "all"); + } + + config +} diff --git a/src/materialized/src/bin/materialized/main.rs b/src/materialized/src/bin/materialized/main.rs index 5c8dc13bbba02..cedea51a06ec8 100644 --- a/src/materialized/src/bin/materialized/main.rs +++ b/src/materialized/src/bin/materialized/main.rs @@ -49,7 +49,7 @@ use uuid::Uuid; use materialized::{ OrchestratorBackend, OrchestratorConfig, SecretsControllerConfig, TlsConfig, TlsMode, }; -use mz_dataflow_types::sources::AwsExternalId; +use mz_dataflow_types::ConnectorContext; use mz_frontegg_auth::{FronteggAuthentication, FronteggConfig}; use mz_orchestrator_kubernetes::{KubernetesImagePullPolicy, KubernetesOrchestratorConfig}; use mz_orchestrator_process::ProcessOrchestratorConfig; @@ -472,7 +472,7 @@ fn run(args: Args) -> Result<(), anyhow::Error> { // handled by the custom panic handler. let metrics_registry = MetricsRegistry::new(); runtime.block_on(mz_ore::tracing::configure(mz_ore::tracing::TracingConfig { - log_filter: args.log_filter, + log_filter: args.log_filter.clone(), opentelemetry_endpoint: args.opentelemetry_endpoint, opentelemetry_headers: args .opentelemetry_header @@ -747,14 +747,11 @@ max log level: {max_log_level}", orchestrator, secrets_controller: Some(secrets_controller), experimental_mode: args.experimental, - aws_external_id: args - .aws_external_id - .map(AwsExternalId::ISwearThisCameFromACliArgOrEnvVariable) - .unwrap_or(AwsExternalId::NotProvided), metrics_registry, now: SYSTEM_TIME.clone(), replica_sizes, availability_zones: args.availability_zone, + connector_context: ConnectorContext::from_cli_args(&args.log_filter, args.aws_external_id), }))?; eprintln!( diff --git a/src/materialized/src/lib.rs b/src/materialized/src/lib.rs index 0eb1e8a880e60..e4dd1d0820403 100644 --- a/src/materialized/src/lib.rs +++ b/src/materialized/src/lib.rs @@ -28,7 +28,7 @@ use futures::StreamExt; use mz_build_info::{build_info, BuildInfo}; use mz_dataflow_types::client::controller::ClusterReplicaSizeMap; use mz_dataflow_types::client::RemoteClient; -use mz_dataflow_types::sources::AwsExternalId; +use mz_dataflow_types::ConnectorContext; use mz_frontegg_auth::FronteggAuthentication; use mz_orchestrator::{Orchestrator, ServiceConfig, ServicePort}; use mz_orchestrator_kubernetes::{KubernetesOrchestrator, KubernetesOrchestratorConfig}; @@ -98,6 +98,12 @@ pub struct Config { /// stash instead of sqlite from the `data_directory`. pub catalog_postgres_stash: Option, + // === Connector options. === + /// Configuration for source and sink connectors created by the storage + /// layer. This can include configuration for external + /// sources. + pub connector_context: ConnectorContext, + // === Platform options. === /// Configuration of service orchestration. pub orchestrator: OrchestratorConfig, @@ -106,12 +112,6 @@ pub struct Config { /// Optional configuration for a secrets controller. pub secrets_controller: Option, - // === AWS options. === - /// An [external ID] to be supplied to all AWS AssumeRole operations. - /// - /// [external id]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html - pub aws_external_id: AwsExternalId, - // === Mode switches. === /// Whether to permit usage of experimental features. pub experimental_mode: bool, @@ -396,12 +396,12 @@ async fn serve_stash( logical_compaction_window: config.logical_compaction_window, experimental_mode: config.experimental_mode, build_info: &BUILD_INFO, - aws_external_id: config.aws_external_id.clone(), metrics_registry: config.metrics_registry.clone(), now: config.now, secrets_controller, replica_sizes: config.replica_sizes.clone(), availability_zones: config.availability_zones.clone(), + connector_context: config.connector_context, }) .await?; diff --git a/src/materialized/tests/util.rs b/src/materialized/tests/util.rs index 3eb9f941b424e..23437dc932f3e 100644 --- a/src/materialized/tests/util.rs +++ b/src/materialized/tests/util.rs @@ -25,7 +25,6 @@ use tokio::runtime::Runtime; use tower_http::cors::AllowOrigin; use materialized::{OrchestratorBackend, OrchestratorConfig, TlsMode}; -use mz_dataflow_types::sources::AwsExternalId; use mz_frontegg_auth::FronteggAuthentication; use mz_orchestrator_process::ProcessOrchestratorConfig; use mz_ore::id_gen::PortAllocator; @@ -44,7 +43,6 @@ lazy_static! { #[derive(Clone)] pub struct Config { data_directory: Option, - aws_external_id: AwsExternalId, logging_granularity: Option, tls: Option, frontegg: Option, @@ -58,7 +56,6 @@ impl Default for Config { fn default() -> Config { Config { data_directory: None, - aws_external_id: AwsExternalId::NotProvided, logging_granularity: Some(Duration::from_secs(1)), tls: None, frontegg: None, @@ -164,7 +161,6 @@ pub fn start_server(config: Config) -> Result { linger: false, }, secrets_controller: None, - aws_external_id: config.aws_external_id, listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), tls: config.tls, frontegg: config.frontegg, @@ -175,6 +171,7 @@ pub fn start_server(config: Config) -> Result { cors_allowed_origin: AllowOrigin::list([]), replica_sizes: Default::default(), availability_zones: Default::default(), + connector_context: Default::default(), }))?; let server = Server { inner, diff --git a/src/ore/src/lib.rs b/src/ore/src/lib.rs index ae0ac4816e1f2..6cbe6e1d3efdf 100644 --- a/src/ore/src/lib.rs +++ b/src/ore/src/lib.rs @@ -54,6 +54,7 @@ pub mod now; pub mod option; pub mod panic; pub mod path; +#[cfg_attr(nightly_doc_features, doc(cfg(feature = "process")))] pub mod result; #[cfg_attr(nightly_doc_features, doc(cfg(feature = "network")))] #[cfg(feature = "network")] diff --git a/src/ore/src/tracing.rs b/src/ore/src/tracing.rs index c174aa6d6ad7e..ff174516d3d4d 100644 --- a/src/ore/src/tracing.rs +++ b/src/ore/src/tracing.rs @@ -19,7 +19,7 @@ use opentelemetry::sdk::{trace, Resource}; use opentelemetry::KeyValue; use tonic::metadata::MetadataMap; use tonic::transport::Endpoint; -use tracing::{Event, Subscriber}; +use tracing::{Event, Level, Subscriber}; use tracing_subscriber::filter::{LevelFilter, Targets}; use tracing_subscriber::fmt::format::{format, Format, Writer}; use tracing_subscriber::fmt::{self, FmtContext, FormatEvent, FormatFields}; @@ -147,6 +147,21 @@ pub async fn configure(config: TracingConfig) -> Result<(), anyhow::Error> { Ok(()) } +/// Returns the level of a specific target from a [`Targets`]. +pub fn target_level(targets: &Targets, target: &str) -> Level { + if targets.would_enable(target, &Level::TRACE) { + Level::TRACE + } else if targets.would_enable(target, &Level::DEBUG) { + Level::DEBUG + } else if targets.would_enable(target, &Level::INFO) { + Level::INFO + } else if targets.would_enable(target, &Level::WARN) { + Level::WARN + } else { + Level::ERROR + } +} + /// A wrapper around a `tracing_subscriber` `Format` that /// prepends a subprocess name to the event logs #[derive(Debug)] diff --git a/src/sql/src/catalog.rs b/src/sql/src/catalog.rs index 5a9c1eeef68e9..6c22a7a525631 100644 --- a/src/sql/src/catalog.rs +++ b/src/sql/src/catalog.rs @@ -19,10 +19,10 @@ use std::time::{Duration, Instant}; use chrono::{DateTime, Utc, MIN_DATETIME}; use lazy_static::lazy_static; -use mz_dataflow_types::sources::{AwsExternalId, SourceConnector}; use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO}; use mz_dataflow_types::client::ComputeInstanceId; +use mz_dataflow_types::sources::SourceConnector; use mz_expr::{DummyHumanizer, ExprHumanizer, MirScalarExpr}; use mz_ore::now::{EpochMillis, NowFn, NOW_ZERO}; use mz_repr::{ColumnName, GlobalId, RelationDesc, ScalarType}; @@ -207,8 +207,6 @@ pub struct CatalogConfig { pub experimental_mode: bool, /// Information about this build of Materialize. pub build_info: &'static BuildInfo, - /// An external ID to be supplied to all AWS AssumeRole operations. - pub aws_external_id: AwsExternalId, /// Default timestamp frequency for CREATE SOURCE pub timestamp_frequency: Duration, /// Function that returns a wall clock now time; can safely be mocked to return @@ -602,7 +600,6 @@ lazy_static! { session_id: Uuid::from_u128(0), experimental_mode: true, build_info: &DUMMY_BUILD_INFO, - aws_external_id: AwsExternalId::NotProvided, timestamp_frequency: Duration::from_secs(1), now: NOW_ZERO.clone(), }; diff --git a/src/sql/src/kafka_util.rs b/src/sql/src/kafka_util.rs index 6adea2f343caf..0a951b99cc99a 100644 --- a/src/sql/src/kafka_util.rs +++ b/src/sql/src/kafka_util.rs @@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex}; use anyhow::{anyhow, bail}; -use mz_kafka_util::client::MzClientContext; +use mz_kafka_util::client::{create_new_client_config, MzClientContext}; use mz_ore::task; use rdkafka::client::ClientContext; use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext}; @@ -204,8 +204,9 @@ pub async fn create_consumer( broker: &str, topic: &str, options: &BTreeMap, + librdkafka_log_level: tracing::Level, ) -> Result>, anyhow::Error> { - let mut config = rdkafka::ClientConfig::new(); + let mut config = create_new_client_config(librdkafka_log_level); config.set("bootstrap.servers", broker); for (k, v) in options { config.set(k, v); diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index b0fa0bb0eb43f..fda836cc8ebd5 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -19,7 +19,7 @@ use std::collections::BTreeMap; use anyhow::{bail, Context}; use itertools::Itertools; -use mz_dataflow_types::sources::{AwsAssumeRole, AwsConfig, AwsCredentials, SerdeUri}; +use mz_dataflow_types::aws::{AwsAssumeRole, AwsConfig, AwsCredentials, SerdeUri}; use mz_repr::ColumnName; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::visit_mut::{self, VisitMut}; diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 440ae99cc9258..cdb5c635bfa12 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -27,8 +27,9 @@ use tracing::info; use uuid::Uuid; use mz_ccsr::{Client, GetBySubjectError}; +use mz_dataflow_types::aws::{AwsConfig, AwsExternalId}; use mz_dataflow_types::postgres_source::PostgresSourceDetails; -use mz_dataflow_types::sources::{AwsConfig, AwsExternalId}; +use mz_dataflow_types::ConnectorContext; use mz_repr::strconv; @@ -52,8 +53,8 @@ use crate::normalize; /// locking access to the catalog for an unbounded amount of time. pub async fn purify_create_source( now: u64, - aws_external_id: AwsExternalId, mut stmt: CreateSourceStatement, + connector_context: ConnectorContext, ) -> Result, anyhow::Error> { let CreateSourceStatement { connector, @@ -102,9 +103,14 @@ pub async fn purify_create_source( // Verify that the provided security options are valid and then test them. kafka_util::extract_config(&mut with_options_map)? }; - let consumer = kafka_util::create_consumer(&broker, &topic, &config_options) - .await - .map_err(|e| anyhow!("Failed to create and connect Kafka consumer: {}", e))?; + let consumer = kafka_util::create_consumer( + &broker, + &topic, + &config_options, + connector_context.librdkafka_log_level, + ) + .await + .map_err(|e| anyhow!("Failed to create and connect Kafka consumer: {}", e))?; // Translate `kafka_time_offset` to `start_offset`. match kafka_util::lookup_start_offsets( @@ -137,7 +143,8 @@ pub async fn purify_create_source( } CreateSourceConnector::S3 { .. } => { let aws_config = normalize::aws_config(&mut with_options_map, None)?; - validate_aws_credentials(&aws_config, aws_external_id).await?; + validate_aws_credentials(&aws_config, connector_context.aws_external_id.as_ref()) + .await?; } CreateSourceConnector::Kinesis { arn } => { let region = arn @@ -147,7 +154,8 @@ pub async fn purify_create_source( .ok_or_else(|| anyhow!("Provided ARN does not include an AWS region"))?; let aws_config = normalize::aws_config(&mut with_options_map, Some(region.into()))?; - validate_aws_credentials(&aws_config, aws_external_id).await?; + validate_aws_credentials(&aws_config, connector_context.aws_external_id.as_ref()) + .await?; } CreateSourceConnector::Postgres { conn, @@ -504,7 +512,7 @@ async fn compile_proto( /// whether the specified AWS configuration is valid. async fn validate_aws_credentials( config: &AwsConfig, - external_id: AwsExternalId, + external_id: Option<&AwsExternalId>, ) -> Result<(), anyhow::Error> { let config = config.load(external_id).await; let sts_client = aws_sdk_sts::Client::new(&config); diff --git a/src/sql/src/query_model/test/catalog.rs b/src/sql/src/query_model/test/catalog.rs index b11b75339e001..5daa15a1ecfe0 100644 --- a/src/sql/src/query_model/test/catalog.rs +++ b/src/sql/src/query_model/test/catalog.rs @@ -23,7 +23,7 @@ use crate::DEFAULT_SCHEMA; use chrono::MIN_DATETIME; use lazy_static::lazy_static; use mz_build_info::DUMMY_BUILD_INFO; -use mz_dataflow_types::sources::{AwsExternalId, SourceConnector}; +use mz_dataflow_types::sources::SourceConnector; use mz_expr::{DummyHumanizer, ExprHumanizer, MirScalarExpr}; use mz_lowertest::*; use mz_ore::now::{EpochMillis, NOW_ZERO}; @@ -42,7 +42,6 @@ lazy_static! { session_id: Uuid::from_u128(0), experimental_mode: false, build_info: &DUMMY_BUILD_INFO, - aws_external_id: AwsExternalId::NotProvided, timestamp_frequency: Duration::from_secs(1), now: NOW_ZERO.clone(), }; diff --git a/src/sqllogictest/Cargo.toml b/src/sqllogictest/Cargo.toml index 6f78c7a8e0531..354da8691ffd0 100644 --- a/src/sqllogictest/Cargo.toml +++ b/src/sqllogictest/Cargo.toml @@ -30,6 +30,7 @@ serde_json = "1.0.81" tempfile = "3.2.0" time = "0.3.9" tokio = "1.18.2" +tracing = "0.1.34" tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", branch = "mz-0.7.2", features = ["with-chrono-0_4", "with-uuid-0_8", "with-serde_json-1"] } tower-http = { version = "0.3.3", features = ["cors"] } uuid = "0.8.2" diff --git a/src/sqllogictest/src/runner.rs b/src/sqllogictest/src/runner.rs index 69dcc92401d3e..d567eecf60105 100644 --- a/src/sqllogictest/src/runner.rs +++ b/src/sqllogictest/src/runner.rs @@ -57,7 +57,6 @@ use tower_http::cors::AllowOrigin; use uuid::Uuid; use materialized::{OrchestratorBackend, OrchestratorConfig}; -use mz_dataflow_types::sources::AwsExternalId; use mz_orchestrator_process::ProcessOrchestratorConfig; use mz_ore::id_gen::PortAllocator; use mz_ore::metrics::MetricsRegistry; @@ -579,7 +578,6 @@ impl Runner { linger: false, }, secrets_controller: None, - aws_external_id: AwsExternalId::NotProvided, listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), tls: None, frontegg: None, @@ -590,6 +588,7 @@ impl Runner { now: SYSTEM_TIME.clone(), replica_sizes: Default::default(), availability_zones: Default::default(), + connector_context: Default::default(), }; let server = materialized::serve(mz_config).await?; let client = connect(&server).await; diff --git a/src/storage/src/render/mod.rs b/src/storage/src/render/mod.rs index 3a95934865958..e4944ceb33a1a 100644 --- a/src/storage/src/render/mod.rs +++ b/src/storage/src/render/mod.rs @@ -138,7 +138,6 @@ pub fn build_storage_dataflow( ) { let worker_logging = timely_worker.log_register().get("timely"); let name = format!("Source dataflow: {debug_name}"); - timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| { // The scope.clone() occurs to allow import in the region. // We build a region here to establish a pattern of a scope inside the dataflow, diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index 8e9fd3e790a92..9776b1c29a34f 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -271,7 +271,6 @@ where base_metrics: &storage_state.source_metrics, storage_metadata, as_of: as_of_frontier.clone(), - aws_external_id: storage_state.aws_external_id.clone(), }; // Pubnub and Postgres are `SimpleSource`s, so they produce _raw_ sources @@ -307,7 +306,7 @@ where let ((ok, err), cap) = source::create_raw_source::<_, KafkaSourceReader>( base_source_config, &connector, - storage_state.aws_external_id.clone(), + storage_state.connector_context.clone(), ); ((SourceType::Delimited(ok), err), cap) } @@ -318,7 +317,7 @@ where >( base_source_config, &connector, - storage_state.aws_external_id.clone(), + storage_state.connector_context.clone(), ); ((SourceType::Delimited(ok), err), cap) } @@ -326,7 +325,7 @@ where let ((ok, err), cap) = source::create_raw_source::<_, S3SourceReader>( base_source_config, &connector, - storage_state.aws_external_id.clone(), + storage_state.connector_context.clone(), ); ((SourceType::ByteStream(ok), err), cap) } @@ -335,7 +334,7 @@ where let ((ok, err), cap) = source::create_raw_source::<_, PubNubSourceReader>( base_source_config, &connector, - storage_state.aws_external_id.clone(), + storage_state.connector_context.clone(), ); ((SourceType::Row(ok), err), cap) } diff --git a/src/storage/src/server.rs b/src/storage/src/server.rs index e7031ff8bbf4f..120f0a4f84220 100644 --- a/src/storage/src/server.rs +++ b/src/storage/src/server.rs @@ -21,7 +21,7 @@ use timely::worker::Worker as TimelyWorker; use tokio::sync::mpsc; use mz_dataflow_types::client::{LocalClient, LocalStorageClient, StorageCommand, StorageResponse}; -use mz_dataflow_types::sources::AwsExternalId; +use mz_dataflow_types::ConnectorContext; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::NowFn; @@ -44,8 +44,8 @@ pub struct Config { pub now: NowFn, /// Metrics registry through which dataflow metrics will be reported. pub metrics_registry: MetricsRegistry, - /// An external ID to use for all AWS AssumeRole operations. - pub aws_external_id: AwsExternalId, + /// Configuration for source and sink connectors. + pub connector_context: ConnectorContext, } /// A handle to a running dataflow server. @@ -104,7 +104,6 @@ pub fn serve_boundary SC + Send + Sync + 'st let tokio_executor = tokio::runtime::Handle::current(); let now = config.now; - let aws_external_id = config.aws_external_id.clone(); let worker_guards = timely::execute::execute(config.timely_config, move |timely_worker| { let timely_worker_index = timely_worker.index(); @@ -137,9 +136,9 @@ pub fn serve_boundary SC + Send + Sync + 'st last_bindings_feedback: Instant::now(), now: now.clone(), source_metrics, - aws_external_id: aws_external_id.clone(), timely_worker_index, timely_worker_peers, + connector_context: config.connector_context.clone(), }, storage_boundary, storage_response_tx, diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index f58bf73d51a84..1e0eb7259407f 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -20,22 +20,22 @@ use rdkafka::statistics::Statistics; use rdkafka::topic_partition_list::Offset; use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList}; use timely::scheduling::activate::SyncActivator; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; use uuid::Uuid; use mz_dataflow_types::sources::{ - encoding::SourceDataEncoding, AwsExternalId, ExternalSourceConnector, KafkaOffset, - KafkaSourceConnector, MzOffset, + encoding::SourceDataEncoding, ExternalSourceConnector, KafkaOffset, KafkaSourceConnector, + MzOffset, }; +use mz_dataflow_types::ConnectorContext; use mz_expr::PartitionId; -use mz_kafka_util::{client::MzClientContext, KafkaAddrs}; +use mz_kafka_util::{client::create_new_client_config, client::MzClientContext, KafkaAddrs}; use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle}; use mz_repr::{adt::jsonb::Jsonb, GlobalId}; use crate::source::{NextMessage, SourceMessage, SourceReader, SourceReaderError}; use self::metrics::KafkaPartitionMetrics; -use super::metrics::SourceBaseMetrics; mod metrics; @@ -85,16 +85,15 @@ impl SourceReader for KafkaSourceReader { worker_count: usize, consumer_activator: SyncActivator, connector: ExternalSourceConnector, - _: AwsExternalId, restored_offsets: Vec<(PartitionId, Option)>, _: SourceDataEncoding, - base_metrics: SourceBaseMetrics, + metrics: crate::source::metrics::SourceBaseMetrics, + connector_context: ConnectorContext, ) -> Result { let kc = match connector { ExternalSourceConnector::Kafka(kc) => kc, _ => unreachable!(), }; - let KafkaSourceConnector { addrs, config_options, @@ -109,6 +108,7 @@ impl SourceReader for KafkaSourceReader { group_id_prefix, cluster_id, &config_options, + connector_context.librdkafka_log_level, ); let (stats_tx, stats_rx) = crossbeam_channel::unbounded(); let consumer: BaseConsumer = kafka_config @@ -184,12 +184,7 @@ impl SourceReader for KafkaSourceReader { partition_info, include_headers: kc.include_headers.is_some(), _metadata_thread_handle: metadata_thread_handle, - partition_metrics: KafkaPartitionMetrics::new( - base_metrics, - partition_ids, - topic, - source_id, - ), + partition_metrics: KafkaPartitionMetrics::new(metrics, partition_ids, topic, source_id), }) } @@ -507,8 +502,9 @@ fn create_kafka_config( group_id_prefix: Option, cluster_id: Uuid, config_options: &BTreeMap, + librdkafka_log_level: tracing::Level, ) -> ClientConfig { - let mut kafka_config = ClientConfig::new(); + let mut kafka_config = create_new_client_config(librdkafka_log_level); // Broker configuration. kafka_config.set("bootstrap.servers", &addrs.to_string()); @@ -555,16 +551,6 @@ fn create_kafka_config( ), ); - // Patch the librdkafka debug log system into the Rust `log` ecosystem. - // This is a very simple integration at the moment; enabling `debug`-level - // logs for the `librdkafka` target enables the full firehouse of librdkafka - // debug logs. We may want to investigate finer-grained control. - // TODO(guswynn): replace this when https://github.com/tokio-rs/tracing/pull/1821 is merged - if log::log_enabled!(target: "librdkafka", log::Level::Debug) { - debug!("Enabling 'debug' for rdkafka"); - kafka_config.set("debug", "all"); - } - // Set additional configuration operations from the user. While these look // arbitrary, other layers of the system tightly control which configuration // options are allowable. @@ -708,9 +694,11 @@ mod tests { use std::time::Duration; use rdkafka::consumer::{BaseConsumer, Consumer}; - use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; + use rdkafka::{Message, Offset, TopicPartitionList}; use uuid::Uuid; + use mz_kafka_util::client::create_new_client_config_simple; + // Splitting off a partition queue with an `Offset` that is not `Offset::Beginning` seems to // lead to a race condition where sometimes we receive messages from polling the main consumer // instead of on the partition queue. This can be surfaced by running the test in a loop (in @@ -728,7 +716,7 @@ mod tests { let topic_name = "queue-test"; let pid = 0; - let mut kafka_config = ClientConfig::new(); + let mut kafka_config = create_new_client_config_simple(); kafka_config.set("bootstrap.servers", "localhost:9092".to_string()); kafka_config.set("enable.auto.commit", "false"); kafka_config.set("group.id", Uuid::new_v4().to_string()); diff --git a/src/storage/src/source/kinesis.rs b/src/storage/src/source/kinesis.rs index 19e7779c5faad..cf35089f0b556 100644 --- a/src/storage/src/source/kinesis.rs +++ b/src/storage/src/source/kinesis.rs @@ -21,16 +21,15 @@ use prometheus::core::AtomicI64; use timely::scheduling::SyncActivator; use tracing::error; -use mz_dataflow_types::sources::{ - encoding::SourceDataEncoding, AwsExternalId, ExternalSourceConnector, KinesisSourceConnector, - MzOffset, -}; -use mz_dataflow_types::SourceErrorDetails; +use mz_dataflow_types::aws::AwsExternalId; +use mz_dataflow_types::sources::encoding::SourceDataEncoding; +use mz_dataflow_types::sources::{ExternalSourceConnector, KinesisSourceConnector, MzOffset}; +use mz_dataflow_types::{ConnectorContext, SourceErrorDetails}; use mz_expr::PartitionId; use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt}; use mz_repr::GlobalId; -use crate::source::metrics::{KinesisMetrics, SourceBaseMetrics}; +use crate::source::metrics::KinesisMetrics; use crate::source::{NextMessage, SourceMessage, SourceReader, SourceReaderError}; /// To read all data from a Kinesis stream, we need to continually update @@ -128,17 +127,21 @@ impl SourceReader for KinesisSourceReader { _worker_count: usize, _consumer_activator: SyncActivator, connector: ExternalSourceConnector, - aws_external_id: AwsExternalId, _restored_offsets: Vec<(PartitionId, Option)>, _encoding: SourceDataEncoding, - base_metrics: SourceBaseMetrics, + metrics: crate::source::metrics::SourceBaseMetrics, + connector_context: ConnectorContext, ) -> Result { let kc = match connector { ExternalSourceConnector::Kinesis(kc) => kc, _ => unreachable!(), }; - let state = block_on(create_state(&base_metrics.kinesis, kc, aws_external_id)); + let state = block_on(create_state( + &metrics.kinesis, + kc, + connector_context.aws_external_id.as_ref(), + )); match state { Ok((kinesis_client, stream_name, shard_set, shard_queue)) => Ok(KinesisSourceReader { kinesis_client, @@ -148,7 +151,7 @@ impl SourceReader for KinesisSourceReader { shard_set, stream_name, processed_message_count: 0, - base_metrics: base_metrics.kinesis, + base_metrics: metrics.kinesis, }), Err(e) => Err(anyhow!("{}", e)), } @@ -263,7 +266,7 @@ impl SourceReader for KinesisSourceReader { async fn create_state( base_metrics: &KinesisMetrics, c: KinesisSourceConnector, - aws_external_id: AwsExternalId, + aws_external_id: Option<&AwsExternalId>, ) -> Result< ( KinesisClient, diff --git a/src/storage/src/source/mod.rs b/src/storage/src/source/mod.rs index eae7731869785..6ee3a7347fc0b 100644 --- a/src/storage/src/source/mod.rs +++ b/src/storage/src/source/mod.rs @@ -55,8 +55,8 @@ use tracing::error; use mz_avro::types::Value; use mz_dataflow_types::sources::encoding::SourceDataEncoding; -use mz_dataflow_types::sources::{AwsExternalId, ExternalSourceConnector, MzOffset}; -use mz_dataflow_types::{DecodeError, SourceError, SourceErrorDetails}; +use mz_dataflow_types::sources::{ExternalSourceConnector, MzOffset}; +use mz_dataflow_types::{ConnectorContext, DecodeError, SourceError, SourceErrorDetails}; use mz_expr::PartitionId; use mz_ore::cast::CastFrom; use mz_ore::metrics::{CounterVecExt, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVecExt}; @@ -118,8 +118,6 @@ pub struct RawSourceCreationConfig<'a, G> { pub now: NowFn, /// The metrics & registry that each source instantiates. pub base_metrics: &'a SourceBaseMetrics, - /// An external ID to use for all AWS AssumeRole operations. - pub aws_external_id: AwsExternalId, /// Storage Metadata pub storage_metadata: CollectionMetadata, /// As Of @@ -168,10 +166,10 @@ where worker_count: usize, consumer_activator: SyncActivator, connector: ExternalSourceConnector, - aws_external_id: AwsExternalId, restored_offsets: Vec<(PartitionId, Option)>, encoding: SourceDataEncoding, metrics: crate::source::metrics::SourceBaseMetrics, + connector_context: ConnectorContext, ) -> Result { S::new( source_name, @@ -180,10 +178,10 @@ where worker_count, consumer_activator, connector, - aws_external_id, restored_offsets, encoding, metrics, + connector_context, ) .map(Self) } @@ -388,10 +386,10 @@ pub trait SourceReader { worker_count: usize, consumer_activator: SyncActivator, connector: ExternalSourceConnector, - aws_external_id: AwsExternalId, restored_offsets: Vec<(PartitionId, Option)>, encoding: SourceDataEncoding, metrics: crate::source::metrics::SourceBaseMetrics, + connector_context: ConnectorContext, ) -> Result where Self: Sized; @@ -910,7 +908,7 @@ where pub fn create_raw_source( config: RawSourceCreationConfig, source_connector: &ExternalSourceConnector, - aws_external_id: AwsExternalId, + connector_context: ConnectorContext, ) -> ( ( timely::dataflow::Stream>, @@ -971,10 +969,10 @@ where worker_count, sync_activator, source_connector.clone(), - aws_external_id.clone(), start_offsets, encoding, base_metrics, + connector_context.clone(), ); let source_stream = match source_reader { Ok(s) => s.into_stream(timestamp_frequency).fuse(), diff --git a/src/storage/src/source/pubnub.rs b/src/storage/src/source/pubnub.rs index 3bb2441b57b77..2fc115743dde3 100644 --- a/src/storage/src/source/pubnub.rs +++ b/src/storage/src/source/pubnub.rs @@ -17,13 +17,11 @@ use pubnub_hyper::{Builder, DefaultRuntime, DefaultTransport, PubNub}; use timely::scheduling::SyncActivator; use tracing::info; -use mz_dataflow_types::sources::{ - encoding::SourceDataEncoding, AwsExternalId, ExternalSourceConnector, MzOffset, -}; +use mz_dataflow_types::sources::{encoding::SourceDataEncoding, ExternalSourceConnector, MzOffset}; +use mz_dataflow_types::ConnectorContext; use mz_expr::PartitionId; use mz_repr::{Datum, GlobalId, Row}; -use super::metrics::SourceBaseMetrics; use crate::source::{SourceMessage, SourceReader, SourceReaderError}; /// Information required to sync data from PubNub @@ -45,10 +43,10 @@ impl SourceReader for PubNubSourceReader { _worker_count: usize, _consumer_activator: SyncActivator, connector: ExternalSourceConnector, - _aws_external_id: AwsExternalId, _restored_offsets: Vec<(PartitionId, Option)>, _encoding: SourceDataEncoding, - _metrics: SourceBaseMetrics, + _: crate::source::metrics::SourceBaseMetrics, + _: ConnectorContext, ) -> Result { let pubnub_conn = match connector { ExternalSourceConnector::PubNub(pubnub_conn) => pubnub_conn, diff --git a/src/storage/src/source/s3.rs b/src/storage/src/source/s3.rs index a896d7faede05..2200416959ec8 100644 --- a/src/storage/src/source/s3.rs +++ b/src/storage/src/source/s3.rs @@ -47,10 +47,10 @@ use tokio::time::{self, Duration}; use tokio_util::io::{ReaderStream, StreamReader}; use tracing::{debug, error, trace, warn}; -use mz_dataflow_types::sources::{ - encoding::SourceDataEncoding, AwsConfig, AwsExternalId, Compression, ExternalSourceConnector, - MzOffset, S3KeySource, -}; +use mz_dataflow_types::aws::{AwsConfig, AwsExternalId}; +use mz_dataflow_types::sources::encoding::SourceDataEncoding; +use mz_dataflow_types::sources::{Compression, ExternalSourceConnector, MzOffset, S3KeySource}; +use mz_dataflow_types::ConnectorContext; use mz_expr::PartitionId; use mz_ore::retry::{Retry, RetryReader}; use mz_ore::task; @@ -126,12 +126,12 @@ async fn download_objects_task( tx: Sender>, mut shutdown_rx: tokio::sync::watch::Receiver, aws_config: AwsConfig, - aws_external_id: AwsExternalId, + aws_external_id: Option, activator: SyncActivator, compression: Compression, metrics: SourceBaseMetrics, ) { - let config = aws_config.load(aws_external_id).await; + let config = aws_config.load(aws_external_id.as_ref()).await; let client = aws_sdk_s3::Client::new(&config); struct BucketInfo { @@ -239,11 +239,11 @@ async fn scan_bucket_task( source_id: String, glob: Option, aws_config: AwsConfig, - aws_external_id: AwsExternalId, + aws_external_id: Option, tx: Sender>, base_metrics: SourceBaseMetrics, ) { - let config = aws_config.load(aws_external_id).await; + let config = aws_config.load(aws_external_id.as_ref()).await; let client = aws_sdk_s3::Client::new(&config); let glob = glob.as_ref(); @@ -352,7 +352,7 @@ async fn read_sqs_task( glob: Option, queue: String, aws_config: AwsConfig, - aws_external_id: AwsExternalId, + aws_external_id: Option, tx: Sender>, mut shutdown_rx: tokio::sync::watch::Receiver, base_metrics: SourceBaseMetrics, @@ -362,7 +362,7 @@ async fn read_sqs_task( source_id, queue, ); - let config = aws_config.load(aws_external_id).await; + let config = aws_config.load(aws_external_id.as_ref()).await; let client = aws_sdk_sqs::Client::new(&config); let glob = glob.as_ref(); @@ -778,10 +778,10 @@ impl SourceReader for S3SourceReader { _worker_count: usize, consumer_activator: SyncActivator, connector: ExternalSourceConnector, - aws_external_id: AwsExternalId, _restored_offsets: Vec<(PartitionId, Option)>, _encoding: SourceDataEncoding, - metrics: SourceBaseMetrics, + metrics: crate::source::metrics::SourceBaseMetrics, + connector_context: ConnectorContext, ) -> Result { let s3_conn = match connector { ExternalSourceConnector::S3(s3_conn) => s3_conn, @@ -805,7 +805,7 @@ impl SourceReader for S3SourceReader { dataflow_tx, shutdown_rx.clone(), s3_conn.aws.clone(), - aws_external_id.clone(), + connector_context.aws_external_id.clone(), consumer_activator, s3_conn.compression, metrics.clone(), @@ -827,7 +827,7 @@ impl SourceReader for S3SourceReader { source_id.to_string(), glob.clone(), s3_conn.aws.clone(), - aws_external_id.clone(), + connector_context.aws_external_id.clone(), keys_tx.clone(), metrics.clone(), ), @@ -845,7 +845,7 @@ impl SourceReader for S3SourceReader { glob.clone(), queue, s3_conn.aws.clone(), - aws_external_id.clone(), + connector_context.aws_external_id.clone(), keys_tx.clone(), shutdown_rx.clone(), metrics.clone(), diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index 00d0aa80fa707..3b7bc284790cb 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -25,8 +25,8 @@ use timely::worker::Worker as TimelyWorker; use tokio::sync::mpsc; use mz_dataflow_types::client::{RenderSourcesCommand, StorageCommand, StorageResponse}; -use mz_dataflow_types::sources::AwsExternalId; use mz_dataflow_types::sources::{ExternalSourceConnector, SourceConnector}; +use mz_dataflow_types::ConnectorContext; use mz_ore::now::NowFn; use mz_repr::{Diff, GlobalId, Row, Timestamp}; @@ -75,12 +75,12 @@ pub struct StorageState { pub now: NowFn, /// Metrics for the source-specific side of dataflows. pub source_metrics: SourceBaseMetrics, - /// An external ID to use for all AWS AssumeRole operations. - pub aws_external_id: AwsExternalId, /// Index of the associated timely dataflow worker. pub timely_worker_index: usize, /// Peers in the associated timely dataflow worker. pub timely_worker_peers: usize, + /// Configuration for source and sink connectors. + pub connector_context: ConnectorContext, } /// State about a single table. diff --git a/src/storaged/src/main.rs b/src/storaged/src/main.rs index 5805bea4f3708..468d03227fb06 100644 --- a/src/storaged/src/main.rs +++ b/src/storaged/src/main.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::env; use std::fmt; use std::path::PathBuf; use std::process; @@ -25,7 +26,7 @@ use tracing_subscriber::filter::Targets; use mz_build_info::{build_info, BuildInfo}; use mz_dataflow_types::client::{GenericClient, StorageClient}; -use mz_dataflow_types::sources::AwsExternalId; +use mz_dataflow_types::ConnectorContext; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::SYSTEM_TIME; use mz_pid_file::PidFile; @@ -178,10 +179,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { experimental_mode: false, metrics_registry: MetricsRegistry::new(), now: SYSTEM_TIME.clone(), - aws_external_id: args - .aws_external_id - .map(AwsExternalId::ISwearThisCameFromACliArgOrEnvVariable) - .unwrap_or(AwsExternalId::NotProvided), + connector_context: ConnectorContext::from_cli_args(&args.log_filter, args.aws_external_id), }; let serve_config = ServeConfig { diff --git a/src/testdrive/src/action.rs b/src/testdrive/src/action.rs index 59f505b567484..d5a751569be99 100644 --- a/src/testdrive/src/action.rs +++ b/src/testdrive/src/action.rs @@ -28,7 +28,7 @@ use itertools::Itertools; use lazy_static::lazy_static; use mz_coord::catalog::{Catalog, ConnCatalog}; use mz_coord::session::Session; -use mz_kafka_util::client::MzClientContext; +use mz_kafka_util::client::{create_new_client_config_simple, MzClientContext}; use mz_ore::now::NOW_ZERO; use rand::Rng; use rdkafka::ClientConfig; @@ -795,7 +795,7 @@ pub async fn create_state( use rdkafka::admin::{AdminClient, AdminOptions}; use rdkafka::producer::FutureProducer; - let mut kafka_config = ClientConfig::new(); + let mut kafka_config = create_new_client_config_simple(); kafka_config.set("bootstrap.servers", &config.kafka_addr); kafka_config.set("group.id", "materialize-testdrive"); kafka_config.set("auto.offset.reset", "earliest"); diff --git a/test/test-util/src/kafka/kafka_client.rs b/test/test-util/src/kafka/kafka_client.rs index 4f0756211bcc5..34e41716e846f 100644 --- a/test/test-util/src/kafka/kafka_client.rs +++ b/test/test-util/src/kafka/kafka_client.rs @@ -13,11 +13,10 @@ use std::time::Duration; use anyhow::Context; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; -use rdkafka::config::ClientConfig; use rdkafka::error::KafkaError; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord}; -use mz_kafka_util::client::MzClientContext; +use mz_kafka_util::client::{create_new_client_config_simple, MzClientContext}; pub struct KafkaClient { producer: FutureProducer, @@ -30,7 +29,7 @@ impl KafkaClient { group_id: &str, configs: &[(&str, &str)], ) -> Result { - let mut config = ClientConfig::new(); + let mut config = create_new_client_config_simple(); config.set("bootstrap.servers", kafka_url); config.set("group.id", group_id); for (key, val) in configs { @@ -53,7 +52,7 @@ impl KafkaClient { configs: &[(&str, &str)], timeout: Option, ) -> Result<(), anyhow::Error> { - let mut config = ClientConfig::new(); + let mut config = create_new_client_config_simple(); config.set("bootstrap.servers", &self.kafka_url); let client = config