Skip to content

Commit

Permalink
iceberg: pull ownership of iobuf out of avro_iobuf_ostream
Browse files Browse the repository at this point in the history
When using a DataFile{Writer,Reader}, we'll need to be able to pass the
buffer used by the writer. This is difficult to do with the current
interface of avro_iobuf_ostream, which is std::moved into the writer.

So, this moves the buffer outside of the ostream, which makes it easier
for callers to manage the iobuf.

Note, it doesn't appear the trimming I was doing on release() was needed
after all.
  • Loading branch information
andrwng committed Jul 17, 2024
1 parent d48ae03 commit e6d456c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 15 deletions.
16 changes: 5 additions & 11 deletions src/v/iceberg/avro_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,24 @@
// iobuf that can be released.
class avro_iobuf_ostream : public avro::OutputStream {
public:
explicit avro_iobuf_ostream(size_t chunk_size)
explicit avro_iobuf_ostream(size_t chunk_size, iobuf* buf)
: chunk_size_(chunk_size)
, buf_(buf)
, available_(0)
, byte_count_(0) {}
~avro_iobuf_ostream() override = default;

iobuf release() {
buf_.trim_back(buf_.size_bytes() - byte_count_);
available_ = 0;
byte_count_ = 0;
return std::exchange(buf_, iobuf{});
}

// If there's no available space in the buffer, allocates `chunk_size_`
// bytes at the end of the buffer.
//
// Returns the current position in the buffer, and the available remaining
// space.
bool next(uint8_t** data, size_t* len) final {
if (available_ == 0) {
buf_.append(ss::temporary_buffer<char>{chunk_size_});
buf_->append(ss::temporary_buffer<char>{chunk_size_});
available_ = chunk_size_;
}
auto back_frag = buf_.rbegin();
auto back_frag = buf_->rbegin();
*data = reinterpret_cast<uint8_t*>(
back_frag->share(chunk_size_ - available_, available_).get_write());
*len = available_;
Expand All @@ -62,7 +56,7 @@ class avro_iobuf_ostream : public avro::OutputStream {
// Size in bytes with which to allocate new fragments.
const size_t chunk_size_;

iobuf buf_;
iobuf* buf_;

// Bytes remaining in the last fragment in the buffer.
size_t available_;
Expand Down
8 changes: 4 additions & 4 deletions src/v/iceberg/tests/manifest_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ TEST(ManifestSerializationTest, TestManifestEntry) {
entry.data_file.record_count = 3;
entry.data_file.file_size_in_bytes = 1024;

auto out = std::make_unique<avro_iobuf_ostream>(4096);
iobuf buf;
auto out = std::make_unique<avro_iobuf_ostream>(4096, &buf);

// Encode to the output stream.
avro::EncoderPtr encoder = avro::binaryEncoder();
Expand All @@ -38,7 +39,6 @@ TEST(ManifestSerializationTest, TestManifestEntry) {
encoder->flush();

// Decode the iobuf from the input stream.
auto buf = out->release();
auto in = std::make_unique<avro_iobuf_istream>(std::move(buf));
avro::DecoderPtr decoder = avro::binaryDecoder();
decoder->init(*in);
Expand Down Expand Up @@ -69,7 +69,8 @@ TEST(ManifestSerializationTest, TestManifestFile) {
manifest.existing_rows_count = 10;
manifest.deleted_rows_count = 11;

auto out = std::make_unique<avro_iobuf_ostream>(4096);
iobuf buf;
auto out = std::make_unique<avro_iobuf_ostream>(4096, &buf);

// Encode to the output stream.
avro::EncoderPtr encoder = avro::binaryEncoder();
Expand All @@ -78,7 +79,6 @@ TEST(ManifestSerializationTest, TestManifestFile) {
encoder->flush();

// Decode the iobuf from the input stream.
auto buf = out->release();
auto in = std::make_unique<avro_iobuf_istream>(std::move(buf));
avro::DecoderPtr decoder = avro::binaryDecoder();
decoder->init(*in);
Expand Down

0 comments on commit e6d456c

Please sign in to comment.