Skip to content

Commit

Permalink
archival: pull out adjacent segment run
Browse files Browse the repository at this point in the history
There's some business logic in archival/types to generate path names. In
the effort to change how we assign paths to objects, this moves that
musiness logical out of archival/types, so that the business logic
doesn't need to muddy such a fundamental file.
  • Loading branch information
andrwng committed Jun 25, 2024
1 parent 62224d6 commit 6945e4d
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 110 deletions.
103 changes: 103 additions & 0 deletions src/v/archival/adjacent_segment_run.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#include "archival/adjacent_segment_run.h"

#include "archival/logger.h"
#include "base/vlog.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/types.h"
#include "model/metadata.h"

#include <vector>

namespace archival {

bool adjacent_segment_run::maybe_add_segment(
const cloud_storage::segment_meta& s, size_t max_size) {
vlog(
archival_log.debug,
"{} Segments collected, looking at segment meta: {}, current run meta: "
"{}",
num_segments,
s,
meta);
if (num_segments == 1 && meta.size_bytes + s.size_bytes > max_size) {
// Corner case, we hit a segment which is smaller than the max_size
// but it's larger than max_size when combined with its neighbor. In
// this case we need to skip previous the segment.
num_segments = 0;
segments.clear();
meta = {};
}
if (num_segments == 0) {
// Find the begining of the small segment
// run.
if (s.size_bytes < max_size) {
meta = s;
num_segments = 1;
segments.push_back(
cloud_storage::partition_manifest::generate_remote_segment_path(
ntp, s));
}
} else {
if (
meta.segment_term == s.segment_term
&& meta.size_bytes + s.size_bytes <= max_size) {
// Cross term merging is disallowed. Because of that we need to stop
// if the term doesn't match the previous term.
if (model::next_offset(meta.committed_offset) != s.base_offset) {
// In case if we're dealing with one of the old manifests
// with inconsistencies (overlapping offsets, etc).
num_segments = 0;
meta = {};
segments.clear();
vlog(
archival_log.debug,
"Reseting the upload, current committed offset: {}, next "
"base offset: {}, meta: {}",
meta.committed_offset,
s.base_offset,
meta);
return false;
}
// Move the end of the small segment run forward
meta.committed_offset = s.committed_offset;
meta.max_timestamp = s.max_timestamp;
num_segments++;
meta.size_bytes += s.size_bytes;
segments.push_back(
cloud_storage::partition_manifest::generate_remote_segment_path(
ntp, s));
} else {
return num_segments > 1;
}
}
return false;
}

std::ostream& operator<<(std::ostream& os, const adjacent_segment_run& run) {
std::vector<ss::sstring> names;
names.reserve(run.segments.size());
std::transform(
run.segments.begin(),
run.segments.end(),
std::back_inserter(names),
[](const cloud_storage::remote_segment_path& rsp) {
return rsp().native();
});
fmt::print(
os,
"{{meta: {}, num_segments: {}, segments: {}}}",
run.meta,
run.num_segments,
names);
return os;
}

} // namespace archival
46 changes: 46 additions & 0 deletions src/v/archival/adjacent_segment_run.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "cloud_storage/types.h"
#include "model/metadata.h"

#include <vector>

namespace archival {

/// Represents a series of adjacent segments
/// The object is used to compute a possible reupload
/// candidate. The series of segment is supposed to be
/// merged and reuploaded. The object produces metadata
/// for the reuploaded segment.
struct adjacent_segment_run {
explicit adjacent_segment_run(model::ntp ntp)
: ntp(std::move(ntp)) {}

model::ntp ntp;
cloud_storage::segment_meta meta{};
size_t num_segments{0};
std::vector<cloud_storage::remote_segment_path> segments;

/// Try to add segment to the run
///
/// The subsequent calls are successful until the total size
/// of the run is below the threshold. The object keeps track
/// of all segment names.
///
/// \return true if the run is assembled, false if more segments can be
/// added to the run
bool
maybe_add_segment(const cloud_storage::segment_meta& s, size_t max_size);
};

std::ostream& operator<<(std::ostream& o, const adjacent_segment_run& run);

} // namespace archival
82 changes: 0 additions & 82 deletions src/v/archival/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,86 +121,4 @@ get_archival_service_config(ss::scheduling_group sg, ss::io_priority_class p) {
return cfg;
}

bool adjacent_segment_run::maybe_add_segment(
const cloud_storage::segment_meta& s, size_t max_size) {
vlog(
archival_log.debug,
"{} Segments collected, looking at segment meta: {}, current run meta: "
"{}",
num_segments,
s,
meta);
if (num_segments == 1 && meta.size_bytes + s.size_bytes > max_size) {
// Corner case, we hit a segment which is smaller than the max_size
// but it's larger than max_size when combined with its neighbor. In
// this case we need to skip previous the segment.
num_segments = 0;
segments.clear();
meta = {};
}
if (num_segments == 0) {
// Find the begining of the small segment
// run.
if (s.size_bytes < max_size) {
meta = s;
num_segments = 1;
segments.push_back(
cloud_storage::partition_manifest::generate_remote_segment_path(
ntp, s));
}
} else {
if (
meta.segment_term == s.segment_term
&& meta.size_bytes + s.size_bytes <= max_size) {
// Cross term merging is disallowed. Because of that we need to stop
// if the term doesn't match the previous term.
if (model::next_offset(meta.committed_offset) != s.base_offset) {
// In case if we're dealing with one of the old manifests
// with inconsistencies (overlapping offsets, etc).
num_segments = 0;
meta = {};
segments.clear();
vlog(
archival_log.debug,
"Reseting the upload, current committed offset: {}, next "
"base offset: {}, meta: {}",
meta.committed_offset,
s.base_offset,
meta);
return false;
}
// Move the end of the small segment run forward
meta.committed_offset = s.committed_offset;
meta.max_timestamp = s.max_timestamp;
num_segments++;
meta.size_bytes += s.size_bytes;
segments.push_back(
cloud_storage::partition_manifest::generate_remote_segment_path(
ntp, s));
} else {
return num_segments > 1;
}
}
return false;
}

std::ostream& operator<<(std::ostream& os, const adjacent_segment_run& run) {
std::vector<ss::sstring> names;
names.reserve(run.segments.size());
std::transform(
run.segments.begin(),
run.segments.end(),
std::back_inserter(names),
[](const cloud_storage::remote_segment_path& rsp) {
return rsp().native();
});
fmt::print(
os,
"{{meta: {}, num_segments: {}, segments: {}}}",
run.meta,
run.num_segments,
names);
return os;
}

} // namespace archival
29 changes: 1 addition & 28 deletions src/v/archival/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include "archival/adjacent_segment_run.h"
#include "base/seastarx.h"
#include "cloud_storage/types.h"
#include "seastar/core/lowres_clock.hh"
Expand Down Expand Up @@ -171,34 +172,6 @@ class housekeeping_job {
/// Number of segment reuploads the job can do per housekeeping run
static constexpr int max_reuploads_per_run = 4;

/// Represents a series of adjacent segments
/// The object is used to compute a possible reupload
/// candidate. The series of segment is supposed to be
/// merged and reuploaded. The object produces metadata
/// for the reuploaded segment.
struct adjacent_segment_run {
explicit adjacent_segment_run(model::ntp ntp)
: ntp(std::move(ntp)) {}

model::ntp ntp;
cloud_storage::segment_meta meta{};
size_t num_segments{0};
std::vector<cloud_storage::remote_segment_path> segments;

/// Try to add segment to the run
///
/// The subsequent calls are successful until the total size
/// of the run is below the threshold. The object keeps track
/// of all segment names.
///
/// \return true if the run is assembled, false if more segments can be
/// added to the run
bool
maybe_add_segment(const cloud_storage::segment_meta& s, size_t max_size);
};

std::ostream& operator<<(std::ostream& o, const adjacent_segment_run& run);

enum class error_outcome {
unexpected_failure = 1,
timed_out,
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ v_cc_library(
../archival/retention_calculator.cc
../archival/upload_housekeeping_service.cc
../archival/adjacent_segment_merger.cc
../archival/adjacent_segment_run.cc
../archival/purger.cc
../archival/scrubber.cc
../archival/archiver_manager.cc
Expand Down

0 comments on commit 6945e4d

Please sign in to comment.