Skip to content

Commit

Permalink
[feature](selectdb-cloud) Implement recycle copy jobs http api (apach…
Browse files Browse the repository at this point in the history
…e#1487)

Usage:
```
GET /RecyclerService/http/recycle_copy_jobs?token={token}&instance_id={instance_id} HTTP/1.1
Content-Length: <ContentLength>
Content-Type: text/plain
```
e.g.

```
curl 127.0.0.1:5100/RecyclerService/http/recycle_copy_jobs?token={token}&instance_id={instance_id}
```
  • Loading branch information
platoneko authored Mar 17, 2023
1 parent 5ef5647 commit 98c6f38
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2829,7 +2829,7 @@ void MetaServiceImpl::get_tablet_stats(::google::protobuf::RpcController* contro
msg = ss.str();
}

std::string static convert_ms_code_to_http_code(const MetaServiceCode& ret, int& status_code) {
std::string convert_ms_code_to_http_code(const MetaServiceCode& ret, int& status_code) {
switch (ret) {
case OK:
return "OK";
Expand Down
76 changes: 70 additions & 6 deletions cloud/src/recycler/recycler_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

namespace selectdb {

extern std::string convert_ms_code_to_http_code(const MetaServiceCode& ret, int& status_code);

RecyclerServiceImpl::RecyclerServiceImpl(std::shared_ptr<TxnKv> txn_kv, Recycler* recycler)
: txn_kv_(std::move(txn_kv)), recycler_(recycler) {}

Expand Down Expand Up @@ -74,24 +76,73 @@ void RecyclerServiceImpl::recycle_instance(::google::protobuf::RpcController* co
recycler_->add_pending_instances(std::move(instances));
}

void recycle_copy_jobs(const std::shared_ptr<TxnKv>& txn_kv, const std::string& instance_id,
MetaServiceCode& code, std::string& msg) {
std::unique_ptr<Transaction> txn;
int ret = txn_kv->create_txn(&txn);
if (ret != 0) {
code = MetaServiceCode::KV_TXN_CREATE_ERR;
msg = "failed to create txn";
return;
}
std::string key;
instance_key({instance_id}, &key);
std::string val;
ret = txn->get(key, &val);
if (ret != 0) {
code = MetaServiceCode::KV_TXN_GET_ERR;
msg = fmt::format("failed to get instance, instance_id={}", instance_id);
return;
}
InstanceInfoPB instance;
if (!instance.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("malformed instance info, key={}", hex(key));
return;
}
static std::mutex s_worker_mtx;
static std::set<std::string> s_worker;
{
std::lock_guard lock(s_worker_mtx);
if (s_worker.size() >= config::recycle_concurrency) { // use another config entry?
msg = "exceeded the concurrency limit";
return;
}
auto [_, success] = s_worker.insert(instance_id);
if (!success) {
msg = "recycle_copy_jobs not yet finished on this instance";
return;
}
}
auto recycler = std::make_unique<InstanceRecycler>(txn_kv, instance);
std::thread worker([recycler = std::move(recycler), instance_id] {
LOG(INFO) << "manually trigger recycle_copy_jobs on instance " << instance_id;
recycler->recycle_copy_jobs();
std::lock_guard lock(s_worker_mtx);
s_worker.erase(instance_id);
});
worker.detach();
}

void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
const ::selectdb::MetaServiceHttpRequest* request,
::selectdb::MetaServiceHttpResponse* response,
::google::protobuf::Closure* done) {
auto cntl = static_cast<brpc::Controller*>(controller);
LOG(INFO) << "rpc from " << cntl->remote_side() << " request: " << request->DebugString();
brpc::ClosureGuard closure_guard(done);
int ret = 0;
MetaServiceCode code = MetaServiceCode::OK;
int status_code = 200;
std::string msg = "OK";
std::string req;
std::string response_body;
std::string request_body;
std::unique_ptr<int, std::function<void(int*)>> defer_status(
(int*)0x01, [&ret, &msg, &status_code, &response_body, &cntl, &req](int*) {
LOG(INFO) << (ret == 0 ? "succ to " : "failed to ") << __PRETTY_FUNCTION__ << " "
<< cntl->remote_side() << " request=\n"
<< req << "\n ret=" << ret << " msg=" << msg;
(int*)0x01, [&code, &msg, &status_code, &response_body, &cntl, &req](int*) {
convert_ms_code_to_http_code(code, status_code);
LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " : "failed to ")
<< __PRETTY_FUNCTION__ << " " << cntl->remote_side() << " request=\n"
<< req << "\n ret=" << code << " msg=" << msg;
cntl->http_response().set_status_code(status_code);
cntl->response_attachment().append(response_body);
cntl->response_attachment().append("\n");
Expand Down Expand Up @@ -137,12 +188,25 @@ void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
}
RecycleInstanceResponse res;
recycle_instance(cntl, &req, &res, nullptr);
ret = res.status().code();
code = res.status().code();
msg = res.status().msg();
response_body = msg;
return;
}

if (unresolved_path == "recycle_copy_jobs") {
auto instance_id = uri.GetQuery("instance_id");
if (instance_id == nullptr || instance_id->empty()) {
msg = "no instance id";
response_body = msg;
status_code = 400;
return;
}
recycle_copy_jobs(txn_kv_, *instance_id, code, msg);
response_body = msg;
return;
}

status_code = 404;
msg = "not found";
response_body = msg;
Expand Down

0 comments on commit 98c6f38

Please sign in to comment.