Skip to content

Commit

Permalink
Plumb librdkafka log level through storage layer
Browse files Browse the repository at this point in the history
Supersedes MaterializeInc#10875.
Fixes MaterializeInc#10441.

Co-authored-by: Nikhil Benesch <nikhil.benesch@gmail.com>
  • Loading branch information
guswynn and benesch committed May 18, 2022
1 parent d11bd77 commit fcedae4
Show file tree
Hide file tree
Showing 38 changed files with 429 additions and 317 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ disallowed-methods = [
{ path = "tokio::runtime::Handle::spawn_blocking", reason = "use the spawn wrappers in `mz_ore::task` instead" },
{ path = "tokio::runtime::Runtime::spawn", reason = "use the spawn wrappers in `mz_ore::task` instead" },
{ path = "tokio::runtime::Runtime::spawn_blocking", reason = "use the spawn wrappers in `mz_ore::task` instead" },
# Use the wrapper that sets the log level correctly
{ path = "rdkafka::config::ClientConfig::new", reason = "use the `client::create_new_client_config` wrapper in `kafka_util` instead" },
]
7 changes: 2 additions & 5 deletions src/compute/src/bin/computed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use futures::sink::SinkExt;
use futures::stream::TryStreamExt;
use http::HeaderMap;
use mz_build_info::{build_info, BuildInfo};
use mz_dataflow_types::sources::AwsExternalId;
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
use tokio::net::TcpListener;
Expand All @@ -27,6 +26,7 @@ use tracing_subscriber::filter::Targets;

use mz_dataflow_types::client::{ComputeClient, GenericClient};
use mz_dataflow_types::reconciliation::command::ComputeCommandReconcile;
use mz_dataflow_types::ConnectorContext;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;

Expand Down Expand Up @@ -254,10 +254,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
experimental_mode: false,
metrics_registry: MetricsRegistry::new(),
now: SYSTEM_TIME.clone(),
aws_external_id: args
.aws_external_id
.map(AwsExternalId::ISwearThisCameFromACliArgOrEnvVariable)
.unwrap_or(AwsExternalId::NotProvided),
connector_context: ConnectorContext::from_cli_args(&args.log_filter, args.aws_external_id),
};

let serve_config = ServeConfig {
Expand Down
4 changes: 4 additions & 0 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio::sync::mpsc;

use mz_dataflow_types::client::{ComputeCommand, ComputeResponse};
use mz_dataflow_types::logging::LoggingConfig;
use mz_dataflow_types::ConnectorContext;
use mz_dataflow_types::{DataflowError, PeekResponse, TailResponse};
use mz_repr::{Diff, GlobalId, Row, Timestamp};
use mz_storage::boundary::ComputeReplay;
Expand Down Expand Up @@ -63,6 +64,9 @@ pub struct ComputeState {
pub sink_metrics: SinkBaseMetrics,
/// The logger, from Timely's logging framework, if logs are enabled.
pub materialized_logger: Option<logging::materialized::Logger>,
/// Configuration for sink connectors.
// TODO: remove when sinks move to storage.
pub connector_context: ConnectorContext,
}

/// A wrapper around [ComputeState] with a live timely worker and response channel.
Expand Down
12 changes: 9 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use timely::worker::Worker as TimelyWorker;
use tokio::sync::mpsc;

use mz_dataflow_types::client::{ComputeCommand, ComputeResponse, LocalClient, LocalComputeClient};
use mz_dataflow_types::sources::AwsExternalId;
use mz_dataflow_types::ConnectorContext;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::NowFn;
use mz_storage::boundary::ComputeReplay;
Expand All @@ -42,8 +42,9 @@ pub struct Config {
pub now: NowFn,
/// Metrics registry through which dataflow metrics will be reported.
pub metrics_registry: MetricsRegistry,
/// An external ID to use for all AWS AssumeRole operations.
pub aws_external_id: AwsExternalId,
/// Configuration for sink connectors.
// TODO: remove when sinks move to storage.
pub connector_context: ConnectorContext,
}

/// A handle to a running dataflow server.
Expand Down Expand Up @@ -106,6 +107,7 @@ pub fn serve_boundary<CR: ComputeReplay, B: Fn(usize) -> CR + Send + Sync + 'sta
compute_boundary,
compute_response_tx,
metrics_bundle: metrics_bundle.clone(),
connector_context: config.connector_context.clone(),
}
.run()
})
Expand Down Expand Up @@ -143,6 +145,9 @@ where
compute_response_tx: mpsc::UnboundedSender<ComputeResponse>,
/// Metrics bundle.
metrics_bundle: (SinkBaseMetrics, TraceMetrics),
/// Configuration for sink connectors.
// TODO: remove when sinks move to storage.
pub connector_context: ConnectorContext,
}

impl<'w, A, CR> Worker<'w, A, CR>
Expand Down Expand Up @@ -201,6 +206,7 @@ where
reported_frontiers: HashMap::new(),
sink_metrics: self.metrics_bundle.0.clone(),
materialized_logger: None,
connector_context: self.connector_context.clone(),
});
}
ComputeCommand::DropInstance => {
Expand Down
27 changes: 20 additions & 7 deletions src/compute/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use mz_interchange::avro::{
self, get_debezium_transaction_schema, AvroEncoder, AvroSchemaGenerator,
};
use mz_interchange::encode::Encode;
use mz_kafka_util::client::MzClientContext;
use mz_kafka_util::client::{create_new_client_config, MzClientContext};
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_ore::metrics::{CounterVecExt, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVecExt};
Expand Down Expand Up @@ -137,6 +137,7 @@ where
sink.as_of.clone(),
Rc::clone(&shared_frontier),
&compute_state.sink_metrics.kafka,
compute_state.connector_context.librdkafka_log_level,
);

compute_state
Expand Down Expand Up @@ -434,9 +435,11 @@ impl KafkaSinkState {
activator: Activator,
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
metrics: &KafkaBaseMetrics,
librdkafka_log_level: tracing::Level,
) -> Self {
let config = Self::create_producer_config(&connector);
let consistency_client_config = Self::create_consistency_client_config(&connector);
let config = Self::create_producer_config(&connector, librdkafka_log_level);
let consistency_client_config =
Self::create_consistency_client_config(&connector, librdkafka_log_level);

let metrics = Arc::new(SinkMetrics::new(
metrics,
Expand Down Expand Up @@ -486,8 +489,11 @@ impl KafkaSinkState {
}
}

fn create_producer_config(connector: &KafkaSinkConnector) -> ClientConfig {
let mut config = ClientConfig::new();
fn create_producer_config(
connector: &KafkaSinkConnector,
librdkafka_log_level: tracing::Level,
) -> ClientConfig {
let mut config = create_new_client_config(librdkafka_log_level);
config.set("bootstrap.servers", &connector.addrs.to_string());

// Ensure that messages are sinked in order and without duplicates. Note that
Expand Down Expand Up @@ -535,8 +541,11 @@ impl KafkaSinkState {
config
}

fn create_consistency_client_config(connector: &KafkaSinkConnector) -> ClientConfig {
let mut config = ClientConfig::new();
fn create_consistency_client_config(
connector: &KafkaSinkConnector,
librdkafka_log_level: tracing::Level,
) -> ClientConfig {
let mut config = create_new_client_config(librdkafka_log_level);
config.set("bootstrap.servers", &connector.addrs.to_string());
for (k, v) in connector.config_options.iter() {
// We explicitly reject `statistics.interval.ms` here so that we don't
Expand Down Expand Up @@ -1023,6 +1032,7 @@ fn kafka<G>(
as_of: SinkAsOf,
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
metrics: &KafkaBaseMetrics,
librdkafka_log_level: tracing::Level,
) -> Rc<dyn Any>
where
G: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -1089,6 +1099,7 @@ where
shared_gate_ts,
write_frontier,
metrics,
librdkafka_log_level,
)
}

Expand All @@ -1112,6 +1123,7 @@ pub fn produce_to_kafka<G>(
shared_gate_ts: Rc<Cell<Option<Timestamp>>>,
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
metrics: &KafkaBaseMetrics,
librdkafka_log_level: tracing::Level,
) -> Rc<dyn Any>
where
G: Scope<Timestamp = Timestamp>,
Expand All @@ -1131,6 +1143,7 @@ where
activator,
write_frontier,
metrics,
librdkafka_log_level,
);

let mut vector = Vec::new();
Expand Down
4 changes: 1 addition & 3 deletions src/coord/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use mz_dataflow_types::client::{
use mz_dataflow_types::logging::LoggingConfig as DataflowLoggingConfig;
use mz_dataflow_types::sinks::{SinkConnector, SinkConnectorBuilder, SinkEnvelope};
use mz_dataflow_types::sources::{
AwsExternalId, ConnectorInner, ExternalSourceConnector, SourceConnector, Timeline,
ConnectorInner, ExternalSourceConnector, SourceConnector, Timeline,
};
use mz_expr::{ExprHumanizer, MirScalarExpr, OptimizedMirRelationExpr};
use mz_ore::collections::CollectionExt;
Expand Down Expand Up @@ -1377,7 +1377,6 @@ impl<S: Append> Catalog<S> {
cluster_id: config.storage.cluster_id(),
session_id: Uuid::new_v4(),
build_info: config.build_info,
aws_external_id: config.aws_external_id.clone(),
timestamp_frequency: config.timestamp_frequency,
now: config.now.clone(),
},
Expand Down Expand Up @@ -1917,7 +1916,6 @@ impl<S: Append> Catalog<S> {
storage,
experimental_mode,
build_info: &DUMMY_BUILD_INFO,
aws_external_id: AwsExternalId::NotProvided,
timestamp_frequency: Duration::from_secs(1),
now,
skip_migrations: true,
Expand Down
5 changes: 0 additions & 5 deletions src/coord/src/catalog/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use std::time::Duration;

use mz_build_info::BuildInfo;
use mz_dataflow_types::sources::AwsExternalId;
use mz_ore::metrics::MetricsRegistry;

use crate::catalog::storage;
Expand All @@ -24,10 +23,6 @@ pub struct Config<'a, S> {
pub experimental_mode: Option<bool>,
/// Information about this build of Materialize.
pub build_info: &'static BuildInfo,
/// An [External ID][] to use for all AWS AssumeRole operations.
///
/// [External ID]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html
pub aws_external_id: AwsExternalId,
/// Timestamp frequency to use for CREATE SOURCE
pub timestamp_frequency: Duration,
/// Function to generate wall clock now; can be mocked.
Expand Down
29 changes: 19 additions & 10 deletions src/coord/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ use mz_dataflow_types::client::{
};
use mz_dataflow_types::sinks::{SinkAsOf, SinkConnector, SinkDesc, TailSinkConnector};
use mz_dataflow_types::sources::{
AwsExternalId, ExternalSourceConnector, PostgresSourceConnector, SourceConnector, Timeline,
ExternalSourceConnector, PostgresSourceConnector, SourceConnector, Timeline,
};
use mz_dataflow_types::{
BuildDesc, DataflowDesc, DataflowDescription, IndexDesc, PeekResponse, Update,
BuildDesc, ConnectorContext, DataflowDesc, DataflowDescription, IndexDesc, PeekResponse, Update,
};
use mz_expr::{
permutation_for_arrangement, CollectionPlan, ExprHumanizer, MirRelationExpr, MirScalarExpr,
Expand Down Expand Up @@ -233,12 +233,12 @@ pub struct Config<S> {
pub logical_compaction_window: Option<Duration>,
pub experimental_mode: bool,
pub build_info: &'static BuildInfo,
pub aws_external_id: AwsExternalId,
pub metrics_registry: MetricsRegistry,
pub now: NowFn,
pub secrets_controller: Box<dyn SecretsController>,
pub availability_zones: Vec<String>,
pub replica_sizes: ClusterReplicaSizeMap,
pub connector_context: ConnectorContext,
}

struct PendingPeek {
Expand Down Expand Up @@ -363,6 +363,9 @@ pub struct Coordinator<S> {
replica_sizes: ClusterReplicaSizeMap,
/// Valid availability zones for replicas.
availability_zones: Vec<String>,

/// Extra context to pass through to connector creation.
connector_context: ConnectorContext,
}

/// Metadata about an active connection.
Expand Down Expand Up @@ -627,9 +630,13 @@ impl<S: Append + 'static> Coordinator<S> {
panic!("sink already initialized during catalog boot")
}
};
let connector = sink_connector::build(builder.clone(), entry.id())
.await
.with_context(|| format!("recreating sink {}", entry.name()))?;
let connector = sink_connector::build(
builder.clone(),
entry.id(),
self.connector_context.clone(),
)
.await
.with_context(|| format!("recreating sink {}", entry.name()))?;
self.handle_sink_connector_ready(
entry.id(),
entry.oid(),
Expand Down Expand Up @@ -1410,8 +1417,8 @@ impl<S: Append + 'static> Coordinator<S> {
match mz_sql::connectors::populate_connectors(stmt, &catalog, &mut vec![]) {
Ok(stmt) => mz_sql::pure::purify_create_source(
self.now(),
self.catalog.config().aws_external_id.clone(),
stmt,
self.connector_context.clone(),
),
Err(e) => return tx.send(Err(e.into()), session),
};
Expand Down Expand Up @@ -2355,6 +2362,7 @@ impl<S: Append + 'static> Coordinator<S> {
// main coordinator thread when the future completes.
let connector_builder = sink.connector_builder;
let internal_cmd_tx = self.internal_cmd_tx.clone();
let connector_context = self.connector_context.clone();
task::spawn(
|| format!("sink_connector_ready:{}", sink.from),
async move {
Expand All @@ -2364,7 +2372,8 @@ impl<S: Append + 'static> Coordinator<S> {
tx,
id,
oid,
result: sink_connector::build(connector_builder, id).await,
result: sink_connector::build(connector_builder, id, connector_context)
.await,
compute_instance,
}))
.expect("sending to internal_cmd_tx cannot fail");
Expand Down Expand Up @@ -4711,12 +4720,12 @@ pub async fn serve<S: Append + 'static>(
logical_compaction_window,
experimental_mode,
build_info,
aws_external_id,
metrics_registry,
now,
secrets_controller,
replica_sizes,
availability_zones,
connector_context,
}: Config<S>,
) -> Result<(Handle, Client), CoordError> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
Expand All @@ -4726,7 +4735,6 @@ pub async fn serve<S: Append + 'static>(
storage,
experimental_mode: Some(experimental_mode),
build_info,
aws_external_id,
timestamp_frequency,
now: now.clone(),
skip_migrations: false,
Expand Down Expand Up @@ -4766,6 +4774,7 @@ pub async fn serve<S: Append + 'static>(
secrets_controller,
replica_sizes,
availability_zones,
connector_context,
};
let bootstrap = handle.block_on(coord.bootstrap(builtin_table_updates));
let ok = bootstrap.is_ok();
Expand Down
Loading

0 comments on commit fcedae4

Please sign in to comment.