Skip to content

Commit

Permalink
closer to making it work?
Browse files Browse the repository at this point in the history
  • Loading branch information
guswynn committed Feb 4, 2022
1 parent 9756cd2 commit cdafa32
Show file tree
Hide file tree
Showing 28 changed files with 201 additions and 104 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 29 additions & 16 deletions src/coord/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ pub struct Config {
pub metrics_registry: MetricsRegistry,
pub persister: PersisterWithConfig,
pub now: NowFn,
pub fmt_level: tracing::Level,
}

/// Glues the external world to the Timely workers.
Expand Down Expand Up @@ -335,6 +336,8 @@ pub struct Coordinator {
write_lock: Arc<tokio::sync::Mutex<()>>,
/// Holds plans deferred due to write lock.
write_lock_wait_group: VecDeque<DeferredPlan>,
/// asdf
fmt_level: tracing::Level,
}

/// Metadata about an active connection.
Expand Down Expand Up @@ -606,7 +609,7 @@ impl Coordinator {
if let Some((name, description)) =
Self::prepare_index_build(self.catalog.state(), &index_id)
{
let df = self.dataflow_builder().build_index_dataflow(
let df = self.dataflow_builder(self.fmt_level).build_index_dataflow(
name,
index_id,
description,
Expand All @@ -629,9 +632,10 @@ impl Coordinator {
panic!("sink already initialized during catalog boot")
}
};
let connector = sink_connector::build(builder.clone(), entry.id())
.await
.with_context(|| format!("recreating sink {}", entry.name()))?;
let connector =
sink_connector::build(builder.clone(), entry.id(), self.fmt_level)
.await
.with_context(|| format!("recreating sink {}", entry.name()))?;
self.handle_sink_connector_ready(entry.id(), entry.oid(), connector)
.await?;
}
Expand Down Expand Up @@ -1254,7 +1258,7 @@ impl Coordinator {

let internal_cmd_tx = self.internal_cmd_tx.clone();
let catalog = self.catalog.for_session(&session);
let purify_fut = sql::pure::purify(&catalog, stmt);
let purify_fut = sql::pure::purify(&catalog, self.fmt_level, stmt);
let conn_id = session.conn_id();
task::spawn(|| format!("purify:{conn_id}"), async move {
let result = purify_fut.await.map_err(|e| e.into());
Expand Down Expand Up @@ -2523,6 +2527,7 @@ impl Coordinator {
// main coordinator thread when the future completes.
let connector_builder = sink.connector_builder;
let internal_cmd_tx = self.internal_cmd_tx.clone();
let fmt_level = self.fmt_level;
task::spawn(
|| format!("sink_connector_ready:{}", sink.from),
async move {
Expand All @@ -2532,7 +2537,7 @@ impl Coordinator {
tx,
id,
oid,
result: sink_connector::build(connector_builder, id).await,
result: sink_connector::build(connector_builder, id, fmt_level).await,
}))
.expect("sending to internal_cmd_tx cannot fail");
},
Expand Down Expand Up @@ -3214,7 +3219,7 @@ impl Coordinator {
// The assembled dataflow contains a view and an index of that view.
let mut dataflow = DataflowDesc::new(format!("temp-view-{}", view_id));
dataflow.set_as_of(Antichain::from_elem(timestamp));
self.dataflow_builder()
self.dataflow_builder(self.fmt_level)
.import_view_into_dataflow(&view_id, &source, &mut dataflow)?;
dataflow.export_index(
index_id,
Expand Down Expand Up @@ -3310,9 +3315,11 @@ impl Coordinator {
strict: !with_snapshot,
},
};
let df =
self.dataflow_builder()
.build_sink_dataflow(sink_name, sink_id, sink_description)?;
let df = self.dataflow_builder(self.fmt_level).build_sink_dataflow(
sink_name,
sink_id,
sink_description,
)?;
self.ship_dataflow(df).await;

let resp = ExecuteResponse::Tailing { rx };
Expand Down Expand Up @@ -3568,6 +3575,7 @@ impl Coordinator {
decorrelated_plan
};

let fmt_level = self.fmt_level;
let optimize =
|timings: &mut Timings,
coord: &mut Self,
Expand All @@ -3577,12 +3585,14 @@ impl Coordinator {
let optimized_plan =
coord.prep_relation_expr(decorrelated_plan, ExprPrepStyle::Explain)?;
let mut dataflow = DataflowDesc::new(format!("explanation"));
coord.dataflow_builder().import_view_into_dataflow(
// TODO: If explaining a view, pipe the actual id of the view.
&GlobalId::Explain,
&optimized_plan,
&mut dataflow,
)?;
coord
.dataflow_builder(fmt_level)
.import_view_into_dataflow(
// TODO: If explaining a view, pipe the actual id of the view.
&GlobalId::Explain,
&optimized_plan,
&mut dataflow,
)?;
transform::optimize_dataflow(&mut dataflow, coord.catalog.enabled_indexes())?;
timings.optimization = Some(start.elapsed());
Ok(dataflow)
Expand Down Expand Up @@ -4109,6 +4119,7 @@ impl Coordinator {
indexes,
persister,
storage,
fmt_level: self.fmt_level,
};
f(builder)
})?;
Expand Down Expand Up @@ -4586,6 +4597,7 @@ pub async fn serve(
metrics_registry,
persister,
now,
fmt_level,
}: Config,
) -> Result<(Handle, Client), CoordError> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -4645,6 +4657,7 @@ pub async fn serve(
pending_tails: HashMap::new(),
write_lock: Arc::new(tokio::sync::Mutex::new(())),
write_lock_wait_group: VecDeque::new(),
fmt_level,
};
if let Some(config) = &logging {
handle.block_on(
Expand Down
4 changes: 3 additions & 1 deletion src/coord/src/coord/dataflow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ pub struct DataflowBuilder<'a> {
pub persister: &'a PersisterWithConfig,
/// A handle to the storage abstraction, which describe sources from their identifier.
pub storage: &'a dataflow_types::client::Controller<Box<dyn dataflow_types::client::Client>>,
pub fmt_level: tracing::Level,
}

impl Coordinator {
/// Creates a new dataflow builder from the catalog and indexes in `self`.
pub fn dataflow_builder<'a>(&'a mut self) -> DataflowBuilder {
pub fn dataflow_builder<'a>(&'a mut self, fmt_level: tracing::Level) -> DataflowBuilder {
DataflowBuilder {
catalog: self.catalog.state(),
indexes: &self.indexes,
persister: &self.persister,
storage: &self.dataflow_client,
fmt_level,
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/coord/src/sink_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ use dataflow_types::sinks::{
PublishedSchemaInfo, SinkConnector, SinkConnectorBuilder,
};
use expr::GlobalId;
use kafka_util::client::{create_new_client_config, MzClientContext};
use kafka_util::client::{create_new_client_config_good, MzClientContext};
use ore::collections::CollectionExt;

use crate::error::CoordError;

pub async fn build(
builder: SinkConnectorBuilder,
id: GlobalId,
fmt_level: tracing::Level,
) -> Result<SinkConnector, CoordError> {
match builder {
SinkConnectorBuilder::Kafka(k) => build_kafka(k, id).await,
SinkConnectorBuilder::Kafka(k) => build_kafka(k, id, fmt_level).await,
SinkConnectorBuilder::AvroOcf(a) => build_avro_ocf(a, id),
}
}
Expand Down Expand Up @@ -200,6 +201,7 @@ async fn publish_kafka_schemas(
async fn build_kafka(
builder: KafkaSinkConnectorBuilder,
id: GlobalId,
fmt_level: tracing::Level,
) -> Result<SinkConnector, CoordError> {
let maybe_append_nonce = {
let reuse_topic = builder.reuse_topic;
Expand All @@ -215,7 +217,7 @@ async fn build_kafka(
let topic = maybe_append_nonce(&builder.topic_prefix);

// Create Kafka topic
let mut config = create_new_client_config();
let mut config = create_new_client_config_good(fmt_level);
config.set("bootstrap.servers", &builder.broker_addrs.to_string());
for (k, v) in builder.config_options.iter() {
// Explicitly reject the statistics interval option here because its not
Expand Down
1 change: 1 addition & 0 deletions src/coordtest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ serde_json = "1.0.78"
sql-parser = { path = "../sql-parser" }
tempfile = "3.2.0"
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] }
tracing = "0.1.29"
tokio = "1.16.1"
24 changes: 14 additions & 10 deletions src/coordtest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,20 @@ impl CoordTest {
NowFn::from(move || *timestamp.lock().unwrap())
};
let metrics_registry = MetricsRegistry::new();
let (dataflow_server, dataflow_client) = dataflow::serve(dataflow::Config {
workers: 1,
timely_config: timely::Config {
communication: timely::CommunicationConfig::Process(1),
worker: timely::WorkerConfig::default(),
let (dataflow_server, dataflow_client) = dataflow::serve(
dataflow::Config {
workers: 1,
timely_config: timely::Config {
communication: timely::CommunicationConfig::Process(1),
worker: timely::WorkerConfig::default(),
},
experimental_mode,
now: now.clone(),
metrics_registry: metrics_registry.clone(),
persister: None,
},
experimental_mode,
now: now.clone(),
metrics_registry: metrics_registry.clone(),
persister: None,
})?;
tracing::Level::ERROR,
)?;
let dataflow_client = InterceptingDataflowClient::new(dataflow_client);

let data_directory = tempfile::tempdir()?;
Expand All @@ -149,6 +152,7 @@ impl CoordTest {
metrics_registry,
persister,
now,
fmt_level: tracing::Level::DEBUG,
})
.await?;
let coordtest = CoordTest {
Expand Down
3 changes: 3 additions & 0 deletions src/dataflow/src/render/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub fn build_dataflow<A: Allocate>(
now: NowFn,
source_metrics: &SourceBaseMetrics,
sink_metrics: &SinkBaseMetrics,
fmt_level: tracing::Level,
) {
let worker_logging = timely_worker.log_register().get("timely");
let name = format!("Dataflow: {}", &dataflow.debug_name);
Expand Down Expand Up @@ -253,6 +254,7 @@ pub fn build_dataflow<A: Allocate>(
src_id.clone(),
now.clone(),
source_metrics,
fmt_level,
);

// Associate collection bundle with the source identifier.
Expand Down Expand Up @@ -313,6 +315,7 @@ pub fn build_dataflow<A: Allocate>(
sink_id,
&sink,
sink_metrics,
fmt_level,
);
}
});
Expand Down
7 changes: 7 additions & 0 deletions src/dataflow/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ where
sink_id: GlobalId,
sink: &SinkDesc,
metrics: &SinkBaseMetrics,
fmt_level: tracing::Level,
) {
let sink_render = get_sink_render_for(&sink.connector);

Expand Down Expand Up @@ -93,6 +94,7 @@ where
sink_id,
collection,
metrics,
SinkContext { fmt_level },
);

if let Some(sink_token) = sink_token {
Expand Down Expand Up @@ -221,6 +223,10 @@ where
collection
}

pub struct SinkContext {
pub fmt_level: tracing::Level,
}

pub trait SinkRender<G>
where
G: Scope<Timestamp = Timestamp>,
Expand All @@ -239,6 +245,7 @@ where
sink_id: GlobalId,
sinked_collection: Collection<G, (Option<Row>, Option<Row>), Diff>,
metrics: &SinkBaseMetrics,
ctx: SinkContext,
) -> Option<Box<dyn Any>>
where
G: Scope<Timestamp = Timestamp>;
Expand Down
5 changes: 5 additions & 0 deletions src/dataflow/src/render/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub(crate) fn import_source<G>(
src_id: GlobalId,
now: NowFn,
base_metrics: &SourceBaseMetrics,
fmt_level: tracing::Level,
) -> (
crate::render::CollectionBundle<G, Row, Timestamp>,
(
Expand Down Expand Up @@ -297,6 +298,7 @@ where
source_persist_config
.as_ref()
.map(|config| config.bindings_config.clone()),
fmt_level,
);
((SourceType::Delimited(ok), ts, err), cap)
}
Expand All @@ -307,6 +309,7 @@ where
source_persist_config
.as_ref()
.map(|config| config.bindings_config.clone()),
fmt_level,
);
((SourceType::Delimited(ok), ts, err), cap)
}
Expand All @@ -317,6 +320,7 @@ where
source_persist_config
.as_ref()
.map(|config| config.bindings_config.clone()),
fmt_level,
);
((SourceType::ByteStream(ok), ts, err), cap)
}
Expand All @@ -327,6 +331,7 @@ where
source_persist_config
.as_ref()
.map(|config| config.bindings_config.clone()),
fmt_level,
);
((SourceType::ByteStream(ok), ts, err), cap)
}
Expand Down
8 changes: 7 additions & 1 deletion src/dataflow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ pub struct Server {
}

/// Initiates a timely dataflow computation, processing materialized commands.
pub fn serve(config: Config) -> Result<(Server, LocalClient), anyhow::Error> {
pub fn serve(
config: Config,
fmt_level: tracing::Level,
) -> Result<(Server, LocalClient), anyhow::Error> {
assert!(config.workers > 0);

let server_metrics = ServerMetrics::register_with(&config.metrics_registry);
Expand Down Expand Up @@ -161,6 +164,7 @@ pub fn serve(config: Config) -> Result<(Server, LocalClient), anyhow::Error> {
now: now.clone(),
dataflow_source_metrics,
dataflow_sink_metrics,
fmt_level,
}
.run()
})
Expand Down Expand Up @@ -214,6 +218,7 @@ where
/// Metrics for the source-specific side of dataflows.
dataflow_source_metrics: SourceBaseMetrics,
dataflow_sink_metrics: SinkBaseMetrics,
fmt_level: tracing::Level,
}

impl<'w, A> Worker<'w, A>
Expand Down Expand Up @@ -731,6 +736,7 @@ where
self.now.clone(),
&self.dataflow_source_metrics,
&self.dataflow_sink_metrics,
self.fmt_level,
);
}
}
Expand Down
Loading

0 comments on commit cdafa32

Please sign in to comment.