Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] cleanup tiered storage temporary cache file if exceptions are thrown during download #24064

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 42 additions & 23 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <seastar/core/seastar.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/defer.hh>

#include <cloud_storage/cache_service.h>
Expand Down Expand Up @@ -1232,6 +1233,7 @@ ss::future<> cache::put(
ss::this_shard_id(),
(++_cnt),
cache_tmp_file_extension));
auto tmp_filepath = dir_path / tmp_filename;

ss::file tmp_cache_file;
while (true) {
Expand All @@ -1246,14 +1248,14 @@ ss::future<> cache::put(
| ss::open_flags::exclusive;

tmp_cache_file = co_await ss::open_file_dma(
(dir_path / tmp_filename).native(), flags);
tmp_filepath.native(), flags);
break;
} catch (std::filesystem::filesystem_error& e) {
if (e.code() == std::errc::no_such_file_or_directory) {
vlog(
cst_log.debug,
"Couldn't open {}, gonna retry",
(dir_path / tmp_filename).native());
tmp_filepath.native());
} else {
throw;
}
Expand All @@ -1266,42 +1268,59 @@ ss::future<> cache::put(
options.io_priority_class = io_priority;
auto out = co_await ss::make_file_output_stream(tmp_cache_file, options);

std::exception_ptr disk_full_error;
std::exception_ptr eptr;
bool no_space_on_device = false;
try {
co_await ss::copy(data, out)
.then([&out]() { return out.flush(); })
.finally([&out]() { return out.close(); });
} catch (std::filesystem::filesystem_error& e) {
// For ENOSPC errors, delay handling so that we can do a trim
if (e.code() == std::errc::no_space_on_device) {
disk_full_error = std::current_exception();
} else {
throw;
no_space_on_device = e.code() == std::errc::no_space_on_device;
eptr = std::current_exception();
} catch (...) {
// For other errors, delay handling so that we can clean up the tmp file
eptr = std::current_exception();
}

// If we failed to write to the tmp file, we should delete it, maybe do an
// eager trim, and rethrow the exception.
if (eptr) {
if (!_gate.is_closed()) {
auto delete_tmp_fut = co_await ss::coroutine::as_future(
delete_file_and_empty_parents(tmp_filepath.native()));
if (
delete_tmp_fut.failed()
&& !ssx::is_shutdown_exception(delete_tmp_fut.get_exception())) {
vlog(
cst_log.error,
"Failed to delete tmp file {}: {}",
tmp_filepath.native(),
delete_tmp_fut.get_exception());
}
}
}

if (disk_full_error) {
vlog(cst_log.error, "Out of space while writing to cache");
if (no_space_on_device) {
vlog(cst_log.error, "Out of space while writing to cache");

// Block further puts from being attempted until notify_disk_status
// reports that there is space available.
set_block_puts(true);
// Block further puts from being attempted until notify_disk_status
// reports that there is space available.
set_block_puts(true);

// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
co_await trim_throttled();
// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
co_await trim_throttled();
}

throw disk_full_error;
std::rethrow_exception(eptr);
}

// commit write transaction
auto src = (dir_path / tmp_filename).native();
auto dest = (dir_path / filename).native();
auto put_size = co_await ss::file_size(tmp_filepath.native());

auto put_size = co_await ss::file_size(src);

co_await ss::rename_file(src, dest);
auto dest = (dir_path / filename).native();
co_await ss::rename_file(tmp_filepath.native(), dest);

// We will now update
reservation.wrote_data(put_size, 1);
Expand Down
37 changes: 37 additions & 0 deletions src/v/cloud_storage/tests/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include "random/generators.h"
#include "ssx/sformat.h"
#include "test_utils/fixture.h"
#include "test_utils/iostream.h"
#include "test_utils/scoped_config.h"
#include "utils/directory_walker.h"
#include "utils/file_io.h"
#include "utils/human.h"

Expand Down Expand Up @@ -451,6 +453,41 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_read_skipped_on_old_version) {
BOOST_REQUIRE_EQUAL(out.size(), 0);
}

ss::future<size_t> count_files(ss::sstring dirname) {
directory_walker walker;
size_t count = 0;
co_await walker.walk(
dirname,
[dirname, &count](const ss::directory_entry& entry) -> ss::future<> {
if (entry.type == ss::directory_entry_type::directory) {
return count_files(fmt::format("{}/{}", dirname, entry.name))
.then([&count](size_t sub_count) { count += sub_count; });
} else {
++count;
return ss::now();
}
});

co_return count;
}

FIXTURE_TEST(test_clean_up_on_stream_exception, cache_test_fixture) {
auto s = tests::make_throwing_stream(ss::abort_requested_exception());
auto reservation = sharded_cache.local().reserve_space(1, 1).get();
BOOST_CHECK_THROW(
sharded_cache.local().put(KEY, s, reservation).get(),
ss::abort_requested_exception);
vlog(test_log.info, "Put failed as expected");

BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_bytes(), 0);
BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_objects(), 0);

vlog(test_log.info, "Counting files in cache directory");
BOOST_CHECK_EQUAL(count_files(CACHE_DIR.native()).get(), 0);

vlog(test_log.info, "Test passed");
}

/**
* Validate that .part files and empty directories are deleted if found during
* the startup walk of the cache.
Expand Down
42 changes: 42 additions & 0 deletions src/v/test_utils/iostream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "base/seastarx.h"

#include <seastar/core/future.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>

namespace tests {

/// Create an input stream that throws an exception on first interaction.
template<class Err>
ss::input_stream<char> make_throwing_stream(Err err) {
struct throwing_stream final : ss::data_source_impl {
explicit throwing_stream(Err e)
: _err(std::move(e)) {}

ss::future<ss::temporary_buffer<char>> skip(uint64_t) final {
return get();
}

ss::future<ss::temporary_buffer<char>> get() final {
return ss::make_exception_future<ss::temporary_buffer<char>>(
std::move(_err));
}

Err _err;
};
auto ds = ss::data_source(std::make_unique<throwing_stream>(err));
return ss::input_stream<char>(std::move(ds));
}

} // namespace tests
24 changes: 2 additions & 22 deletions src/v/utils/tests/input_stream_fanout_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "bytes/iostream.h"
#include "bytes/random.h"
#include "random/generators.h"
#include "test_utils/iostream.h"
#include "utils/stream_utils.h"

#include <seastar/core/abort_source.hh>
Expand Down Expand Up @@ -437,29 +438,8 @@ SEASTAR_THREAD_TEST_CASE(input_stream_fanout_detach_10_size_limit) {
test_detached_consumer<10>(4, 1000);
}

template<class Err>
ss::input_stream<char> make_throwing_stream(Err err) {
struct throwing_stream final : ss::data_source_impl {
explicit throwing_stream(Err e)
: _err(std::move(e)) {}

ss::future<ss::temporary_buffer<char>> skip(uint64_t) final {
return get();
}

ss::future<ss::temporary_buffer<char>> get() final {
return ss::make_exception_future<ss::temporary_buffer<char>>(
std::move(_err));
}

Err _err;
};
auto ds = ss::data_source(std::make_unique<throwing_stream>(err));
return ss::input_stream<char>(std::move(ds));
}

SEASTAR_THREAD_TEST_CASE(input_stream_fanout_producer_throw) {
auto is = make_throwing_stream(ss::abort_requested_exception());
auto is = tests::make_throwing_stream(ss::abort_requested_exception());
auto [s1, s2] = input_stream_fanout<2>(std::move(is), 4, 8);

BOOST_REQUIRE_THROW(s1.read().get(), ss::abort_requested_exception);
Expand Down
Loading