Skip to content

Commit

Permalink
Merge pull request #24064 from nvartolomei/nv/manual-backport-24000-v…
Browse files Browse the repository at this point in the history
…24.1.x-504

[v24.1.x] cleanup tiered storage temporary cache file if exceptions are thrown during download
  • Loading branch information
nvartolomei authored Nov 8, 2024
2 parents 69ea20f + e7cfdc3 commit 738692f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 45 deletions.
65 changes: 42 additions & 23 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <seastar/core/seastar.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/defer.hh>

#include <cloud_storage/cache_service.h>
Expand Down Expand Up @@ -1232,6 +1233,7 @@ ss::future<> cache::put(
ss::this_shard_id(),
(++_cnt),
cache_tmp_file_extension));
auto tmp_filepath = dir_path / tmp_filename;

ss::file tmp_cache_file;
while (true) {
Expand All @@ -1246,14 +1248,14 @@ ss::future<> cache::put(
| ss::open_flags::exclusive;

tmp_cache_file = co_await ss::open_file_dma(
(dir_path / tmp_filename).native(), flags);
tmp_filepath.native(), flags);
break;
} catch (std::filesystem::filesystem_error& e) {
if (e.code() == std::errc::no_such_file_or_directory) {
vlog(
cst_log.debug,
"Couldn't open {}, gonna retry",
(dir_path / tmp_filename).native());
tmp_filepath.native());
} else {
throw;
}
Expand All @@ -1266,42 +1268,59 @@ ss::future<> cache::put(
options.io_priority_class = io_priority;
auto out = co_await ss::make_file_output_stream(tmp_cache_file, options);

std::exception_ptr disk_full_error;
std::exception_ptr eptr;
bool no_space_on_device = false;
try {
co_await ss::copy(data, out)
.then([&out]() { return out.flush(); })
.finally([&out]() { return out.close(); });
} catch (std::filesystem::filesystem_error& e) {
// For ENOSPC errors, delay handling so that we can do a trim
if (e.code() == std::errc::no_space_on_device) {
disk_full_error = std::current_exception();
} else {
throw;
no_space_on_device = e.code() == std::errc::no_space_on_device;
eptr = std::current_exception();
} catch (...) {
// For other errors, delay handling so that we can clean up the tmp file
eptr = std::current_exception();
}

// If we failed to write to the tmp file, we should delete it, maybe do an
// eager trim, and rethrow the exception.
if (eptr) {
if (!_gate.is_closed()) {
auto delete_tmp_fut = co_await ss::coroutine::as_future(
delete_file_and_empty_parents(tmp_filepath.native()));
if (
delete_tmp_fut.failed()
&& !ssx::is_shutdown_exception(delete_tmp_fut.get_exception())) {
vlog(
cst_log.error,
"Failed to delete tmp file {}: {}",
tmp_filepath.native(),
delete_tmp_fut.get_exception());
}
}
}

if (disk_full_error) {
vlog(cst_log.error, "Out of space while writing to cache");
if (no_space_on_device) {
vlog(cst_log.error, "Out of space while writing to cache");

// Block further puts from being attempted until notify_disk_status
// reports that there is space available.
set_block_puts(true);
// Block further puts from being attempted until notify_disk_status
// reports that there is space available.
set_block_puts(true);

// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
co_await trim_throttled();
// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
co_await trim_throttled();
}

throw disk_full_error;
std::rethrow_exception(eptr);
}

// commit write transaction
auto src = (dir_path / tmp_filename).native();
auto dest = (dir_path / filename).native();
auto put_size = co_await ss::file_size(tmp_filepath.native());

auto put_size = co_await ss::file_size(src);

co_await ss::rename_file(src, dest);
auto dest = (dir_path / filename).native();
co_await ss::rename_file(tmp_filepath.native(), dest);

// We will now update
reservation.wrote_data(put_size, 1);
Expand Down
37 changes: 37 additions & 0 deletions src/v/cloud_storage/tests/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include "random/generators.h"
#include "ssx/sformat.h"
#include "test_utils/fixture.h"
#include "test_utils/iostream.h"
#include "test_utils/scoped_config.h"
#include "utils/directory_walker.h"
#include "utils/file_io.h"
#include "utils/human.h"

Expand Down Expand Up @@ -451,6 +453,41 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_read_skipped_on_old_version) {
BOOST_REQUIRE_EQUAL(out.size(), 0);
}

ss::future<size_t> count_files(ss::sstring dirname) {
directory_walker walker;
size_t count = 0;
co_await walker.walk(
dirname,
[dirname, &count](const ss::directory_entry& entry) -> ss::future<> {
if (entry.type == ss::directory_entry_type::directory) {
return count_files(fmt::format("{}/{}", dirname, entry.name))
.then([&count](size_t sub_count) { count += sub_count; });
} else {
++count;
return ss::now();
}
});

co_return count;
}

FIXTURE_TEST(test_clean_up_on_stream_exception, cache_test_fixture) {
auto s = tests::make_throwing_stream(ss::abort_requested_exception());
auto reservation = sharded_cache.local().reserve_space(1, 1).get();
BOOST_CHECK_THROW(
sharded_cache.local().put(KEY, s, reservation).get(),
ss::abort_requested_exception);
vlog(test_log.info, "Put failed as expected");

BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_bytes(), 0);
BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_objects(), 0);

vlog(test_log.info, "Counting files in cache directory");
BOOST_CHECK_EQUAL(count_files(CACHE_DIR.native()).get(), 0);

vlog(test_log.info, "Test passed");
}

/**
* Validate that .part files and empty directories are deleted if found during
* the startup walk of the cache.
Expand Down
42 changes: 42 additions & 0 deletions src/v/test_utils/iostream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "base/seastarx.h"

#include <seastar/core/future.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>

namespace tests {

/// Create an input stream that throws an exception on first interaction.
template<class Err>
ss::input_stream<char> make_throwing_stream(Err err) {
struct throwing_stream final : ss::data_source_impl {
explicit throwing_stream(Err e)
: _err(std::move(e)) {}

ss::future<ss::temporary_buffer<char>> skip(uint64_t) final {
return get();
}

ss::future<ss::temporary_buffer<char>> get() final {
return ss::make_exception_future<ss::temporary_buffer<char>>(
std::move(_err));
}

Err _err;
};
auto ds = ss::data_source(std::make_unique<throwing_stream>(err));
return ss::input_stream<char>(std::move(ds));
}

} // namespace tests
24 changes: 2 additions & 22 deletions src/v/utils/tests/input_stream_fanout_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "bytes/iostream.h"
#include "bytes/random.h"
#include "random/generators.h"
#include "test_utils/iostream.h"
#include "utils/stream_utils.h"

#include <seastar/core/abort_source.hh>
Expand Down Expand Up @@ -437,29 +438,8 @@ SEASTAR_THREAD_TEST_CASE(input_stream_fanout_detach_10_size_limit) {
test_detached_consumer<10>(4, 1000);
}

template<class Err>
ss::input_stream<char> make_throwing_stream(Err err) {
struct throwing_stream final : ss::data_source_impl {
explicit throwing_stream(Err e)
: _err(std::move(e)) {}

ss::future<ss::temporary_buffer<char>> skip(uint64_t) final {
return get();
}

ss::future<ss::temporary_buffer<char>> get() final {
return ss::make_exception_future<ss::temporary_buffer<char>>(
std::move(_err));
}

Err _err;
};
auto ds = ss::data_source(std::make_unique<throwing_stream>(err));
return ss::input_stream<char>(std::move(ds));
}

SEASTAR_THREAD_TEST_CASE(input_stream_fanout_producer_throw) {
auto is = make_throwing_stream(ss::abort_requested_exception());
auto is = tests::make_throwing_stream(ss::abort_requested_exception());
auto [s1, s2] = input_stream_fanout<2>(std::move(is), 4, 8);

BOOST_REQUIRE_THROW(s1.read().get(), ss::abort_requested_exception);
Expand Down

0 comments on commit 738692f

Please sign in to comment.