Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
fix(bulk_load): fix remove_local_bulk_load_dir() (#823)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyifan27 authored and neverchanje committed Apr 25, 2021
1 parent 2c6f215 commit 386a431
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/block_service/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ set(MY_PROJ_LIBS
set(MY_BINPLACES
config-test.ini
run.sh
clear.sh
)

dsn_add_test()
2 changes: 1 addition & 1 deletion src/block_service/test/clear.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash

rm -rf log.* *.log data dsn_block_service_test.xml randomfile*
rm -rf log.* *.log data dsn_block_service_test.xml randomfile* rename_dir* test_dir
41 changes: 41 additions & 0 deletions src/block_service/test/hdfs_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,22 @@ class HDFSClientTest : public testing::Test
virtual void SetUp() override;
virtual void TearDown() override;
void generate_test_file(const char *filename);
void write_test_files_async();
std::string name_node;
std::string backup_path;
std::string local_test_dir;
std::string test_data_str;
};

void HDFSClientTest::SetUp()
{
name_node = FLAGS_test_name_node;
backup_path = FLAGS_test_backup_path;
local_test_dir = "test_dir";
test_data_str = "";
for (int i = 0; i < FLAGS_num_test_file_lines; ++i) {
test_data_str += "test";
}
}

void HDFSClientTest::TearDown() {}
Expand All @@ -75,6 +83,22 @@ void HDFSClientTest::generate_test_file(const char *filename)
fclose(fp);
}

void HDFSClientTest::write_test_files_async()
{
dsn::utils::filesystem::create_directory(local_test_dir);
for (int i = 0; i < 100; ++i) {
tasking::enqueue(LPC_TEST_HDFS, nullptr, [this, i]() {
// mock the writing process in hdfs_file_object::download().
std::string test_file_name = local_test_dir + "/test_file_" + std::to_string(i);
std::ofstream out(test_file_name, std::ios::binary | std::ios::out | std::ios::trunc);
if (out.is_open()) {
out.write(test_data_str.c_str(), test_data_str.length());
}
out.close();
});
}
}

TEST_F(HDFSClientTest, test_basic_operation)
{
if (name_node == example_name_node || backup_path == example_backup_path) {
Expand Down Expand Up @@ -341,3 +365,20 @@ TEST_F(HDFSClientTest, test_concurrent_upload_download)
utils::filesystem::remove_path(downloaded_file_names[i]);
}
}

TEST_F(HDFSClientTest, test_rename_path_while_writing)
{
write_test_files_async();
usleep(100);
std::string rename_dir = "rename_dir." + std::to_string(dsn_now_ms());
// rename succeed but writing failed.
ASSERT_TRUE(dsn::utils::filesystem::rename_path(local_test_dir, rename_dir));
}

TEST_F(HDFSClientTest, test_remove_path_while_writing)
{
write_test_files_async();
usleep(100);
// couldn't remove the directory while writing files in it.
ASSERT_FALSE(dsn::utils::filesystem::remove_path(local_test_dir));
}
41 changes: 24 additions & 17 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "replica_bulk_loader.h"

#include <dsn/dist/block_service.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>

#include "replica_bulk_loader.h"
#include "replica/disk_cleaner.h"

namespace dsn {
namespace replication {

Expand Down Expand Up @@ -579,27 +580,33 @@ void replica_bulk_loader::handle_bulk_load_finish(bulk_load_status::type new_sta
// remove local bulk load dir
std::string bulk_load_dir = utils::filesystem::path_combine(
_replica->_dir, bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR);
error_code err = remove_local_bulk_load_dir(bulk_load_dir);
if (err != ERR_OK) {
tasking::enqueue(
LPC_REPLICATION_COMMON,
&_replica->_tracker,
std::bind(&replica_bulk_loader::remove_local_bulk_load_dir, this, bulk_load_dir),
get_gpid().thread_hash());
}

remove_local_bulk_load_dir(bulk_load_dir);
clear_bulk_load_states();
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica_bulk_loader::remove_local_bulk_load_dir(const std::string &bulk_load_dir)
void replica_bulk_loader::remove_local_bulk_load_dir(const std::string &bulk_load_dir)
{
if (!utils::filesystem::directory_exists(bulk_load_dir) ||
!utils::filesystem::remove_path(bulk_load_dir)) {
derror_replica("remove bulk_load dir({}) failed", bulk_load_dir);
return ERR_FILE_OPERATION_FAILED;
if (!utils::filesystem::directory_exists(bulk_load_dir)) {
return;
}
// Rename bulk_load_dir to ${replica_dir}.bulk_load.timestamp.gar before remove it.
// Because we download sst files asynchronously and couldn't remove a directory while writing
// files in it.
std::string garbage_dir = fmt::format("{}.{}.{}.{}",
_replica->_dir,
bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR,
std::to_string(dsn_now_ms()),
kFolderSuffixGar);
if (!utils::filesystem::rename_path(bulk_load_dir, garbage_dir)) {
derror_replica("rename bulk_load dir({}) failed.", bulk_load_dir);
return;
}
if (!utils::filesystem::remove_path(garbage_dir)) {
derror_replica(
"remove bulk_load gar dir({}) failed, disk cleaner would retry to remove it.",
garbage_dir);
}
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down
2 changes: 1 addition & 1 deletion src/replica/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class replica_bulk_loader : replica_base
void handle_bulk_load_finish(bulk_load_status::type new_status);
void pause_bulk_load();

error_code remove_local_bulk_load_dir(const std::string &bulk_load_dir);
void remove_local_bulk_load_dir(const std::string &bulk_load_dir);
void cleanup_download_task();
void clear_bulk_load_states();
bool is_cleaned_up();
Expand Down

0 comments on commit 386a431

Please sign in to comment.