From 6945e4dc35e0248f48d950a621aabf6d18658400 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Tue, 18 Jun 2024 16:54:48 -0700 Subject: [PATCH] archival: pull out adjacent segment run There's some business logic in archival/types to generate path names. In the effort to change how we assign paths to objects, this moves that musiness logical out of archival/types, so that the business logic doesn't need to muddy such a fundamental file. --- src/v/archival/adjacent_segment_run.cc | 103 +++++++++++++++++++++++++ src/v/archival/adjacent_segment_run.h | 46 +++++++++++ src/v/archival/types.cc | 82 -------------------- src/v/archival/types.h | 29 +------ src/v/cluster/CMakeLists.txt | 1 + 5 files changed, 151 insertions(+), 110 deletions(-) create mode 100644 src/v/archival/adjacent_segment_run.cc create mode 100644 src/v/archival/adjacent_segment_run.h diff --git a/src/v/archival/adjacent_segment_run.cc b/src/v/archival/adjacent_segment_run.cc new file mode 100644 index 0000000000000..e67a7d62c41cb --- /dev/null +++ b/src/v/archival/adjacent_segment_run.cc @@ -0,0 +1,103 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#include "archival/adjacent_segment_run.h" + +#include "archival/logger.h" +#include "base/vlog.h" +#include "cloud_storage/partition_manifest.h" +#include "cloud_storage/types.h" +#include "model/metadata.h" + +#include + +namespace archival { + +bool adjacent_segment_run::maybe_add_segment( + const cloud_storage::segment_meta& s, size_t max_size) { + vlog( + archival_log.debug, + "{} Segments collected, looking at segment meta: {}, current run meta: " + "{}", + num_segments, + s, + meta); + if (num_segments == 1 && meta.size_bytes + s.size_bytes > max_size) { + // Corner case, we hit a segment which is smaller than the max_size + // but it's larger than max_size when combined with its neighbor. In + // this case we need to skip previous the segment. + num_segments = 0; + segments.clear(); + meta = {}; + } + if (num_segments == 0) { + // Find the begining of the small segment + // run. + if (s.size_bytes < max_size) { + meta = s; + num_segments = 1; + segments.push_back( + cloud_storage::partition_manifest::generate_remote_segment_path( + ntp, s)); + } + } else { + if ( + meta.segment_term == s.segment_term + && meta.size_bytes + s.size_bytes <= max_size) { + // Cross term merging is disallowed. Because of that we need to stop + // if the term doesn't match the previous term. + if (model::next_offset(meta.committed_offset) != s.base_offset) { + // In case if we're dealing with one of the old manifests + // with inconsistencies (overlapping offsets, etc). + num_segments = 0; + meta = {}; + segments.clear(); + vlog( + archival_log.debug, + "Reseting the upload, current committed offset: {}, next " + "base offset: {}, meta: {}", + meta.committed_offset, + s.base_offset, + meta); + return false; + } + // Move the end of the small segment run forward + meta.committed_offset = s.committed_offset; + meta.max_timestamp = s.max_timestamp; + num_segments++; + meta.size_bytes += s.size_bytes; + segments.push_back( + cloud_storage::partition_manifest::generate_remote_segment_path( + ntp, s)); + } else { + return num_segments > 1; + } + } + return false; +} + +std::ostream& operator<<(std::ostream& os, const adjacent_segment_run& run) { + std::vector names; + names.reserve(run.segments.size()); + std::transform( + run.segments.begin(), + run.segments.end(), + std::back_inserter(names), + [](const cloud_storage::remote_segment_path& rsp) { + return rsp().native(); + }); + fmt::print( + os, + "{{meta: {}, num_segments: {}, segments: {}}}", + run.meta, + run.num_segments, + names); + return os; +} + +} // namespace archival diff --git a/src/v/archival/adjacent_segment_run.h b/src/v/archival/adjacent_segment_run.h new file mode 100644 index 0000000000000..7af73c67059ca --- /dev/null +++ b/src/v/archival/adjacent_segment_run.h @@ -0,0 +1,46 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "cloud_storage/types.h" +#include "model/metadata.h" + +#include + +namespace archival { + +/// Represents a series of adjacent segments +/// The object is used to compute a possible reupload +/// candidate. The series of segment is supposed to be +/// merged and reuploaded. The object produces metadata +/// for the reuploaded segment. +struct adjacent_segment_run { + explicit adjacent_segment_run(model::ntp ntp) + : ntp(std::move(ntp)) {} + + model::ntp ntp; + cloud_storage::segment_meta meta{}; + size_t num_segments{0}; + std::vector segments; + + /// Try to add segment to the run + /// + /// The subsequent calls are successful until the total size + /// of the run is below the threshold. The object keeps track + /// of all segment names. + /// + /// \return true if the run is assembled, false if more segments can be + /// added to the run + bool + maybe_add_segment(const cloud_storage::segment_meta& s, size_t max_size); +}; + +std::ostream& operator<<(std::ostream& o, const adjacent_segment_run& run); + +} // namespace archival diff --git a/src/v/archival/types.cc b/src/v/archival/types.cc index 60b9c9181a146..4bc40dbe19845 100644 --- a/src/v/archival/types.cc +++ b/src/v/archival/types.cc @@ -121,86 +121,4 @@ get_archival_service_config(ss::scheduling_group sg, ss::io_priority_class p) { return cfg; } -bool adjacent_segment_run::maybe_add_segment( - const cloud_storage::segment_meta& s, size_t max_size) { - vlog( - archival_log.debug, - "{} Segments collected, looking at segment meta: {}, current run meta: " - "{}", - num_segments, - s, - meta); - if (num_segments == 1 && meta.size_bytes + s.size_bytes > max_size) { - // Corner case, we hit a segment which is smaller than the max_size - // but it's larger than max_size when combined with its neighbor. In - // this case we need to skip previous the segment. - num_segments = 0; - segments.clear(); - meta = {}; - } - if (num_segments == 0) { - // Find the begining of the small segment - // run. - if (s.size_bytes < max_size) { - meta = s; - num_segments = 1; - segments.push_back( - cloud_storage::partition_manifest::generate_remote_segment_path( - ntp, s)); - } - } else { - if ( - meta.segment_term == s.segment_term - && meta.size_bytes + s.size_bytes <= max_size) { - // Cross term merging is disallowed. Because of that we need to stop - // if the term doesn't match the previous term. - if (model::next_offset(meta.committed_offset) != s.base_offset) { - // In case if we're dealing with one of the old manifests - // with inconsistencies (overlapping offsets, etc). - num_segments = 0; - meta = {}; - segments.clear(); - vlog( - archival_log.debug, - "Reseting the upload, current committed offset: {}, next " - "base offset: {}, meta: {}", - meta.committed_offset, - s.base_offset, - meta); - return false; - } - // Move the end of the small segment run forward - meta.committed_offset = s.committed_offset; - meta.max_timestamp = s.max_timestamp; - num_segments++; - meta.size_bytes += s.size_bytes; - segments.push_back( - cloud_storage::partition_manifest::generate_remote_segment_path( - ntp, s)); - } else { - return num_segments > 1; - } - } - return false; -} - -std::ostream& operator<<(std::ostream& os, const adjacent_segment_run& run) { - std::vector names; - names.reserve(run.segments.size()); - std::transform( - run.segments.begin(), - run.segments.end(), - std::back_inserter(names), - [](const cloud_storage::remote_segment_path& rsp) { - return rsp().native(); - }); - fmt::print( - os, - "{{meta: {}, num_segments: {}, segments: {}}}", - run.meta, - run.num_segments, - names); - return os; -} - } // namespace archival diff --git a/src/v/archival/types.h b/src/v/archival/types.h index 5652815775e45..d6c92ed690238 100644 --- a/src/v/archival/types.h +++ b/src/v/archival/types.h @@ -10,6 +10,7 @@ #pragma once +#include "archival/adjacent_segment_run.h" #include "base/seastarx.h" #include "cloud_storage/types.h" #include "seastar/core/lowres_clock.hh" @@ -171,34 +172,6 @@ class housekeeping_job { /// Number of segment reuploads the job can do per housekeeping run static constexpr int max_reuploads_per_run = 4; -/// Represents a series of adjacent segments -/// The object is used to compute a possible reupload -/// candidate. The series of segment is supposed to be -/// merged and reuploaded. The object produces metadata -/// for the reuploaded segment. -struct adjacent_segment_run { - explicit adjacent_segment_run(model::ntp ntp) - : ntp(std::move(ntp)) {} - - model::ntp ntp; - cloud_storage::segment_meta meta{}; - size_t num_segments{0}; - std::vector segments; - - /// Try to add segment to the run - /// - /// The subsequent calls are successful until the total size - /// of the run is below the threshold. The object keeps track - /// of all segment names. - /// - /// \return true if the run is assembled, false if more segments can be - /// added to the run - bool - maybe_add_segment(const cloud_storage::segment_meta& s, size_t max_size); -}; - -std::ostream& operator<<(std::ostream& o, const adjacent_segment_run& run); - enum class error_outcome { unexpected_failure = 1, timed_out, diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index 99c9aaa915c54..96ecd018ba599 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -212,6 +212,7 @@ v_cc_library( ../archival/retention_calculator.cc ../archival/upload_housekeeping_service.cc ../archival/adjacent_segment_merger.cc + ../archival/adjacent_segment_run.cc ../archival/purger.cc ../archival/scrubber.cc ../archival/archiver_manager.cc