Skip to content

Commit

Permalink
datalake: add additional system fields
Browse files Browse the repository at this point in the history
Adds two more system fields:
- partition: the kafka partition id
- headers: a list of key-value pairs represented as iobufs
  • Loading branch information
andrwng committed Nov 20, 2024
1 parent 5ded2df commit d57236a
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 32 deletions.
35 changes: 29 additions & 6 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ record_multiplexer::operator()(model::record_batch batch) {
first_timestamp + record.timestamp_delta()};
kafka::offset offset{batch.base_offset()() + record.offset_delta()};
int64_t estimated_size = key.size_bytes() + val.size_bytes();
chunked_vector<std::pair<std::optional<iobuf>, std::optional<iobuf>>>
header_kvs;
for (auto& hdr : record.headers()) {
header_kvs.emplace_back(hdr.share_key_opt(), hdr.share_value_opt());
}

auto val_type_res = co_await _type_resolver.resolve_buf_type(
std::move(val));
Expand All @@ -61,7 +66,11 @@ record_multiplexer::operator()(model::record_batch batch) {
case type_resolver::errc::bad_input:
case type_resolver::errc::translation_error:
auto invalid_res = co_await handle_invalid_record(
offset, record.share_key(), record.share_value(), timestamp);
offset,
record.share_key(),
record.share_value(),
timestamp,
std::move(header_kvs));
if (invalid_res.has_error()) {
_error = invalid_res.error();
co_return ss::stop_iteration::yes;
Expand All @@ -71,11 +80,13 @@ record_multiplexer::operator()(model::record_batch batch) {
}

auto record_data_res = co_await record_translator::translate_data(
_ntp.tp.partition,
offset,
std::move(key),
val_type_res.value().type,
std::move(val_type_res.value().parsable_buf),
timestamp);
timestamp,
header_kvs);
if (record_data_res.has_error()) {
switch (record_data_res.error()) {
case record_translator::errc::translation_error:
Expand All @@ -85,7 +96,11 @@ record_multiplexer::operator()(model::record_batch batch) {
offset,
record_data_res.error());
auto invalid_res = co_await handle_invalid_record(
offset, record.share_key(), record.share_value(), timestamp);
offset,
record.share_key(),
record.share_value(),
timestamp,
std::move(header_kvs));
if (invalid_res.has_error()) {
_error = invalid_res.error();
co_return ss::stop_iteration::yes;
Expand All @@ -107,7 +122,8 @@ record_multiplexer::operator()(model::record_batch batch) {
offset,
record.share_key(),
record.share_value(),
timestamp);
timestamp,
std::move(header_kvs));
if (invalid_res.has_error()) {
_error = invalid_res.error();
co_return ss::stop_iteration::yes;
Expand Down Expand Up @@ -189,15 +205,22 @@ record_multiplexer::end_of_stream() {

ss::future<result<std::nullopt_t, writer_error>>
record_multiplexer::handle_invalid_record(
kafka::offset offset, iobuf key, iobuf val, model::timestamp ts) {
kafka::offset offset,
iobuf key,
iobuf val,
model::timestamp ts,
chunked_vector<std::pair<std::optional<iobuf>, std::optional<iobuf>>>
headers) {
vlog(_log.debug, "Handling invalid record {}", offset);
int64_t estimated_size = key.size_bytes() + val.size_bytes();
auto record_data_res = co_await record_translator::translate_data(
_ntp.tp.partition,
offset,
std::move(key),
/*val_type*/ std::nullopt,
std::move(val),
ts);
ts,
headers);
if (record_data_res.has_error()) {
vlog(
_log.error,
Expand Down
8 changes: 6 additions & 2 deletions src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ class record_multiplexer {
// target table.
// TODO: this just writes to the existing table, populating internal
// columns. Consider a separate table entirely.
ss::future<result<std::nullopt_t, writer_error>>
handle_invalid_record(kafka::offset, iobuf, iobuf, model::timestamp);
ss::future<result<std::nullopt_t, writer_error>> handle_invalid_record(
kafka::offset,
iobuf,
iobuf,
model::timestamp,
chunked_vector<std::pair<std::optional<iobuf>, std::optional<iobuf>>>);

prefix_logger _log;
const model::ntp& _ntp;
Expand Down
29 changes: 27 additions & 2 deletions src/v/datalake/record_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "iceberg/avro_utils.h"
#include "iceberg/datatypes.h"
#include "iceberg/values.h"
#include "model/fundamental.h"

#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
Expand Down Expand Up @@ -78,7 +79,7 @@ record_translator::build_type(std::optional<resolved_type> val_type) {
ret_type.fields[0]->type);
// Use the next id of the system defaults.
system_fields.fields.emplace_back(iceberg::nested_field::create(
6, "data", field->required, std::move(field->type)));
11, "data", field->required, std::move(field->type)));
continue;
}
// Add the extra user-defined fields.
Expand All @@ -96,17 +97,41 @@ record_translator::build_type(std::optional<resolved_type> val_type) {

ss::future<checked<iceberg::struct_value, record_translator::errc>>
record_translator::translate_data(
model::partition_id pid,
kafka::offset o,
iobuf key,
const std::optional<resolved_type>& val_type,
iobuf parsable_val,
model::timestamp ts) {
model::timestamp ts,
const chunked_vector<std::pair<std::optional<iobuf>, std::optional<iobuf>>>&
headers) {
auto ret_data = iceberg::struct_value{};
auto system_data = std::make_unique<iceberg::struct_value>();
system_data->fields.emplace_back(iceberg::int_value(pid));
system_data->fields.emplace_back(iceberg::long_value(o));
// NOTE: Kafka uses milliseconds, Iceberg uses microseconds.
system_data->fields.emplace_back(
iceberg::timestamp_value(ts.value() * 1000));

if (headers.empty()) {
system_data->fields.emplace_back(std::nullopt);
} else {
auto headers_list = std::make_unique<iceberg::list_value>();
for (const auto& [k, v] : headers) {
auto header_kv_struct = std::make_unique<iceberg::struct_value>();
header_kv_struct->fields.emplace_back(
k ? std::make_optional<iceberg::value>(
iceberg::binary_value(k->copy()))
: std::nullopt);
header_kv_struct->fields.emplace_back(
v ? std::make_optional<iceberg::value>(
iceberg::binary_value(v->copy()))
: std::nullopt);
headers_list->elements.emplace_back(std::move(header_kv_struct));
}
system_data->fields.emplace_back(std::move(headers_list));
}

system_data->fields.emplace_back(iceberg::binary_value{std::move(key)});
if (val_type.has_value()) {
// Fill in the internal value field.
Expand Down
5 changes: 4 additions & 1 deletion src/v/datalake/record_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ class record_translator {
friend std::ostream& operator<<(std::ostream&, const errc&);
static record_type build_type(std::optional<resolved_type> val_type);
static ss::future<checked<iceberg::struct_value, errc>> translate_data(
model::partition_id pid,
kafka::offset o,
iobuf key,
const std::optional<resolved_type>& val_type,
iobuf parsable_val,
model::timestamp ts);
model::timestamp ts,
const chunked_vector<
std::pair<std::optional<iobuf>, std::optional<iobuf>>>& headers);
};

} // namespace datalake
24 changes: 19 additions & 5 deletions src/v/datalake/table_definition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,27 @@ struct_type schemaless_struct_type() {
using namespace iceberg;
struct_type system_fields;
system_fields.fields.emplace_back(
nested_field::create(2, "offset", field_required::yes, long_type{}));
nested_field::create(2, "partition", field_required::yes, int_type{}));
system_fields.fields.emplace_back(
nested_field::create(3, "offset", field_required::yes, long_type{}));
system_fields.fields.emplace_back(nested_field::create(
4, "timestamp", field_required::yes, timestamp_type{}));

struct_type headers_kv;
headers_kv.fields.emplace_back(
nested_field::create(7, "key", field_required::no, binary_type{}));
headers_kv.fields.emplace_back(
nested_field::create(8, "value", field_required::no, binary_type{}));
system_fields.fields.emplace_back(nested_field::create(
3, "timestamp", field_required::yes, timestamp_type{}));
5,
"headers",
field_required::no,
list_type::create(6, field_required::yes, std::move(headers_kv))));

system_fields.fields.emplace_back(
nested_field::create(4, "key", field_required::no, binary_type{}));
nested_field::create(9, "key", field_required::no, binary_type{}));
system_fields.fields.emplace_back(
nested_field::create(5, "value", field_required::no, binary_type{}));
nested_field::create(10, "value", field_required::no, binary_type{}));
struct_type res;
res.fields.emplace_back(nested_field::create(
1,
Expand All @@ -45,7 +59,7 @@ schema default_schema() {
partition_spec hour_partition_spec() {
chunked_vector<partition_field> fields;
fields.emplace_back(partition_field{
.source_id = nested_field::id_t{3},
.source_id = nested_field::id_t{4},
.field_id = partition_field::id_t{1000},
.name = "redpanda_timestamp_hour",
.transform = hour_transform{},
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/tests/gtest_record_multiplexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ TEST_F(RecordMultiplexerParquetTest, TestSimple) {
// Default columns + a nested struct.
EXPECT_EQ(table->num_columns(), 8);
auto expected_type
= R"(redpanda: struct<offset: int64 not null, timestamp: timestamp[us] not null, key: binary, value: binary> not null
= R"(redpanda: struct<partition: int32 not null, offset: int64 not null, timestamp: timestamp[us] not null, headers: list<element: struct<key: binary, value: binary> not null>, key: binary, value: binary> not null
mylong: int64 not null
nestedrecord: struct<inval1: double not null, inval2: string not null, inval3: int32 not null> not null
myarray: list<element: double not null> not null
Expand Down
29 changes: 16 additions & 13 deletions src/v/datalake/tests/record_multiplexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ TEST_P(RecordMultiplexerParamTest, TestSimpleAvroRecords) {

// 4 default columns + RootRecord + mylong
auto schema = get_current_schema();
EXPECT_EQ(schema->highest_field_id(), 6);
EXPECT_EQ(schema->highest_field_id(), 11);
}

TEST_P(RecordMultiplexerParamTest, TestAvroRecordsMultipleSchemas) {
Expand Down Expand Up @@ -262,7 +262,7 @@ TEST_P(RecordMultiplexerParamTest, TestAvroRecordsMultipleSchemas) {
}
EXPECT_EQ(hrs.size(), GetParam().hrs);
auto schema = get_current_schema();
EXPECT_EQ(schema->highest_field_id(), 16);
EXPECT_EQ(schema->highest_field_id(), 21);
}

INSTANTIATE_TEST_SUITE_P(
Expand Down Expand Up @@ -297,10 +297,12 @@ TEST_F(RecordMultiplexerTest, TestAvroRecordsWithRedpandaField) {

// Add Avro records.
auto start_offset = model::offset{0};
auto res = mux(default_param, start_offset, [&gen](storage::record_batch_builder& b) {
auto res = gen.add_random_avro_record(b, "avro_rp", std::nullopt).get();
ASSERT_FALSE(res.has_error());
});
auto res = mux(
default_param, start_offset, [&gen](storage::record_batch_builder& b) {
auto res
= gen.add_random_avro_record(b, "avro_rp", std::nullopt).get();
ASSERT_FALSE(res.has_error());
});
ASSERT_TRUE(res.has_value());
const auto& write_res = res.value();
EXPECT_EQ(write_res.data_files.size(), default_param.hrs);
Expand All @@ -315,11 +317,12 @@ TEST_F(RecordMultiplexerTest, TestAvroRecordsWithRedpandaField) {
// 1 nested redpanda column + 4 default columns + mylong + 1 user redpanda
// column + 1 nested
auto schema = get_current_schema();
EXPECT_EQ(schema->highest_field_id(), 8);
EXPECT_EQ(schema->highest_field_id(), 13);

// The redpanda system fields should include the 'data' column.
const auto& rp_struct = std::get<iceberg::struct_type>(schema->schema_struct.fields[0]->type);
EXPECT_EQ(5, rp_struct.fields.size());
const auto& rp_struct = std::get<iceberg::struct_type>(
schema->schema_struct.fields[0]->type);
EXPECT_EQ(7, rp_struct.fields.size());
EXPECT_EQ("data", rp_struct.fields.back()->name);
}

Expand All @@ -338,7 +341,7 @@ TEST_F(RecordMultiplexerTest, TestMissingSchema) {
EXPECT_EQ(write_res.data_files.size(), default_param.hrs);

auto schema = get_current_schema();
EXPECT_EQ(schema->highest_field_id(), 5);
EXPECT_EQ(schema->highest_field_id(), 10);
}

TEST_F(RecordMultiplexerTest, TestBadData) {
Expand All @@ -364,7 +367,7 @@ TEST_F(RecordMultiplexerTest, TestBadData) {
// shouldn't register the Avro schema -- instead we should see the default
// schema.
auto schema = get_current_schema();
EXPECT_EQ(schema->highest_field_id(), 5);
EXPECT_EQ(schema->highest_field_id(), 10);
}

TEST_F(RecordMultiplexerTest, TestBadSchemaChange) {
Expand Down Expand Up @@ -397,7 +400,7 @@ TEST_F(RecordMultiplexerTest, TestBadSchemaChange) {

// This should have registered the valid schema.
auto schema = get_current_schema();
EXPECT_EQ(schema->highest_field_id(), 6);
EXPECT_EQ(schema->highest_field_id(), 11);

// Now try writing with an incompatible schema.
res = mux(
Expand All @@ -413,5 +416,5 @@ TEST_F(RecordMultiplexerTest, TestBadSchemaChange) {
const auto& write_res = res.value();
EXPECT_EQ(write_res.data_files.size(), default_param.hrs);
schema = get_current_schema();
EXPECT_EQ(schema->highest_field_id(), 6);
EXPECT_EQ(schema->highest_field_id(), 11);
}
4 changes: 2 additions & 2 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_avro_schema(self, storage_type, query_engine, use_serde_parquet):
trino = dl.trino()
trino_expected_out = [(
'redpanda',
'row(offset bigint, timestamp timestamp(6), key varbinary, value varbinary)',
'row(partition integer, offset bigint, timestamp timestamp(6), headers array(row(key varbinary, value varbinary)), key varbinary, value varbinary)',
'', ''), ('val', 'bigint', '', '')]
trino_describe_out = trino.run_query_fetch_all(
f"describe {table_name}")
Expand All @@ -116,7 +116,7 @@ def test_avro_schema(self, storage_type, query_engine, use_serde_parquet):
spark = dl.spark()
spark_expected_out = [(
'redpanda',
'struct<offset:bigint,timestamp:timestamp_ntz,key:binary,value:binary>',
'struct<partition:int,offset:bigint,timestamp:timestamp_ntz,headers:array<struct<key:binary,value:binary>>,key:binary,value:binary>',
None), ('val', 'bigint', None), ('', '', ''),
('# Partitioning', '', ''),
('Part 0', 'hours(redpanda.timestamp)',
Expand Down

0 comments on commit d57236a

Please sign in to comment.