Skip to content

Commit

Permalink
Add ingest_binlog/http_get_snapshot limit download speed && Add async…
Browse files Browse the repository at this point in the history
… ingest_binlog

Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
  • Loading branch information
JackDrogon committed Nov 2, 2023
1 parent c46fa33 commit a47cfb4
Show file tree
Hide file tree
Showing 20 changed files with 640 additions and 331 deletions.
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,12 @@ DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
// Max size(bytes) of group commit queues, used for mem back pressure.
DEFINE_Int32(group_commit_max_queue_size, "65536");

// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");

// Download binlog rate limit, unit is KB/s, 0 means no limit
DEFINE_Int32(download_binlog_rate_limit_kbs, "0");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,12 @@ DECLARE_String(default_tzfiles_path);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);

// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);

// Download binlog rate limit, unit is KB/s
DECLARE_Int32(download_binlog_rate_limit_kbs);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
29 changes: 21 additions & 8 deletions be/src/http/action/download_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,20 @@
#include "runtime/exec_env.h"

namespace doris {

const std::string FILE_PARAMETER = "file";
const std::string TOKEN_PARAMETER = "token";

DownloadAction::DownloadAction(ExecEnv* exec_env, const std::vector<std::string>& allow_dirs,
int32_t num_workers)
: _exec_env(exec_env), _download_type(NORMAL), _num_workers(num_workers) {
namespace {
static const std::string FILE_PARAMETER = "file";
static const std::string TOKEN_PARAMETER = "token";
static const std::string CHANNEL_PARAMETER = "channel";
static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
} // namespace

DownloadAction::DownloadAction(ExecEnv* exec_env,
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group,
const std::vector<std::string>& allow_dirs, int32_t num_workers)
: _exec_env(exec_env),
_download_type(NORMAL),
_num_workers(num_workers),
_rate_limit_group(std::move(rate_limit_group)) {
for (auto& dir : allow_dirs) {
std::string p;
Status st = io::global_local_filesystem()->canonicalize(dir, &p);
Expand Down Expand Up @@ -107,7 +114,13 @@ void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_par
if (is_dir) {
do_dir_response(file_param, req);
} else {
do_file_response(file_param, req);
const auto& channel = req->param(CHANNEL_PARAMETER);
bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
if (ingest_binlog) {
do_file_response(file_param, req, _rate_limit_group.get());
} else {
do_file_response(file_param, req);
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions be/src/http/action/download_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "http/http_handler.h"
#include "util/threadpool.h"

struct bufferevent_rate_limit_group;

namespace doris {

class ExecEnv;
Expand All @@ -36,8 +38,9 @@ class HttpRequest;
// We use parameter named 'file' to specify the static resource path, it is an absolute path.
class DownloadAction : public HttpHandler {
public:
DownloadAction(ExecEnv* exec_env, const std::vector<std::string>& allow_dirs,
int32_t num_workers = 0);
DownloadAction(ExecEnv* exec_env,
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group,
const std::vector<std::string>& allow_dirs, int32_t num_workers = 0);

// for load error
DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir);
Expand Down Expand Up @@ -67,6 +70,8 @@ class DownloadAction : public HttpHandler {
std::string _error_log_root_dir;
int32_t _num_workers;
std::unique_ptr<ThreadPool> _download_workers;

std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
}; // end class DownloadAction

} // end namespace doris
12 changes: 8 additions & 4 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include <fmt/ranges.h>

#include <cstdint>
#include <limits>
#include <stdexcept>
#include <string_view>
#include <utility>
#include <vector>

#include "common/config.h"
Expand Down Expand Up @@ -96,7 +98,7 @@ void handle_get_binlog_info(HttpRequest* req) {
}

/// handle get segment file, need tablet_id, rowset_id && index
void handle_get_segment_file(HttpRequest* req) {
void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_file_path;
try {
Expand Down Expand Up @@ -125,7 +127,7 @@ void handle_get_segment_file(HttpRequest* req) {
LOG(WARNING) << "file not exist, file path: " << segment_file_path;
return;
}
do_file_response(segment_file_path, req);
do_file_response(segment_file_path, req, rate_limit_group);
}

void handle_get_rowset_meta(HttpRequest* req) {
Expand All @@ -149,7 +151,9 @@ void handle_get_rowset_meta(HttpRequest* req) {

} // namespace

DownloadBinlogAction::DownloadBinlogAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
DownloadBinlogAction::DownloadBinlogAction(
ExecEnv* exec_env, std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group)
: _exec_env(exec_env), _rate_limit_group(std::move(rate_limit_group)) {}

void DownloadBinlogAction::handle(HttpRequest* req) {
VLOG_CRITICAL << "accept one download binlog request " << req->debug_string();
Expand Down Expand Up @@ -178,7 +182,7 @@ void DownloadBinlogAction::handle(HttpRequest* req) {
if (method == "get_binlog_info") {
handle_get_binlog_info(req);
} else if (method == "get_segment_file") {
handle_get_segment_file(req);
handle_get_segment_file(req, _rate_limit_group.get());
} else if (method == "get_rowset_meta") {
handle_get_rowset_meta(req);
} else {
Expand Down
7 changes: 6 additions & 1 deletion be/src/http/action/download_binlog_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@

#pragma once

#include <memory>
#include <string>
#include <vector>

#include "common/status.h"
#include "http/http_handler.h"

struct bufferevent_rate_limit_group;

namespace doris {

class ExecEnv;
class HttpRequest;

class DownloadBinlogAction : public HttpHandler {
public:
DownloadBinlogAction(ExecEnv* exec_env);
DownloadBinlogAction(ExecEnv* exec_env,
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group);
virtual ~DownloadBinlogAction() = default;

void handle(HttpRequest* req) override;
Expand All @@ -40,6 +44,7 @@ class DownloadBinlogAction : public HttpHandler {

private:
ExecEnv* _exec_env;
std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
};

} // namespace doris
58 changes: 31 additions & 27 deletions be/src/http/ev_http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,17 @@ static int on_connection(struct evhttp_request* req, void* param) {
EvHttpServer::EvHttpServer(int port, int num_workers)
: _port(port), _num_workers(num_workers), _real_port(0) {
_host = BackendOptions::get_service_bind_address();

evthread_use_pthreads();
DCHECK_GT(_num_workers, 0);
_event_bases.resize(_num_workers);
for (int i = 0; i < _num_workers; ++i) {
std::shared_ptr<event_base> base(event_base_new(),
[](event_base* base) { event_base_free(base); });
CHECK(base != nullptr) << "Couldn't create an event_base.";
std::lock_guard lock(_event_bases_lock);
_event_bases[i] = base;
}
}

EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers)
Expand All @@ -107,34 +117,28 @@ void EvHttpServer::start() {
.set_min_threads(_num_workers)
.set_max_threads(_num_workers)
.build(&_workers));

evthread_use_pthreads();
_event_bases.resize(_num_workers);
for (int i = 0; i < _num_workers; ++i) {
CHECK(_workers->submit_func([this, i]() {
std::shared_ptr<event_base> base(event_base_new(), [](event_base* base) {
event_base_free(base);
});
CHECK(base != nullptr) << "Couldn't create an event_base.";
{
std::lock_guard<std::mutex> lock(_event_bases_lock);
_event_bases[i] = base;
}

/* Create a new evhttp object to handle requests. */
std::shared_ptr<evhttp> http(evhttp_new(base.get()),
[](evhttp* http) { evhttp_free(http); });
CHECK(http != nullptr) << "Couldn't create an evhttp.";

auto res = evhttp_accept_socket(http.get(), _server_fd);
CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;

evhttp_set_newreqcb(http.get(), on_connection, this);
evhttp_set_gencb(http.get(), on_request, this);

event_base_dispatch(base.get());
})
.ok());
auto status = _workers->submit_func([this, i]() {
std::shared_ptr<event_base> base;
{
std::lock_guard lock(_event_bases_lock);
base = _event_bases[i];
}

/* Create a new evhttp object to handle requests. */
std::shared_ptr<evhttp> http(evhttp_new(base.get()),
[](evhttp* http) { evhttp_free(http); });
CHECK(http != nullptr) << "Couldn't create an evhttp.";

auto res = evhttp_accept_socket(http.get(), _server_fd);
CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;

evhttp_set_newreqcb(http.get(), on_connection, this);
evhttp_set_gencb(http.get(), on_request, this);

event_base_dispatch(base.get());
});
CHECK(status.ok());
}
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/http/ev_http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class EvHttpServer {
// get real port
int get_real_port() const { return _real_port; }

std::vector<std::shared_ptr<event_base>> get_event_bases() {
std::lock_guard lock(_event_bases_lock);
return _event_bases;
}

private:
Status _bind();
HttpHandler* _find_handler(HttpRequest* req);
Expand Down
13 changes: 10 additions & 3 deletions be/src/http/http_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "http/http_channel.h"

#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/http.h>

#include <algorithm>
Expand Down Expand Up @@ -69,11 +70,17 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std:
evbuffer_free(evb);
}

void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size) {
void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size,
bufferevent_rate_limit_group* rate_limit_group) {
auto evb = evbuffer_new();
evbuffer_add_file(evb, fd, off, size);
evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK,
default_reason(HttpStatus::OK).c_str(), evb);
auto* evhttp_request = request->get_evhttp_request();
if (rate_limit_group) {
auto* evhttp_connection = evhttp_request_get_connection(evhttp_request);
auto* buffer_event = evhttp_connection_get_bufferevent(evhttp_connection);
bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
}
evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb);
evbuffer_free(evb);
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/http/http_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "http/http_status.h"

struct bufferevent_rate_limit_group;
namespace doris {

class HttpRequest;
Expand All @@ -43,7 +44,8 @@ class HttpChannel {

static void send_reply(HttpRequest* request, HttpStatus status, const std::string& content);

static void send_file(HttpRequest* request, int fd, size_t off, size_t size);
static void send_file(HttpRequest* request, int fd, size_t off, size_t size,
bufferevent_rate_limit_group* rate_limit_group = nullptr);

static bool compress_content(const std::string& accept_encoding, const std::string& input,
std::string* output);
Expand Down
5 changes: 3 additions & 2 deletions be/src/http/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ std::string get_content_type(const std::string& file_name) {
return "";
}

void do_file_response(const std::string& file_path, HttpRequest* req) {
void do_file_response(const std::string& file_path, HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group) {
if (file_path.find("..") != std::string::npos) {
LOG(WARNING) << "Not allowed to read relative path: " << file_path;
HttpChannel::send_error(req, HttpStatus::FORBIDDEN);
Expand Down Expand Up @@ -165,7 +166,7 @@ void do_file_response(const std::string& file_path, HttpRequest* req) {
return;
}

HttpChannel::send_file(req, fd, 0, file_size);
HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group);
}

void do_dir_response(const std::string& dir_path, HttpRequest* req) {
Expand Down
5 changes: 4 additions & 1 deletion be/src/http/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "common/utils.h"
#include "http/http_request.h"

struct bufferevent_rate_limit_group;

namespace doris {

struct AuthInfo;
Expand All @@ -34,7 +36,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa

bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth);

void do_file_response(const std::string& dir_path, HttpRequest* req);
void do_file_response(const std::string& dir_path, HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group = nullptr);

void do_dir_response(const std::string& dir_path, HttpRequest* req);

Expand Down
Loading

0 comments on commit a47cfb4

Please sign in to comment.