Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: Add columnar format to HollowBatchPart #27800

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
"storage_source_decode_fuel": "100000",
"timestamp_oracle": "postgres",
"wait_catalog_consolidation_on_startup": "true",
"persist_batch_record_part_format": "true",
}

DEFAULT_CRDB_ENVIRONMENT = [
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ def __init__(
)
self.flags_with_values["enable_eager_delta_joins"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_columnar_format"] = ["row", "both"]
self.flags_with_values["persist_batch_record_part_format"] = BOOLEAN_FLAG_VALUES

def run(self, exe: Executor) -> bool:
flag_name = self.rng.choice(list(self.flags_with_values.keys()))
Expand Down
16 changes: 15 additions & 1 deletion src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ pub struct BatchBuilderConfig {
pub(crate) batch_delete_enabled: bool,
pub(crate) batch_builder_max_outstanding_parts: usize,
pub(crate) batch_columnar_format: BatchColumnarFormat,
pub(crate) batch_record_part_format: bool,
pub(crate) inline_writes_single_max_bytes: usize,
pub(crate) stats_collection_enabled: bool,
pub(crate) stats_budget: usize,
Expand All @@ -351,7 +352,13 @@ pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
pub(crate) const BATCH_COLUMNAR_FORMAT: Config<&'static str> = Config::new(
"persist_batch_columnar_format",
BatchColumnarFormat::default().as_str(),
"Columnar format for a batch written to Persist, either 'row' or 'both' (Materialize).",
"Columnar format for a batch written to Persist, either 'row', 'both', or 'both_v1' (Materialize).",
);

pub(crate) const BATCH_RECORD_PART_FORMAT: Config<bool> = Config::new(
"persist_batch_record_part_format",
false,
"Wether we record the format of the Part in state (Materialize).",
);

/// A target maximum size of blob payloads in bytes. If a logical "batch" is
Expand Down Expand Up @@ -392,6 +399,7 @@ impl BatchBuilderConfig {
.dynamic
.batch_builder_max_outstanding_parts(),
batch_columnar_format: BatchColumnarFormat::from_str(&BATCH_COLUMNAR_FORMAT.get(value)),
batch_record_part_format: BATCH_RECORD_PART_FORMAT.get(value),
inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
stats_budget: STATS_BUDGET_BYTES.get(value),
Expand Down Expand Up @@ -1123,6 +1131,11 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
}
stats
});
let format = if cfg.batch_record_part_format {
Some(cfg.batch_columnar_format)
} else {
None
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may fail to roundtrip during upgrade, if a batch with a columnar format is written by a new version but retracted by the old version without the proto change.

This is probably fine iff no environments currently have the new columnar format enabled, but otherwise you may want an additional flag?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! I think adding a second flag to control the writing is easier to reason about so I updated the PR to do that


BatchPart::Hollow(HollowBatchPart {
key: partial_key,
Expand All @@ -1131,6 +1144,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
stats,
ts_rewrite,
diffs_sum: cfg.write_diffs_sum.then_some(diffs_sum),
format,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
mz_persist::cfg::all_dyn_configs(configs)
.add(&crate::batch::BATCH_DELETE_ENABLED)
.add(&crate::batch::BATCH_COLUMNAR_FORMAT)
.add(&crate::batch::BATCH_RECORD_PART_FORMAT)
.add(&crate::batch::BLOB_TARGET_SIZE)
.add(&crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES)
.add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES)
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/internal/datadriven.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl<'a> DirectiveArgs<'a> {
stats: None,
ts_rewrite: None,
diffs_sum: None,
format: None,
})
})
.collect(),
Expand Down
30 changes: 29 additions & 1 deletion src/persist-client/src/internal/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use std::sync::Arc;
use bytes::{Buf, Bytes};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::Description;
use mz_ore::cast::CastInto;
use mz_ore::halt;
use mz_persist::indexed::columnar::ColumnarRecords;
use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
use mz_persist::location::{SeqNo, VersionedData};
use mz_persist::metrics::ColumnarMetrics;
use mz_persist_types::stats::{PartStats, ProtoStructStats};
Expand Down Expand Up @@ -1258,6 +1259,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
stats: None,
ts_rewrite: None,
diffs_sum: None,
format: None,
})
}));
Ok(HollowBatch {
Expand All @@ -1279,6 +1281,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
key_stats: x.stats.into_proto(),
ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
format: x.format.map(|f| f.into_proto()),
},
BatchPart::Inline {
updates,
Expand All @@ -1290,6 +1293,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
key_stats: None,
ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
diffs_sum: None,
format: None,
},
}
}
Expand All @@ -1308,6 +1312,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
stats: proto.key_stats.into_rust()?,
ts_rewrite,
diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
format: proto.format.map(|f| f.into_rust()).transpose()?,
}))
}
Some(proto_hollow_batch_part::Kind::Inline(x)) => {
Expand All @@ -1328,6 +1333,27 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
}
}

impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
fn into_proto(&self) -> proto_hollow_batch_part::Format {
match self {
BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
BatchColumnarFormat::Both(version) => {
proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
}
}
}

fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
let format = match proto {
proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
proto_hollow_batch_part::Format::RowAndColumnar(version) => {
BatchColumnarFormat::Both(version.cast_into())
}
};
Ok(format)
}
}

/// Aggregate statistics about data contained in a part.
///
/// These are "lazy" in the sense that we don't decode them (or even validate
Expand Down Expand Up @@ -1620,6 +1646,7 @@ mod tests {
stats: None,
ts_rewrite: None,
diffs_sum: None,
format: None,
})],
4,
vec![],
Expand All @@ -1642,6 +1669,7 @@ mod tests {
stats: None,
ts_rewrite: None,
diffs_sum: None,
format: None,
}));
assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
}
Expand Down
6 changes: 6 additions & 0 deletions src/persist-client/src/internal/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ syntax = "proto3";

package mz_persist_client.internal.state;

import "google/protobuf/empty.proto";
import "persist/src/persist.proto";

message ProtoU64Antichain {
Expand All @@ -39,6 +40,11 @@ message ProtoHollowBatchPart {
uint64 encoded_size_bytes = 2;
bytes key_lower = 3;
optional int64 diffs_sum = 6;
oneof format {
google.protobuf.Empty row = 7;
uint64 row_and_columnar = 8;
}

optional bytes key_stats = 536870906;

reserved 536870907 to 536870911;
Expand Down
31 changes: 24 additions & 7 deletions src/persist-client/src/internal/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use mz_dyncfg::Config;
use mz_ore::cast::CastFrom;
use mz_ore::now::EpochMillis;
use mz_ore::vec::PartialOrdVecExt;
use mz_persist::indexed::encoding::BatchColumnarFormat;
use mz_persist::location::SeqNo;
use mz_persist_types::{Codec, Codec64, Opaque};
use proptest_derive::Arbitrary;
Expand Down Expand Up @@ -318,6 +319,13 @@ pub struct HollowBatchPart<T> {
/// we later decide that's of some benefit.
#[serde(serialize_with = "serialize_diffs_sum")]
pub diffs_sum: Option<[u8; 8]>,
/// Columnar format that this batch was written in.
///
/// This is `None` if this part was written before we started writing structured
/// columnar data, or if the [`BATCH_RECORD_PART_FORMAT`] dyncfg is off.
///
/// [`BATCH_RECORD_PART_FORMAT`]: crate::batch::BATCH_RECORD_PART_FORMAT
pub format: Option<BatchColumnarFormat>,
}

/// A [Batch] but with the updates themselves stored externally.
Expand Down Expand Up @@ -584,6 +592,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
stats: self_stats,
ts_rewrite: self_ts_rewrite,
diffs_sum: self_diffs_sum,
format: self_format,
} = self;
let HollowBatchPart {
key: other_key,
Expand All @@ -592,6 +601,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
stats: other_stats,
ts_rewrite: other_ts_rewrite,
diffs_sum: other_diffs_sum,
format: other_format,
} = other;
(
self_key,
Expand All @@ -600,6 +610,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
self_stats,
self_ts_rewrite.as_ref().map(|x| x.elements()),
self_diffs_sum,
self_format,
)
.cmp(&(
other_key,
Expand All @@ -608,6 +619,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
other_stats,
other_ts_rewrite.as_ref().map(|x| x.elements()),
other_diffs_sum,
other_format,
))
}
}
Expand Down Expand Up @@ -1926,14 +1938,18 @@ pub(crate) mod tests {
any_some_lazy_part_stats(),
any::<Option<T>>(),
any::<[u8; 8]>(),
any::<Option<BatchColumnarFormat>>(),
),
|(key, encoded_size_bytes, key_lower, stats, ts_rewrite, diffs_sum)| HollowBatchPart {
key,
encoded_size_bytes,
key_lower,
stats,
ts_rewrite: ts_rewrite.map(Antichain::from_elem),
diffs_sum: Some(diffs_sum),
|(key, encoded_size_bytes, key_lower, stats, ts_rewrite, diffs_sum, format)| {
HollowBatchPart {
key,
encoded_size_bytes,
key_lower,
stats,
ts_rewrite: ts_rewrite.map(Antichain::from_elem),
diffs_sum: Some(diffs_sum),
format,
}
},
)
}
Expand Down Expand Up @@ -2083,6 +2099,7 @@ pub(crate) mod tests {
stats: None,
ts_rewrite: None,
diffs_sum: None,
format: None,
})
})
.collect(),
Expand Down
Loading