Skip to content

Commit

Permalink
refine docs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Oct 8, 2023
1 parent e531925 commit 9ba70d5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
35 changes: 19 additions & 16 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ impl SourceStreamChunkBuilder {

/// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`],
/// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically.
///
/// Callers are supposed to call one of the `insert`, `delete` or `update` methods to write a record,
/// providing a closure that produces one or two [`Datum`]s by corresponding [`SourceColumnDesc`].
/// Specifically,
/// - only columns with [`SourceColumnType::Normal`] need to be handled;
/// - errors for non-primary key columns will be ignored and filled with default value instead;
/// - other errors will be propagated.
pub struct SourceStreamChunkRowWriter<'a> {
descs: &'a [SourceColumnDesc],
builders: &'a mut [ArrayBuilderImpl],
Expand Down Expand Up @@ -289,8 +296,10 @@ impl SourceStreamChunkRowWriter<'_> {
if let Some(meta_value) =
(self.row_meta.as_ref()).and_then(|row_meta| row_meta.value_for_column(desc))
{
// For meta columns, fill in the meta data.
Ok(A::output_for(meta_value))
} else {
// For normal columns, call the user provided closure.
match f(desc) {
Ok(output) => Ok(output),

Expand All @@ -301,7 +310,7 @@ impl SourceStreamChunkRowWriter<'_> {
// TODO: figure out a way to fill in not-null default value if user specifies one
// TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message)
tracing::warn!(
?error,
%error,
column = desc.name,
"failed to parse non-pk column, padding with `NULL`"
);
Expand Down Expand Up @@ -337,38 +346,32 @@ impl SourceStreamChunkRowWriter<'_> {
}
}

/// Write an `Insert` record to the [`StreamChunk`].
///
/// # Arguments
/// Write an `Insert` record to the [`StreamChunk`], with the given fallible closure that
/// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
///
/// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`].
/// Callers only need to handle columns with the type [`SourceColumnType::Normal`].
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
pub fn insert(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> AccessResult<Datum>,
) -> AccessResult<()> {
self.do_action::<OpActionInsert>(f)
}

/// Write a `Delete` record to the [`StreamChunk`].
/// Write a `Delete` record to the [`StreamChunk`], with the given fallible closure that
/// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
///
/// # Arguments
///
/// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`].
/// Callers only need to handle columns with the type [`SourceColumnType::Normal`].
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
pub fn delete(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> AccessResult<Datum>,
) -> AccessResult<()> {
self.do_action::<OpActionDelete>(f)
}

/// Write a `Update` record to the [`StreamChunk`].
///
/// # Arguments
/// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that
/// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`].
///
/// * `f`: A failable closure that produced two [`Datum`]s as old and new value by corresponding
/// [`SourceColumnDesc`]. Callers only need to handle columns with the type [`SourceColumnType::Normal`].
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
pub fn update(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> AccessResult<(Datum, Datum)>,
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,9 +688,7 @@ impl GlobalBarrierManager {
};

// Tracing related stuff
prev_epoch.span().in_scope(|| {
tracing::info!(target: "rw_tracing", epoch = curr_epoch.value().0, "new barrier enqueued");
});
tracing::info!(target: "rw_tracing", parent: prev_epoch.span(), epoch = curr_epoch.value().0, "new barrier enqueued");
span.record("epoch", curr_epoch.value().0);

let command_ctx = Arc::new(CommandContext::new(
Expand Down

0 comments on commit 9ba70d5

Please sign in to comment.