Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](cloud) Support cache warmup for any remote filesystem #34191

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,26 @@ void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&,
}
});
};
static_cast<void>(_exec_env->sync_load_for_tablets_thread_pool()->submit_func(std::move(f)));
static_cast<void>(_engine.sync_load_for_tablets_thread_pool().submit_func(std::move(f)));
}

void CloudBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response,
const TGetTopNHotPartitionsRequest& request) {
TabletHotspot::instance()->get_top_n_hot_partition(&response.hot_tables);
_engine.tablet_hotspot().get_top_n_hot_partition(&response.hot_tables);
response.file_cache_size = io::FileCacheFactory::instance()->get_capacity();
response.__isset.hot_tables = !response.hot_tables.empty();
}

void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
const TWarmUpTabletsRequest& request) {
Status st;
auto* manager = CloudWarmUpManager::instance();
auto& manager = _engine.cloud_warm_up_manager();
switch (request.type) {
case TWarmUpTabletsRequestType::SET_JOB: {
LOG_INFO("receive the warm up request.")
.tag("request_type", "SET_JOB")
.tag("job_id", request.job_id);
st = manager->check_and_set_job_id(request.job_id);
st = manager.check_and_set_job_id(request.job_id);
if (!st) {
LOG_WARNING("SET_JOB failed.").error(st);
break;
Expand All @@ -105,9 +105,9 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("batch_id", request.batch_id)
.tag("jobs size", request.job_metas.size());
bool retry = false;
st = manager->check_and_set_batch_id(request.job_id, request.batch_id, &retry);
st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry);
if (!retry && st) {
manager->add_job(request.job_metas);
manager.add_job(request.job_metas);
} else {
if (retry) {
LOG_WARNING("retry the job.")
Expand All @@ -121,7 +121,7 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
}
case TWarmUpTabletsRequestType::GET_CURRENT_JOB_STATE_AND_LEASE: {
auto [job_id, batch_id, pending_job_size, finish_job_size] =
manager->get_current_job_state();
manager.get_current_job_state();
LOG_INFO("receive the warm up request.")
.tag("request_type", "GET_CURRENT_JOB_STATE_AND_LEASE")
.tag("job_id", job_id)
Expand All @@ -138,7 +138,7 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
LOG_INFO("receive the warm up request.")
.tag("request_type", "CLEAR_JOB")
.tag("job_id", request.job_id);
st = manager->clear_job(request.job_id);
st = manager.clear_job(request.job_id);
break;
}
default:
Expand All @@ -165,12 +165,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
PGetFileCacheMetaResponse brpc_response;
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
if (!cntl.Failed()) {
std::vector<FileCacheBlockMeta> metas;
std::transform(brpc_response.file_cache_block_metas().cbegin(),
brpc_response.file_cache_block_metas().cend(), std::back_inserter(metas),
[](const FileCacheBlockMeta& meta) { return meta; });
io::DownloadTask download_task(std::move(metas));
io::FileCacheBlockDownloader::instance()->submit_download_task(download_task);
_engine.file_cache_block_downloader().submit_download_task(
std::move(*brpc_response.mutable_file_cache_block_metas()));
} else {
st = Status::RpcError("{} isn't connected", brpc_addr);
}
Expand All @@ -181,7 +177,7 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) {
std::map<int64_t, bool> task_done;
io::FileCacheBlockDownloader::instance()->check_download_task(request.tablets, &task_done);
_engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done);
response.__set_task_done(task_done);

Status st = Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CloudBackendService final : public BaseBackendService {
const TCheckWarmUpCacheAsyncRequest& request) override;

private:
[[maybe_unused]] CloudStorageEngine& _engine;
CloudStorageEngine& _engine;
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
OlapStopWatch watch;
VLOG_NOTICE << "begin to calculate delete bitmap. transaction_id=" << transaction_id;
std::unique_ptr<ThreadPoolToken> token =
_engine.calc_tablet_delete_bitmap_task_thread_pool()->new_token(
_engine.calc_tablet_delete_bitmap_task_thread_pool().new_token(
ThreadPool::ExecutionMode::CONCURRENT);

for (const auto& partition : _cal_delete_bitmap_req.partitions) {
Expand Down
14 changes: 13 additions & 1 deletion be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_full_compaction.h"
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet_hotspot.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_txn_delete_bitmap_cache.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/s3_file_system.h"
Expand Down Expand Up @@ -181,7 +184,16 @@ Status CloudStorageEngine::open() {
std::make_unique<CloudTxnDeleteBitmapCache>(config::delete_bitmap_agg_cache_capacity);
RETURN_IF_ERROR(_txn_delete_bitmap_cache->init());

return Status::OK();
_file_cache_block_downloader = std::make_unique<io::FileCacheBlockDownloader>(*this);

_cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this);

_tablet_hotspot = std::make_unique<TabletHotspot>();

return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
.build(&_sync_load_for_tablets_thread_pool);
}

void CloudStorageEngine::stop() {
Expand Down
40 changes: 32 additions & 8 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ namespace doris {
namespace cloud {
class CloudMetaMgr;
}
namespace io {
class FileCacheBlockDownloader;
}

class CloudTabletMgr;
class CloudCumulativeCompaction;
class CloudBaseCompaction;
class CloudFullCompaction;
class TabletHotspot;
class CloudWarmUpManager;

class CloudStorageEngine final : public BaseStorageEngine {
public:
Expand All @@ -58,13 +63,13 @@ class CloudStorageEngine final : public BaseStorageEngine {
return Status::OK();
}

cloud::CloudMetaMgr& meta_mgr() { return *_meta_mgr; }
cloud::CloudMetaMgr& meta_mgr() const { return *_meta_mgr; }

CloudTabletMgr& tablet_mgr() { return *_tablet_mgr; }
CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; }

CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() { return *_txn_delete_bitmap_cache; }
std::unique_ptr<ThreadPool>& calc_tablet_delete_bitmap_task_thread_pool() {
return _calc_tablet_delete_bitmap_task_thread_pool;
CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return *_txn_delete_bitmap_cache; }
ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
return *_calc_tablet_delete_bitmap_task_thread_pool;
}

io::FileSystemSPtr get_fs_by_vault_id(const std::string& vault_id) const {
Expand Down Expand Up @@ -93,24 +98,36 @@ class CloudStorageEngine final : public BaseStorageEngine {

bool has_base_compaction(int64_t tablet_id) const {
std::lock_guard lock(_compaction_mtx);
return _submitted_base_compactions.count(tablet_id);
return _submitted_base_compactions.contains(tablet_id);
}

bool has_cumu_compaction(int64_t tablet_id) const {
std::lock_guard lock(_compaction_mtx);
return _submitted_cumu_compactions.count(tablet_id);
return _submitted_cumu_compactions.contains(tablet_id);
}

bool has_full_compaction(int64_t tablet_id) const {
std::lock_guard lock(_compaction_mtx);
return _submitted_full_compactions.count(tablet_id);
return _submitted_full_compactions.contains(tablet_id);
}

std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
std::string_view compaction_policy);

void sync_storage_vault();

io::FileCacheBlockDownloader& file_cache_block_downloader() const {
return *_file_cache_block_downloader;
}

CloudWarmUpManager& cloud_warm_up_manager() const { return *_cloud_warm_up_manager; }

TabletHotspot& tablet_hotspot() const { return *_tablet_hotspot; }

ThreadPool& sync_load_for_tablets_thread_pool() const {
return *_sync_load_for_tablets_thread_pool;
}

private:
void _refresh_storage_vault_info_thread_callback();
void _vacuum_stale_rowsets_thread_callback();
Expand All @@ -131,6 +148,13 @@ class CloudStorageEngine final : public BaseStorageEngine {
std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;

// Components for cache warmup
std::unique_ptr<io::FileCacheBlockDownloader> _file_cache_block_downloader;
// Depended by `FileCacheBlockDownloader`
std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
std::unique_ptr<TabletHotspot> _tablet_hotspot;
std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;

// FileSystem with latest shared storage info, new data will be written to this fs.
mutable std::mutex _latest_fs_mtx;
io::FileSystemSPtr _latest_fs;
Expand Down
29 changes: 21 additions & 8 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,26 +197,39 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
#ifndef BE_TEST
// Warmup rowset data in background
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
io::S3FileMeta download_file_meta;
auto rowset_meta = rs->rowset_meta();
const auto& rowset_meta = rs->rowset_meta();
constexpr int64_t interval = 600; // 10 mins
// When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time.
// So we need to filter out the old rowsets avoid to download the whole table.
if (warmup_delta_data &&
::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) {
continue;
}
download_file_meta.file_size = 0;
download_file_meta.file_system = rowset_meta->fs();
download_file_meta.path = io::Path(rs->segment_file_path(seg_id));
download_file_meta.expiration_time =

auto fs = rowset_meta->fs();
if (!fs) {
LOG(WARNING) << "failed to get fs. tablet_id=" << tablet_id()
<< " rowset_id=" << rowset_meta->rowset_id()
<< " resource_id=" << rowset_meta->resource_id();
continue;
}

int64_t expiration_time =
_tablet_meta->ttl_seconds() == 0 ||
rowset_meta->newest_write_timestamp() <= 0
? 0
: rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
io::FileCacheBlockDownloader::instance()->submit_download_task(
std::move(download_file_meta));
_engine.file_cache_block_downloader().submit_download_task(
io::DownloadFileMeta {
.path = rs->segment_file_path(seg_id),
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
.file_system = std::move(fs),
.ctx =
{
.expiration_time = expiration_time,
},
});
}
#endif
}
Expand Down
17 changes: 6 additions & 11 deletions be/src/cloud/cloud_tablet_hotspot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@

namespace doris {

void TabletHotspot::count(const BaseTabletSPtr& tablet) {
if (!config::is_cloud_mode()) return;
size_t slot_idx = tablet->tablet_id() % s_slot_size;
void TabletHotspot::count(const BaseTablet& tablet) {
size_t slot_idx = tablet.tablet_id() % s_slot_size;
auto& slot = _tablets_hotspot[slot_idx];
std::lock_guard lock(slot.mtx);
HotspotCounterPtr counter;
if (auto iter = slot.map.find(tablet->tablet_id()); iter == slot.map.end()) {
counter = std::make_shared<HotspotCounter>(tablet->table_id(), tablet->index_id(),
tablet->partition_id());
slot.map.insert(std::make_pair(tablet->tablet_id(), counter));
if (auto iter = slot.map.find(tablet.tablet_id()); iter == slot.map.end()) {
counter = std::make_shared<HotspotCounter>(tablet.table_id(), tablet.index_id(),
tablet.partition_id());
slot.map.insert(std::make_pair(tablet.tablet_id(), counter));
} else {
counter = iter->second;
}
Expand Down Expand Up @@ -71,10 +70,6 @@ struct TabletHotspotMapValue {

using TabletHotspotMapKey = std::pair<int64_t, int64_t>;

TabletHotspot* TabletHotspot::instance() {
return ExecEnv::GetInstance()->tablet_hotspot();
}

void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables) {
// map<pair<table_id, index_id>, map<partition_id, value>> for day
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_tablet_hotspot.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,10 @@ using HotspotCounterPtr = std::shared_ptr<HotspotCounter>;

class TabletHotspot {
public:
static TabletHotspot* instance();
TabletHotspot();
~TabletHotspot();
// When query the tablet, count it
void count(const BaseTabletSPtr& tablet);
void count(const BaseTablet& tablet);
void get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables);

private:
Expand Down
50 changes: 30 additions & 20 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@

namespace doris {

CloudWarmUpManager* CloudWarmUpManager::instance() {
return ExecEnv::GetInstance()->cloud_warm_up_manager();
}

CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) {
_download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
}
Expand Down Expand Up @@ -81,27 +77,41 @@ void CloudWarmUpManager::handle_jobs() {
auto rs_metas = tablet_meta->snapshot_rs_metas();
for (auto& [_, rs] : rs_metas) {
for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) {
io::S3FileMeta download_file_meta;
download_file_meta.file_system = rs->fs();
std::string seg_path = BetaRowset::remote_segment_path(rs->tablet_id(),
rs->rowset_id(), seg_id);
download_file_meta.path = seg_path;
download_file_meta.expiration_time =
auto fs = rs->fs();
if (!fs) {
LOG(WARNING) << "failed to get fs. tablet_id=" << tablet_id
<< " rowset_id=" << rs->rowset_id()
<< " resource_id=" << rs->resource_id();
continue;
}

int64_t expiration_time =
tablet_meta->ttl_seconds() == 0 || rs->newest_write_timestamp() <= 0
? 0
: rs->newest_write_timestamp() + tablet_meta->ttl_seconds();
if (download_file_meta.expiration_time <= UnixSeconds()) {
download_file_meta.expiration_time = 0;
if (expiration_time <= UnixSeconds()) {
expiration_time = 0;
}
download_file_meta.download_callback = [=](Status st) {
if (!st) {
LOG_WARNING("Warm up error ").error(st);
}
wait->signal();
};

wait->add_count();
io::FileCacheBlockDownloader::instance()->submit_download_task(
std::move(download_file_meta));
_engine.file_cache_block_downloader().submit_download_task(
io::DownloadFileMeta {
.path = BetaRowset::remote_segment_path(
rs->tablet_id(), rs->rowset_id(), seg_id),
.file_size = rs->segment_file_size(seg_id),
.file_system = std::move(fs),
.ctx =
{
.expiration_time = expiration_time,
},
.download_done =
[wait](Status st) {
if (!st) {
LOG_WARNING("Warm up error ").error(st);
}
wait->signal();
},
});
}
}
if (!wait->wait()) {
Expand Down
4 changes: 1 addition & 3 deletions be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ struct JobMeta {

class CloudWarmUpManager {
public:
static CloudWarmUpManager* instance();

CloudWarmUpManager(CloudStorageEngine& engine);
explicit CloudWarmUpManager(CloudStorageEngine& engine);
~CloudWarmUpManager();
// Set the job id if the id is zero
Status check_and_set_job_id(int64_t job_id);
Expand Down
Loading
Loading