Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: minor updates for build system improvements #19784

Merged
merged 8 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v/bytes/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ v_cc_library(
bytes
SRCS
iobuf.cc
bytes.cc
iostream.cc
scattered_message.cc
DEPS
absl::hash
Seastar::seastar
Expand Down
51 changes: 51 additions & 0 deletions src/v/bytes/bytes.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#include "bytes/bytes.h"

ss::sstring to_hex(bytes_view b) {
static constexpr std::string_view digits{"0123456789abcdef"};
ss::sstring out = ss::uninitialized_string(b.size() * 2);
const auto end = b.size();
for (size_t i = 0; i != end; ++i) {
uint8_t x = b[i];
out[2 * i] = digits[x >> uint8_t(4)];
out[2 * i + 1] = digits[x & uint8_t(0xf)];
}
return out;
}

ss::sstring to_hex(const bytes& b) { return to_hex(bytes_view(b)); }

std::ostream& operator<<(std::ostream& os, const bytes& b) {
return os << bytes_view(b);
}

std::ostream& operator<<(std::ostream& os, const bytes_opt& b) {
if (b) {
return os << *b;
}
return os << "empty";
}

namespace std {
std::ostream& operator<<(std::ostream& os, const bytes_view& b) {
fmt::print(os, "{{bytes:{}}}", b.size());
return os;
}
} // namespace std

bool bytes_type_cmp::operator()(const bytes& lhs, const bytes_view& rhs) const {
return lhs < rhs;
}

bool bytes_type_cmp::operator()(const bytes& lhs, const bytes& rhs) const {
return lhs < rhs;
}
143 changes: 0 additions & 143 deletions src/v/bytes/iobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
#include "bytes/iobuf.h"

#include "base/vassert.h"
#include "bytes/bytes.h"
#include "bytes/details/io_allocation_size.h"
#include "bytes/iostream.h"
#include "bytes/scattered_message.h"

#include <seastar/core/bitops.hh>
#include <seastar/core/do_with.hh>
Expand All @@ -28,106 +25,6 @@ std::ostream& operator<<(std::ostream& o, const iobuf& io) {
return o << "{bytes=" << io.size_bytes()
<< ", fragments=" << std::distance(io.cbegin(), io.cend()) << "}";
}
ss::scattered_message<char> iobuf_as_scattered(iobuf b) {
ss::scattered_message<char> msg;
auto in = iobuf::iterator_consumer(b.cbegin(), b.cend());
int32_t chunk_no = 0;
in.consume(
b.size_bytes(), [&msg, &chunk_no, &b](const char* src, size_t sz) {
++chunk_no;
vassert(
chunk_no <= std::numeric_limits<int16_t>::max(),
"Invalid construction of scattered_message. fragment coutn exceeds "
"max count:{}. Usually a bug with small append() to iobuf. {}",
chunk_no,
b);
msg.append_static(src, sz);
return ss::stop_iteration::no;
});
msg.on_delete([b = std::move(b)] {});
return msg;
}

ss::future<>
write_iobuf_to_output_stream(iobuf buf, ss::output_stream<char>& output) {
return ss::do_with(std::move(buf), [&output](iobuf& buf) {
return ss::do_for_each(buf, [&output](iobuf::fragment& f) {
return output.write(f.get(), f.size());
});
});
}

ss::future<iobuf> read_iobuf_exactly(ss::input_stream<char>& in, size_t n) {
return ss::do_with(iobuf{}, n, [&in](iobuf& b, size_t& n) {
return ss::do_until(
[&n] { return n == 0; },
[&n, &in, &b] {
return in.read_up_to(n).then(
[&n, &b](ss::temporary_buffer<char> buf) {
if (buf.empty()) {
n = 0;
return;
}
n -= buf.size();
b.append(std::move(buf));
});
})
.then([&b] { return std::move(b); });
});
}

ss::output_stream<char> make_iobuf_ref_output_stream(iobuf& io) {
struct iobuf_output_stream final : ss::data_sink_impl {
explicit iobuf_output_stream(iobuf& i)
: io(i) {}
ss::future<> put(ss::net::packet data) final {
auto all = data.release();
for (auto& b : all) {
io.append(std::move(b));
}
return ss::make_ready_future<>();
}
ss::future<> put(std::vector<ss::temporary_buffer<char>> all) final {
for (auto& b : all) {
io.append(std::move(b));
}
return ss::make_ready_future<>();
}
ss::future<> put(ss::temporary_buffer<char> buf) final {
io.append(std::move(buf));
return ss::make_ready_future<>();
}
ss::future<> flush() final { return ss::make_ready_future<>(); }
ss::future<> close() final { return ss::make_ready_future<>(); }
iobuf& io;
};
const size_t sz = io.size_bytes();
return ss::output_stream<char>(
ss::data_sink(std::make_unique<iobuf_output_stream>(io)), sz);
}
ss::input_stream<char> make_iobuf_input_stream(iobuf io) {
struct iobuf_input_stream final : ss::data_source_impl {
explicit iobuf_input_stream(iobuf i)
: io(std::move(i)) {}
ss::future<ss::temporary_buffer<char>> skip(uint64_t n) final {
io.trim_front(n);
return get();
}
ss::future<ss::temporary_buffer<char>> get() final {
if (io.begin() == io.end()) {
return ss::make_ready_future<ss::temporary_buffer<char>>();
}
auto buf = io.begin()->share();
io.pop_front();
return ss::make_ready_future<ss::temporary_buffer<char>>(
std::move(buf));
}
iobuf io;
};
auto ds = ss::data_source(
std::make_unique<iobuf_input_stream>(std::move(io)));
return ss::input_stream<char>(std::move(ds));
}

iobuf iobuf::copy() const {
auto in = iobuf::iterator_consumer(cbegin(), cend());
Expand Down Expand Up @@ -315,43 +212,3 @@ iobuf::placeholder iobuf::reserve(size_t sz) {
it->reserve(sz);
return p;
}

ss::sstring to_hex(bytes_view b) {
static constexpr std::string_view digits{"0123456789abcdef"};
ss::sstring out = ss::uninitialized_string(b.size() * 2);
const auto end = b.size();
for (size_t i = 0; i != end; ++i) {
uint8_t x = b[i];
out[2 * i] = digits[x >> uint8_t(4)];
out[2 * i + 1] = digits[x & uint8_t(0xf)];
}
return out;
}

ss::sstring to_hex(const bytes& b) { return to_hex(bytes_view(b)); }

std::ostream& operator<<(std::ostream& os, const bytes& b) {
return os << bytes_view(b);
}

std::ostream& operator<<(std::ostream& os, const bytes_opt& b) {
if (b) {
return os << *b;
}
return os << "empty";
}

namespace std {
std::ostream& operator<<(std::ostream& os, const bytes_view& b) {
fmt::print(os, "{{bytes:{}}}", b.size());
return os;
}
} // namespace std

bool bytes_type_cmp::operator()(const bytes& lhs, const bytes_view& rhs) const {
return lhs < rhs;
}

bool bytes_type_cmp::operator()(const bytes& lhs, const bytes& rhs) const {
return lhs < rhs;
}
93 changes: 93 additions & 0 deletions src/v/bytes/iostream.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#include "bytes/iostream.h"

ss::input_stream<char> make_iobuf_input_stream(iobuf io) {
struct iobuf_input_stream final : ss::data_source_impl {
explicit iobuf_input_stream(iobuf i)
: io(std::move(i)) {}
ss::future<ss::temporary_buffer<char>> skip(uint64_t n) final {
io.trim_front(n);
return get();
}
ss::future<ss::temporary_buffer<char>> get() final {
if (io.begin() == io.end()) {
return ss::make_ready_future<ss::temporary_buffer<char>>();
}
auto buf = io.begin()->share();
io.pop_front();
return ss::make_ready_future<ss::temporary_buffer<char>>(
std::move(buf));
}
iobuf io;
};
auto ds = ss::data_source(
std::make_unique<iobuf_input_stream>(std::move(io)));
return ss::input_stream<char>(std::move(ds));
}

ss::output_stream<char> make_iobuf_ref_output_stream(iobuf& io) {
struct iobuf_output_stream final : ss::data_sink_impl {
explicit iobuf_output_stream(iobuf& i)
: io(i) {}
ss::future<> put(ss::net::packet data) final {
auto all = data.release();
for (auto& b : all) {
io.append(std::move(b));
}
return ss::make_ready_future<>();
}
ss::future<> put(std::vector<ss::temporary_buffer<char>> all) final {
for (auto& b : all) {
io.append(std::move(b));
}
return ss::make_ready_future<>();
}
ss::future<> put(ss::temporary_buffer<char> buf) final {
io.append(std::move(buf));
return ss::make_ready_future<>();
}
ss::future<> flush() final { return ss::make_ready_future<>(); }
ss::future<> close() final { return ss::make_ready_future<>(); }
iobuf& io;
};
const size_t sz = io.size_bytes();
return ss::output_stream<char>(
ss::data_sink(std::make_unique<iobuf_output_stream>(io)), sz);
}

ss::future<iobuf> read_iobuf_exactly(ss::input_stream<char>& in, size_t n) {
return ss::do_with(iobuf{}, n, [&in](iobuf& b, size_t& n) {
return ss::do_until(
[&n] { return n == 0; },
[&n, &in, &b] {
return in.read_up_to(n).then(
[&n, &b](ss::temporary_buffer<char> buf) {
if (buf.empty()) {
n = 0;
return;
}
n -= buf.size();
b.append(std::move(buf));
});
})
.then([&b] { return std::move(b); });
});
}

ss::future<>
write_iobuf_to_output_stream(iobuf buf, ss::output_stream<char>& output) {
return ss::do_with(std::move(buf), [&output](iobuf& buf) {
return ss::do_for_each(buf, [&output](iobuf::fragment& f) {
return output.write(f.get(), f.size());
});
});
}
31 changes: 31 additions & 0 deletions src/v/bytes/scattered_message.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#include "bytes/scattered_message.h"

ss::scattered_message<char> iobuf_as_scattered(iobuf b) {
ss::scattered_message<char> msg;
auto in = iobuf::iterator_consumer(b.cbegin(), b.cend());
int32_t chunk_no = 0;
in.consume(
b.size_bytes(), [&msg, &chunk_no, &b](const char* src, size_t sz) {
++chunk_no;
vassert(
chunk_no <= std::numeric_limits<int16_t>::max(),
"Invalid construction of scattered_message. fragment coutn exceeds "
"max count:{}. Usually a bug with small append() to iobuf. {}",
chunk_no,
b);
msg.append_static(src, sz);
return ss::stop_iteration::no;
});
msg.on_delete([b = std::move(b)] {});
return msg;
}
2 changes: 2 additions & 0 deletions src/v/cluster/tests/randoms.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
#include "cluster/health_monitor_types.h"
#include "cluster/partition_balancer_state.h"
#include "cluster/partition_balancer_types.h"
#include "cluster/producer_state.h"
#include "cluster/rm_stm_types.h"
#include "model/tests/randoms.h"
#include "random/generators.h"
#include "storage/tests/randoms.h"
#include "test_utils/randoms.h"
#include "utils/prefix_logger.h"

#include <iterator>

Expand Down
3 changes: 2 additions & 1 deletion src/v/container/include/container/chunked_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
*/
#pragma once

#include "container/fragmented_vector.h"

#include <absl/hash/hash.h>
#include <ankerl/unordered_dense.h>
#include <container/fragmented_vector.h>

#include <type_traits>

Expand Down
Loading