Skip to content

Commit

Permalink
Implement the evalsha and script command (apache#369)
Browse files Browse the repository at this point in the history
Redis regards the Lua scripts as memory cache which would
lost after restarting, but we prefer persisting into the disk 
which also can save some memory usage. The only side effect was 
all history scripts would be stored, but we think
it's ok since scripts shouldn't be too many.

Co-authored-by: Wang Yuan <wangyuancode@163.com>
  • Loading branch information
git-hulk and ShooterIT committed Nov 2, 2021
1 parent 4c1d4a8 commit fff2d5f
Show file tree
Hide file tree
Showing 25 changed files with 728 additions and 72 deletions.
9 changes: 7 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ target_sources(kvrocks PRIVATE
src/scripting.h
src/sha1.cc
src/sha1.h
)
src/rand.cc
src/rand.h)

# kvrocks2redis sync tool
add_executable(kvrocks2redis)
Expand Down Expand Up @@ -259,6 +260,8 @@ target_sources(kvrocks2redis PRIVATE
src/compaction_checker.h
src/scripting.cc
src/scripting.h
src/rand.cc
src/rand.h
tools/kvrocks2redis/config.cc
tools/kvrocks2redis/config.h
tools/kvrocks2redis/main.cc
Expand Down Expand Up @@ -352,7 +355,9 @@ add_executable(unittest
tests/log_collector_test.cc
src/config_type.h
src/sha1.cc
src/sha1.h)
src/sha1.h
src/rand.cc
src/rand.h)

add_dependencies(unittest glog rocksdb snappy jemalloc lua)
target_compile_features(unittest PRIVATE cxx_std_11)
Expand Down
7 changes: 7 additions & 0 deletions docs/support-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@
**NOTE : String and Bitmap is different type in kvrocks, so you can't do bit with string, vice versa.**


## Script Commands

| Command | Supported OR Not | Desc |
| --------- | ---------------- | ---- |
| eval || |
| evalsha || |
| script || script kill and debug subcommand are not supported |

## Pub/Sub Commands

Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ SHARED_OBJS= cluster.o compact_filter.o config.o cron.o encoding.o event_listene
redis_hash.o redis_list.o redis_metadata.o redis_slot.o redis_pubsub.o redis_reply.o \
redis_request.o redis_set.o redis_string.o redis_zset.o redis_geo.o replication.o \
server.o stats.o storage.o task_runner.o util.o geohash.o worker.o redis_sortedint.o \
compaction_checker.o table_properties_collector.o scripting.o sha1.o
compaction_checker.o table_properties_collector.o scripting.o sha1.o rand.o
KVROCKS_OBJS= $(SHARED_OBJS) main.o

UNITTEST_OBJS= $(SHARED_OBJS) ../tests/main.o ../tests/t_metadata_test.o ../tests/compact_test.o \
Expand Down
21 changes: 21 additions & 0 deletions src/compact_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ class SubKeyFilterFactory : public rocksdb::CompactionFilterFactory {
Engine::Storage *stor_ = nullptr;
};

class PropagateFilter : public rocksdb::CompactionFilter {
public:
const char *Name() const override { return "PropagateFilter"; }
bool Filter(int level, const Slice &key, const Slice &value,
std::string *new_value, bool *modified) const override {
// We propagate Lua commands which don't store data,
// just in order to implement updating Lua state.
return key == Engine::kPropagateScriptCommand;
}
};

class PropagateFilterFactory : public rocksdb::CompactionFilterFactory {
public:
PropagateFilterFactory() = default;
const char *Name() const override { return "PropagateFilterFactory"; }
std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
const rocksdb::CompactionFilter::Context &context) override {
return std::unique_ptr<rocksdb::CompactionFilter>(new PropagateFilter());
}
};

class PubSubFilter : public rocksdb::CompactionFilter {
public:
const char *Name() const override { return "PubSubFilter"; }
Expand Down
4 changes: 2 additions & 2 deletions src/compaction_checker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
#include <glog/logging.h>
#include "storage.h"

void CompactionChecker::CompactPubsubFiles() {
void CompactionChecker::CompactPropagateAndPubSubFiles() {
rocksdb::CompactRangeOptions compact_opts;
compact_opts.change_level = true;
std::vector<std::string> cf_names = {Engine::kPubSubColumnFamilyName};
std::vector<std::string> cf_names = {Engine::kPubSubColumnFamilyName, Engine::kPropagateColumnFamilyName};
for (const auto &cf_name : cf_names) {
LOG(INFO) << "[compaction checker] Start the compact the column family: " << cf_name;
auto cf_handle = storage_->GetCFHandle(cf_name);
Expand Down
2 changes: 1 addition & 1 deletion src/compaction_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class CompactionChecker {
explicit CompactionChecker(Engine::Storage *storage):storage_(storage) {}
~CompactionChecker() {}
void PickCompactionFiles(const std::string &cf_name);
void CompactPubsubFiles();
void CompactPropagateAndPubSubFiles();
private:
Engine::Storage *storage_ = nullptr;
};
95 changes: 95 additions & 0 deletions src/rand.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/* Pseudo random number generation functions derived from the drand48()
* function obtained from pysam source code.
*
* This functions are used in order to replace the default math.random()
* Lua implementation with something having exactly the same behavior
* across different systems (by default Lua uses libc's rand() that is not
* required to implement a specific PRNG generating the same sequence
* in different systems if seeded with the same integer).
*
* The original code appears to be under the public domain.
* I modified it removing the non needed functions and all the
* 1960-style C coding stuff...
*
* ----------------------------------------------------------------------------
*
* Copyright (c) 2010-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rand.h"

#include <stdint.h>

#define N 16
#define MASK ((1 << (N - 1)) + (1 << (N - 1)) - 1)
#define LOW(x) ((unsigned)(x) & MASK)
#define HIGH(x) LOW((x) >> N)
#define MUL(x, y, z) { int32_t l = (int64_t)(x) * (int64_t)(y); \
(z)[0] = LOW(l); (z)[1] = HIGH(l); }
#define CARRY(x, y) ((int32_t)(x) + (int64_t)(y) > MASK)
#define ADDEQU(x, y, z) (z = CARRY(x, (y)), x = LOW(x + (y)))
#define X0 0x330E
#define X1 0xABCD
#define X2 0x1234
#define A0 0xE66D
#define A1 0xDEEC
#define A2 0x5
#define C 0xB
#define SET3(x, x0, x1, x2) ((x)[0] = (x0), (x)[1] = (x1), (x)[2] = (x2))
#define SETLOW(x, y, n) SET3(x, LOW((y)[n]), LOW((y)[(n)+1]), LOW((y)[(n)+2]))
#define SEED(x0, x1, x2) (SET3(x, x0, x1, x2), SET3(a, A0, A1, A2), c = C)
#define REST(v) for (i = 0; i < 3; i++) { xsubi[i] = x[i]; x[i] = temp[i]; } \
return (v);
#define HI_BIT (1L << (2 * N - 1))

static uint32_t x[3] = {X0, X1, X2}, a[3] = {A0, A1, A2}, c = C;
static void next(void);

int32_t redisLrand48() {
next();
return (((int32_t) x[2] << (N - 1)) + (x[1] >> 1));
}

void redisSrand48(int32_t seedval) {
SEED(X0, LOW(seedval), HIGH(seedval));
}

static void next(void) {
uint32_t p[2], q[2], r[2], carry0, carry1;

MUL(a[0], x[0], p);
ADDEQU(p[0], c, carry0);
ADDEQU(p[1], carry0, carry1);
MUL(a[0], x[1], q);
ADDEQU(p[1], q[0], carry0);
MUL(a[1], x[0], r);
x[2] = LOW(carry0 + carry1 + CARRY(p[1], r[0]) + q[1] + r[1] +
a[0] * x[2] + a[1] * x[1] + a[2] * x[0]);
x[1] = LOW(p[1] + r[0]);
x[0] = LOW(p[0]);
}
36 changes: 36 additions & 0 deletions src/rand.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once

#include <stdint.h>

int32_t redisLrand48();
void redisSrand48(int32_t seedval);

#define REDIS_LRAND48_MAX INT32_MAX
65 changes: 59 additions & 6 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4292,15 +4292,66 @@ class CommandEval : public Commander {
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
char funcname[43];
return Lua::evalGenericCommand(conn, args_, false, output);
}
};

/* We obtain the script SHA1, then check if this function is already
* defined into the Lua state */
funcname[0] = 'f';
funcname[1] = '_';
class CommandEvalSHA : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args[1].size() != 40) {
return Status(Status::NotOK, "NOSCRIPT No matching script. Please use EVAL");
}
return Status::OK();
}

return Lua::evalGenericCommand(conn, args_, false, output);
Status Execute(Server *svr, Connection *conn, std::string *output) override {
return Lua::evalGenericCommand(conn, args_, true, output);
}
};

class CommandScript : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
subcommand_ = Util::ToLower(args[1]);
return Status::OK();
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
// There's a little tricky here since the script command was the write type
// command but some subcommands like `exists` were readonly, so we want to allow
// executing on slave here. Maybe we should find other way to do this.
if (svr->IsSlave() && subcommand_ != "exists") {
return Status(Status::NotOK, "READONLY You can't write against a read only slave");
}
if (args_.size() == 2 && subcommand_ == "flush") {
svr->ScriptFlush();
svr->Propagate(Engine::kPropagateScriptCommand, args_);
*output = Redis::SimpleString("OK");
} else if (args_.size() >= 2 && subcommand_ == "exists") {
*output = Redis::MultiLen(args_.size()-2);
for (size_t j = 2; j < args_.size(); j++) {
if (svr->ScriptExists(args_[j]).IsOK()) {
*output += Redis::Integer(1);
} else {
*output += Redis::Integer(0);
}
}
} else if (args_.size() == 3 && subcommand_ == "load") {
std::string sha;
auto s = Lua::createFunction(svr, args_[2], &sha);
if (!s.IsOK()) {
return s;
}
*output = Redis::SimpleString(sha);
} else {
return Status(Status::NotOK, "Unknown SCRIPT subcommand or wrong # of args");
}
return Status::OK();
}

private:
std::string subcommand_;
};

#define ADD_CMD(name, arity, description , first_key, last_key, key_step, fn) \
Expand Down Expand Up @@ -4473,6 +4524,8 @@ CommandAttributes redisCommandTable[] = {
ADD_CMD("clusterx", -2, "cluster no-script", 0, 0, 0, CommandClusterX),

ADD_CMD("eval", -3, "exclusive write no-script", 0, 0, 0, CommandEval),
ADD_CMD("evalsha", -3, "exclusive write no-script", 0, 0, 0, CommandEvalSHA),
ADD_CMD("script", -2, "exclusive no-script", 0, 0, 0, CommandScript),

ADD_CMD("compact", 1, "read-only no-script", 0, 0, 0, CommandCompact),
ADD_CMD("bgsave", 1, "read-only no-script", 0, 0, 0, CommandBGSave),
Expand Down
2 changes: 1 addition & 1 deletion src/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ void Connection::ExecuteCommands(const std::vector<Redis::CommandTokens> &to_pro
s = current_cmd_->Parse(cmd_tokens);
if (!s.IsOK()) {
if (IsFlagEnabled(Connection::kMultiExec)) multi_error_ = true;
Reply(Redis::Error(s.Msg()));
Reply(Redis::Error("ERR "+s.Msg()));
continue;
}

Expand Down
28 changes: 20 additions & 8 deletions src/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -917,11 +917,20 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri

status = write_batch.Iterate(&write_batch_handler);
if (!status.ok()) return status;
if (write_batch_handler.IsPublish()) {
srv_->PublishMessage(write_batch_handler.GetPublishChannel().ToString(),
write_batch_handler.GetPublishValue().ToString());
switch (write_batch_handler.Type()) {
case kBatchTypePublish:
srv_->PublishMessage(write_batch_handler.Key(), write_batch_handler.Value());
break;
case kBatchTypePropagate:
if (write_batch_handler.Key() == Engine::kPropagateScriptCommand) {
std::vector<std::string> tokens;
Util::TokenizeRedisProtocol(write_batch_handler.Value(), &tokens);
if (!tokens.empty()) {
srv_->ExecPropagatedCommand(tokens);
}
}
break;
}

return rocksdb::Status::OK();
}

Expand All @@ -931,11 +940,14 @@ bool ReplicationThread::isRestoringError(const char *err) {

rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const rocksdb::Slice &key,
const rocksdb::Slice &value) {
if (column_family_id != kColumnFamilyIDPubSub) {
if (column_family_id == kColumnFamilyIDPubSub) {
type_ = kBatchTypePublish;
kv_ = std::make_pair(key.ToString(), value.ToString());
return rocksdb::Status::OK();
} else if (column_family_id == kColumnFamilyIDPropagate) {
type_ = kBatchTypePropagate;
kv_ = std::make_pair(key.ToString(), value.ToString());
return rocksdb::Status::OK();
}

publish_message_ = std::make_pair(key.ToString(), value.ToString());
is_publish_ = true;
return rocksdb::Status::OK();
}
15 changes: 10 additions & 5 deletions src/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ enum ReplState {
kReplError,
};

enum WriteBatchType {
kBatchTypePublish = 1,
kBatchTypePropagate,
};

typedef std::function<void(const std::string, const uint32_t)> fetch_file_callback;

class FeedSlaveThread {
Expand Down Expand Up @@ -187,11 +192,11 @@ class WriteBatchHandler : public rocksdb::WriteBatch::Handler {
public:
rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice &key,
const rocksdb::Slice &value) override;
WriteBatchType Type() { return type_; }
std::string Key() const { return kv_.first; }
std::string Value() const { return kv_.second; }

rocksdb::Slice GetPublishChannel() { return publish_message_.first; }
rocksdb::Slice GetPublishValue() { return publish_message_.second; }
bool IsPublish() { return is_publish_; }
private:
std::pair<std::string, std::string> publish_message_;
bool is_publish_ = false;
std::pair<std::string, std::string> kv_;
WriteBatchType type_;
};
Loading

0 comments on commit fff2d5f

Please sign in to comment.