Skip to content

Commit

Permalink
Merge pull request redpanda-data#14743 from rockwotj/make-batch
Browse files Browse the repository at this point in the history
wasm: cleanup batching
  • Loading branch information
rockwotj authored Nov 7, 2023
2 parents b45438b + 8499ce6 commit da8387e
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 53 deletions.
73 changes: 60 additions & 13 deletions src/v/model/tests/transform_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,22 @@
*/

#include "model/fundamental.h"
#include "model/record_batch_types.h"
#include "model/record_utils.h"
#include "model/tests/random_batch.h"
#include "model/tests/randoms.h"
#include "model/timestamp.h"
#include "model/transform.h"
#include "random/generators.h"
#include "test_utils/randoms.h"
#include "units.h"

#include <seastar/core/chunked_fifo.hh>

#include <gtest/gtest.h>

#include <initializer_list>
#include <math.h>
#include <utility>

namespace model {
Expand Down Expand Up @@ -140,25 +145,27 @@ void append_vint_to_iobuf(iobuf& b, int64_t v) {
auto vb = vint::to_bytes(v);
b.append(vb.data(), vb.size());
}
} // namespace

TEST(TransformedDataTest, Serialize) {
auto src = model::test::make_random_record(
0, random_generators::make_iobuf());
std::optional<transformed_data> noop_transformed_data(const model::record& r) {
iobuf payload;
append_vint_to_iobuf(payload, src.key_size());
payload.append(src.key().copy());
append_vint_to_iobuf(payload, src.value_size());
payload.append(src.value().copy());
append_vint_to_iobuf(payload, int64_t(src.headers().size()));
for (const auto& header : src.headers()) {
append_vint_to_iobuf(payload, r.key_size());
payload.append(r.key().copy());
append_vint_to_iobuf(payload, r.value_size());
payload.append(r.value().copy());
append_vint_to_iobuf(payload, int64_t(r.headers().size()));
for (const auto& header : r.headers()) {
append_vint_to_iobuf(payload, header.key_size());
payload.append(header.key().copy());
append_vint_to_iobuf(payload, header.value_size());
payload.append(header.value().copy());
}
auto validated = model::transformed_data::create_validated(
std::move(payload));
return model::transformed_data::create_validated(std::move(payload));
}
} // namespace

TEST(TransformedDataTest, Serialize) {
auto src = model::test::make_random_record(
0, random_generators::make_iobuf());
auto validated = noop_transformed_data(src);
ASSERT_TRUE(validated.has_value());
auto got = std::move(validated.value())
.to_serialized_record(
Expand All @@ -170,4 +177,44 @@ TEST(TransformedDataTest, Serialize) {
<< want.hexdump(1_KiB);
}

TEST(TransformedDataTest, MakeBatch) {
auto batch = test::make_random_batch({
.allow_compression = false,
.count = 4,
});
ss::chunked_fifo<transformed_data> transformed;
for (const auto& r : batch.copy_records()) {
transformed.push_back(noop_transformed_data(r).value());
}
auto now = model::timestamp::now();
auto transformed_batch = transformed_data::make_batch(
now, std::move(transformed));
EXPECT_EQ(transformed_batch.header().first_timestamp, now);
EXPECT_EQ(transformed_batch.header().max_timestamp, now);
EXPECT_EQ(transformed_batch.header().producer_id, -1);
EXPECT_EQ(
transformed_batch.header().type, model::record_batch_type::raft_data);
EXPECT_EQ(transformed_batch.header().record_count, 4);
EXPECT_EQ(
transformed_batch.header().crc,
model::crc_record_batch(transformed_batch));
EXPECT_EQ(
transformed_batch.header().header_crc,
model::internal_header_only_crc(transformed_batch.header()));
EXPECT_EQ(
transformed_batch.header().size_bytes, transformed_batch.size_bytes());
auto expected_records = batch.copy_records();
auto actual_records = transformed_batch.copy_records();
for (auto i = 0; i < expected_records.size(); ++i) {
EXPECT_EQ(actual_records[i].key(), expected_records[i].key());
EXPECT_EQ(actual_records[i].value(), expected_records[i].value());
EXPECT_EQ(actual_records[i].headers(), expected_records[i].headers());
EXPECT_EQ(
actual_records[i].offset_delta(), expected_records[i].offset_delta());
// Timestamps are different than what the test helper makes and that's
// OK.
EXPECT_EQ(actual_records[i].timestamp_delta(), 0);
}
}

} // namespace model
39 changes: 39 additions & 0 deletions src/v/model/transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include "bytes/iobuf_parser.h"
#include "model/fundamental.h"
#include "model/record.h"
#include "model/record_batch_types.h"
#include "utils/vint.h"

#include <seastar/core/print.hh>
Expand Down Expand Up @@ -159,6 +161,43 @@ std::optional<transformed_data> transformed_data::create_validated(iobuf buf) {
return transformed_data(std::move(buf));
}

model::record_batch transformed_data::make_batch(
model::timestamp timestamp, ss::chunked_fifo<transformed_data> records) {
model::record_batch::compressed_records serialized_records;
int32_t i = 0;
for (model::transformed_data& r : records) {
serialized_records.append_fragments(std::move(r).to_serialized_record(
model::record_attributes(),
/*timestamp_delta=*/0,
/*offset_delta=*/i++));
}

model::record_batch_header header;
header.type = record_batch_type::raft_data;
// mark the batch as created with broker time
header.attrs.set_timestamp_type(model::timestamp_type::append_time);
header.first_timestamp = timestamp;
header.max_timestamp = timestamp;
// disable idempotent producing, we don't currently use that within
// transforms.
header.producer_id = -1;

header.record_count = i;
header.size_bytes = int32_t(
model::packed_record_batch_header_size + serialized_records.size_bytes());

auto batch = model::record_batch(
header,
std::move(serialized_records),
model::record_batch::tag_ctor_ng{});

// Recompute the crc
batch.header().crc = model::crc_record_batch(batch);
batch.header().header_crc = model::internal_header_only_crc(batch.header());

return batch;
}

iobuf transformed_data::to_serialized_record(
record_attributes attrs, int64_t timestamp_delta, int32_t offset_delta) && {
iobuf out;
Expand Down
7 changes: 7 additions & 0 deletions src/v/model/transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "serde/envelope.h"
#include "utils/named_type.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/sstring.hh>

#include <absl/container/btree_map.h>
Expand Down Expand Up @@ -226,6 +227,12 @@ class transformed_data {
*/
static std::optional<transformed_data> create_validated(iobuf);

/**
* Create a batch from transformed_data.
*/
static model::record_batch
make_batch(model::timestamp, ss::chunked_fifo<transformed_data>);

/**
* Generate a serialized record from the following metadata.
*/
Expand Down
3 changes: 3 additions & 0 deletions src/v/transform/rpc/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ client::client(

ss::future<cluster::errc> client::produce(
model::topic_partition tp, ss::chunked_fifo<model::record_batch> batches) {
if (batches.empty()) {
co_return cluster::errc::success;
}
produce_request req;
req.topic_data.emplace_back(std::move(tp), std::move(batches));
req.timeout = timeout;
Expand Down
19 changes: 19 additions & 0 deletions src/v/transform/tests/test_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
#include "transform/tests/test_fixture.h"

#include "model/fundamental.h"
#include "model/record.h"
#include "model/transform.h"
#include "transform/logger.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/lowres_clock.hh>

#include <gtest/gtest.h>

#include <cstdint>
#include <exception>
#include <iostream>

Expand Down Expand Up @@ -64,13 +67,29 @@ ss::future<> fake_source::push_batch(model::record_batch batch) {
_cond_var.broadcast();
co_return;
}

ss::future<> fake_wasm_engine::start() { return ss::now(); }

ss::future<> fake_wasm_engine::stop() { return ss::now(); }

ss::future<> fake_offset_tracker::stop() { co_return; }

ss::future<> fake_offset_tracker::start() { co_return; }

void fake_wasm_engine::set_mode(mode m) { _mode = m; }

ss::future<model::record_batch>
fake_wasm_engine::transform(model::record_batch batch, wasm::transform_probe*) {
switch (_mode) {
case mode::noop:
co_return batch;
case mode::filter: {
co_return model::transformed_data::make_batch(
batch.header().max_timestamp, {});
}
}
}

ss::future<> fake_offset_tracker::commit_offset(kafka::offset o) {
_committed = o;
_cond_var.broadcast();
Expand Down
14 changes: 11 additions & 3 deletions src/v/transform/tests/test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,21 @@ static const model::transform_metadata my_metadata{

class fake_wasm_engine : public wasm::engine {
public:
enum class mode {
noop,
filter,
};

ss::future<model::record_batch>
transform(model::record_batch batch, wasm::transform_probe*) override {
co_return batch;
}
transform(model::record_batch batch, wasm::transform_probe*) override;

void set_mode(mode m);

ss::future<> start() override;
ss::future<> stop() override;

private:
mode _mode = mode::noop;
};

class fake_source : public source {
Expand Down
39 changes: 36 additions & 3 deletions src/v/transform/tests/transform_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace {
class ProcessorTestFixture : public ::testing::Test {
public:
void SetUp() override {
ss::shared_ptr<wasm::engine> engine
= ss::make_shared<testing::fake_wasm_engine>();
auto engine = ss::make_shared<testing::fake_wasm_engine>();
_engine = engine.get();
auto src = std::make_unique<testing::fake_source>();
_src = src.get();
auto sink = std::make_unique<testing::fake_sink>();
Expand All @@ -56,10 +56,22 @@ class ProcessorTestFixture : public ::testing::Test {
// processor is actually ready, otherwise it could be possible that
// the processor picks up after the initial records are added to the
// partition.
_offset_tracker->wait_for_committed_offset({}).get();
wait_for_committed_offset(kafka::offset{});
}
void TearDown() override { _p->stop().get(); }

void wait_for_committed_offset(model::offset o) {
wait_for_committed_offset(model::offset_cast(o));
}
void wait_for_committed_offset(kafka::offset o) {
_offset_tracker->wait_for_committed_offset(o).get();
}

using transform_mode = testing::fake_wasm_engine::mode;
void set_transform_mode(testing::fake_wasm_engine::mode m) {
_engine->set_mode(m);
}

model::record_batch make_tiny_batch() {
return model::test::make_random_batch(model::test::record_batch_spec{
.offset = kafka::offset_cast(++_offset),
Expand All @@ -82,6 +94,7 @@ class ProcessorTestFixture : public ::testing::Test {

kafka::offset _offset = start_offset;
std::unique_ptr<transform::processor> _p;
testing::fake_wasm_engine* _engine;
testing::fake_source* _src;
testing::fake_offset_tracker* _offset_tracker;
std::vector<testing::fake_sink*> _sinks;
Expand Down Expand Up @@ -143,4 +156,24 @@ TEST_F(ProcessorTestFixture, TracksOffsets) {
EXPECT_EQ(error_count(), 0);
}

TEST_F(ProcessorTestFixture, HandlesEmptyBatches) {
auto batch_one = make_tiny_batch();
push_batch(batch_one.copy());
wait_for_committed_offset(batch_one.last_offset());
EXPECT_EQ(read_batch(), batch_one);

auto batch_two = make_tiny_batch();
set_transform_mode(transform_mode::filter);
push_batch(batch_two.copy());
// We never will read batch two, it was filtered out
// but we should still get a commit for batch two
wait_for_committed_offset(batch_two.last_offset());

auto batch_three = make_tiny_batch();
set_transform_mode(transform_mode::noop);
push_batch(batch_three.copy());
wait_for_committed_offset(batch_three.last_offset());
EXPECT_EQ(read_batch(), batch_three);
}

} // namespace transform
4 changes: 3 additions & 1 deletion src/v/transform/transform_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ drain_queue(ss::queue<transformed_batch>* queue, probe* p) {
p->increment_write_bytes(batch_size);
auto transformed = queue->pop();
result.latest_offset = transformed.input_offset;
result.batches.push_back(std::move(transformed.batch));
if (transformed.batch.record_count() > 0) {
result.batches.push_back(std::move(transformed.batch));
}
}
co_return result;
}
Expand Down
35 changes: 2 additions & 33 deletions src/v/wasm/wasmtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,46 +620,15 @@ class wasmtime_engine : public engine {
ss::future<model::record_batch>
invoke_transform(model::record_batch batch, transform_probe* p) {
std::unique_ptr<transform_probe::hist_t::measurement> m;
model::record_batch_header header = batch.header().copy();
auto transformed = co_await _transform_module.for_each_record_async(
std::move(batch),
[this, &m, p, ctx = wasmtime_store_context(_store.get())]() {
reset_fuel(ctx);
m = p->latency_measurement();
});
m = nullptr;

model::record_batch::compressed_records serialized_records;
int32_t i = 0;
for (model::transformed_data& r : transformed) {
serialized_records.append_fragments(
std::move(r).to_serialized_record(
model::record_attributes(),
/*timestamp_delta=*/0,
/*offset_delta=*/i++));
}

header.record_count = i;
header.size_bytes = int32_t(
model::packed_record_batch_header_size
+ serialized_records.size_bytes());

// mark the batch as created with broker time
header.attrs.set_timestamp_type(model::timestamp_type::append_time);
header.first_timestamp = model::timestamp::now();
header.max_timestamp = header.first_timestamp;

batch = model::record_batch(
header,
std::move(serialized_records),
model::record_batch::tag_ctor_ng{});

// Recompute the crc
batch.header().crc = model::crc_record_batch(batch);
batch.header().header_crc = model::internal_header_only_crc(
batch.header());

co_return batch;
co_return model::transformed_data::make_batch(
model::timestamp::now(), std::move(transformed));
}

wasmtime_runtime* _runtime;
Expand Down

0 comments on commit da8387e

Please sign in to comment.