Skip to content

Commit

Permalink
Add support for listening to unix socket (#531)
Browse files Browse the repository at this point in the history
Two processes cannot bind (and listen) to the same unix socket.
That's why only one worker binds the socket.
Introduce a separate ConfigField and function to parse
unix socket permissions from config file. This number is defined
in octal form (e.g. 770 or 0777).
  • Loading branch information
torwig authored May 7, 2022
1 parent 5c2e68f commit f4c517b
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 21 deletions.
9 changes: 9 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
# bind 127.0.0.1
bind 0.0.0.0

# Unix socket.
#
# Specify the path for the unix socket that will be used to listen for
# incoming connections. There is no default, so kvrocks will not listen
# on a unix socket when not specified.
#
# unixsocket /tmp/kvrocks.sock
# unixsocketperm 777

# Accept connections on the specified port, default is 6666.
port 6666

Expand Down
6 changes: 4 additions & 2 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ Config::Config() {
{"migrate-speed", false, new IntField(&migrate_speed, 4096, 0, INT_MAX)},
{"migrate-pipeline-size", false, new IntField(&pipeline_size, 16, 1, INT_MAX)},
{"migrate-sequence-gap", false, new IntField(&sequence_gap, 10000, 1, INT_MAX)},
{"unixsocket", true, new StringField(&unixsocket, "")},
{"unixsocketperm", true, new OctalField(&unixsocketperm, 0777, 1, INT_MAX)},

/* rocksdb options */
{"rocksdb.compression", false, new EnumField(&RocksDB.compression, compression_type_enum, 0)},
Expand Down Expand Up @@ -227,9 +229,9 @@ void Config::initFieldValidator() {
return Status(Status::NotOK, "invalid range format, the range should be between 0 and 24");
}
int64_t start, stop;
Status s = Util::StringToNum(args[0], &start, 0, 24);
Status s = Util::DecimalStringToNum(args[0], &start, 0, 24);
if (!s.IsOK()) return s;
s = Util::StringToNum(args[1], &stop, 0, 24);
s = Util::DecimalStringToNum(args[1], &stop, 0, 24);
if (!s.IsOK()) return s;
if (start > stop) return Status(Status::NotOK, "invalid range format, start should be smaller than stop");
compaction_checker_range.Start = start;
Expand Down
2 changes: 2 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ struct Config{
std::string masterauth;
std::string requirepass;
std::string master_host;
std::string unixsocket;
int unixsocketperm = 0777;
int master_port = 0;
Cron compact_cron;
Cron bgsave_cron;
Expand Down
32 changes: 30 additions & 2 deletions src/config_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,35 @@ class IntField : public ConfigField {
}
Status Set(const std::string &v) override {
int64_t n;
auto s = Util::StringToNum(v, &n, min_, max_);
auto s = Util::DecimalStringToNum(v, &n, min_, max_);
if (!s.IsOK()) return s;
*receiver_ = static_cast<int>(n);
return Status::OK();
}

private:
int *receiver_;
int min_ = INT_MIN;
int max_ = INT_MAX;
};

class OctalField : public ConfigField {
public:
OctalField(int *receiver, int n, int min, int max)
: receiver_(receiver), min_(min), max_(max) {
*receiver_ = n;
}
~OctalField() override = default;
std::string ToString() override {
return std::to_string(*receiver_);
}
Status ToNumber(int64_t *n) override {
*n = *receiver_;
return Status::OK();
}
Status Set(const std::string &v) override {
int64_t n;
auto s = Util::OctalStringToNum(v, &n, min_, max_);
if (!s.IsOK()) return s;
*receiver_ = static_cast<int>(n);
return Status::OK();
Expand Down Expand Up @@ -117,7 +145,7 @@ class Int64Field : public ConfigField {
}
Status Set(const std::string &v) override {
int64_t n;
auto s = Util::StringToNum(v, &n, min_, max_);
auto s = Util::DecimalStringToNum(v, &n, min_, max_);
if (!s.IsOK()) return s;
*receiver_ = n;
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3658,7 +3658,7 @@ class CommandPerfLog : public Commander {
if (args[2] == "*") {
cnt_ = 0;
} else {
Status s = Util::StringToNum(args[2], &cnt_);
Status s = Util::DecimalStringToNum(args[2], &cnt_);
return s;
}
}
Expand Down Expand Up @@ -3694,7 +3694,7 @@ class CommandSlowlog : public Commander {
if (args[2] == "*") {
cnt_ = 0;
} else {
Status s = Util::StringToNum(args[2], &cnt_);
Status s = Util::DecimalStringToNum(args[2], &cnt_);
return s;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/scripting.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ namespace Lua {
Server *srv = conn->GetServer();
lua_State *lua = srv->Lua();

auto s = Util::StringToNum(args[2], &numkeys);
auto s = Util::DecimalStringToNum(args[2], &numkeys);
if (!s.IsOK()) {
return s;
}
Expand Down Expand Up @@ -604,7 +604,7 @@ const char *redisProtocolToLuaType_Int(lua_State *lua, const char *reply) {
const char *p = strchr(reply+1, '\r');
int64_t value;

Util::StringToNum(std::string(reply+1, p-reply-1), &value);
Util::DecimalStringToNum(std::string(reply+1, p-reply-1), &value);
lua_pushnumber(lua, static_cast<lua_Number>(value));
return p+2;
}
Expand All @@ -613,7 +613,7 @@ const char *redisProtocolToLuaType_Bulk(lua_State *lua, const char *reply) {
const char *p = strchr(reply+1, '\r');
int64_t bulklen;

Util::StringToNum(std::string(reply+1, p-reply-1), &bulklen);
Util::DecimalStringToNum(std::string(reply+1, p-reply-1), &bulklen);
if (bulklen == -1) {
lua_pushboolean(lua, 0);
return p+2;
Expand Down Expand Up @@ -648,7 +648,7 @@ const char *redisProtocolToLuaType_Aggregate(lua_State *lua, const char *reply,
int64_t mbulklen;
int j = 0;

Util::StringToNum(std::string(reply+1, p-reply-1), &mbulklen);
Util::DecimalStringToNum(std::string(reply+1, p-reply-1), &mbulklen);
p += 2;
if (mbulklen == -1) {
lua_pushboolean(lua, 0);
Expand Down
11 changes: 11 additions & 0 deletions src/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ Server::Server(Engine::Storage *storage, Config *config) :

for (int i = 0; i < config->workers; i++) {
auto worker = new Worker(this, config);
// multiple workers can't listen to the same unix socket, so
// listen unix socket only from a single worker - the first one
if (!config->unixsocket.empty() && i == 0) {
Status s = worker->ListenUnixSocket(config->unixsocket, config->unixsocketperm, config->backlog);
if (!s.IsOK()) {
LOG(ERROR) << "[server] Failed to listen on unix socket: "<< config->unixsocket
<< ", encounter error: " << s.Msg();
delete worker;
exit(1);
}
}
worker_threads_.emplace_back(new WorkerThread(worker));
}
AdjustOpenFilesLimit();
Expand Down
14 changes: 13 additions & 1 deletion src/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ int GetPeerAddr(int fd, std::string *addr, uint32_t *port) {
return -2; // only support AF_INET currently
}

Status StringToNum(const std::string &str, int64_t *n, int64_t min, int64_t max) {
Status DecimalStringToNum(const std::string &str, int64_t *n, int64_t min, int64_t max) {
try {
*n = static_cast<int64_t>(std::stoll(str));
if (max > min && (*n < min || *n > max)) {
Expand All @@ -335,6 +335,18 @@ Status StringToNum(const std::string &str, int64_t *n, int64_t min, int64_t max)
return Status::OK();
}

Status OctalStringToNum(const std::string &str, int64_t *n, int64_t min, int64_t max) {
try {
*n = static_cast<int64_t>(std::stoll(str, nullptr, 8));
if (max > min && (*n < min || *n > max)) {
return Status(Status::NotOK, "value shoud between "+std::to_string(min)+" and "+std::to_string(max));
}
} catch (std::exception &e) {
return Status(Status::NotOK, "value is not an integer or out of range");
}
return Status::OK();
}

std::string ToLower(std::string in) {
std::transform(in.begin(), in.end(), in.begin(),
[](char c) -> char { return static_cast<char>(std::tolower(c)); });
Expand Down
3 changes: 2 additions & 1 deletion src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ int GetPeerAddr(int fd, std::string *addr, uint32_t *port);
bool IsPortInUse(int port);

// string util
Status StringToNum(const std::string &str, int64_t *n, int64_t min = INT64_MIN, int64_t max = INT64_MAX);
Status DecimalStringToNum(const std::string &str, int64_t *n, int64_t min = INT64_MIN, int64_t max = INT64_MAX);
Status OctalStringToNum(const std::string &str, int64_t *n, int64_t min = INT64_MIN, int64_t max = INT64_MAX);
const std::string Float2String(double d);
std::string ToLower(std::string in);
void BytesToHuman(char *buf, size_t size, uint64_t n);
Expand Down
68 changes: 62 additions & 6 deletions src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <list>
#include <cctype>
#include <cstring>
#include <utility>
#include <algorithm>
#include <glog/logging.h>
Expand All @@ -44,7 +47,7 @@ Worker::Worker(Server *svr, Config *config, bool repl) : svr_(svr) {
int port = config->port;
auto binds = config->binds;
for (const auto &bind : binds) {
Status s = listen(bind, port, config->backlog);
Status s = listenTCP(bind, port, config->backlog);
if (!s.IsOK()) {
LOG(ERROR) << "[worker] Failed to listen on: "<< bind << ":" << port
<< ", encounter error: " << s.Msg();
Expand Down Expand Up @@ -78,8 +81,8 @@ void Worker::TimerCB(int, int16_t events, void *ctx) {
worker->KickoutIdleClients(config->timeout);
}

void Worker::newConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx) {
void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx) {
auto worker = static_cast<Worker *>(ctx);
DLOG(INFO) << "[worker] New connection: fd=" << fd
<< " from port: " << worker->svr_->GetConfig()->port << " thread #"
Expand All @@ -106,23 +109,51 @@ void Worker::newConnection(evconnlistener *listener, evutil_socket_t fd,
Redis::Connection::OnEvent, conn);
bufferevent_enable(bev, EV_READ);
Status status = worker->AddConnection(conn);
if (!status.IsOK()) {
std::string err_msg = Redis::Error("ERR " + status.Msg());
write(fd, err_msg.data(), err_msg.size());
conn->Close();
return;
}
std::string ip;
uint32_t port;
if (Util::GetPeerAddr(fd, &ip, &port) == 0) {
conn->SetAddr(ip, port);
}
if (worker->rate_limit_group_ != nullptr) {
bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
}
}

void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx) {
auto worker = static_cast<Worker *>(ctx);
DLOG(INFO) << "[worker] New connection: fd=" << fd
<< " from unixsocket: " << worker->svr_->GetConfig()->unixsocket << " thread #"
<< worker->tid_;
event_base *base = evconnlistener_get_base(listener);
auto evThreadSafeFlags = BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS;
bufferevent *bev = bufferevent_socket_new(base,
fd,
evThreadSafeFlags);
auto conn = new Redis::Connection(bev, worker);
bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite,
Redis::Connection::OnEvent, conn);
bufferevent_enable(bev, EV_READ);
Status status = worker->AddConnection(conn);
if (!status.IsOK()) {
std::string err_msg = Redis::Error("ERR " + status.Msg());
write(fd, err_msg.data(), err_msg.size());
conn->Close();
return;
}

conn->SetAddr(worker->svr_->GetConfig()->unixsocket, 0);
if (worker->rate_limit_group_ != nullptr) {
bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
}
}

Status Worker::listen(const std::string &host, int port, int backlog) {
Status Worker::listenTCP(const std::string &host, int port, int backlog) {
sockaddr_in sin{};
sin.sin_family = AF_INET;
evutil_inet_pton(AF_INET, host.data(), &(sin.sin_addr));
Expand All @@ -140,9 +171,34 @@ Status Worker::listen(const std::string &host, int port, int backlog) {
return Status(Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
}
evutil_make_socket_nonblocking(fd);
auto lev = evconnlistener_new(base_, newConnection, this,
auto lev = evconnlistener_new(base_, newTCPConnection, this,
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
return Status::OK();
}

Status Worker::ListenUnixSocket(const std::string &path, int perm, int backlog) {
unlink(path.c_str());
sockaddr_un sa{};
if (path.size() > sizeof(sa.sun_path) - 1) {
return Status(Status::NotOK, "unix socket path too long");
}
sa.sun_family = AF_LOCAL;
strncpy(sa.sun_path, path.c_str(), sizeof(sa.sun_path) - 1);
int fd = socket(AF_LOCAL, SOCK_STREAM, 0);
if (fd == -1) {
return Status(Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
}
if (bind(fd, (struct sockaddr *)&sa, sizeof(sa)) < 0) {
return Status(Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
}
evutil_make_socket_nonblocking(fd);
auto lev = evconnlistener_new(base_, newUnixSocketConnection, this,
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
if (perm != 0) {
chmod(sa.sun_path, (mode_t)perm);
}
return Status::OK();
}

Expand Down
10 changes: 7 additions & 3 deletions src/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,16 @@ class Worker {
uint64_t type, bool skipme, int64_t *killed);
void KickoutIdleClients(int timeout);

Status ListenUnixSocket(const std::string &path, int perm, int backlog);

Server *svr_;

private:
Status listen(const std::string &host, int port, int backlog);
static void newConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx);
Status listenTCP(const std::string &host, int port, int backlog);
static void newTCPConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx);
static void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx);
static void TimerCB(int, int16_t events, void *ctx);
Redis::Connection *removeConnection(int fd);

Expand Down

0 comments on commit f4c517b

Please sign in to comment.