Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg: utils for operating on avro #21493

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions src/v/iceberg/avro_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "bytes/iobuf.h"

#include <avro/DataFile.hh>
#include <avro/Stream.hh>

// Near-identical implementation of avro::MemoryOutputStream, but backed by an
// iobuf that can be released.
class avro_iobuf_ostream : public avro::OutputStream {
public:
explicit avro_iobuf_ostream(size_t chunk_size, iobuf* buf)
: chunk_size_(chunk_size)
, buf_(buf)
, available_(0)
, byte_count_(0) {}
~avro_iobuf_ostream() override = default;

// If there's no available space in the buffer, allocates `chunk_size_`
// bytes at the end of the buffer.
//
// Returns the current position in the buffer, and the available remaining
// space.
bool next(uint8_t** data, size_t* len) final {
if (available_ == 0) {
buf_->append(ss::temporary_buffer<char>{chunk_size_});
available_ = chunk_size_;
}
auto back_frag = buf_->rbegin();
*data = reinterpret_cast<uint8_t*>(
back_frag->share(chunk_size_ - available_, available_).get_write());
*len = available_;
byte_count_ += available_;
available_ = 0;
return true;
}

void backup(size_t len) final {
available_ += len;
byte_count_ -= len;
}

uint64_t byteCount() const final { return byte_count_; }

void flush() final {}

private:
// Size in bytes with which to allocate new fragments.
const size_t chunk_size_;

iobuf* buf_;

// Bytes remaining in the last fragment in the buffer.
size_t available_;

// Total number of bytes.
size_t byte_count_;
};

// InputStream implementation that takes an iobuf as input.
// Iterates through the fragments of the given iobuf.
class avro_iobuf_istream : public avro::InputStream {
public:
explicit avro_iobuf_istream(iobuf buf)
: buf_(std::move(buf))
, cur_frag_(buf_.begin())
, cur_frag_pos_(0)
, cur_pos_(0) {}

// Returns the contiguous chunk of memory and the length of that memory.
bool next(const uint8_t** data, size_t* len) final {
if (cur_frag_ == buf_.end()) {
return false;
}
auto left_in_frag = cur_frag_->size() - cur_frag_pos_;
while (left_in_frag == 0) {
++cur_frag_;
cur_frag_pos_ = 0;
if (cur_frag_ == buf_.end()) {
return false;
}
left_in_frag = cur_frag_->size();
}
*data = reinterpret_cast<const uint8_t*>(
cur_frag_->share(cur_frag_pos_, left_in_frag).get());
*len = left_in_frag;
cur_frag_pos_ = cur_frag_->size();
cur_pos_ += cur_frag_->size();
return true;
}

void backup(size_t len) final {
cur_pos_ -= len;
if (cur_frag_ == buf_.end()) {
cur_frag_ = std::prev(buf_.end());
cur_frag_pos_ = cur_frag_->size();
}
while (cur_frag_pos_ < len) {
len -= cur_frag_pos_;
if (cur_frag_ == buf_.begin()) {
return;
}
--cur_frag_;
cur_frag_pos_ = cur_frag_->size();
}
cur_frag_pos_ -= len;
}

void skip(size_t len) final {
if (cur_frag_ == buf_.end()) {
return;
}
cur_pos_ += len;
while (cur_frag_->size() - cur_frag_pos_ > len) {
len -= cur_frag_->size() - cur_frag_pos_;
++cur_frag_;
if (cur_frag_ == buf_.end()) {
return;
}
cur_frag_pos_ = 0;
}
cur_frag_pos_ += len;
}

size_t byteCount() const final { return cur_pos_; }

private:
iobuf buf_;
iobuf::iterator cur_frag_;
size_t cur_frag_pos_;
size_t cur_pos_;
};
1 change: 1 addition & 0 deletions src/v/iceberg/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rp_test(
datatypes_json_test.cc
LIBRARIES
Avro::avro
Boost::iostreams
v::bytes
v::gtest_main
v::iceberg
Expand Down
Loading