Skip to content

Commit

Permalink
Merge pull request #27833 from petrosagg/sources-with-dataflow-error
Browse files Browse the repository at this point in the history
storage: switch sources to columnated `DataflowError`
  • Loading branch information
petrosagg authored Jun 24, 2024
2 parents f9818bb + 8c40152 commit 6b86f93
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 103 deletions.
3 changes: 1 addition & 2 deletions src/storage-types/src/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
syntax = "proto3";

import "expr/src/scalar.proto";
import "repr/src/global_id.proto";
import "repr/src/row.proto";
import "storage-types/src/shim.proto";

Expand Down Expand Up @@ -38,7 +37,7 @@ message ProtoSourceErrorDetails {
}

message ProtoSourceError {
mz_repr.global_id.ProtoGlobalId source_id = 1;
reserved 1;
ProtoSourceErrorDetails error = 2;
}

Expand Down
15 changes: 1 addition & 14 deletions src/storage-types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use bytes::BufMut;
use mz_expr::EvalError;
use mz_kafka_util::client::TunnelingClientContext;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::{GlobalId, Row};
use mz_repr::Row;
use mz_ssh_util::tunnel::SshTunnelStatus;
use proptest_derive::Arbitrary;
use prost::Message;
Expand Down Expand Up @@ -336,37 +336,25 @@ impl Display for UpsertError {
/// This should _not_ include transient source errors, like connection issues or misconfigurations.
#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct SourceError {
pub source_id: GlobalId,
pub error: SourceErrorDetails,
}

impl SourceError {
pub fn new(source_id: GlobalId, error: SourceErrorDetails) -> SourceError {
SourceError { source_id, error }
}
}

impl RustType<ProtoSourceError> for SourceError {
fn into_proto(&self) -> ProtoSourceError {
ProtoSourceError {
source_id: Some(self.source_id.into_proto()),
error: Some(self.error.into_proto()),
}
}

fn from_proto(proto: ProtoSourceError) -> Result<Self, TryFromProtoError> {
Ok(SourceError {
source_id: proto
.source_id
.into_rust_if_some("ProtoSourceError::source_id")?,
error: proto.error.into_rust_if_some("ProtoSourceError::error")?,
})
}
}

impl Display for SourceError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: ", self.source_id)?;
self.error.fmt(f)
}
}
Expand Down Expand Up @@ -765,7 +753,6 @@ mod columnation {
DataflowError::SourceError(err) => {
let err: &SourceError = &*err;
let err = SourceError {
source_id: err.source_id,
error: match &err.error {
SourceErrorDetails::Initialization(string) => {
SourceErrorDetails::Initialization(self.string_region.copy(string))
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// https://github.com/tokio-rs/prost/issues/237
#![allow(missing_docs)]

use crate::source::types::{SourceMessage, SourceReaderError};
use crate::source::types::SourceMessage;

pub mod generator;
mod kafka;
Expand Down
9 changes: 5 additions & 4 deletions src/storage/src/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::time::Duration;
use differential_dataflow::{AsCollection, Collection};
use futures::StreamExt;
use mz_repr::{Diff, Row};
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::load_generator::{
Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorSourceConnection,
};
Expand All @@ -24,7 +25,7 @@ use timely::progress::Antichain;

use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
use crate::source::types::{ProgressStatisticsUpdate, SourceRender};
use crate::source::{RawSourceCreationConfig, SourceMessage, SourceReaderError};
use crate::source::{RawSourceCreationConfig, SourceMessage};

mod auction;
mod counter;
Expand Down Expand Up @@ -131,7 +132,7 @@ impl GeneratorKind {
committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
Collection<G, (usize, Result<SourceMessage, SourceReaderError>), Diff>,
Collection<G, (usize, Result<SourceMessage, DataflowError>), Diff>,
Option<Stream<G, Infallible>>,
Stream<G, HealthStatusMessage>,
Stream<G, ProgressStatisticsUpdate>,
Expand Down Expand Up @@ -173,7 +174,7 @@ impl SourceRender for LoadGeneratorSourceConnection {
committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
Collection<G, (usize, Result<SourceMessage, SourceReaderError>), Diff>,
Collection<G, (usize, Result<SourceMessage, DataflowError>), Diff>,
Option<Stream<G, Infallible>>,
Stream<G, HealthStatusMessage>,
Stream<G, ProgressStatisticsUpdate>,
Expand All @@ -199,7 +200,7 @@ fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
required_exports: usize,
) -> (
Collection<G, (usize, Result<SourceMessage, SourceReaderError>), Diff>,
Collection<G, (usize, Result<SourceMessage, DataflowError>), Diff>,
Option<Stream<G, Infallible>>,
Stream<G, HealthStatusMessage>,
Stream<G, ProgressStatisticsUpdate>,
Expand Down
9 changes: 5 additions & 4 deletions src/storage/src/source/generator/key_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use differential_dataflow::{AsCollection, Collection};
use futures::stream::StreamExt;
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, Row};
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::load_generator::KeyValueLoadGenerator;
use mz_storage_types::sources::{MzOffset, SourceTimestamp};
use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
Expand All @@ -25,7 +26,7 @@ use timely::progress::Antichain;

use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
use crate::source::types::ProgressStatisticsUpdate;
use crate::source::{RawSourceCreationConfig, SourceMessage, SourceReaderError};
use crate::source::{RawSourceCreationConfig, SourceMessage};

pub fn render<G: Scope<Timestamp = MzOffset>>(
key_value: KeyValueLoadGenerator,
Expand All @@ -34,7 +35,7 @@ pub fn render<G: Scope<Timestamp = MzOffset>>(
committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
Collection<G, (usize, Result<SourceMessage, SourceReaderError>), Diff>,
Collection<G, (usize, Result<SourceMessage, DataflowError>), Diff>,
Option<Stream<G, Infallible>>,
Stream<G, HealthStatusMessage>,
Stream<G, ProgressStatisticsUpdate>,
Expand Down Expand Up @@ -323,7 +324,7 @@ impl TransactionalSnapshotProducer {
value_buffer: &'a mut Vec<u8>,
) -> impl Iterator<
Item = (
(usize, Result<SourceMessage, SourceReaderError>),
(usize, Result<SourceMessage, DataflowError>),
MzOffset,
Diff,
),
Expand Down Expand Up @@ -447,7 +448,7 @@ impl UpdateProducer {
u64,
impl Iterator<
Item = (
(usize, Result<SourceMessage, SourceReaderError>),
(usize, Result<SourceMessage, DataflowError>),
MzOffset,
Diff,
),
Expand Down
42 changes: 26 additions & 16 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use anyhow::anyhow;
use chrono::{DateTime, NaiveDateTime};
use differential_dataflow::{AsCollection, Collection};
use futures::StreamExt;
Expand All @@ -27,7 +26,9 @@ use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle};
use mz_repr::adt::timestamp::CheckedTimestamp;
use mz_repr::{adt::jsonb::Jsonb, Datum, Diff, GlobalId, Row};
use mz_ssh_util::tunnel::SshTunnelStatus;
use mz_storage_types::errors::ContextCreationError;
use mz_storage_types::errors::{
ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
};
use mz_storage_types::sources::kafka::{
KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
};
Expand Down Expand Up @@ -55,7 +56,7 @@ use tracing::{error, info, trace, warn};
use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
use crate::metrics::source::kafka::KafkaSourceMetrics;
use crate::source::types::{ProgressStatisticsUpdate, SourceRender};
use crate::source::{RawSourceCreationConfig, SourceMessage, SourceReaderError};
use crate::source::{RawSourceCreationConfig, SourceMessage};

#[derive(Default)]
struct HealthStatus {
Expand Down Expand Up @@ -160,7 +161,7 @@ impl SourceRender for KafkaSourceConnection {
resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
Collection<G, (usize, Result<SourceMessage, SourceReaderError>), Diff>,
Collection<G, (usize, Result<SourceMessage, DataflowError>), Diff>,
Option<Stream<G, Infallible>>,
Stream<G, HealthStatusMessage>,
Stream<G, ProgressStatisticsUpdate>,
Expand Down Expand Up @@ -541,10 +542,12 @@ impl SourceRender for KafkaSourceConnection {
if !PartialOrder::less_equal(data_cap.time(), &future_ts) {
let prev_pid_count = prev_pid_info.map(|info| info.len()).unwrap_or(0);
let pid_count = partitions.len();
let err = SourceReaderError::other_definite(anyhow!(
"topic was recreated: partition \
let err = DataflowError::SourceError(Box::new(SourceError {
error: SourceErrorDetails::Other(format!(
"topic was recreated: partition \
count regressed from {prev_pid_count} to {pid_count}"
));
)),
}));
let time = data_cap.time().clone();
data_output.give(&data_cap, ((0, Err(err)), time, 1)).await;
return;
Expand All @@ -555,12 +558,13 @@ impl SourceRender for KafkaSourceConnection {
for (pid, prev_watermarks) in prev_pid_info {
let watermarks = &partitions[&pid];
if !(prev_watermarks.high <= watermarks.high) {
let err = SourceReaderError::other_definite(anyhow!(
"topic was recreated: high watermark of \
let err = DataflowError::SourceError(Box::new(SourceError {
error: SourceErrorDetails::Other(format!(
"topic was recreated: high watermark of \
partition {pid} regressed from {} to {}",
prev_watermarks.high,
watermarks.high
));
prev_watermarks.high, watermarks.high
)),
}));
let time = data_cap.time().clone();
data_output.give(&data_cap, ((0, Err(err)), time, 1)).await;
return;
Expand Down Expand Up @@ -656,8 +660,11 @@ impl SourceRender for KafkaSourceConnection {
if let Some((msg, time, diff)) = reader.handle_message(message, ts) {
let pid = time.interval().singleton().unwrap().unwrap_exact();
let part_cap = &reader.partition_capabilities[pid].data;
let msg =
msg.map_err(|e| SourceReaderError::other_definite(e.into()));
let msg = msg.map_err(|e| {
DataflowError::SourceError(Box::new(SourceError {
error: SourceErrorDetails::Other(format!("{}", e)),
}))
});
data_output.give(part_cap, ((0, msg), time, diff)).await;
}
}
Expand All @@ -678,8 +685,11 @@ impl SourceRender for KafkaSourceConnection {
Ok(Some((msg, time, diff))) => {
let pid = time.interval().singleton().unwrap().unwrap_exact();
let part_cap = &reader.partition_capabilities[pid].data;
let msg =
msg.map_err(|e| SourceReaderError::other_definite(e.into()));
let msg = msg.map_err(|e| {
DataflowError::SourceError(Box::new(SourceError {
error: SourceErrorDetails::Other(format!("{}", e)),
}))
});
data_output.give(part_cap, ((0, msg), time, diff)).await;
}
Ok(None) => continue,
Expand Down
13 changes: 7 additions & 6 deletions src/storage/src/source/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use std::io;
use std::rc::Rc;

use differential_dataflow::Collection;
use mz_storage_types::errors::{DataflowError, SourceError};
use mz_storage_types::sources::SourceExport;
use serde::{Deserialize, Serialize};
use timely::container::CapacityContainerBuilder;
Expand All @@ -79,7 +80,7 @@ use mz_timely_util::order::Extrema;

use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
use crate::source::types::{ProgressStatisticsUpdate, SourceRender};
use crate::source::{RawSourceCreationConfig, SourceMessage, SourceReaderError};
use crate::source::{RawSourceCreationConfig, SourceMessage};

mod replication;
mod schemas;
Expand All @@ -100,7 +101,7 @@ impl SourceRender for MySqlSourceConnection {
resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
_start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
Collection<G, (usize, Result<SourceMessage, SourceReaderError>), Diff>,
Collection<G, (usize, Result<SourceMessage, DataflowError>), Diff>,
Option<Stream<G, Infallible>>,
Stream<G, HealthStatusMessage>,
Stream<G, ProgressStatisticsUpdate>,
Expand Down Expand Up @@ -275,11 +276,11 @@ pub enum DefiniteError {
ServerConfigurationError(String),
}

impl From<DefiniteError> for SourceReaderError {
impl From<DefiniteError> for DataflowError {
fn from(err: DefiniteError) -> Self {
SourceReaderError {
inner: SourceErrorDetails::Other(err.to_string()),
}
DataflowError::SourceError(Box::new(SourceError {
error: SourceErrorDetails::Other(err.to_string()),
}))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/source/mysql/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ use mz_mysql_util::{
use mz_ore::cast::CastFrom;
use mz_ore::result::ResultExt;
use mz_repr::{Diff, GlobalId, Row};
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::mysql::{gtid_set_frontier, GtidPartition, GtidState};
use mz_storage_types::sources::MySqlSourceConnection;
use mz_timely_util::builder_async::{
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};

use crate::metrics::source::mysql::MySqlSourceMetrics;
use crate::source::types::SourceReaderError;
use crate::source::RawSourceCreationConfig;

use super::{
Expand Down Expand Up @@ -105,7 +105,7 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
rewind_stream: &Stream<G, RewindRequest>,
metrics: MySqlSourceMetrics,
) -> (
Collection<G, (usize, Result<Row, SourceReaderError>), Diff>,
Collection<G, (usize, Result<Row, DataflowError>), Diff>,
Stream<G, Infallible>,
Stream<G, ReplicationError>,
PressOnDropButton,
Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/source/mysql/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,14 @@ use mz_ore::future::InTask;
use mz_ore::metrics::MetricsFutureExt;
use mz_ore::result::ResultExt;
use mz_repr::{Diff, Row};
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::mysql::{gtid_set_frontier, GtidPartition};
use mz_storage_types::sources::MySqlSourceConnection;
use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};

use crate::metrics::source::mysql::MySqlSnapshotMetrics;
use crate::source::types::ProgressStatisticsUpdate;
use crate::source::{RawSourceCreationConfig, SourceReaderError};
use crate::source::RawSourceCreationConfig;

use super::schemas::verify_schemas;
use super::{
Expand All @@ -124,7 +125,7 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
subsources: Vec<SubsourceInfo>,
metrics: MySqlSnapshotMetrics,
) -> (
Collection<G, (usize, Result<Row, SourceReaderError>), Diff>,
Collection<G, (usize, Result<Row, DataflowError>), Diff>,
Stream<G, RewindRequest>,
Stream<G, ProgressStatisticsUpdate>,
Stream<G, ReplicationError>,
Expand Down
Loading

0 comments on commit 6b86f93

Please sign in to comment.