diff --git a/src/v/archival/adjacent_segment_run.cc b/src/v/archival/adjacent_segment_run.cc new file mode 100644 index 0000000000000..e67a7d62c41cb --- /dev/null +++ b/src/v/archival/adjacent_segment_run.cc @@ -0,0 +1,103 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#include "archival/adjacent_segment_run.h" + +#include "archival/logger.h" +#include "base/vlog.h" +#include "cloud_storage/partition_manifest.h" +#include "cloud_storage/types.h" +#include "model/metadata.h" + +#include + +namespace archival { + +bool adjacent_segment_run::maybe_add_segment( + const cloud_storage::segment_meta& s, size_t max_size) { + vlog( + archival_log.debug, + "{} Segments collected, looking at segment meta: {}, current run meta: " + "{}", + num_segments, + s, + meta); + if (num_segments == 1 && meta.size_bytes + s.size_bytes > max_size) { + // Corner case, we hit a segment which is smaller than the max_size + // but it's larger than max_size when combined with its neighbor. In + // this case we need to skip previous the segment. + num_segments = 0; + segments.clear(); + meta = {}; + } + if (num_segments == 0) { + // Find the begining of the small segment + // run. + if (s.size_bytes < max_size) { + meta = s; + num_segments = 1; + segments.push_back( + cloud_storage::partition_manifest::generate_remote_segment_path( + ntp, s)); + } + } else { + if ( + meta.segment_term == s.segment_term + && meta.size_bytes + s.size_bytes <= max_size) { + // Cross term merging is disallowed. Because of that we need to stop + // if the term doesn't match the previous term. + if (model::next_offset(meta.committed_offset) != s.base_offset) { + // In case if we're dealing with one of the old manifests + // with inconsistencies (overlapping offsets, etc). + num_segments = 0; + meta = {}; + segments.clear(); + vlog( + archival_log.debug, + "Reseting the upload, current committed offset: {}, next " + "base offset: {}, meta: {}", + meta.committed_offset, + s.base_offset, + meta); + return false; + } + // Move the end of the small segment run forward + meta.committed_offset = s.committed_offset; + meta.max_timestamp = s.max_timestamp; + num_segments++; + meta.size_bytes += s.size_bytes; + segments.push_back( + cloud_storage::partition_manifest::generate_remote_segment_path( + ntp, s)); + } else { + return num_segments > 1; + } + } + return false; +} + +std::ostream& operator<<(std::ostream& os, const adjacent_segment_run& run) { + std::vector names; + names.reserve(run.segments.size()); + std::transform( + run.segments.begin(), + run.segments.end(), + std::back_inserter(names), + [](const cloud_storage::remote_segment_path& rsp) { + return rsp().native(); + }); + fmt::print( + os, + "{{meta: {}, num_segments: {}, segments: {}}}", + run.meta, + run.num_segments, + names); + return os; +} + +} // namespace archival diff --git a/src/v/archival/adjacent_segment_run.h b/src/v/archival/adjacent_segment_run.h new file mode 100644 index 0000000000000..7af73c67059ca --- /dev/null +++ b/src/v/archival/adjacent_segment_run.h @@ -0,0 +1,46 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "cloud_storage/types.h" +#include "model/metadata.h" + +#include + +namespace archival { + +/// Represents a series of adjacent segments +/// The object is used to compute a possible reupload +/// candidate. The series of segment is supposed to be +/// merged and reuploaded. The object produces metadata +/// for the reuploaded segment. +struct adjacent_segment_run { + explicit adjacent_segment_run(model::ntp ntp) + : ntp(std::move(ntp)) {} + + model::ntp ntp; + cloud_storage::segment_meta meta{}; + size_t num_segments{0}; + std::vector segments; + + /// Try to add segment to the run + /// + /// The subsequent calls are successful until the total size + /// of the run is below the threshold. The object keeps track + /// of all segment names. + /// + /// \return true if the run is assembled, false if more segments can be + /// added to the run + bool + maybe_add_segment(const cloud_storage::segment_meta& s, size_t max_size); +}; + +std::ostream& operator<<(std::ostream& o, const adjacent_segment_run& run); + +} // namespace archival diff --git a/src/v/archival/types.cc b/src/v/archival/types.cc index 60b9c9181a146..4bc40dbe19845 100644 --- a/src/v/archival/types.cc +++ b/src/v/archival/types.cc @@ -121,86 +121,4 @@ get_archival_service_config(ss::scheduling_group sg, ss::io_priority_class p) { return cfg; } -bool adjacent_segment_run::maybe_add_segment( - const cloud_storage::segment_meta& s, size_t max_size) { - vlog( - archival_log.debug, - "{} Segments collected, looking at segment meta: {}, current run meta: " - "{}", - num_segments, - s, - meta); - if (num_segments == 1 && meta.size_bytes + s.size_bytes > max_size) { - // Corner case, we hit a segment which is smaller than the max_size - // but it's larger than max_size when combined with its neighbor. In - // this case we need to skip previous the segment. - num_segments = 0; - segments.clear(); - meta = {}; - } - if (num_segments == 0) { - // Find the begining of the small segment - // run. - if (s.size_bytes < max_size) { - meta = s; - num_segments = 1; - segments.push_back( - cloud_storage::partition_manifest::generate_remote_segment_path( - ntp, s)); - } - } else { - if ( - meta.segment_term == s.segment_term - && meta.size_bytes + s.size_bytes <= max_size) { - // Cross term merging is disallowed. Because of that we need to stop - // if the term doesn't match the previous term. - if (model::next_offset(meta.committed_offset) != s.base_offset) { - // In case if we're dealing with one of the old manifests - // with inconsistencies (overlapping offsets, etc). - num_segments = 0; - meta = {}; - segments.clear(); - vlog( - archival_log.debug, - "Reseting the upload, current committed offset: {}, next " - "base offset: {}, meta: {}", - meta.committed_offset, - s.base_offset, - meta); - return false; - } - // Move the end of the small segment run forward - meta.committed_offset = s.committed_offset; - meta.max_timestamp = s.max_timestamp; - num_segments++; - meta.size_bytes += s.size_bytes; - segments.push_back( - cloud_storage::partition_manifest::generate_remote_segment_path( - ntp, s)); - } else { - return num_segments > 1; - } - } - return false; -} - -std::ostream& operator<<(std::ostream& os, const adjacent_segment_run& run) { - std::vector names; - names.reserve(run.segments.size()); - std::transform( - run.segments.begin(), - run.segments.end(), - std::back_inserter(names), - [](const cloud_storage::remote_segment_path& rsp) { - return rsp().native(); - }); - fmt::print( - os, - "{{meta: {}, num_segments: {}, segments: {}}}", - run.meta, - run.num_segments, - names); - return os; -} - } // namespace archival diff --git a/src/v/archival/types.h b/src/v/archival/types.h index 5652815775e45..d6c92ed690238 100644 --- a/src/v/archival/types.h +++ b/src/v/archival/types.h @@ -10,6 +10,7 @@ #pragma once +#include "archival/adjacent_segment_run.h" #include "base/seastarx.h" #include "cloud_storage/types.h" #include "seastar/core/lowres_clock.hh" @@ -171,34 +172,6 @@ class housekeeping_job { /// Number of segment reuploads the job can do per housekeeping run static constexpr int max_reuploads_per_run = 4; -/// Represents a series of adjacent segments -/// The object is used to compute a possible reupload -/// candidate. The series of segment is supposed to be -/// merged and reuploaded. The object produces metadata -/// for the reuploaded segment. -struct adjacent_segment_run { - explicit adjacent_segment_run(model::ntp ntp) - : ntp(std::move(ntp)) {} - - model::ntp ntp; - cloud_storage::segment_meta meta{}; - size_t num_segments{0}; - std::vector segments; - - /// Try to add segment to the run - /// - /// The subsequent calls are successful until the total size - /// of the run is below the threshold. The object keeps track - /// of all segment names. - /// - /// \return true if the run is assembled, false if more segments can be - /// added to the run - bool - maybe_add_segment(const cloud_storage::segment_meta& s, size_t max_size); -}; - -std::ostream& operator<<(std::ostream& o, const adjacent_segment_run& run); - enum class error_outcome { unexpected_failure = 1, timed_out, diff --git a/src/v/cloud_storage/remote.cc b/src/v/cloud_storage/remote.cc index 0cae14e4398f0..b58d599d8804b 100644 --- a/src/v/cloud_storage/remote.cc +++ b/src/v/cloud_storage/remote.cc @@ -365,11 +365,19 @@ ss::future remote::do_download_manifest( ss::future remote::upload_manifest( const cloud_storage_clients::bucket_name& bucket, const base_manifest& manifest, + retry_chain_node& parent) { + auto key = manifest.get_manifest_path(); + co_return co_await upload_manifest(bucket, manifest, key, parent); +} + +ss::future remote::upload_manifest( + const cloud_storage_clients::bucket_name& bucket, + const base_manifest& manifest, + const remote_manifest_path& key, retry_chain_node& parent) { auto guard = _gate.hold(); retry_chain_node fib(&parent); retry_chain_logger ctxlog(cst_log, fib); - auto key = manifest.get_manifest_path(); auto path = cloud_storage_clients::object_key(key()); auto lease = co_await _pool.local().acquire(fib.root_abort_source()); auto permit = fib.retry(); diff --git a/src/v/cloud_storage/remote.h b/src/v/cloud_storage/remote.h index 4bf2eb73e9f6c..0064a03a8ce9c 100644 --- a/src/v/cloud_storage/remote.h +++ b/src/v/cloud_storage/remote.h @@ -291,6 +291,12 @@ class remote const base_manifest& manifest, retry_chain_node& parent); + ss::future upload_manifest( + const cloud_storage_clients::bucket_name& bucket, + const base_manifest& manifest, + const remote_manifest_path& key, + retry_chain_node& parent); + /// \brief Upload segment to S3 /// /// The method uploads the segment while tolerating some errors. It can diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index 99c9aaa915c54..96ecd018ba599 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -212,6 +212,7 @@ v_cc_library( ../archival/retention_calculator.cc ../archival/upload_housekeeping_service.cc ../archival/adjacent_segment_merger.cc + ../archival/adjacent_segment_run.cc ../archival/purger.cc ../archival/scrubber.cc ../archival/archiver_manager.cc