diff --git a/CMakeLists.txt b/CMakeLists.txt index ce81aead516..bd4b65c2c37 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -158,7 +158,8 @@ target_sources(kvrocks PRIVATE src/scripting.h src/sha1.cc src/sha1.h - ) + src/rand.cc + src/rand.h) # kvrocks2redis sync tool add_executable(kvrocks2redis) @@ -259,6 +260,8 @@ target_sources(kvrocks2redis PRIVATE src/compaction_checker.h src/scripting.cc src/scripting.h + src/rand.cc + src/rand.h tools/kvrocks2redis/config.cc tools/kvrocks2redis/config.h tools/kvrocks2redis/main.cc @@ -352,7 +355,9 @@ add_executable(unittest tests/log_collector_test.cc src/config_type.h src/sha1.cc - src/sha1.h) + src/sha1.h + src/rand.cc + src/rand.h) add_dependencies(unittest glog rocksdb snappy jemalloc lua) target_compile_features(unittest PRIVATE cxx_std_11) diff --git a/docs/support-commands.md b/docs/support-commands.md index 8b9442c8c8f..3732d8cc151 100644 --- a/docs/support-commands.md +++ b/docs/support-commands.md @@ -148,6 +148,13 @@ **NOTE : String and Bitmap is different type in kvrocks, so you can't do bit with string, vice versa.** +## Script Commands + +| Command | Supported OR Not | Desc | +| --------- | ---------------- | ---- | +| eval | √ | | +| evalsha | √ | | +| script | √ | script kill and debug subcommand are not supported | ## Pub/Sub Commands diff --git a/src/Makefile b/src/Makefile index 842f06c2d84..d62fa47aa15 100644 --- a/src/Makefile +++ b/src/Makefile @@ -40,7 +40,7 @@ SHARED_OBJS= cluster.o compact_filter.o config.o cron.o encoding.o event_listene redis_hash.o redis_list.o redis_metadata.o redis_slot.o redis_pubsub.o redis_reply.o \ redis_request.o redis_set.o redis_string.o redis_zset.o redis_geo.o replication.o \ server.o stats.o storage.o task_runner.o util.o geohash.o worker.o redis_sortedint.o \ - compaction_checker.o table_properties_collector.o scripting.o sha1.o + compaction_checker.o table_properties_collector.o scripting.o sha1.o rand.o KVROCKS_OBJS= $(SHARED_OBJS) main.o UNITTEST_OBJS= $(SHARED_OBJS) ../tests/main.o ../tests/t_metadata_test.o ../tests/compact_test.o \ diff --git a/src/compact_filter.h b/src/compact_filter.h index 1134fa6ee9c..a81d71ed53b 100644 --- a/src/compact_filter.h +++ b/src/compact_filter.h @@ -71,6 +71,27 @@ class SubKeyFilterFactory : public rocksdb::CompactionFilterFactory { Engine::Storage *stor_ = nullptr; }; +class PropagateFilter : public rocksdb::CompactionFilter { + public: + const char *Name() const override { return "PropagateFilter"; } + bool Filter(int level, const Slice &key, const Slice &value, + std::string *new_value, bool *modified) const override { + // We propagate Lua commands which don't store data, + // just in order to implement updating Lua state. + return key == Engine::kPropagateScriptCommand; + } +}; + +class PropagateFilterFactory : public rocksdb::CompactionFilterFactory { + public: + PropagateFilterFactory() = default; + const char *Name() const override { return "PropagateFilterFactory"; } + std::unique_ptr CreateCompactionFilter( + const rocksdb::CompactionFilter::Context &context) override { + return std::unique_ptr(new PropagateFilter()); + } +}; + class PubSubFilter : public rocksdb::CompactionFilter { public: const char *Name() const override { return "PubSubFilter"; } diff --git a/src/compaction_checker.cc b/src/compaction_checker.cc index 1112cbf6d8d..3f0e6ca39c7 100644 --- a/src/compaction_checker.cc +++ b/src/compaction_checker.cc @@ -2,10 +2,10 @@ #include #include "storage.h" -void CompactionChecker::CompactPubsubFiles() { +void CompactionChecker::CompactPropagateAndPubSubFiles() { rocksdb::CompactRangeOptions compact_opts; compact_opts.change_level = true; - std::vector cf_names = {Engine::kPubSubColumnFamilyName}; + std::vector cf_names = {Engine::kPubSubColumnFamilyName, Engine::kPropagateColumnFamilyName}; for (const auto &cf_name : cf_names) { LOG(INFO) << "[compaction checker] Start the compact the column family: " << cf_name; auto cf_handle = storage_->GetCFHandle(cf_name); diff --git a/src/compaction_checker.h b/src/compaction_checker.h index 6ccb86d1ce1..0f81c8047b3 100644 --- a/src/compaction_checker.h +++ b/src/compaction_checker.h @@ -10,7 +10,7 @@ class CompactionChecker { explicit CompactionChecker(Engine::Storage *storage):storage_(storage) {} ~CompactionChecker() {} void PickCompactionFiles(const std::string &cf_name); - void CompactPubsubFiles(); + void CompactPropagateAndPubSubFiles(); private: Engine::Storage *storage_ = nullptr; }; diff --git a/src/rand.cc b/src/rand.cc new file mode 100644 index 00000000000..3ec6603e78f --- /dev/null +++ b/src/rand.cc @@ -0,0 +1,95 @@ +/* Pseudo random number generation functions derived from the drand48() + * function obtained from pysam source code. + * + * This functions are used in order to replace the default math.random() + * Lua implementation with something having exactly the same behavior + * across different systems (by default Lua uses libc's rand() that is not + * required to implement a specific PRNG generating the same sequence + * in different systems if seeded with the same integer). + * + * The original code appears to be under the public domain. + * I modified it removing the non needed functions and all the + * 1960-style C coding stuff... + * + * ---------------------------------------------------------------------------- + * + * Copyright (c) 2010-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rand.h" + +#include + +#define N 16 +#define MASK ((1 << (N - 1)) + (1 << (N - 1)) - 1) +#define LOW(x) ((unsigned)(x) & MASK) +#define HIGH(x) LOW((x) >> N) +#define MUL(x, y, z) { int32_t l = (int64_t)(x) * (int64_t)(y); \ + (z)[0] = LOW(l); (z)[1] = HIGH(l); } +#define CARRY(x, y) ((int32_t)(x) + (int64_t)(y) > MASK) +#define ADDEQU(x, y, z) (z = CARRY(x, (y)), x = LOW(x + (y))) +#define X0 0x330E +#define X1 0xABCD +#define X2 0x1234 +#define A0 0xE66D +#define A1 0xDEEC +#define A2 0x5 +#define C 0xB +#define SET3(x, x0, x1, x2) ((x)[0] = (x0), (x)[1] = (x1), (x)[2] = (x2)) +#define SETLOW(x, y, n) SET3(x, LOW((y)[n]), LOW((y)[(n)+1]), LOW((y)[(n)+2])) +#define SEED(x0, x1, x2) (SET3(x, x0, x1, x2), SET3(a, A0, A1, A2), c = C) +#define REST(v) for (i = 0; i < 3; i++) { xsubi[i] = x[i]; x[i] = temp[i]; } \ + return (v); +#define HI_BIT (1L << (2 * N - 1)) + +static uint32_t x[3] = {X0, X1, X2}, a[3] = {A0, A1, A2}, c = C; +static void next(void); + +int32_t redisLrand48() { + next(); + return (((int32_t) x[2] << (N - 1)) + (x[1] >> 1)); +} + +void redisSrand48(int32_t seedval) { + SEED(X0, LOW(seedval), HIGH(seedval)); +} + +static void next(void) { + uint32_t p[2], q[2], r[2], carry0, carry1; + + MUL(a[0], x[0], p); + ADDEQU(p[0], c, carry0); + ADDEQU(p[1], carry0, carry1); + MUL(a[0], x[1], q); + ADDEQU(p[1], q[0], carry0); + MUL(a[1], x[0], r); + x[2] = LOW(carry0 + carry1 + CARRY(p[1], r[0]) + q[1] + r[1] + + a[0] * x[2] + a[1] * x[1] + a[2] * x[0]); + x[1] = LOW(p[1] + r[0]); + x[0] = LOW(p[0]); +} diff --git a/src/rand.h b/src/rand.h new file mode 100644 index 00000000000..d3b97847803 --- /dev/null +++ b/src/rand.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#pragma once + +#include + +int32_t redisLrand48(); +void redisSrand48(int32_t seedval); + +#define REDIS_LRAND48_MAX INT32_MAX diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc index b0a0b1065e0..b5e936f5c75 100644 --- a/src/redis_cmd.cc +++ b/src/redis_cmd.cc @@ -4292,15 +4292,66 @@ class CommandEval : public Commander { } Status Execute(Server *svr, Connection *conn, std::string *output) override { - char funcname[43]; + return Lua::evalGenericCommand(conn, args_, false, output); + } +}; - /* We obtain the script SHA1, then check if this function is already - * defined into the Lua state */ - funcname[0] = 'f'; - funcname[1] = '_'; +class CommandEvalSHA : public Commander { + public: + Status Parse(const std::vector &args) override { + if (args[1].size() != 40) { + return Status(Status::NotOK, "NOSCRIPT No matching script. Please use EVAL"); + } + return Status::OK(); + } - return Lua::evalGenericCommand(conn, args_, false, output); + Status Execute(Server *svr, Connection *conn, std::string *output) override { + return Lua::evalGenericCommand(conn, args_, true, output); + } +}; + +class CommandScript : public Commander { + public: + Status Parse(const std::vector &args) override { + subcommand_ = Util::ToLower(args[1]); + return Status::OK(); } + + Status Execute(Server *svr, Connection *conn, std::string *output) override { + // There's a little tricky here since the script command was the write type + // command but some subcommands like `exists` were readonly, so we want to allow + // executing on slave here. Maybe we should find other way to do this. + if (svr->IsSlave() && subcommand_ != "exists") { + return Status(Status::NotOK, "READONLY You can't write against a read only slave"); + } + if (args_.size() == 2 && subcommand_ == "flush") { + svr->ScriptFlush(); + svr->Propagate(Engine::kPropagateScriptCommand, args_); + *output = Redis::SimpleString("OK"); + } else if (args_.size() >= 2 && subcommand_ == "exists") { + *output = Redis::MultiLen(args_.size()-2); + for (size_t j = 2; j < args_.size(); j++) { + if (svr->ScriptExists(args_[j]).IsOK()) { + *output += Redis::Integer(1); + } else { + *output += Redis::Integer(0); + } + } + } else if (args_.size() == 3 && subcommand_ == "load") { + std::string sha; + auto s = Lua::createFunction(svr, args_[2], &sha); + if (!s.IsOK()) { + return s; + } + *output = Redis::SimpleString(sha); + } else { + return Status(Status::NotOK, "Unknown SCRIPT subcommand or wrong # of args"); + } + return Status::OK(); + } + + private: + std::string subcommand_; }; #define ADD_CMD(name, arity, description , first_key, last_key, key_step, fn) \ @@ -4473,6 +4524,8 @@ CommandAttributes redisCommandTable[] = { ADD_CMD("clusterx", -2, "cluster no-script", 0, 0, 0, CommandClusterX), ADD_CMD("eval", -3, "exclusive write no-script", 0, 0, 0, CommandEval), + ADD_CMD("evalsha", -3, "exclusive write no-script", 0, 0, 0, CommandEvalSHA), + ADD_CMD("script", -2, "exclusive no-script", 0, 0, 0, CommandScript), ADD_CMD("compact", 1, "read-only no-script", 0, 0, 0, CommandCompact), ADD_CMD("bgsave", 1, "read-only no-script", 0, 0, 0, CommandBGSave), diff --git a/src/redis_connection.cc b/src/redis_connection.cc index 98f2a79e187..3b99f00d64a 100644 --- a/src/redis_connection.cc +++ b/src/redis_connection.cc @@ -357,7 +357,7 @@ void Connection::ExecuteCommands(const std::vector &to_pro s = current_cmd_->Parse(cmd_tokens); if (!s.IsOK()) { if (IsFlagEnabled(Connection::kMultiExec)) multi_error_ = true; - Reply(Redis::Error(s.Msg())); + Reply(Redis::Error("ERR "+s.Msg())); continue; } diff --git a/src/replication.cc b/src/replication.cc index 073ea4836fe..125e2110bab 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -917,11 +917,20 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri status = write_batch.Iterate(&write_batch_handler); if (!status.ok()) return status; - if (write_batch_handler.IsPublish()) { - srv_->PublishMessage(write_batch_handler.GetPublishChannel().ToString(), - write_batch_handler.GetPublishValue().ToString()); + switch (write_batch_handler.Type()) { + case kBatchTypePublish: + srv_->PublishMessage(write_batch_handler.Key(), write_batch_handler.Value()); + break; + case kBatchTypePropagate: + if (write_batch_handler.Key() == Engine::kPropagateScriptCommand) { + std::vector tokens; + Util::TokenizeRedisProtocol(write_batch_handler.Value(), &tokens); + if (!tokens.empty()) { + srv_->ExecPropagatedCommand(tokens); + } + } + break; } - return rocksdb::Status::OK(); } @@ -931,11 +940,14 @@ bool ReplicationThread::isRestoringError(const char *err) { rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const rocksdb::Slice &key, const rocksdb::Slice &value) { - if (column_family_id != kColumnFamilyIDPubSub) { + if (column_family_id == kColumnFamilyIDPubSub) { + type_ = kBatchTypePublish; + kv_ = std::make_pair(key.ToString(), value.ToString()); + return rocksdb::Status::OK(); + } else if (column_family_id == kColumnFamilyIDPropagate) { + type_ = kBatchTypePropagate; + kv_ = std::make_pair(key.ToString(), value.ToString()); return rocksdb::Status::OK(); } - - publish_message_ = std::make_pair(key.ToString(), value.ToString()); - is_publish_ = true; return rocksdb::Status::OK(); } diff --git a/src/replication.h b/src/replication.h index 6fd95c750d0..51ed9be9078 100644 --- a/src/replication.h +++ b/src/replication.h @@ -27,6 +27,11 @@ enum ReplState { kReplError, }; +enum WriteBatchType { + kBatchTypePublish = 1, + kBatchTypePropagate, +}; + typedef std::function fetch_file_callback; class FeedSlaveThread { @@ -187,11 +192,11 @@ class WriteBatchHandler : public rocksdb::WriteBatch::Handler { public: rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice &key, const rocksdb::Slice &value) override; + WriteBatchType Type() { return type_; } + std::string Key() const { return kv_.first; } + std::string Value() const { return kv_.second; } - rocksdb::Slice GetPublishChannel() { return publish_message_.first; } - rocksdb::Slice GetPublishValue() { return publish_message_.second; } - bool IsPublish() { return is_publish_; } private: - std::pair publish_message_; - bool is_publish_ = false; + std::pair kv_; + WriteBatchType type_; }; diff --git a/src/scripting.cc b/src/scripting.cc index 901b2c9c2c4..3e94a6cb84c 100644 --- a/src/scripting.cc +++ b/src/scripting.cc @@ -4,6 +4,7 @@ // #include "scripting.h" +#include #include #include "util.h" @@ -11,6 +12,7 @@ #include "server.h" #include "redis_connection.h" #include "redis_cmd.h" +#include "rand.h" /* The maximum number of characters needed to represent a long double * as a string (long double has a huge range). @@ -30,9 +32,14 @@ namespace Lua { loadLibraries(lua); removeUnsupportedFunctions(lua); loadFuncs(lua); + enableGlobalsProtection(lua); return lua; } + void DestroyState(lua_State *lua) { + lua_close(lua); + } + void loadFuncs(lua_State *lua) { lua_newtable(lua); @@ -63,12 +70,21 @@ namespace Lua { /* Replace math.random and math.randomseed with our implementations. */ lua_getglobal(lua, "math"); + + lua_pushstring(lua, "random"); + lua_pushcfunction(lua, redisMathRandom); + lua_settable(lua, -3); + + lua_pushstring(lua, "randomseed"); + lua_pushcfunction(lua, redisMathRandomSeed); + lua_settable(lua, -3); + lua_setglobal(lua, "math"); - /* Add a helper function we use for pcall error reporting. - * Note that when the error is in the C function we want to report the - * information about the caller, that's what makes sense from the point - * of view of the user debugging a script. */ + /* Add a helper function we use for pcall error reporting. + * Note that when the error is in the C function we want to report the + * information about the caller, that's what makes sense from the point + * of view of the user debugging a script. */ { const char *err_func = "local dbg = debug\n" "function __redis__err__handler(err)\n" @@ -85,6 +101,15 @@ namespace Lua { luaL_loadbuffer(lua, err_func, strlen(err_func), "@err_handler_def"); lua_pcall(lua, 0, 0, 0); } + { + const char *compare_func = "function __redis__compare_helper(a,b)\n" + " if a == false then a = '' end\n" + " if b == false then b = '' end\n" + " return aGetServer()->Lua(); + Server *srv = conn->GetServer(); + lua_State *lua = srv->Lua(); auto s = Util::StringToNum(args[2], &numkeys); if (!s.IsOK()) { @@ -111,6 +137,12 @@ namespace Lua { funcname[1] = '_'; if (!evalsha) { SHA1Hex(funcname+2, args[1].c_str(), args[1].size()); + } else { + for (int j = 0; j < 40; j++) { + std::string sha = args[1]; + funcname[j+2] = (sha[j] >= 'A' && sha[j] <= 'Z') ? sha[j]+('a'-'A') : sha[j]; + } + funcname[42] = '\0'; } /* Push the pcall error handler function on the stack. */ @@ -120,31 +152,33 @@ namespace Lua { lua_getglobal(lua, funcname); if (lua_isnil(lua, -1)) { lua_pop(lua, 1); /* remove the nil from the stack */ - /* Function not defined... let's define it if we have the - * body of the function. If this is an EVALSHA call we can just - * return an error. */ + std::string body; if (evalsha) { - lua_pop(lua, 1); /* remove the error handler from the stack. */ - return Status(Status::NotOK, "NOSCRIPT No matching script. Please use EVAL"); + auto s = srv->ScriptGet(funcname+2, &body); + if (!s.IsOK()) { + lua_pop(lua, 1); /* remove the error handler from the stack. */ + return Status(Status::NotOK, "NOSCRIPT No matching script. Please use EVAL"); + } + } else { + body = args[1]; } std::string sha; - s = createFunction(lua, args[1], &sha); + s = createFunction(srv, body, &sha); if (!s.IsOK()) { lua_pop(lua, 1); /* remove the error handler from the stack. */ - /* The error is sent to the client by luaCreateFunction() - * itself when it returns NULL. */ return s; } /* Now the following is guaranteed to return non nil */ lua_getglobal(lua, funcname); } + /* Populate the argv and keys table accordingly to the arguments that * EVAL received. */ setGlobalArray(lua, "KEYS", std::vector(args.begin()+3, args.begin()+3+numkeys)); setGlobalArray(lua, "ARGV", std::vector(args.begin()+3+numkeys, args.end())); int err = lua_pcall(lua, 0, 1, -2); if (err) { - std::string msg = std::string("Error running script (call to ") + funcname + "): "+ lua_tostring(lua, -1); + std::string msg = std::string("ERR running script (call to ") + funcname + "): "+ lua_tostring(lua, -1); *output = Redis::Error(msg); lua_pop(lua, 2); } else { @@ -193,9 +227,11 @@ namespace Lua { snprintf(dbuf, sizeof(dbuf), "%.17g", static_cast(num)); args.emplace_back(dbuf); } else { - const char *str = lua_tostring(lua, j+1); - if (str == nullptr) break; /* no a string */ - args.emplace_back(str); + const char *obj_s; + size_t obj_len; + obj_s = lua_tolstring(lua, j+1, &obj_len); + if (obj_s == nullptr) break; /* no a string */ + args.emplace_back(std::string(obj_s, obj_len)); } } if (j != argc) { @@ -248,7 +284,7 @@ namespace Lua { } auto s = cmd->Parse(args); if (!s.IsOK()) { - pushError(lua, s.Msg().c_str()); + pushError(lua, s.Msg().data()); return raise_error ? raiseError(lua) : 1; } srv->stats_.IncrCalls(cmd_name); @@ -276,6 +312,37 @@ namespace Lua { lua_setglobal(lua, "dofile"); } + void enableGlobalsProtection(lua_State *lua) { + const char *s[32]; + int j = 0; + + s[j++] = "local dbg=debug\n"; + s[j++] = "local mt = {}\n"; + s[j++] = "setmetatable(_G, mt)\n"; + s[j++] = "mt.__newindex = function (t, n, v)\n"; + s[j++] = " if dbg.getinfo(2) then\n"; + s[j++] = " local w = dbg.getinfo(2, \"S\").what\n"; + s[j++] = " if w ~= \"main\" and w ~= \"C\" then\n"; + s[j++] = " error(\"Script attempted to create global variable '\"..tostring(n)..\"'\", 2)\n"; + s[j++] = " end\n"; + s[j++] = " end\n"; + s[j++] = " rawset(t, n, v)\n"; + s[j++] = "end\n"; + s[j++] = "mt.__index = function (t, n)\n"; + s[j++] = " if dbg.getinfo(2) and dbg.getinfo(2, \"S\").what ~= \"C\" then\n"; + s[j++] = " error(\"Script attempted to access nonexistent global variable '\"..tostring(n)..\"'\", 2)\n"; + s[j++] = " end\n"; + s[j++] = " return rawget(t, n)\n"; + s[j++] = "end\n"; + s[j++] = "debug = nil\n"; + s[j++] = nullptr; + + std::string code; + for (j = 0; s[j] != nullptr; j++) code += s[j]; + luaL_loadbuffer(lua, code.c_str(), code.size(), "@enable_strict_lua"); + lua_pcall(lua, 0, 0, 0); + } + void loadLibraries(lua_State *lua) { auto loadLib = [] (lua_State *lua, const char *libname, lua_CFunction func) { lua_pushcfunction(lua, func); @@ -519,10 +586,14 @@ void pushError(lua_State *lua, const char *err) { std::string replyToRedisReply(lua_State *lua) { std::string output; + const char *obj_s = nullptr; + size_t obj_len; + int t = lua_type(lua, -1); switch (t) { case LUA_TSTRING: - output = Redis::BulkString(std::string(lua_tostring(lua, -1), lua_strlen(lua, -1))); + obj_s = lua_tolstring(lua, -1, &obj_len); + output = Redis::BulkString(std::string(obj_s, obj_len)); break; case LUA_TBOOLEAN: output = lua_toboolean(lua, -1) ? Redis::Integer(1) : Redis::NilString(); @@ -551,7 +622,8 @@ std::string replyToRedisReply(lua_State *lua) { lua_gettable(lua, -2); t = lua_type(lua, -1); if (t == LUA_TSTRING) { - output = Redis::SimpleString(lua_tostring(lua, -1)); + obj_s = lua_tolstring(lua, -1, &obj_len); + output = Redis::BulkString(std::string(obj_s, obj_len)); lua_pop(lua, 1); return output; } else { @@ -628,7 +700,48 @@ void setGlobalArray(lua_State *lua, const std::string &var, const std::vectorLua(); if (luaL_loadbuffer(lua, funcdef.c_str(), funcdef.size(), "@user_script")) { std::string errMsg = lua_tostring(lua, -1); lua_pop(lua, 1); @@ -676,7 +791,8 @@ Status createFunction(lua_State *lua, const std::string &body, std::string *sha) return Status(Status::NotOK, "Error running script (new function): " + errMsg + "\n"); } - *sha = funcdef.substr(2); + // would store lua function into propagate column family and propagate those scripts to slaves + srv->ScriptSet(*sha, body); return Status::OK(); } diff --git a/src/scripting.h b/src/scripting.h index 2e0557703a0..76e29a1fa33 100644 --- a/src/scripting.h +++ b/src/scripting.h @@ -16,17 +16,19 @@ extern "C" { namespace Lua { lua_State* CreateState(); +void DestroyState(lua_State *lua); void loadFuncs(lua_State *lua); void loadLibraries(lua_State *lua); void removeUnsupportedFunctions(lua_State *lua); +void enableGlobalsProtection(lua_State *lua); int redisCallCommand(lua_State *lua); int redisPCallCommand(lua_State *lua); int redisGenericCommand(lua_State *lua, int raise_error); int redisSha1hexCommand(lua_State *lua); int redisStatusReplyCommand(lua_State *lua); int redisErrorReplyCommand(lua_State *lua); -Status createFunction(lua_State *lua, const std::string &body, std::string *sha); +Status createFunction(Server *srv, const std::string &body, std::string *sha); Status evalGenericCommand(Redis::Connection *conn, const std::vector &args, bool evalsha, @@ -48,4 +50,7 @@ void sortArray(lua_State *lua); void setGlobalArray(lua_State *lua, const std::string &var, const std::vector &elems); void SHA1Hex(char *digest, const char *script, size_t len); + +int redisMathRandom(lua_State *L); +int redisMathRandomSeed(lua_State *L); } // namespace Lua diff --git a/src/server.cc b/src/server.cc index 5123c3d570e..2a5ca3aa390 100644 --- a/src/server.cc +++ b/src/server.cc @@ -112,7 +112,7 @@ Status Server::Start() { // compact once per day if (now != 0 && last_compact_date != now/86400) { last_compact_date = now/86400; - compaction_checker.CompactPubsubFiles(); + compaction_checker.CompactPropagateAndPubSubFiles(); } } } @@ -1222,3 +1222,76 @@ Status Server::LookupAndCreateCommand(const std::string &cmd_name, return Status::OK(); } +Status Server::ScriptExists(const std::string &sha) { + std::string body; + return ScriptGet(sha, &body); +} + +Status Server::ScriptGet(const std::string &sha, std::string *body) { + std::string funcname = Engine::kLuaFunctionPrefix + sha; + auto cf = storage_->GetCFHandle(Engine::kPropagateColumnFamilyName); + auto s = storage_->GetDB()->Get(rocksdb::ReadOptions(), cf, funcname, body); + if (!s.ok()) { + if (s.IsNotFound()) return Status(Status::NotFound); + return Status(Status::NotOK, s.ToString()); + } + return Status::OK(); +} + +void Server::ScriptSet(const std::string &sha, const std::string &body) { + std::string funcname = Engine::kLuaFunctionPrefix + sha; + WriteToPropagateCF(funcname, body); +} + +void Server::ScriptReset() { + Lua::DestroyState(lua_); + lua_ = Lua::CreateState(); +} + +void Server::ScriptFlush() { + auto cf = storage_->GetCFHandle(Engine::kPropagateColumnFamilyName); + storage_->FlushScripts(rocksdb::WriteOptions(), cf); + ScriptReset(); +} + +Status Server::WriteToPropagateCF(const std::string &key, const std::string &value) const { + rocksdb::WriteBatch batch; + auto propagateCf = storage_->GetCFHandle(Engine::kPropagateColumnFamilyName); + batch.Put(propagateCf, key, value); + auto s = storage_->Write(rocksdb::WriteOptions(), &batch); + if (!s.ok()) { + return Status(Status::NotOK, s.ToString()); + } + return Status::OK(); +} + +// Generally, we store data into rocksdb and just replicate WAL instead of propagating +// commands. But sometimes, we need to update inner states or do special operations +// for specific commands, such as `script flush`. +// channel: we put the same function commands into one channel to handle uniformly +// tokens: the serialized commands +Status Server::Propagate(const std::string &channel, const std::vector &tokens) { + std::string value = Redis::MultiLen(tokens.size()); + for (const auto &iter : tokens) { + value += Redis::BulkString(iter); + } + return WriteToPropagateCF(channel, value); +} + +Status Server::ExecPropagateScriptCommand(const std::vector &tokens) { + auto subcommand = Util::ToLower(tokens[1]); + if (subcommand == "flush") { + ScriptReset(); + } + return Status::OK(); +} + +Status Server::ExecPropagatedCommand(const std::vector &tokens) { + if (tokens.empty()) return Status::OK(); + + auto command = Util::ToLower(tokens[0]); + if (command == "script" && tokens.size() >= 2) { + return ExecPropagateScriptCommand(tokens); + } + return Status::OK(); +} diff --git a/src/server.h b/src/server.h index dc321b1e78b..9cbc2cb75b0 100644 --- a/src/server.h +++ b/src/server.h @@ -8,6 +8,7 @@ #include #include #include +#include extern "C" { #include @@ -126,6 +127,17 @@ class Server { void SetReplicationRateLimit(uint64_t max_replication_mb); lua_State *Lua() { return lua_; } + Status ScriptExists(const std::string &sha); + Status ScriptGet(const std::string &sha, std::string *body); + void ScriptSet(const std::string &sha, const std::string &body); + void ScriptReset(); + void ScriptFlush(); + + Status WriteToPropagateCF(const std::string &key, const std::string &value) const; + Status Propagate(const std::string &channel, const std::vector &tokens); + Status ExecPropagatedCommand(const std::vector &tokens); + Status ExecPropagateScriptCommand(const std::vector &tokens); + void SetCurrentConnection(Redis::Connection *conn) { curr_connection_ = conn; } Redis::Connection *GetCurrentConnection() { return curr_connection_; } @@ -158,6 +170,7 @@ class Server { std::mutex last_random_key_cursor_mu_; lua_State *lua_; + Redis::Connection *curr_connection_ = nullptr; // client counters diff --git a/src/storage.cc b/src/storage.cc index f5b1157bd6a..011dedac7c1 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -30,6 +30,11 @@ const char *kPubSubColumnFamilyName = "pubsub"; const char *kZSetScoreColumnFamilyName = "zset_score"; const char *kMetadataColumnFamilyName = "metadata"; const char *kSubkeyColumnFamilyName = "default"; +const char *kPropagateColumnFamilyName = "propagate"; + +const char *kPropagateScriptCommand = "script"; + +const char *kLuaFunctionPrefix = "lua_f_"; const uint64_t kIORateLimitMaxMb = 1024000; @@ -129,7 +134,8 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options &options) { if (s.ok()) { std::vector cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, - kPubSubColumnFamilyName}; + kPubSubColumnFamilyName, + kPropagateColumnFamilyName}; std::vector cf_handles; s = tmp_db->CreateColumnFamilies(cf_options, cf_names, &cf_handles); if (!s.ok()) { @@ -201,12 +207,21 @@ Status Storage::Open(bool read_only) { pubsub_opts.compaction_filter_factory = std::make_shared(); pubsub_opts.disable_auto_compactions = config_->RocksDB.disable_auto_compactions; + rocksdb::BlockBasedTableOptions propagate_table_opts; + propagate_table_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, true)); + propagate_table_opts.block_size = block_size; + rocksdb::ColumnFamilyOptions propagate_opts(options); + propagate_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(propagate_table_opts)); + propagate_opts.compaction_filter_factory = std::make_shared(); + propagate_opts.disable_auto_compactions = config_->RocksDB.disable_auto_compactions; + std::vector column_families; // Caution: don't change the order of column family, or the handle will be mismatched column_families.emplace_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, subkey_opts)); column_families.emplace_back(rocksdb::ColumnFamilyDescriptor(kMetadataColumnFamilyName, metadata_opts)); column_families.emplace_back(rocksdb::ColumnFamilyDescriptor(kZSetScoreColumnFamilyName, subkey_opts)); column_families.emplace_back(rocksdb::ColumnFamilyDescriptor(kPubSubColumnFamilyName, pubsub_opts)); + column_families.emplace_back(rocksdb::ColumnFamilyDescriptor(kPropagateColumnFamilyName, propagate_opts)); std::vector old_column_families; auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, &old_column_families); if (!s.ok()) return Status(Status::NotOK, s.ToString()); @@ -440,6 +455,14 @@ rocksdb::Status Storage::DeleteRange(const std::string &first_key, const std::st return rocksdb::Status::OK(); } +rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle) { + std::string begin_key = kLuaFunctionPrefix, end_key = begin_key; + // we need to increase one here since the DeleteRange api + // didn't contain the end key. + end_key[end_key.size()-1] += 1; + return db_->DeleteRange(options, cf_handle, begin_key, end_key); +} + Status Storage::WriteBatch(std::string &&raw_batch) { if (reach_db_size_limit_) { return Status(Status::NotOK, "reach space limit"); @@ -459,6 +482,8 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) { return cf_handles_[2]; } else if (name == kPubSubColumnFamilyName) { return cf_handles_[3]; + } else if (name == kPropagateColumnFamilyName) { + return cf_handles_[4]; } return cf_handles_[0]; } @@ -485,7 +510,10 @@ uint64_t Storage::GetTotalSize(const std::string &ns) { uint8_t include_both = rocksdb::DB::SizeApproximationFlags::INCLUDE_FILES | rocksdb::DB::SizeApproximationFlags::INCLUDE_MEMTABLES; for (auto cf_handle : cf_handles_) { - if (cf_handle == GetCFHandle(kPubSubColumnFamilyName)) continue; + if (cf_handle == GetCFHandle(kPubSubColumnFamilyName) || + cf_handle == GetCFHandle(kPropagateColumnFamilyName)) { + continue; + } auto s = db.FindKeyRangeWithPrefix(prefix, &begin_key, &end_key, cf_handle); if (!s.ok()) continue; diff --git a/src/storage.h b/src/storage.h index 2b8839a2750..83f549fee6a 100644 --- a/src/storage.h +++ b/src/storage.h @@ -21,6 +21,7 @@ enum ColumnFamilyID{ kColumnFamilyIDMetadata, kColumnFamilyIDZSetScore, kColumnFamilyIDPubSub, + kColumnFamilyIDPropagate, }; namespace Engine { @@ -28,6 +29,11 @@ extern const char *kPubSubColumnFamilyName; extern const char *kZSetScoreColumnFamilyName; extern const char *kMetadataColumnFamilyName; extern const char *kSubkeyColumnFamilyName; +extern const char *kPropagateColumnFamilyName; + +extern const char *kPropagateScriptCommand; + +extern const char *kLuaFunctionPrefix; class Storage { public: @@ -57,6 +63,7 @@ class Storage { rocksdb::ColumnFamilyHandle *cf_handle, const rocksdb::Slice &key); rocksdb::Status DeleteRange(const std::string &first_key, const std::string &last_key); + rocksdb::Status FlushScripts(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle); bool WALHasNewData(rocksdb::SequenceNumber seq) { return seq <= LatestSeq(); } rocksdb::Status Compact(const rocksdb::Slice *begin, const rocksdb::Slice *end); diff --git a/src/util.cc b/src/util.cc index 387f3222927..5182b046f0e 100644 --- a/src/util.cc +++ b/src/util.cc @@ -525,6 +525,63 @@ void BytesToHuman(char *buf, size_t size, uint64_t n) { } } +void TokenizeRedisProtocol(const std::string &value, std::vector *tokens) { + tokens->clear(); + + if (value.empty()) { + return; + } + + enum ParserState { stateArrayLen, stateBulkLen, stateBulkData }; + uint64_t array_len = 0, bulk_len = 0; + int state = stateArrayLen; + const char *start = value.data(), *end = start + value.size(), *p; + while (start != end) { + switch (state) { + case stateArrayLen: + if (start[0] != '*') { + return; + } + p = strchr(start, '\r'); + if (!p || (p == end) || p[1] != '\n') { + tokens->clear(); + return; + } + array_len = std::stoull(std::string(start+1, p)); + start = p + 2; + state = stateBulkLen; + break; + + case stateBulkLen: + if (start[0] != '$') { + return; + } + p = strchr(start, '\r'); + if (!p || (p == end) || p[1] != '\n') { + tokens->clear(); + return; + } + bulk_len = std::stoull(std::string(start+1, p)); + start = p + 2; + state = stateBulkData; + break; + + case stateBulkData: + if (bulk_len+2 > static_cast(end-start)) { + tokens->clear(); + return; + } + tokens->emplace_back(std::string(start, start+bulk_len)); + start += bulk_len + 2; + state = stateBulkLen; + break; + } + } + if (array_len != tokens->size()) { + tokens->clear(); + } +} + bool IsPortInUse(int port) { int fd; Status s = SockConnect("0.0.0.0", static_cast(port), &fd); diff --git a/src/util.h b/src/util.h index 0875a905766..cefc80be60c 100644 --- a/src/util.h +++ b/src/util.h @@ -61,6 +61,7 @@ bool HasPrefix(const std::string &str, const std::string &prefix); int StringMatch(const std::string &pattern, const std::string &in, int nocase); int StringMatchLen(const char *p, int plen, const char *s, int slen, int nocase); std::string StringToHex(const std::string &input); +void TokenizeRedisProtocol(const std::string &value, std::vector *tokens); void ThreadSetName(const char *name); int aeWait(int fd, int mask, uint64_t milliseconds); diff --git a/tests/string_util_test.cc b/tests/string_util_test.cc index e6a62fe5e78..8d4b63a136b 100644 --- a/tests/string_util_test.cc +++ b/tests/string_util_test.cc @@ -44,4 +44,11 @@ TEST(StringUtil, Split) { ASSERT_EQ(expected, array); Util::Split("a\tb\nc\t\nd ", " \t\n", &array); ASSERT_EQ(expected, array); +} + +TEST(StringUtil, TokenizeRedisProtocol) { + std::vector array; + std::vector expected = {"this", "is", "a", "test"}; + Util::TokenizeRedisProtocol("*4\r\n$4\r\nthis\r\n$2\r\nis\r\n$1\r\na\r\n$4\r\ntest\r\n", &array); + ASSERT_EQ(expected, array); } \ No newline at end of file diff --git a/tests/tcl/tests/support/util.tcl b/tests/tcl/tests/support/util.tcl index 0b421aec8a2..821219a6479 100644 --- a/tests/tcl/tests/support/util.tcl +++ b/tests/tcl/tests/support/util.tcl @@ -95,7 +95,7 @@ proc wait_for_sync r { proc wait_for_ofs_sync {r1 r2} { wait_for_condition 50 100 { - [status $r1 master_repl_offset] eq [status $r2 master_repl_offset] + [status $r1 sequence] eq [status $r2 sequence] } else { fail "replica didn't sync in time" } diff --git a/tests/tcl/tests/tmp/.gitignore b/tests/tcl/tests/tmp/.gitignore index 72e8ffc0db8..f59ec20aabf 100644 --- a/tests/tcl/tests/tmp/.gitignore +++ b/tests/tcl/tests/tmp/.gitignore @@ -1 +1 @@ -* +* \ No newline at end of file diff --git a/tests/tcl/tests/unit/command.tcl b/tests/tcl/tests/unit/command.tcl index c94e7d42b7b..292c6c9b07e 100644 --- a/tests/tcl/tests/unit/command.tcl +++ b/tests/tcl/tests/unit/command.tcl @@ -1,7 +1,7 @@ start_server {tags {"command"}} { - test {kvrocks has 161 commands currently} { + test {kvrocks has 163 commands currently} { r command count - } {161} + } {163} test {acquire GET command info by COMMAND INFO} { set e [lindex [r command info get] 0] diff --git a/tests/tcl/tests/unit/scripting.tcl b/tests/tcl/tests/unit/scripting.tcl index 6662db06d49..a2506c75125 100644 --- a/tests/tcl/tests/unit/scripting.tcl +++ b/tests/tcl/tests/unit/scripting.tcl @@ -51,21 +51,40 @@ start_server {tags {"scripting"}} { r eval {return redis.call('get',KEYS[1])} 1 mykey } {myval} - test {EVAL - Redis integer -> Lua type conversion} { - r set x 0 - r eval { - local foo = redis.pcall('incr',KEYS[1]) - return {type(foo),foo} - } 1 x - } {number 1} - - test {EVAL - Redis bulk -> Lua type conversion} { - r set mykey myval - r eval { - local foo = redis.pcall('get',KEYS[1]) - return {type(foo),foo} - } 1 mykey - } {string myval} + test {EVALSHA - Can we call a SHA1 if already defined?} { + r evalsha fd758d1589d044dd850a6f05d52f2eefd27f033f 1 mykey + } {myval} + + test {EVALSHA - Can we call a SHA1 in uppercase?} { + r evalsha FD758D1589D044DD850A6F05D52F2EEFD27F033F 1 mykey + } {myval} + + test {EVALSHA - Do we get an error on invalid SHA1?} { + catch {r evalsha NotValidShaSUM 0} e + set _ $e + } {ERR NOSCRIPT*} + + test {EVALSHA - Do we get an error on non defined SHA1?} { + catch {r evalsha ffd632c7d33e571e9f24556ebed26c3479a87130 0} e + set _ $e + } {ERR NOSCRIPT*} + + test {EVAL - Redis integer -> Lua type conversion} { + r set x 0 + r eval { + local foo = redis.pcall('incr',KEYS[1]) + return {type(foo),foo} + } 1 x + } {number 1} + + test {EVAL - Redis bulk -> Lua type conversion} { + r set mykey myval + r eval { + local foo = redis.pcall('get',KEYS[1]) + return {type(foo),foo} + } 1 mykey + } {string myval} + test {EVAL - Redis integer -> Lua type conversion} { r set x 0 r eval { @@ -261,4 +280,100 @@ start_server {tags {"scripting"}} { $rd close r get x } {10000} -} \ No newline at end of file + + test {SCRIPTING FLUSH - is able to clear the scripts cache?} { + r set mykey myval + set v [r evalsha fd758d1589d044dd850a6f05d52f2eefd27f033f 1 mykey] + assert_equal $v myval + set e "" + r script flush + catch {r evalsha fd758d1589d044dd850a6f05d52f2eefd27f033f 1 mykey} e + set e + } {ERR NOSCRIPT*} + + test {SCRIPT EXISTS - can detect already defined scripts?} { + r eval "return 1+1" 0 + r script exists a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bd9 a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bda + } {1 0} + + test {SCRIPT LOAD - is able to register scripts in the scripting cache} { + list \ + [r script load "return 'loaded'"] \ + [r evalsha b534286061d4b9e4026607613b95c06c06015ae8 0] + } {b534286061d4b9e4026607613b95c06c06015ae8 loaded} + + test {Globals protection reading an undeclared global variable} { + catch {r eval {return a} 0} e + set e + } {*ERR*attempted to access * global*} + + test {Globals protection setting an undeclared global*} { + catch {r eval {a=10} 0} e + set e + } {*ERR*attempted to create global*} + + test {Test an example script DECR_IF_GT} { + set decr_if_gt { + local current + + current = redis.call('get',KEYS[1]) + if not current then return nil end + if current > ARGV[1] then + return redis.call('decr',KEYS[1]) + else + return redis.call('get',KEYS[1]) + end + } + r set foo 5 + set res {} + lappend res [r eval $decr_if_gt 1 foo 2] + lappend res [r eval $decr_if_gt 1 foo 2] + lappend res [r eval $decr_if_gt 1 foo 2] + lappend res [r eval $decr_if_gt 1 foo 2] + lappend res [r eval $decr_if_gt 1 foo 2] + set res + } {4 3 2 2 2} + + test {Scripting engine PRNG can be seeded correctly} { + set rand1 [r eval { + math.randomseed(ARGV[1]); return tostring(math.random()) + } 0 10] + set rand2 [r eval { + math.randomseed(ARGV[1]); return tostring(math.random()) + } 0 10] + set rand3 [r eval { + math.randomseed(ARGV[1]); return tostring(math.random()) + } 0 20] + assert_equal $rand1 $rand2 + assert {$rand2 ne $rand3} + } + test "In the context of Lua the output of random commands gets ordered" { + r del myset + r sadd myset a b c d e f g h i l m n o p q r s t u v z aa aaa azz + r eval {return redis.call('smembers',KEYS[1])} 1 myset + } {a aa aaa azz b c d e f g h i l m n o p q r s t u v z} +} + +start_server {tags {"repl"}} { + start_server {} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set slave [srv 0 client] + + $slave slaveof $master_host $master_port + wait_for_sync $slave + + test {SCRIPTING: script load on master, read on slave} { + set sha [$master script load "return 'script loaded'"] + assert_equal 4167ea82ed9c381c7659f7cf93f394219147e8c4 $sha + wait_for_ofs_sync $master $slave + assert_equal 1 [$master script exists $sha] + assert_equal 1 [$slave script exists $sha] + + $master script flush + wait_for_ofs_sync $master $slave + assert_equal 0 [$slave script exists $sha] + } + } +}