Skip to content

Commit

Permalink
[feature](selectdb-cloud) Improve the efficiency of deleting object d…
Browse files Browse the repository at this point in the history
…ata (apache#1461)

1. Improve the efficiency of deleting object data
2. Add `type` in `RecycleXxxPB` to clarify the recycling reason
  • Loading branch information
platoneko authored Apr 4, 2023
1 parent 10c7669 commit 1d7bfbf
Show file tree
Hide file tree
Showing 14 changed files with 1,497 additions and 904 deletions.
1 change: 1 addition & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ CONF_Int32(recycle_job_lease_expired_ms, "60000");
CONF_String(recycle_whitelist, ""); // Comma seprated list
// These instances will not be recycled, only effective when whitelist is empty.
CONF_String(recycle_blacklist, ""); // Comma seprated list
CONF_mInt32(instance_recycler_worker_pool_size, "10");
CONF_Bool(enable_checker, "false");
CONF_mInt32(check_object_interval_seconds, "259200"); // 72h

Expand Down
107 changes: 107 additions & 0 deletions cloud/src/common/simple_sync_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#pragma once

#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <deque>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>

namespace selectdb {

template <typename T,
template <typename ELEM, typename ALLOC = std::allocator<ELEM>> class CONT = std::deque>
class SimpleSyncQueue {
public:
SimpleSyncQueue() = default;
explicit SimpleSyncQueue(std::uint32_t max_size) : _max_size(max_size) {}
virtual ~SimpleSyncQueue() { _queue.clear(); }

void put(const T& t) {
std::unique_lock<std::mutex> locker(_mutex);
_not_full.wait(locker, [this]() { return _queue.size() < _max_size; });
_queue.push_back(std::move(t));
_not_empty.notify_one();
}

bool put_with_timeout(const T& t, int timeout /*in milliseconds*/) {
std::unique_lock<std::mutex> locker(_mutex);
if (_not_full.wait_for(locker, std::chrono::milliseconds(timeout),
[this]() { return _queue.size() < _max_size; })) {
_queue.push_back(std::move(t));
_not_empty.notify_one();
return true;
}
//timeout
return false;
}

template <typename... Args>
void emplace(Args&&... args) {
std::unique_lock<std::mutex> locker(_mutex);
_not_full.wait(locker, [this]() { return _queue.size() < _max_size; });
_queue.emplace_back(std::forward<Args>(args)...);
_not_empty.notify_one();
}

template <typename... Args>
bool emplace_with_timeout(int timeout /*in milliseconds*/, Args&&... args) {
std::unique_lock<std::mutex> locker(_mutex);
if (_not_full.wait_for(locker, std::chrono::milliseconds(timeout),
[this]() { return _queue.size() < _max_size; })) {
_queue.emplace_back(std::forward<Args>(args)...);
_not_empty.notify_one();
return true;
}
//timeout
return false;
}

void get(T* t) {
std::unique_lock<std::mutex> locker(_mutex);
_not_empty.wait(locker, [this]() { return !_queue.empty(); });
(*t) = std::move(_queue.front());
_queue.pop_front();
_not_full.notify_one();
}

bool get_with_timeout(T* t, int timeout /*in milliseconds*/) {
std::unique_lock<std::mutex> locker(_mutex);
if (_not_empty.wait_for(locker, std::chrono::milliseconds(timeout),
[this]() { return !_queue.empty(); })) {
(*t) = std::move(_queue.front());
_queue.pop_front();
_not_full.notify_one();
return true;
}
//timeout
return false;
}

void clear() {
std::lock_guard<std::mutex> locker(_mutex);
_queue.clear();
}

size_t size() {
std::lock_guard<std::mutex> locker(_mutex);
return _queue.size();
}

bool empty() {
std::lock_guard<std::mutex> locker(_mutex);
return _queue.empty();
}

private:
CONT<T> _queue;
std::mutex _mutex;
std::condition_variable _not_empty;
std::condition_variable _not_full;
std::uint32_t _max_size = std::numeric_limits<std::uint32_t>::max();
};

} // namespace selectdb
// vim: et ts=2 sw=2 tw=80:
147 changes: 147 additions & 0 deletions cloud/src/common/simple_thread_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#pragma once

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>

#include "simple_sync_queue.h"

namespace selectdb {

class SimpleThreadPool {
private:
using JobType = std::function<void()>;
// a lock based sync queue
std::shared_ptr<SimpleSyncQueue<JobType>> _job_queue;

std::vector<std::thread> _worker_thread_group; // multi thread pool
std::atomic<bool> _is_running;
size_t _pool_size;

public:
SimpleThreadPool(size_t size) : _is_running(false), _pool_size(size) {
_job_queue = std::make_shared<SimpleSyncQueue<JobType>>(_pool_size * 2);
}

SimpleThreadPool() = delete;

/**
* Submits jobs, if job queue is full it will blocked
*
* @return 0 if succeeded
*/
int submit(JobType f) {
if (!_is_running) {
// not started
return -1;
}
_job_queue->emplace(std::move(f));
return 0;
}

/**
* Submits jobs with a limit blocking time
*
* @param timeout max blocking time in milliseconds
* @return 0 if success
* -1 if thread pool not start yet
* -2 if time out
*/
int submit_with_timeout(JobType f, int timeout) {
if (!_is_running) {
// not started
return -1;
}
if (!_job_queue->emplace_with_timeout(timeout, std::move(f))) {
return -2;
}
return 0;
}

/**
*
* @return always 0
*/
int start() {
_is_running = true;
_worker_thread_group.clear();
for (size_t i = 0; i < _pool_size; ++i) {
_worker_thread_group.emplace_back(&SimpleThreadPool::work, this);
}
return 0;
}

/**
* Stops to get jobs from job queue, the job being done will finish normally
*
* @return 0 if succeed, otherwise non-zero value returned
*/
int stop() {
if (!_is_running) {
// already stopped
return -1;
}
_is_running = false;
for (auto& i : _worker_thread_group) {
if (i.joinable()) {
i.join();
}
}
return 0;
}

/**
* Gets size of the pool
*
* @return the thread number(pool size) in the thread pool
*/
size_t size() { return _pool_size; }

/**
* Terminates the thread pool immediately, jobs in the queue will not be done
* and the running threads will be detached
*
* @return 0 if succeed, otherwise non-zero value returned
*/
int terminate() {
if (!_is_running) {
// not running
return -1;
}
_is_running = false;
_job_queue->clear(); // abandon all jobs
for (auto& i : _worker_thread_group) {
i.detach();
}
return 0;
}

~SimpleThreadPool() {
if (_is_running) {
stop();
}
}

private:
/**
* Working thread
*/
void work() {
while (_is_running || !_job_queue->empty()) {
JobType job;
if (!_job_queue->get_with_timeout(&job, 10)) {
continue;
}
try {
job();
} catch (...) {
// do nothing
}
}
}
};

} // namespace selectdb
46 changes: 41 additions & 5 deletions cloud/src/meta-service/keys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ void instance_key(const InstanceKeyInfo& in, std::string* out) {
// Transaction keys
//==============================================================================

std::string txn_key_prefix(std::string_view instance_id) {
std::string out;
encode_prefix(TxnIndexKeyInfo {instance_id, 0}, &out);
return out;
}

void txn_label_key(const TxnLabelKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "txn" ${instance_id}
encode_bytes(TXN_KEY_INFIX_LABEL, out); // "txn_index"
Expand Down Expand Up @@ -194,6 +200,12 @@ void version_key(const VersionKeyInfo& in, std::string* out) {
// Meta keys
//==============================================================================

std::string meta_key_prefix(std::string_view instance_id) {
std::string out;
encode_prefix(MetaTabletIdxKeyInfo {instance_id, 0}, &out);
return out;
}

void meta_rowset_key(const MetaRowsetKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "meta" ${instance_id}
encode_bytes(META_KEY_INFIX_ROWSET, out); // "rowset"
Expand Down Expand Up @@ -234,6 +246,12 @@ void meta_schema_key(const MetaSchemaKeyInfo& in, std::string* out) {
// Recycle keys
//==============================================================================

std::string recycle_key_prefix(std::string_view instance_id) {
std::string out;
encode_prefix(RecycleIndexKeyInfo {instance_id, 0}, &out);
return out;
}

void recycle_index_key(const RecycleIndexKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "recycle" ${instance_id}
encode_bytes(RECYCLE_KEY_INFIX_INDEX, out); // "index"
Expand Down Expand Up @@ -266,6 +284,10 @@ void recycle_stage_key(const RecycleStageKeyInfo& in, std::string* out) {
encode_bytes(std::get<1>(in), out); // stage_id
}

//==============================================================================
// Stats keys
//==============================================================================

void stats_tablet_key(const StatsTabletKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "stats" ${instance_id}
encode_bytes(STATS_KEY_INFIX_TABLET, out); // "tablet"
Expand All @@ -275,6 +297,10 @@ void stats_tablet_key(const StatsTabletKeyInfo& in, std::string* out) {
encode_int64(std::get<4>(in), out); // tablet_id
}

//==============================================================================
// Job keys
//==============================================================================

void job_tablet_key(const JobTabletKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "job" ${instance_id}
encode_bytes(JOB_KEY_INFIX_TABLET, out); // "tablet"
Expand All @@ -284,6 +310,21 @@ void job_tablet_key(const JobTabletKeyInfo& in, std::string* out) {
encode_int64(std::get<4>(in), out); // tablet_id
}

void job_recycle_key(const JobRecycleKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "job" ${instance_id}
encode_bytes("recycle", out); // "recycle"
}

//==============================================================================
// Copy keys
//==============================================================================

std::string copy_key_prefix(std::string_view instance_id) {
std::string out;
encode_prefix(CopyJobKeyInfo {instance_id, "", 0, "", 0}, &out);
return out;
}

void copy_job_key(const CopyJobKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "copy" ${instance_id}
encode_bytes(COPY_JOB_KEY_INFIX, out); // "job"
Expand Down Expand Up @@ -337,11 +378,6 @@ std::string system_meta_service_encryption_key_info_key() {
// Other keys
//==============================================================================

void job_recycle_key(const JobRecycleKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "job" ${instance_id}
encode_bytes("recycle", out); // "recycle"
}

//==============================================================================
// Decode keys
//==============================================================================
Expand Down
Loading

0 comments on commit 1d7bfbf

Please sign in to comment.