diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index bdacac4c59c5f..04a6c3b0b2c05 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -129,6 +129,13 @@ impl SourceStreamChunkBuilder { /// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`], /// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically. +/// +/// Callers are supposed to call one of the `insert`, `delete` or `update` methods to write a record, +/// providing a closure that produces one or two [`Datum`]s by corresponding [`SourceColumnDesc`]. +/// Specifically, +/// - only columns with [`SourceColumnType::Normal`] need to be handled; +/// - errors for non-primary key columns will be ignored and filled with default value instead; +/// - other errors will be propagated. pub struct SourceStreamChunkRowWriter<'a> { descs: &'a [SourceColumnDesc], builders: &'a mut [ArrayBuilderImpl], @@ -289,8 +296,10 @@ impl SourceStreamChunkRowWriter<'_> { if let Some(meta_value) = (self.row_meta.as_ref()).and_then(|row_meta| row_meta.value_for_column(desc)) { + // For meta columns, fill in the meta data. Ok(A::output_for(meta_value)) } else { + // For normal columns, call the user provided closure. match f(desc) { Ok(output) => Ok(output), @@ -301,7 +310,7 @@ impl SourceStreamChunkRowWriter<'_> { // TODO: figure out a way to fill in not-null default value if user specifies one // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message) tracing::warn!( - ?error, + %error, column = desc.name, "failed to parse non-pk column, padding with `NULL`" ); @@ -337,12 +346,10 @@ impl SourceStreamChunkRowWriter<'_> { } } - /// Write an `Insert` record to the [`StreamChunk`]. - /// - /// # Arguments + /// Write an `Insert` record to the [`StreamChunk`], with the given fallible closure that + /// produces one [`Datum`] by corresponding [`SourceColumnDesc`]. /// - /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. - /// Callers only need to handle columns with the type [`SourceColumnType::Normal`]. + /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details. pub fn insert( &mut self, f: impl FnMut(&SourceColumnDesc) -> AccessResult, @@ -350,12 +357,10 @@ impl SourceStreamChunkRowWriter<'_> { self.do_action::(f) } - /// Write a `Delete` record to the [`StreamChunk`]. + /// Write a `Delete` record to the [`StreamChunk`], with the given fallible closure that + /// produces one [`Datum`] by corresponding [`SourceColumnDesc`]. /// - /// # Arguments - /// - /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. - /// Callers only need to handle columns with the type [`SourceColumnType::Normal`]. + /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details. pub fn delete( &mut self, f: impl FnMut(&SourceColumnDesc) -> AccessResult, @@ -363,12 +368,10 @@ impl SourceStreamChunkRowWriter<'_> { self.do_action::(f) } - /// Write a `Update` record to the [`StreamChunk`]. - /// - /// # Arguments + /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that + /// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`]. /// - /// * `f`: A failable closure that produced two [`Datum`]s as old and new value by corresponding - /// [`SourceColumnDesc`]. Callers only need to handle columns with the type [`SourceColumnType::Normal`]. + /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details. pub fn update( &mut self, f: impl FnMut(&SourceColumnDesc) -> AccessResult<(Datum, Datum)>, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index cd3ee0360009f..e59225c8de510 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -688,9 +688,7 @@ impl GlobalBarrierManager { }; // Tracing related stuff - prev_epoch.span().in_scope(|| { - tracing::info!(target: "rw_tracing", epoch = curr_epoch.value().0, "new barrier enqueued"); - }); + tracing::info!(target: "rw_tracing", parent: prev_epoch.span(), epoch = curr_epoch.value().0, "new barrier enqueued"); span.record("epoch", curr_epoch.value().0); let command_ctx = Arc::new(CommandContext::new(