From 9f105e4dc00988538947e618cc96acdf56401d06 Mon Sep 17 00:00:00 2001 From: reindexer-bot <@> Date: Thu, 31 Oct 2024 22:33:59 +0000 Subject: [PATCH] Merge branch 'feature/corotransaction_commit_return_query_result' into 'develop' MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Добавить возврат QueryResult из CommitTransaction метода CoroReindexer класса See merge request itv-backend/reindexer!1712 --- cjson/creflect.go | 6 +- clang-tidy/.clang-tidy | 31 -- clang-tidy/.clang-tidy-ignore | 10 - clang-tidy/run-clang-tidy-18.py | 429 ------------------ cpp_src/client/cororeindexer.cc | 2 +- cpp_src/client/cororeindexer.h | 6 +- cpp_src/client/cororpcclient.cc | 36 +- cpp_src/client/cororpcclient.h | 2 +- cpp_src/client/corotransaction.cc | 89 ++-- cpp_src/client/corotransaction.h | 16 +- cpp_src/client/rpcclient.cc | 17 +- cpp_src/client/rpcclientmock.cc | 13 +- cpp_src/client/synccororeindexer.cc | 10 +- cpp_src/client/synccororeindexer.h | 3 +- cpp_src/client/synccororeindexerimpl.cc | 10 +- cpp_src/client/synccororeindexerimpl.h | 2 +- .../test/test_storage_compatibility.sh | 197 ++++++++ cpp_src/cmd/reindexer_tool/readme.md | 17 +- cpp_src/cmd/reindexer_tool/reindexer_tool.cc | 16 +- cpp_src/core/query/dsl/dslparser.cc | 14 +- cpp_src/core/reindexer.h | 4 +- cpp_src/core/transaction.cc | 5 +- cpp_src/core/transaction.h | 2 +- cpp_src/gtests/tests/API/base_tests.cc | 2 +- .../gtests/tests/unit/clientsstats_test.cc | 10 +- .../unit/replication_master_master_test.cc | 8 +- cpp_src/gtests/tests/unit/rpcclient_test.cc | 180 +++++--- cpp_src/readme.md | 28 +- cpp_src/server/rpcserver.cc | 5 +- cpp_src/server/serverimpl.cc | 65 +-- query.go | 11 +- 31 files changed, 541 insertions(+), 705 deletions(-) delete mode 100644 clang-tidy/.clang-tidy delete mode 100644 clang-tidy/.clang-tidy-ignore delete mode 100755 clang-tidy/run-clang-tidy-18.py create mode 100755 cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh diff --git a/cjson/creflect.go b/cjson/creflect.go index 1f9943502..d257b6018 100644 --- a/cjson/creflect.go +++ b/cjson/creflect.go @@ -78,7 +78,7 @@ type payloadIface struct { func (pl *payloadIface) ptr(field, idx, typ int) unsafe.Pointer { if pl.p == 0 { - panic(fmt.Errorf("Null pointer derefernce")) + panic(fmt.Errorf("Null pointer dereference")) } f := &pl.t.Fields[field] @@ -91,7 +91,7 @@ func (pl *payloadIface) ptr(field, idx, typ int) unsafe.Pointer { if !f.IsArray { if idx != 0 { - panic(fmt.Errorf("Trying to acces by index '%d' to non array field '%s'", idx, f.Name)) + panic(fmt.Errorf("Trying to access by index '%d' to non array field '%s'", idx, f.Name)) } return p } @@ -197,7 +197,7 @@ func (pl *payloadIface) getArrayLen(field int) int { return int((*ArrayHeader)(p).len) } -// get c reflect value and set to go reflect valie +// get c reflect value and set to go reflect value func (pl *payloadIface) getValue(field int, idx int, v reflect.Value) { k := v.Type().Kind() diff --git a/clang-tidy/.clang-tidy b/clang-tidy/.clang-tidy deleted file mode 100644 index 063df74f1..000000000 --- a/clang-tidy/.clang-tidy +++ /dev/null @@ -1,31 +0,0 @@ -Checks: 'clang-diagnostic-*, - clang-analyzer-*, - performance-*, - bugprone-*, - -bugprone-exception-escape, - -bugprone-branch-clone, - -bugprone-easily-swappable-parameters, - -bugprone-macro-parentheses, - -bugprone-signed-char-misuse, - -bugprone-narrowing-conversions, - -bugprone-reserved-identifier, - -bugprone-implicit-widening-of-multiplication-result, - -bugprone-assignment-in-if-condition, - -bugprone-parent-virtual-call, - -bugprone-integer-division, - -bugprone-unhandled-self-assignment, - -bugprone-inc-dec-in-conditions, - -clang-analyzer-security.insecureAPI.DeprecatedOrUnsafeBufferHandling, - -performance-no-int-to-ptr, - -performance-enum-size, - -performance-avoid-endl' -# clang-analyzer-security.insecureAPI.DeprecatedOrUnsafeBufferHandling - too many unnecessary warning in vendored code -# performance-no-int-to-ptr - consider how to fix this -# bugprone-macro-parentheses - consider fixing -WarningsAsErrors: '*' -HeaderFilterRegex: '.*(?= 4.0.0 are given under - # the top level key 'Diagnostics' in the output yaml files - mergekey = "Diagnostics" - merged=[] - for replacefile in glob.iglob(os.path.join(tmpdir, '*.yaml')): - content = yaml.safe_load(open(replacefile, 'r')) - if not content: - continue # Skip empty files. - merged.extend(content.get(mergekey, [])) - - if merged: - # MainSourceFile: The key is required by the definition inside - # include/clang/Tooling/ReplacementsYaml.h, but the value - # is actually never used inside clang-apply-replacements, - # so we set it to '' here. - output = {'MainSourceFile': '', mergekey: merged} - with open(mergefile, 'w') as out: - yaml.safe_dump(output, out) - else: - # Empty the file: - open(mergefile, 'w').close() - - -def find_binary(arg, name, build_path): - """Get the path for a binary or exit""" - if arg: - if shutil.which(arg): - return arg - else: - raise SystemExit( - "error: passed binary '{}' was not found or is not executable" - .format(arg)) - - built_path = os.path.join(build_path, "bin", name) - binary = shutil.which(name) or shutil.which(built_path) - if binary: - return binary - else: - raise SystemExit( - "error: failed to find {} in $PATH or at {}" - .format(name, built_path)) - - -def apply_fixes(args, clang_apply_replacements_binary, tmpdir): - """Calls clang-apply-fixes on a given directory.""" - invocation = [clang_apply_replacements_binary] - invocation.append('-ignore-insert-conflict') - if args.format: - invocation.append('-format') - if args.style: - invocation.append('-style=' + args.style) - invocation.append(tmpdir) - subprocess.call(invocation) - - -def run_tidy(args, clang_tidy_binary, tmpdir, build_path, queue, lock, - failed_files): - """Takes filenames out of queue and runs clang-tidy on them.""" - while True: - name = queue.get() - invocation = get_tidy_invocation(name, clang_tidy_binary, args.checks, - tmpdir, build_path, args.header_filter, - args.allow_enabling_alpha_checkers, - args.extra_arg, args.extra_arg_before, - args.quiet, args.config_file, args.config, - args.line_filter, args.use_color, - args.plugins) - - proc = subprocess.Popen(invocation, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, err = proc.communicate() - if proc.returncode != 0: - if proc.returncode < 0: - msg = "%s: terminated by signal %d\n" % (name, -proc.returncode) - err += msg.encode('utf-8') - failed_files.append(name) - with lock: - sys.stdout.write(' '.join(invocation) + '\n' + output.decode('utf-8')) - if len(err) > 0: - sys.stdout.flush() - sys.stderr.write(err.decode('utf-8')) - queue.task_done() - - -def main(): - parser = argparse.ArgumentParser(description='Runs clang-tidy over all files ' - 'in a compilation database. Requires ' - 'clang-tidy and clang-apply-replacements in ' - '$PATH or in your build directory.') - parser.add_argument('-allow-enabling-alpha-checkers', - action='store_true', help='allow alpha checkers from ' - 'clang-analyzer.') - parser.add_argument('-clang-tidy-binary', metavar='PATH', - default='clang-tidy-18', - help='path to clang-tidy binary') - parser.add_argument('-clang-apply-replacements-binary', metavar='PATH', - default='clang-apply-replacements-18', - help='path to clang-apply-replacements binary') - parser.add_argument('-checks', default=None, - help='checks filter, when not specified, use clang-tidy ' - 'default') - config_group = parser.add_mutually_exclusive_group() - config_group.add_argument('-config', default=None, - help='Specifies a configuration in YAML/JSON format: ' - ' -config="{Checks: \'*\', ' - ' CheckOptions: {x: y}}" ' - 'When the value is empty, clang-tidy will ' - 'attempt to find a file named .clang-tidy for ' - 'each source file in its parent directories.') - config_group.add_argument('-config-file', default=None, - help='Specify the path of .clang-tidy or custom config ' - 'file: e.g. -config-file=/some/path/myTidyConfigFile. ' - 'This option internally works exactly the same way as ' - '-config option after reading specified config file. ' - 'Use either -config-file or -config, not both.') - parser.add_argument('-header-filter', default=None, - help='regular expression matching the names of the ' - 'headers to output diagnostics from. Diagnostics from ' - 'the main file of each translation unit are always ' - 'displayed.') - parser.add_argument('-line-filter', default=None, - help='List of files with line ranges to filter the' - 'warnings.') - if yaml: - parser.add_argument('-export-fixes', metavar='filename', dest='export_fixes', - help='Create a yaml file to store suggested fixes in, ' - 'which can be applied with clang-apply-replacements.') - parser.add_argument('-j', type=int, default=0, - help='number of tidy instances to be run in parallel.') - parser.add_argument('files', nargs='*', default=['.*'], - help='files to be processed (regex on path)') - parser.add_argument('-fix', action='store_true', help='apply fix-its') - parser.add_argument('-format', action='store_true', help='Reformat code ' - 'after applying fixes') - parser.add_argument('-style', default='file', help='The style of reformat ' - 'code after applying fixes') - parser.add_argument('-use-color', type=strtobool, nargs='?', const=True, - help='Use colors in diagnostics, overriding clang-tidy\'s' - ' default behavior. This option overrides the \'UseColor' - '\' option in .clang-tidy file, if any.') - parser.add_argument('-p', dest='build_path', - help='Path used to read a compile command database.') - parser.add_argument('-extra-arg', dest='extra_arg', - action='append', default=[], - help='Additional argument to append to the compiler ' - 'command line.') - parser.add_argument('-extra-arg-before', dest='extra_arg_before', - action='append', default=[], - help='Additional argument to prepend to the compiler ' - 'command line.') - parser.add_argument('-ignore', default=DEFAULT_CLANG_TIDY_IGNORE, - help='File path to clang-tidy-ignore') - parser.add_argument('-quiet', action='store_true', - help='Run clang-tidy in quiet mode') - parser.add_argument('-load', dest='plugins', - action='append', default=[], - help='Load the specified plugin in clang-tidy.') - args = parser.parse_args() - - db_path = 'compile_commands.json' - - if args.build_path is not None: - build_path = args.build_path - else: - # Find our database - build_path = find_compilation_database(db_path) - - clang_tidy_binary = find_binary(args.clang_tidy_binary, "clang-tidy", - build_path) - - tmpdir = None - if args.fix or (yaml and args.export_fixes): - clang_apply_replacements_binary = find_binary( - args.clang_apply_replacements_binary, "clang-apply-replacements", - build_path) - tmpdir = tempfile.mkdtemp() - - try: - invocation = get_tidy_invocation("", clang_tidy_binary, args.checks, - None, build_path, args.header_filter, - args.allow_enabling_alpha_checkers, - args.extra_arg, args.extra_arg_before, - args.quiet, args.config_file, args.config, - args.line_filter, args.use_color, - args.plugins) - invocation.append('-list-checks') - invocation.append('-') - if args.quiet: - # Even with -quiet we still want to check if we can call clang-tidy. - with open(os.devnull, 'w') as dev_null: - subprocess.check_call(invocation, stdout=dev_null) - else: - subprocess.check_call(invocation) - except: - print("Unable to run clang-tidy.", file=sys.stderr) - sys.exit(1) - - # Load the database and extract all files. - database = json.load(open(os.path.join(build_path, db_path))) - files = set([make_absolute(entry['file'], entry['directory']) - for entry in database]) - files, excluded = filter_files(args.ignore, files) - if excluded: - print("Excluding the following files:\n" + "\n".join(excluded) + "\n") - - max_task = args.j - if max_task == 0: - max_task = multiprocessing.cpu_count() - - # Build up a big regexy filter from all command line arguments. - file_name_re = re.compile('|'.join(args.files)) - - return_code = 0 - try: - # Spin up a bunch of tidy-launching threads. - task_queue = queue.Queue(max_task) - # List of files with a non-zero return code. - failed_files = [] - lock = threading.Lock() - for _ in range(max_task): - t = threading.Thread(target=run_tidy, - args=(args, clang_tidy_binary, tmpdir, build_path, - task_queue, lock, failed_files)) - t.daemon = True - t.start() - - # Fill the queue with files. - for name in files: - if file_name_re.search(name): - task_queue.put(name) - - # Wait for all threads to be done. - task_queue.join() - if len(failed_files): - return_code = 1 - - except KeyboardInterrupt: - # This is a sad hack. Unfortunately subprocess goes - # bonkers with ctrl-c and we start forking merrily. - print('\nCtrl-C detected, goodbye.') - if tmpdir: - shutil.rmtree(tmpdir) - os.kill(0, 9) - - if yaml and args.export_fixes: - print('Writing fixes to ' + args.export_fixes + ' ...') - try: - merge_replacement_files(tmpdir, args.export_fixes) - except: - print('Error exporting fixes.\n', file=sys.stderr) - traceback.print_exc() - return_code=1 - - if args.fix: - print('Applying fixes ...') - try: - apply_fixes(args, clang_apply_replacements_binary, tmpdir) - except: - print('Error applying fixes.\n', file=sys.stderr) - traceback.print_exc() - return_code = 1 - - if tmpdir: - shutil.rmtree(tmpdir) - sys.exit(return_code) - - -if __name__ == '__main__': - main() diff --git a/cpp_src/client/cororeindexer.cc b/cpp_src/client/cororeindexer.cc index cf36c8c3f..e34938cce 100644 --- a/cpp_src/client/cororeindexer.cc +++ b/cpp_src/client/cororeindexer.cc @@ -79,7 +79,7 @@ Error CoroReindexer::GetSqlSuggestions(const std::string_view sqlQuery, int pos, Error CoroReindexer::Status() { return impl_->Status(ctx_); } CoroTransaction CoroReindexer::NewTransaction(std::string_view nsName) { return impl_->NewTransaction(nsName, ctx_); } -Error CoroReindexer::CommitTransaction(CoroTransaction& tr) { return impl_->CommitTransaction(tr, ctx_); } +Error CoroReindexer::CommitTransaction(CoroTransaction& tr, CoroQueryResults& result) { return impl_->CommitTransaction(tr, result, ctx_); } Error CoroReindexer::RollBackTransaction(CoroTransaction& tr) { return impl_->RollBackTransaction(tr, ctx_); } } // namespace client diff --git a/cpp_src/client/cororeindexer.h b/cpp_src/client/cororeindexer.h index a7d849d25..2eb6857bc 100644 --- a/cpp_src/client/cororeindexer.h +++ b/cpp_src/client/cororeindexer.h @@ -32,7 +32,7 @@ class CoroReindexer { typedef std::function Completion; /// Create Reindexer database object - CoroReindexer(const ReindexerConfig& = ReindexerConfig()); + explicit CoroReindexer(const ReindexerConfig& = ReindexerConfig()); /// Destroy Reindexer database object ~CoroReindexer(); CoroReindexer(const CoroReindexer&) = delete; @@ -180,7 +180,8 @@ class CoroReindexer { CoroTransaction NewTransaction(std::string_view nsName); /// Commit transaction - transaction will be deleted after commit /// @param tr - transaction to commit - Error CommitTransaction(CoroTransaction& tr); + /// @param result - QueryResults with IDs of items changed by tx. + Error CommitTransaction(CoroTransaction& tr, CoroQueryResults& result); /// RollBack transaction - transaction will be deleted after rollback /// @param tr - transaction to rollback Error RollBackTransaction(CoroTransaction& tr); @@ -196,6 +197,7 @@ class CoroReindexer { typedef CoroQueryResults QueryResultsT; typedef Item ItemT; typedef ReindexerConfig ConfigT; + typedef CoroTransaction TransactionT; private: CoroReindexer(CoroRPCClient* impl, InternalRdxContext&& ctx) : impl_(impl), owner_(false), ctx_(std::move(ctx)) {} diff --git a/cpp_src/client/cororpcclient.cc b/cpp_src/client/cororpcclient.cc index 43b41257c..f36995083 100644 --- a/cpp_src/client/cororpcclient.cc +++ b/cpp_src/client/cororpcclient.cc @@ -1,12 +1,10 @@ #include "client/cororpcclient.h" -#include #include #include "client/itemimpl.h" #include "core/namespacedef.h" #include "gason/gason.h" #include "tools/errors.h" #include "tools/logger.h" -#include "vendor/gason/gason.h" namespace reindexer { namespace client { @@ -164,8 +162,8 @@ Error CoroRPCClient::modifyItem(std::string_view nsName, Item& item, int mode, s } CoroQueryResults qr; InternalRdxContext ctxCompl = ctx.WithCompletion(nullptr); - auto ret = selectImpl(Query(nsName).Limit(0), qr, netTimeout, ctxCompl); - if (ret.code() == errTimeout) { + Error err = selectImpl(Query(nsName).Limit(0), qr, netTimeout, ctxCompl); + if (err.code() == errTimeout) { return Error(errTimeout, "Request timeout"); } if (withNetTimeout) { @@ -173,7 +171,7 @@ Error CoroRPCClient::modifyItem(std::string_view nsName, Item& item, int mode, s } auto newItem = NewItem(nsName); char* endp = nullptr; - Error err = newItem.FromJSON(item.impl_->GetJSON(), &endp); + err = newItem.FromJSON(item.impl_->GetJSON(), &endp); if (!err.ok()) { return err; } @@ -181,6 +179,8 @@ Error CoroRPCClient::modifyItem(std::string_view nsName, Item& item, int mode, s item = std::move(newItem); } } + + return errOK; } Error CoroRPCClient::subscribeImpl(bool subscribe) { @@ -299,7 +299,6 @@ void vec2pack(const h_vector& vec, WrSerializer& ser) { for (auto v : vec) { ser.PutVarUint(v); } - return; } Error CoroRPCClient::selectImpl(std::string_view query, CoroQueryResults& result, seconds netTimeout, const InternalRdxContext& ctx) { @@ -595,14 +594,29 @@ CoroTransaction CoroRPCClient::NewTransaction(std::string_view nsName, const Int return CoroTransaction(std::move(err)); } -Error CoroRPCClient::CommitTransaction(CoroTransaction& tr, const InternalRdxContext& ctx) { +Error CoroRPCClient::CommitTransaction(CoroTransaction& tr, CoroQueryResults& result, const InternalRdxContext& ctx) { + Error returnErr; if (tr.conn_) { - auto ret = tr.conn_->Call(mkCommand(cproto::kCmdCommitTx, &ctx), tr.txId_).Status(); - tr.clear(); - return ret; + const int flags = result.fetchFlags_ ? result.fetchFlags_ : (kResultsWithItemID | kResultsWithPayloadTypes); + NsArray nsArray{getNamespace(tr.nsName_)}; + result = CoroQueryResults(tr.conn_, std::move(nsArray), flags, config_.FetchAmount, config_.RequestTimeout); + auto ret = tr.conn_->Call(mkCommand(cproto::kCmdCommitTx, &ctx), tr.txId_, flags); + returnErr = ret.Status(); + try { + if (ret.Status().ok()) { + auto args = ret.GetArgs(2); + result.Bind(p_string(args[0]), RPCQrId{int(args[1]), args.size() > 2 ? int64_t(args[2]) : -1}); + } + } catch (const Error& err) { + returnErr = err; + } + } else { + returnErr = Error(errLogic, "connection is nullptr"); } - return Error(errLogic, "connection is nullptr"); + tr.clear(); + return returnErr; } + Error CoroRPCClient::RollBackTransaction(CoroTransaction& tr, const InternalRdxContext& ctx) { if (tr.conn_) { auto ret = tr.conn_->Call(mkCommand(cproto::kCmdRollbackTx, &ctx), tr.txId_).Status(); diff --git a/cpp_src/client/cororpcclient.h b/cpp_src/client/cororpcclient.h index 696732960..0bc56da83 100644 --- a/cpp_src/client/cororpcclient.h +++ b/cpp_src/client/cororpcclient.h @@ -73,7 +73,7 @@ class CoroRPCClient { Error Status(const InternalRdxContext& ctx); CoroTransaction NewTransaction(std::string_view nsName, const InternalRdxContext& ctx); - Error CommitTransaction(CoroTransaction& tr, const InternalRdxContext& ctx); + Error CommitTransaction(CoroTransaction& tr, CoroQueryResults& result, const InternalRdxContext& ctx); Error RollBackTransaction(CoroTransaction& tr, const InternalRdxContext& ctx); protected: diff --git a/cpp_src/client/corotransaction.cc b/cpp_src/client/corotransaction.cc index 3ca1b9bd6..74b3b8d88 100644 --- a/cpp_src/client/corotransaction.cc +++ b/cpp_src/client/corotransaction.cc @@ -4,53 +4,64 @@ #include "core/keyvalue/p_string.h" #include "net/cproto/coroclientconnection.h" -namespace reindexer { -namespace client { +namespace reindexer::client { Error CoroTransaction::Modify(Query&& query) { - if (conn_) { - WrSerializer ser; - query.Serialize(ser); - return conn_->Call({cproto::kCmdUpdateQueryTx, RequestTimeout_, execTimeout_, nullptr}, ser.Slice(), txId_).Status(); + if (!conn_) { + return {errLogic, "Connection pointer in transaction is nullptr."}; } - return Error(errLogic, "Connection pointer in transaction is nullptr."); + + WrSerializer ser; + query.Serialize(ser); + return conn_->Call({cproto::kCmdUpdateQueryTx, RequestTimeout_, execTimeout_, nullptr}, ser.Slice(), txId_).Status(); } Error CoroTransaction::addTxItem(Item&& item, ItemModifyMode mode) { + if (!conn_) { + return {errLogic, "Connection pointer in transaction is nullptr."}; + } + + WrSerializer ser; + if (item.impl_->GetPrecepts().size()) { + ser.PutVarUint(item.impl_->GetPrecepts().size()); + for (auto& p : item.impl_->GetPrecepts()) { + ser.PutVString(p); + } + } + auto itData = item.GetJSON(); p_string itemData(&itData); - if (conn_) { - for (int tryCount = 0;; tryCount++) { - auto ret = - conn_->Call({net::cproto::kCmdAddTxItem, RequestTimeout_, execTimeout_, nullptr}, FormatJson, itemData, mode, "", 0, txId_); - - if (!ret.Status().ok()) { - if (ret.Status().code() != errStateInvalidated || tryCount > 2) { - return ret.Status(); - } - - CoroQueryResults qr; - InternalRdxContext ctx; - ctx = ctx.WithTimeout(execTimeout_); - auto err = rpcClient_->Select(Query(nsName_).Limit(0), qr, ctx); - if (!err.ok()) { - return Error(errLogic, "Can't update TagsMatcher"); - } - - auto newItem = NewItem(); - char* endp = nullptr; - err = newItem.FromJSON(item.impl_->GetJSON(), &endp); - if (!err.ok()) { - return err; - } - item = std::move(newItem); - } else { - break; - } + + for (int tryCount = 0;; ++tryCount) { + auto ret = + conn_->Call({net::cproto::kCmdAddTxItem, RequestTimeout_, execTimeout_, nullptr}, FormatJson, itemData, mode, ser.Slice(), 0, txId_); + if (ret.Status().ok()) { + break; + } + + if (ret.Status().code() != errStateInvalidated || tryCount > 2) { + return ret.Status(); + } + + CoroQueryResults qr; + InternalRdxContext ctx; + ctx = ctx.WithTimeout(execTimeout_); + auto err = rpcClient_->Select(Query(nsName_).Limit(0), qr, ctx); + if (!err.ok()) { + return {errLogic, "Can't update TagsMatcher"}; + } + + auto newItem = NewItem(); + + char* endp = nullptr; + err = newItem.FromJSON(item.impl_->GetJSON(), &endp); + if (!err.ok()) { + return err; } - return errOK; + + item = std::move(newItem); } - return Error(errLogic, "Connection pointer in transaction is nullptr."); + return errOK; } Item CoroTransaction::NewItem() { @@ -59,5 +70,5 @@ Item CoroTransaction::NewItem() { } return rpcClient_->NewItem(nsName_); } -} // namespace client -} // namespace reindexer + +} // namespace reindexer::client diff --git a/cpp_src/client/corotransaction.h b/cpp_src/client/corotransaction.h index 4b3bdf645..a813193b3 100644 --- a/cpp_src/client/corotransaction.h +++ b/cpp_src/client/corotransaction.h @@ -5,12 +5,9 @@ namespace reindexer { -namespace net { -namespace cproto { - +namespace net::cproto { class CoroClientConnection; -} // namespace cproto -} // namespace net +} // namespace net::cproto namespace client { @@ -35,7 +32,7 @@ class CoroTransaction { friend class CoroRPCClient; friend class SyncCoroReindexerImpl; friend class SyncCoroTransaction; - CoroTransaction(Error status) : status_(std::move(status)) {} + explicit CoroTransaction(Error status) : status_(std::move(status)) {} CoroTransaction(CoroRPCClient* rpcClient, net::cproto::CoroClientConnection* conn, int64_t txId, std::chrono::seconds RequestTimeout, std::chrono::milliseconds execTimeout, std::string nsName) : txId_(txId), @@ -53,9 +50,9 @@ class CoroTransaction { status_ = errOK; } - int64_t txId_ = -1; - CoroRPCClient* rpcClient_ = nullptr; - reindexer::net::cproto::CoroClientConnection* conn_ = nullptr; + int64_t txId_{-1}; + CoroRPCClient* rpcClient_{nullptr}; + reindexer::net::cproto::CoroClientConnection* conn_{nullptr}; std::chrono::seconds RequestTimeout_{0}; std::chrono::milliseconds execTimeout_{0}; std::string nsName_; @@ -63,4 +60,5 @@ class CoroTransaction { }; } // namespace client + } // namespace reindexer diff --git a/cpp_src/client/rpcclient.cc b/cpp_src/client/rpcclient.cc index d7747af15..35fcc5672 100644 --- a/cpp_src/client/rpcclient.cc +++ b/cpp_src/client/rpcclient.cc @@ -1,5 +1,4 @@ #include "client/rpcclient.h" -#include #include #include "client/itemimpl.h" #include "core/namespacedef.h" @@ -8,7 +7,6 @@ #include "tools/cpucheck.h" #include "tools/errors.h" #include "tools/logger.h" -#include "vendor/gason/gason.h" namespace reindexer { namespace client { @@ -245,8 +243,8 @@ Error RPCClient::modifyItem(std::string_view nsName, Item& item, int mode, secon } QueryResults qr; InternalRdxContext ctxCompl = ctx.WithCompletion(nullptr); - auto ret = selectImpl(Query(std::string(nsName)).Limit(0), qr, nullptr, netTimeout, ctxCompl); - if (ret.code() == errTimeout) { + Error err = selectImpl(Query(std::string(nsName)).Limit(0), qr, nullptr, netTimeout, ctxCompl); + if (err.code() == errTimeout) { return Error(errTimeout, "Request timeout"); } if (withNetTimeout) { @@ -254,11 +252,10 @@ Error RPCClient::modifyItem(std::string_view nsName, Item& item, int mode, secon } auto newItem = NewItem(nsName); char* endp = nullptr; - Error err = newItem.FromJSON(item.impl_->GetJSON(), &endp); + err = newItem.FromJSON(item.impl_->GetJSON(), &endp); if (!err.ok()) { return err; } - item = std::move(newItem); continue; } @@ -272,6 +269,8 @@ Error RPCClient::modifyItem(std::string_view nsName, Item& item, int mode, secon return err; } } + + return errOK; } Error RPCClient::modifyItemAsync(std::string_view nsName, Item* item, int mode, cproto::ClientConnection* conn, seconds netTimeout, @@ -768,7 +767,7 @@ void RPCClient::onUpdates(net::cproto::RPCAnswer& ans, cproto::ClientConnection* // then we need to ask server to send tagsMatcher. ++serialDelays_; - // Delay this update and all the further updates until we get responce from server. + // Postpone this update and all subsequent updates until we receive response from the server. ans.EnsureHold(); delayedUpdates_.emplace_back(std::move(ans)); @@ -828,8 +827,8 @@ void RPCClient::onUpdates(net::cproto::RPCAnswer& ans, cproto::ClientConnection* } else if (wrec.type == WalTagsMatcher) { TagsMatcher tm; Serializer ser(wrec.data.data(), wrec.data.size()); - const auto version = ser.GetVarint(); - const auto stateToken = ser.GetVarint(); + const auto version = int(ser.GetVarint()); + const auto stateToken = int(ser.GetVarint()); tm.deserialize(ser, version, stateToken); auto ns = getNamespace(nsName); std::lock_guard lck(ns->lck_); diff --git a/cpp_src/client/rpcclientmock.cc b/cpp_src/client/rpcclientmock.cc index 7b6bd9e58..155472c8e 100644 --- a/cpp_src/client/rpcclientmock.cc +++ b/cpp_src/client/rpcclientmock.cc @@ -1,7 +1,6 @@ #include "rpcclientmock.h" #include #include "client/itemimpl.h" -#include "core/namespacedef.h" #include "tools/errors.h" namespace reindexer { @@ -129,7 +128,7 @@ Error RPCClientMock::modifyItem(std::string_view nsName, Item& item, int mode, s data = item.GetMsgPack(); break; default: - return Error(errParams, "ModifyItem: Unknow data format [%d]", format); + return Error(errParams, "ModifyItem: Unknown data format [%d]", format); } auto ret = conn->Call(mkCommand(cproto::kCmdModifyItem, netTimeout, &ctx), nsName, format, data, mode, ser.Slice(), item.GetStateToken(), 0); @@ -142,8 +141,8 @@ Error RPCClientMock::modifyItem(std::string_view nsName, Item& item, int mode, s } QueryResults qr; InternalRdxContext ctxCompl = ctx.WithCompletion(nullptr); - auto ret = selectImpl(Query(std::string(nsName)).Limit(0), qr, nullptr, netTimeout, ctxCompl, format); - if (ret.code() == errTimeout) { + Error err = selectImpl(Query(std::string(nsName)).Limit(0), qr, nullptr, netTimeout, ctxCompl, format); + if (err.code() == errTimeout) { return Error(errTimeout, "Request timeout"); } if (withNetTimeout) { @@ -151,7 +150,7 @@ Error RPCClientMock::modifyItem(std::string_view nsName, Item& item, int mode, s } auto newItem = NewItem(nsName); char* endp = nullptr; - Error err = newItem.FromJSON(item.impl_->GetJSON(), &endp); + err = newItem.FromJSON(item.impl_->GetJSON(), &endp); if (!err.ok()) { return err; } @@ -169,6 +168,8 @@ Error RPCClientMock::modifyItem(std::string_view nsName, Item& item, int mode, s return err; } } + + return errOK; } Error RPCClientMock::modifyItemAsync(std::string_view nsName, Item* item, int mode, cproto::ClientConnection* conn, seconds netTimeout, @@ -196,7 +197,7 @@ Error RPCClientMock::modifyItemAsync(std::string_view nsName, Item* item, int mo data = item->GetMsgPack(); break; default: - return Error(errParams, "ModifyItem: Unknow data format [%d]", format); + return Error(errParams, "ModifyItem: Unknown data format [%d]", format); } std::string ns(nsName); diff --git a/cpp_src/client/synccororeindexer.cc b/cpp_src/client/synccororeindexer.cc index 630d49ade..bd3fd61d8 100644 --- a/cpp_src/client/synccororeindexer.cc +++ b/cpp_src/client/synccororeindexer.cc @@ -1,8 +1,7 @@ #include "synccororeindexer.h" #include "synccororeindexerimpl.h" -namespace reindexer { -namespace client { +namespace reindexer::client { SyncCoroReindexer::SyncCoroReindexer(const ReindexerConfig& config) : impl_(new SyncCoroReindexerImpl(config)), owner_(true), ctx_() {} SyncCoroReindexer::~SyncCoroReindexer() { @@ -67,8 +66,9 @@ Error SyncCoroReindexer::GetSqlSuggestions(const std::string_view sqlQuery, int Error SyncCoroReindexer::Status() { return impl_->Status(ctx_); } SyncCoroTransaction SyncCoroReindexer::NewTransaction(std::string_view nsName) { return impl_->NewTransaction(nsName, ctx_); } -Error SyncCoroReindexer::CommitTransaction(SyncCoroTransaction& tr) { return impl_->CommitTransaction(tr, ctx_); } +Error SyncCoroReindexer::CommitTransaction(SyncCoroTransaction& tr, SyncCoroQueryResults& result) { + return impl_->CommitTransaction(tr, result, ctx_); +} Error SyncCoroReindexer::RollBackTransaction(SyncCoroTransaction& tr) { return impl_->RollBackTransaction(tr, ctx_); } -} // namespace client -} // namespace reindexer +} // namespace reindexer::client diff --git a/cpp_src/client/synccororeindexer.h b/cpp_src/client/synccororeindexer.h index 20c989c83..3cfe10a3b 100644 --- a/cpp_src/client/synccororeindexer.h +++ b/cpp_src/client/synccororeindexer.h @@ -154,7 +154,8 @@ class SyncCoroReindexer { SyncCoroTransaction NewTransaction(std::string_view nsName); /// Commit transaction - transaction will be deleted after commit /// @param tr - transaction to commit - Error CommitTransaction(SyncCoroTransaction& tr); + /// @param result - QueryResults with IDs of items changed by tx. + Error CommitTransaction(SyncCoroTransaction& tr, SyncCoroQueryResults& result); /// RollBack transaction - transaction will be deleted after rollback /// @param tr - transaction to rollback Error RollBackTransaction(SyncCoroTransaction& tr); diff --git a/cpp_src/client/synccororeindexerimpl.cc b/cpp_src/client/synccororeindexerimpl.cc index 2d44e27cd..81ac7a5cc 100644 --- a/cpp_src/client/synccororeindexerimpl.cc +++ b/cpp_src/client/synccororeindexerimpl.cc @@ -130,8 +130,8 @@ SyncCoroTransaction SyncCoroReindexerImpl::NewTransaction(std::string_view nsNam } return SyncCoroTransaction(tx.Status(), this); } -Error SyncCoroReindexerImpl::CommitTransaction(SyncCoroTransaction& tr, const InternalRdxContext& ctx) { - Error err = sendCommand(DbCmdCommitTransaction, tr.tr_, ctx); +Error SyncCoroReindexerImpl::CommitTransaction(SyncCoroTransaction& tr, SyncCoroQueryResults& results, const InternalRdxContext& ctx) { + Error err = sendCommand(DbCmdCommitTransaction, tr.tr_, results.results_, ctx); tr = SyncCoroTransaction(errOK, this); return err; } @@ -386,8 +386,10 @@ void SyncCoroReindexerImpl::coroInterpreter(reindexer::client::CoroRPCClient& rx break; } case DbCmdCommitTransaction: { - std::function f = - std::bind(&client::CoroRPCClient::CommitTransaction, &rx, _1, _2); + std::function f = + std::bind(static_cast( + &client::CoroRPCClient::CommitTransaction), + &rx, _1, _2, _3); execCommand(v.first, f); break; } diff --git a/cpp_src/client/synccororeindexerimpl.h b/cpp_src/client/synccororeindexerimpl.h index f7ea0442b..eac5bf1a7 100644 --- a/cpp_src/client/synccororeindexerimpl.h +++ b/cpp_src/client/synccororeindexerimpl.h @@ -53,7 +53,7 @@ class SyncCoroReindexerImpl { Error GetSqlSuggestions(const std::string_view sqlQuery, int pos, std::vector& suggestions); Error Status(const InternalRdxContext& ctx); SyncCoroTransaction NewTransaction(std::string_view nsName, const InternalRdxContext& ctx); - Error CommitTransaction(SyncCoroTransaction& tr, const InternalRdxContext& ctx); + Error CommitTransaction(SyncCoroTransaction& tr, SyncCoroQueryResults& results, const InternalRdxContext& ctx); Error RollBackTransaction(SyncCoroTransaction& tr, const InternalRdxContext& ctx); private: diff --git a/cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh b/cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh new file mode 100755 index 000000000..873181e20 --- /dev/null +++ b/cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh @@ -0,0 +1,197 @@ +#!/bin/bash +# Task: https://github.com/restream/reindexer/-/issues/1188 +set -e + +function KillAndRemoveServer { + local pid=$1 + kill $pid + wait $pid + yum remove -y 'reindexer*' > /dev/null +} + +function WaitForDB { + # wait until DB is loaded + set +e # disable "exit on error" so the script won't stop when DB's not loaded yet + is_connected=$(reindexer_tool --dsn $ADDRESS --command '\databases list'); + while [[ $is_connected != "test" ]] + do + sleep 2 + is_connected=$(reindexer_tool --dsn $ADDRESS --command '\databases list'); + done + set -e +} + +function CompareNamespacesLists { + local ns_list_actual=$1 + local ns_list_expected=$2 + local pid=$3 + + diff=$(echo ${ns_list_actual[@]} ${ns_list_expected[@]} | tr ' ' '\n' | sort | uniq -u) # compare in any order + if [ "$diff" == "" ]; then + echo "## PASS: namespaces list not changed" + else + echo "##### FAIL: namespaces list was changed" + echo "expected: $ns_list_expected" + echo "actual: $ns_list_actual" + KillAndRemoveServer $pid; + exit 1 + fi +} + +function CompareMemstats { + local actual=$1 + local expected=$2 + local pid=$3 + diff=$(echo ${actual[@]} ${expected[@]} | tr ' ' '\n' | sed 's/\(.*\),$/\1/' | sort | uniq -u) # compare in any order + if [ "$diff" == "" ]; then + echo "## PASS: memstats not changed" + else + echo "##### FAIL: memstats was changed" + echo "expected: $expected" + echo "actual: $actual" + KillAndRemoveServer $pid; + exit 1 + fi +} + + +RX_SERVER_CURRENT_VERSION_RPM="$(basename build/reindexer-*server*.rpm)" +VERSION_FROM_RPM=$(echo "$RX_SERVER_CURRENT_VERSION_RPM" | grep -o '.*server-..') +VERSION=$(echo ${VERSION_FROM_RPM: -2:1}) # one-digit version + +echo "## choose latest release rpm file" +if [ $VERSION == 3 ]; then + LATEST_RELEASE=$(python3 cpp_src/cmd/reindexer_server/test/get_last_rx_version.py -v 3) + namespaces_list_expected=$'purchase_options_ext_dict\nchild_account_recommendations\n#config\n#activitystats\nradio_channels\ncollections\n#namespaces\nwp_imports_tasks\nepg_genres\nrecom_media_items_personal\nrecom_epg_archive_default\n#perfstats\nrecom_epg_live_default\nmedia_view_templates\nasset_video_servers\nwp_tasks_schedule\nadmin_roles\n#clientsstats\nrecom_epg_archive_personal\nrecom_media_items_similars\nmenu_items\naccount_recommendations\nkaraoke_items\nmedia_items\nbanners\n#queriesperfstats\nrecom_media_items_default\nrecom_epg_live_personal\nservices\n#memstats\nchannels\nmedia_item_recommendations\nwp_tasks_tasks\nepg' +elif [ $VERSION == 4 ]; then + LATEST_RELEASE=$(python3 cpp_src/cmd/reindexer_server/test/get_last_rx_version.py -v 4) + # replicationstats ns added for v4 + namespaces_list_expected=$'purchase_options_ext_dict\nchild_account_recommendations\n#config\n#activitystats\n#replicationstats\nradio_channels\ncollections\n#namespaces\nwp_imports_tasks\nepg_genres\nrecom_media_items_personal\nrecom_epg_archive_default\n#perfstats\nrecom_epg_live_default\nmedia_view_templates\nasset_video_servers\nwp_tasks_schedule\nadmin_roles\n#clientsstats\nrecom_epg_archive_personal\nrecom_media_items_similars\nmenu_items\naccount_recommendations\nkaraoke_items\nmedia_items\nbanners\n#queriesperfstats\nrecom_media_items_default\nrecom_epg_live_personal\nservices\n#memstats\nchannels\nmedia_item_recommendations\nwp_tasks_tasks\nepg' +else + echo "Unknown version" + exit 1 +fi + +echo "## downloading latest release rpm file: $LATEST_RELEASE" +curl "http://repo.itv.restr.im/itv-api-ng/7/x86_64/$LATEST_RELEASE" --output $LATEST_RELEASE; +echo "## downloading example DB" +curl "https://github.com/restream/reindexer_testdata/-/raw/main/dump_demo.zip" --output dump_demo.zip; +unzip -o dump_demo.zip # unzips into demo_test.rxdump; + +ADDRESS="cproto://127.0.0.1:6534/" +DB_NAME="test" + +memstats_expected=$'[ +{"name":"account_recommendations","replication":{"data_hash":6833710705,"data_count":1}}, +{"name":"admin_roles","replication":{"data_hash":1896088071,"data_count":2}}, +{"name":"asset_video_servers","replication":{"data_hash":7404222244,"data_count":97}}, +{"name":"banners","replication":{"data_hash":0,"data_count":0}}, +{"name":"channels","replication":{"data_hash":457292509431319,"data_count":3941}}, +{"name":"child_account_recommendations","replication":{"data_hash":6252344969,"data_count":1}}, +{"name":"collections","replication":{"data_hash":0,"data_count":0}}, +{"name":"epg","replication":{"data_hash":-7049751653258,"data_count":1623116}}, +{"name":"epg_genres","replication":{"data_hash":8373644068,"data_count":1315}}, +{"name":"karaoke_items","replication":{"data_hash":5858155773472,"data_count":4500}}, +{"name":"media_item_recommendations","replication":{"data_hash":-6520334670,"data_count":35886}}, +{"name":"media_items","replication":{"data_hash":-1824301168479972392,"data_count":65448}}, +{"name":"media_view_templates","replication":{"data_hash":0,"data_count":0}}, +{"name":"menu_items","replication":{"data_hash":0,"data_count":0}}, +{"name":"purchase_options_ext_dict","replication":{"data_hash":24651210926,"data_count":3}}, +{"name":"radio_channels","replication":{"data_hash":37734732881,"data_count":28}}, +{"name":"recom_epg_archive_default","replication":{"data_hash":0,"data_count":0}}, +{"name":"recom_epg_archive_personal","replication":{"data_hash":0,"data_count":0}}, +{"name":"recom_epg_live_default","replication":{"data_hash":0,"data_count":0}}, +{"name":"recom_epg_live_personal","replication":{"data_hash":0,"data_count":0}}, +{"name":"recom_media_items_default","replication":{"data_hash":8288213744,"data_count":3}}, +{"name":"recom_media_items_personal","replication":{"data_hash":0,"data_count":0}}, +{"name":"recom_media_items_similars","replication":{"data_hash":-672103903,"data_count":33538}}, +{"name":"services","replication":{"data_hash":0,"data_count":0}}, +{"name":"wp_imports_tasks","replication":{"data_hash":777859741066,"data_count":1145}}, +{"name":"wp_tasks_schedule","replication":{"data_hash":12595790956,"data_count":4}}, +{"name":"wp_tasks_tasks","replication":{"data_hash":28692716680,"data_count":281}} +] +Returned 27 rows' + +echo "##### Forward compatibility test #####" + +DB_PATH=$(pwd)"/rx_db" + +echo "Database: "$DB_PATH + +echo "## installing latest release: $LATEST_RELEASE" +yum install -y $LATEST_RELEASE > /dev/null; +# run RX server with disabled logging +reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH & +server_pid=$! +sleep 2; + +reindexer_tool --dsn $ADDRESS$DB_NAME -f demo_test.rxdump --createdb; +sleep 1; + +namespaces_1=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list'); +echo $namespaces_1; +CompareNamespacesLists "${namespaces_1[@]}" "${namespaces_list_expected[@]}" $server_pid; + +memstats_1=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select name, replication.data_hash, replication.data_count from #memstats order by name'); +CompareMemstats "${memstats_1[@]}" "${memstats_expected[@]}" $server_pid; + +KillAndRemoveServer $server_pid; + +echo "## installing current version: $RX_SERVER_CURRENT_VERSION_RPM" +yum install -y build/*.rpm > /dev/null; +reindexer_server -l0 --corelog=none --httplog=none --rpclog=none --db $DB_PATH & +server_pid=$! +sleep 2; + +WaitForDB + +namespaces_2=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list'); +echo $namespaces_2; +CompareNamespacesLists "${namespaces_2[@]}" "${namespaces_1[@]}" $server_pid; + + +memstats_2=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select name, replication.data_hash, replication.data_count from #memstats order by name'); +CompareMemstats "${memstats_2[@]}" "${memstats_1[@]}" $server_pid; + +KillAndRemoveServer $server_pid; +rm -rf $DB_PATH; +sleep 1; + +echo "##### Backward compatibility test #####" + +echo "## installing current version: $RX_SERVER_CURRENT_VERSION_RPM" +yum install -y build/*.rpm > /dev/null; +reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH & +server_pid=$! +sleep 2; + +reindexer_tool --dsn $ADDRESS$DB_NAME -f demo_test.rxdump --createdb; +sleep 1; + +namespaces_3=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list'); +echo $namespaces_3; +CompareNamespacesLists "${namespaces_3[@]}" "${namespaces_list_expected[@]}" $server_pid; + + +memstats_3=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select name, replication.data_hash, replication.data_count from #memstats order by name'); +CompareMemstats "${memstats_3[@]}" "${memstats_expected[@]}" $server_pid; + +KillAndRemoveServer $server_pid; + +echo "## installing latest release: $LATEST_RELEASE" +yum install -y $LATEST_RELEASE > /dev/null; +reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH & +server_pid=$! +sleep 2; + +WaitForDB + +namespaces_4=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list'); +echo $namespaces_4; +CompareNamespacesLists "${namespaces_4[@]}" "${namespaces_3[@]}" $server_pid; + +memstats_4=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select name, replication.data_hash, replication.data_count from #memstats order by name'); +CompareMemstats "${memstats_4[@]}" "${memstats_3[@]}" $server_pid; + +KillAndRemoveServer $server_pid; +rm -rf $DB_PATH; diff --git a/cpp_src/cmd/reindexer_tool/readme.md b/cpp_src/cmd/reindexer_tool/readme.md index cfc94caaf..690553632 100644 --- a/cpp_src/cmd/reindexer_tool/readme.md +++ b/cpp_src/cmd/reindexer_tool/readme.md @@ -22,13 +22,15 @@ reindexer_tool {OPTIONS} Options -d[DSN], --dsn=[DSN] DSN to 'reindexer', like 'cproto://127.0.0.1:6534/dbname', 'builtin:///var/lib/reindexer/dbname' or `ucproto://user@password:/tmp/reindexer.sock:/dbname` - -f[FILENAME], --filename=[FILENAME] execute commands from file, then exit - -c[COMMAND], --command=[COMMAND] run only single command (SQL or internal) and exit - -o[FILENAME], --output=[FILENAME] send query results to file - -l[INT=1..5], --log=[INT=1..5] reindexer logging level + -f[FILENAME], --filename=[FILENAME] Execute commands from file, then exit + -c[COMMAND], --command=[COMMAND] Run single command (SQL or internal) and exit + -o[FILENAME], --output=[FILENAME] Send query results to file + -l[INT=1..5], --log=[INT=1..5] Reindexer logging level -C[INT], --connections=[INT] Number of simulateonous connections to db -t[INT], --threads=[INT] Number of threads used by db connector (used only for bench) - + --createdb Creates target database if it is missing + -a[Application name], + --appname=[Application name] Application name which will be used in login info ``` ## Commands @@ -135,5 +137,8 @@ reindexer_tool --dsn cproto://127.0.0.1:6534/mydb --command '\dump' --output myd Restore database from backup file: ```sh -reindexer_tool --dsn cproto://127.0.0.1:6534/mydb --filename mydb.rxdump +reindexer_tool --dsn cproto://127.0.0.1:6534/mydb --createdb --filename mydb.rxdump ``` + +Option `createdb` in example above allows to automatically create `mydb` if it does not exist. By default `reindexer_tool` requires an existing database to connect. + diff --git a/cpp_src/cmd/reindexer_tool/reindexer_tool.cc b/cpp_src/cmd/reindexer_tool/reindexer_tool.cc index 5ccc70ea9..0da07360f 100644 --- a/cpp_src/cmd/reindexer_tool/reindexer_tool.cc +++ b/cpp_src/cmd/reindexer_tool/reindexer_tool.cc @@ -65,26 +65,26 @@ int main(int argc, char* argv[]) { "DSN to 'reindexer'. Can be 'cproto://:/', 'builtin://' or 'ucproto://:/'", {'d', "dsn"}, "", Options::Single | Options::Global); #endif // _WIN32 - args::ValueFlag fileName(progOptions, "FILENAME", "execute commands from file, then exit", {'f', "filename"}, "", + args::ValueFlag fileName(progOptions, "FILENAME", "Execute commands from file, then exit", {'f', "filename"}, "", Options::Single | Options::Global); - args::ValueFlag command(progOptions, "COMMAND", "run only single command (SQL or internal) and exit'", {'c', "command"}, - "", Options::Single | Options::Global); - args::ValueFlag outFileName(progOptions, "FILENAME", "send query results to file", {'o', "output"}, "", + args::ValueFlag command(progOptions, "COMMAND", "Run single command (SQL or internal) and exit'", {'c', "command"}, "", + Options::Single | Options::Global); + args::ValueFlag outFileName(progOptions, "FILENAME", "Send query results to file", {'o', "output"}, "", Options::Single | Options::Global); args::ValueFlag connThreads(progOptions, "INT=1..65535", "Number of threads(connections) used by db connector", {'t', "threads"}, 1, Options::Single | Options::Global); - args::Flag createDBF(progOptions, "", "Enable created database if missed", {"createdb"}); + args::Flag createDBF(progOptions, "", "Creates target database if it is missing", {"createdb"}); args::Positional dbName(progOptions, "DB name", "Name of a database to get connected to", Options::Single); - args::ActionFlag logLevel(progOptions, "INT=1..5", "reindexer logging level", {'l', "log"}, 1, &InstallLogLevel, + args::ActionFlag logLevel(progOptions, "INT=1..5", "Reindexer logging level", {'l', "log"}, 1, &InstallLogLevel, Options::Single | Options::Global); - args::Flag repair(progOptions, "", "Repair database", {'r', "repair"}); + args::Flag repair(progOptions, "", "Try to repair storage", {'r', "repair"}); - args::ValueFlag appName(progOptions, "Application name", "Application name which will be used in login info", + args::ValueFlag appName(progOptions, "Application name", "Application name that will be used in login info", {'a', "appname"}, "reindexer_tool", Options::Single | Options::Global); args::GlobalOptions globals(parser, progOptions); diff --git a/cpp_src/core/query/dsl/dslparser.cc b/cpp_src/core/query/dsl/dslparser.cc index f20bbb2ee..64f774ca8 100644 --- a/cpp_src/core/query/dsl/dslparser.cc +++ b/cpp_src/core/query/dsl/dslparser.cc @@ -378,8 +378,18 @@ static void parseFilter(const JsonValue& filter, Query& q, std::vector 0); const auto& qjoin = q.GetJoinQueries().back(); - if (qjoin.joinType != JoinType::LeftJoin) { - q.AppendQueryEntry((qjoin.joinType == JoinType::InnerJoin) ? OpAnd : OpOr, q.GetJoinQueries().size() - 1); + if (qjoin.joinType == JoinType::LeftJoin) { + if (op != OpAnd) { + throw Error(errParseJson, "Operation %s is not allowed with LeftJoin", OpTypeToStr(op)); + } + } else { + if (qjoin.joinType == JoinType::OrInnerJoin) { + if (op == OpNot) { + throw Error(errParseJson, "Operation NOT is not allowed with OrInnerJoin"); + } + op = OpOr; + } + q.AppendQueryEntry(op, q.GetJoinQueries().size() - 1); } break; } diff --git a/cpp_src/core/reindexer.h b/cpp_src/core/reindexer.h index e3b8adc2b..b6dbbeb2b 100644 --- a/cpp_src/core/reindexer.h +++ b/cpp_src/core/reindexer.h @@ -15,7 +15,6 @@ using std::chrono::milliseconds; class ReindexerImpl; class IUpdatesObserver; class IClientsStats; -class ProtobufSchema; class UpdatesFilters; /// The main Reindexer interface. Holds database object
@@ -32,7 +31,7 @@ class Reindexer { /// Create Reindexer database object /// @param cfg - general database options - Reindexer(ReindexerConfig cfg = ReindexerConfig()); + explicit Reindexer(ReindexerConfig cfg = ReindexerConfig()); /// Destroy Reindexer database object ~Reindexer(); @@ -281,6 +280,7 @@ class Reindexer { typedef QueryResults QueryResultsT; typedef Item ItemT; + typedef Transaction TransactionT; Error DumpIndex(std::ostream& os, std::string_view nsName, std::string_view index); diff --git a/cpp_src/core/transaction.cc b/cpp_src/core/transaction.cc index 1e63cd09f..e48414308 100644 --- a/cpp_src/core/transaction.cc +++ b/cpp_src/core/transaction.cc @@ -2,6 +2,7 @@ #include "transactionimpl.h" namespace reindexer { +Transaction::Transaction() = default; Transaction::Transaction(const std::string& nsName, const PayloadType& pt, const TagsMatcher& tm, const FieldsSet& pf, std::shared_ptr schema) : impl_(new TransactionImpl(nsName, pt, tm, pf, std::move(schema))) {} @@ -9,8 +10,8 @@ Transaction::Transaction(const std::string& nsName, const PayloadType& pt, const Transaction::Transaction(const Error& err) : status_(err) {} Transaction::~Transaction() = default; -Transaction::Transaction(Transaction&&) noexcept = default; -Transaction& Transaction::operator=(Transaction&&) noexcept = default; +Transaction::Transaction(Transaction&& other) noexcept = default; +Transaction& Transaction::operator=(Transaction&& other) noexcept = default; const std::string& Transaction::GetName() const { static std::string empty; diff --git a/cpp_src/core/transaction.h b/cpp_src/core/transaction.h index 8d0f92a48..bbea26def 100644 --- a/cpp_src/core/transaction.h +++ b/cpp_src/core/transaction.h @@ -15,11 +15,11 @@ class Transaction { public: using time_point = system_clock_w::time_point; + Transaction(); Transaction(const std::string& nsName, const PayloadType& pt, const TagsMatcher& tm, const FieldsSet& pf, std::shared_ptr schema); Transaction(const Error& err); ~Transaction(); - Transaction() = default; Transaction(Transaction&&) noexcept; Transaction& operator=(Transaction&&) noexcept; diff --git a/cpp_src/gtests/tests/API/base_tests.cc b/cpp_src/gtests/tests/API/base_tests.cc index 6436a7192..7ea486928 100644 --- a/cpp_src/gtests/tests/API/base_tests.cc +++ b/cpp_src/gtests/tests/API/base_tests.cc @@ -1191,7 +1191,7 @@ TEST_F(ReindexerApi, DslFieldsTest) { } }, { - "op": "OR", + "op": "AND", "join_query": { "type": "left", "namespace": "test2", diff --git a/cpp_src/gtests/tests/unit/clientsstats_test.cc b/cpp_src/gtests/tests/unit/clientsstats_test.cc index 4f28c860d..a402c8841 100644 --- a/cpp_src/gtests/tests/unit/clientsstats_test.cc +++ b/cpp_src/gtests/tests/unit/clientsstats_test.cc @@ -190,9 +190,10 @@ TEST_F(ClientsStatsApi, ClientsStatsValues) { resultFilters.FromJSON(clientsStats["updates_filter"]); EXPECT_EQ(resultFilters, filters); - err = reindexer.CommitTransaction(tx1); + CoroQueryResults resultDummy1, resultDummy2; + err = reindexer.CommitTransaction(tx1, resultDummy1); ASSERT_TRUE(err.ok()) << err.what(); - err = reindexer.CommitTransaction(tx2); + err = reindexer.CommitTransaction(tx2, resultDummy2); ASSERT_TRUE(err.ok()) << err.what(); finished = true; @@ -343,9 +344,10 @@ TEST_F(ClientsStatsApi, TxCountLimitation) { ASSERT_EQ(txs.size(), kMaxTxCount); ASSERT_EQ(StatsTxCount(reindexer), kMaxTxCount); + CoroQueryResults qrDummy; for (size_t i = 0; i < kMaxTxCount / 2; ++i) { if (i % 2) { - err = reindexer.CommitTransaction(txs[i]); + err = reindexer.CommitTransaction(txs[i], qrDummy); } else { err = reindexer.RollBackTransaction(txs[i]); } @@ -362,7 +364,7 @@ TEST_F(ClientsStatsApi, TxCountLimitation) { for (size_t i = 0; i < txs.size(); ++i) { if (!txs[i].IsFree() && txs[i].Status().ok()) { if (i % 2) { - err = reindexer.CommitTransaction(txs[i]); + err = reindexer.CommitTransaction(txs[i], qrDummy); } else { err = reindexer.RollBackTransaction(txs[i]); } diff --git a/cpp_src/gtests/tests/unit/replication_master_master_test.cc b/cpp_src/gtests/tests/unit/replication_master_master_test.cc index 7e714012f..7b915832f 100644 --- a/cpp_src/gtests/tests/unit/replication_master_master_test.cc +++ b/cpp_src/gtests/tests/unit/replication_master_master_test.cc @@ -137,8 +137,10 @@ class TestNamespace1 { err = tr.Upsert(std::move(item)); ASSERT_TRUE(err.ok()) << err.what(); } - auto err = rx.CommitTransaction(tr); + reindexer::client::SyncCoroQueryResults qr(&rx); + auto err = rx.CommitTransaction(tr, qr); ASSERT_TRUE(err.ok()) << err.what(); + ASSERT_EQ(qr.Count(), count); } std::function AddRow1msSleep = [](ServerControl& masterControl, int from, @@ -1344,7 +1346,7 @@ TEST_P(ServerIdChange, UpdateServerId) { TestNamespace1 ns(master); const int startId = 0; - const int n2 = 20000; + const int n2 = 1000; const int dn = 10; AddFun(master, dataStore, startId, n2); @@ -1398,8 +1400,6 @@ TEST_P(ServerIdChange, UpdateServerId) { std::vector> results; - Query qr = Query("ns1").Sort("id", true); - for (size_t i = 0; i < nodes.size(); i++) { results.emplace_back(); ns.GetDataWithStrings(nodes[i], results.back()); diff --git a/cpp_src/gtests/tests/unit/rpcclient_test.cc b/cpp_src/gtests/tests/unit/rpcclient_test.cc index f0181fe67..7b9816714 100644 --- a/cpp_src/gtests/tests/unit/rpcclient_test.cc +++ b/cpp_src/gtests/tests/unit/rpcclient_test.cc @@ -75,7 +75,7 @@ TEST_F(RPCClientTestApi, RequestCancels) { ASSERT_TRUE(res.ok()) << res.what(); } -TEST_F(RPCClientTestApi, SuccessfullRequestWithTimeout) { +TEST_F(RPCClientTestApi, SuccessfulRequestWithTimeout) { AddFakeServer(); StartServer(); reindexer::client::ReindexerConfig config; @@ -158,7 +158,7 @@ TEST_F(RPCClientTestApi, SeveralDsnReconnect) { ASSERT_TRUE(res.ok()) << res.what(); } res = StopAllServers(); - (void)res; // ingore; Error is fine here + (void)res; // ignore; Error is fine here } TEST_F(RPCClientTestApi, SelectFromClosedNamespace) { @@ -168,8 +168,6 @@ TEST_F(RPCClientTestApi, SelectFromClosedNamespace) { bool finished = false; loop.spawn([&loop, &finished] { const std::string dsn = "cproto://" + kDefaultRPCServerAddr + "/db1"; - reindexer::client::ReindexerConfig config; - config.FetchAmount = 0; reindexer::client::ConnectOpts opts; opts.CreateDBIfMissing(); reindexer::client::CoroReindexer rx; @@ -228,8 +226,6 @@ TEST_F(RPCClientTestApi, RenameNamespace) { bool finished = false; loop.spawn([&loop, &finished] { const std::string dsn = "cproto://" + kDefaultRPCServerAddr + "/db1"; - reindexer::client::ReindexerConfig config; - config.FetchAmount = 0; reindexer::client::ConnectOpts opts; opts.CreateDBIfMissing(); reindexer::client::CoroReindexer rx; @@ -454,7 +450,7 @@ TEST_F(RPCClientTestApi, CoroRequestCancels) { ASSERT_TRUE(err.ok()) << err.what(); } -TEST_F(RPCClientTestApi, CoroSuccessfullRequestWithTimeout) { +TEST_F(RPCClientTestApi, CoroSuccessfulRequestWithTimeout) { // Should be able to execute some basic requests with timeout AddFakeServer(); StartServer(); @@ -587,7 +583,8 @@ TEST_F(RPCClientTestApi, CoroUpserts) { err = tx.Upsert(std::move(item)); ASSERT_TRUE(err.ok()) << err.what(); } - auto err = rx.CommitTransaction(tx); + reindexer::client::CoroQueryResults qr; + auto err = rx.CommitTransaction(tx, qr); ASSERT_TRUE(err.ok()) << err.what(); }; @@ -772,15 +769,15 @@ TEST_F(RPCClientTestApi, CoroUpdatesFilteringByNs) { CreateNamespace(rx, kNs4Name); CreateNamespace(rx, kNs5Name); - UpdatesReciever reciever1(loop); // Recieves updates for ns 'n5' and 'ns1' + UpdatesReciever receiver1(loop); // Receives updates for ns 'n5' and 'ns1' { UpdatesFilters filters; filters.AddFilter(kNs5Name, UpdatesFilters::Filter()); - err = rx.SubscribeUpdates(&reciever1, filters); + err = rx.SubscribeUpdates(&receiver1, filters); ASSERT_TRUE(err.ok()) << err.what(); UpdatesFilters filters1; filters1.AddFilter(kNs1Name, UpdatesFilters::Filter()); - err = rx.SubscribeUpdates(&reciever1, filters1, SubscriptionOpts().IncrementSubscription()); + err = rx.SubscribeUpdates(&receiver1, filters1, SubscriptionOpts().IncrementSubscription()); ASSERT_TRUE(err.ok()) << err.what(); } @@ -788,26 +785,26 @@ TEST_F(RPCClientTestApi, CoroUpdatesFilteringByNs) { const size_t count = 50; FillData(rx, kNs4Name, 0, count); FillData(rx, kNs5Name, 0, count); - ASSERT_TRUE(reciever1.AwaitNamespaces(1)); - ASSERT_TRUE(reciever1.AwaitItems(kNs5Name, count)); - reciever1.Reset(); + ASSERT_TRUE(receiver1.AwaitNamespaces(1)); + ASSERT_TRUE(receiver1.AwaitItems(kNs5Name, count)); + receiver1.Reset(); } - UpdatesReciever reciever2(loop); // Recieves all the updates + UpdatesReciever receiver2(loop); // Receives all the updates { UpdatesFilters filters; - err = rx.SubscribeUpdates(&reciever2, filters); + err = rx.SubscribeUpdates(&receiver2, filters); ASSERT_TRUE(err.ok()) << err.what(); UpdatesFilters filters1; - err = rx.SubscribeUpdates(&reciever2, filters1, SubscriptionOpts().IncrementSubscription()); + err = rx.SubscribeUpdates(&receiver2, filters1, SubscriptionOpts().IncrementSubscription()); ASSERT_TRUE(err.ok()) << err.what(); } - UpdatesReciever reciever3(loop); // Recieves updates for ns 'ns4' + UpdatesReciever receiver3(loop); // Receives updates for ns 'ns4' { UpdatesFilters filters; filters.AddFilter(kNs4Name, UpdatesFilters::Filter()); - err = rx.SubscribeUpdates(&reciever3, filters); + err = rx.SubscribeUpdates(&receiver3, filters); ASSERT_TRUE(err.ok()) << err.what(); } @@ -815,18 +812,18 @@ TEST_F(RPCClientTestApi, CoroUpdatesFilteringByNs) { const size_t count = 100; FillData(rx, kNs4Name, 0, count); FillData(rx, kNs5Name, 0, count); - ASSERT_TRUE(reciever1.AwaitNamespaces(1)); - ASSERT_TRUE(reciever1.AwaitItems(kNs5Name, count)); - reciever1.Reset(); - - ASSERT_TRUE(reciever2.AwaitNamespaces(2)); - ASSERT_TRUE(reciever2.AwaitItems(kNs5Name, count)); - ASSERT_TRUE(reciever2.AwaitItems(kNs4Name, count)); - reciever2.Reset(); - - ASSERT_TRUE(reciever3.AwaitNamespaces(1)); - ASSERT_TRUE(reciever3.AwaitItems(kNs4Name, count)); - reciever3.Reset(); + ASSERT_TRUE(receiver1.AwaitNamespaces(1)); + ASSERT_TRUE(receiver1.AwaitItems(kNs5Name, count)); + receiver1.Reset(); + + ASSERT_TRUE(receiver2.AwaitNamespaces(2)); + ASSERT_TRUE(receiver2.AwaitItems(kNs5Name, count)); + ASSERT_TRUE(receiver2.AwaitItems(kNs4Name, count)); + receiver2.Reset(); + + ASSERT_TRUE(receiver3.AwaitNamespaces(1)); + ASSERT_TRUE(receiver3.AwaitItems(kNs4Name, count)); + receiver3.Reset(); } err = rx.OpenNamespace(kNs1Name); @@ -838,27 +835,27 @@ TEST_F(RPCClientTestApi, CoroUpdatesFilteringByNs) { err = rx.DropNamespace(kNs2Name); ASSERT_TRUE(err.ok()) << err.what(); { - ASSERT_TRUE(reciever1.AwaitNamespaces(2)); - ASSERT_TRUE(reciever1.AwaitItems(kNs1Name, 2)); - ASSERT_TRUE(reciever1.AwaitItems(kNs2Name, 2)); - reciever1.Reset(); - - ASSERT_TRUE(reciever2.AwaitNamespaces(2)); - ASSERT_TRUE(reciever2.AwaitItems(kNs1Name, 2)); - ASSERT_TRUE(reciever2.AwaitItems(kNs2Name, 2)); - reciever2.Reset(); - - ASSERT_TRUE(reciever3.AwaitNamespaces(2)); - ASSERT_TRUE(reciever3.AwaitItems(kNs1Name, 2)); - ASSERT_TRUE(reciever3.AwaitItems(kNs2Name, 2)); - reciever3.Reset(); + ASSERT_TRUE(receiver1.AwaitNamespaces(2)); + ASSERT_TRUE(receiver1.AwaitItems(kNs1Name, 2)); + ASSERT_TRUE(receiver1.AwaitItems(kNs2Name, 2)); + receiver1.Reset(); + + ASSERT_TRUE(receiver2.AwaitNamespaces(2)); + ASSERT_TRUE(receiver2.AwaitItems(kNs1Name, 2)); + ASSERT_TRUE(receiver2.AwaitItems(kNs2Name, 2)); + receiver2.Reset(); + + ASSERT_TRUE(receiver3.AwaitNamespaces(2)); + ASSERT_TRUE(receiver3.AwaitItems(kNs1Name, 2)); + ASSERT_TRUE(receiver3.AwaitItems(kNs2Name, 2)); + receiver3.Reset(); } - err = rx.UnsubscribeUpdates(&reciever1); + err = rx.UnsubscribeUpdates(&receiver1); ASSERT_TRUE(err.ok()) << err.what(); - err = rx.UnsubscribeUpdates(&reciever2); + err = rx.UnsubscribeUpdates(&receiver2); ASSERT_TRUE(err.ok()) << err.what(); - err = rx.UnsubscribeUpdates(&reciever3); + err = rx.UnsubscribeUpdates(&receiver3); ASSERT_TRUE(err.ok()) << err.what(); { @@ -866,9 +863,9 @@ TEST_F(RPCClientTestApi, CoroUpdatesFilteringByNs) { FillData(rx, kNs4Name, 0, count); FillData(rx, kNs5Name, 0, count); loop.sleep(std::chrono::seconds(2)); - ASSERT_TRUE(reciever1.AwaitNamespaces(0)); - ASSERT_TRUE(reciever2.AwaitNamespaces(0)); - ASSERT_TRUE(reciever3.AwaitNamespaces(0)); + ASSERT_TRUE(receiver1.AwaitNamespaces(0)); + ASSERT_TRUE(receiver2.AwaitNamespaces(0)); + ASSERT_TRUE(receiver3.AwaitNamespaces(0)); } rx.Stop(); @@ -883,8 +880,8 @@ TEST_F(RPCClientTestApi, CoroUpdatesFilteringByNs) { ASSERT_TRUE(finished); } -TEST_F(RPCClientTestApi, UnknowResultsFlag) { - // Check if server will not resturn unknown result flag +TEST_F(RPCClientTestApi, UnknownResultsFlag) { + // Check if server will not return unknown result flag StartDefaultRealServer(); ev::dynamic_loop loop; bool finished = false; @@ -999,7 +996,7 @@ TEST_F(RPCClientTestApi, AggregationsWithStrictModeTest) { } TEST_F(RPCClientTestApi, AggregationsFetching) { // Validate, that distinct results will remain valid after query results fetching. - // Actual aggregation values will be sent for initial 'select' only, but must be available at any point of iterator's lifetime. + // Actual aggregation values will only be sent for initial 'select', but must be available at any point in iterator's lifetime. using namespace reindexer::client; using namespace reindexer::net::ev; @@ -1023,7 +1020,7 @@ TEST_F(RPCClientTestApi, AggregationsFetching) { FillData(rx, nsName, 0, kItemsCount); { - CoroQueryResults qr; + reindexer::client::CoroQueryResults qr; const auto q = Query(nsName).Distinct("id").ReqTotal().Explain(); err = rx.Select(q, qr); ASSERT_TRUE(err.ok()) << err.what(); @@ -1058,13 +1055,8 @@ TEST_F(RPCClientTestApi, AggregationsFetching) { } TEST_F(RPCClientTestApi, SubQuery) { - using namespace reindexer::client; - using namespace reindexer::net::ev; - using reindexer::coroutine::wait_group; - using reindexer::coroutine::wait_group_guard; - StartDefaultRealServer(); - dynamic_loop loop; + reindexer::net::ev::dynamic_loop loop; loop.spawn([&loop, this]() noexcept { const std::string kLeftNsName = "left_ns"; @@ -1076,7 +1068,7 @@ TEST_F(RPCClientTestApi, SubQuery) { constexpr auto kFetchCount = 50; constexpr auto kNsSize = kFetchCount * 3; cfg.FetchAmount = kFetchCount; - CoroReindexer rx(cfg); + reindexer::client::CoroReindexer rx(cfg); auto err = rx.Connect(dsn, loop, opts); ASSERT_TRUE(err.ok()) << err.what(); @@ -1125,3 +1117,67 @@ TEST_F(RPCClientTestApi, SubQuery) { loop.run(); } + +TEST_F(RPCClientTestApi, CoroTransactionInsertWithPrecepts) { + StartDefaultRealServer(); + reindexer::net::ev::dynamic_loop loop; + + loop.spawn([&loop, this]() noexcept { + const std::string dsn = "cproto://" + kDefaultRPCServerAddr + "/db1"; + reindexer::client::ConnectOpts opts; + opts.CreateDBIfMissing(); + reindexer::client::CoroReindexer rx; + auto err = rx.Connect(dsn, loop, opts); + ASSERT_TRUE(err.ok()) << err.what(); + const std::string kNsName = "TestCoroInsertWithPrecepts"; + CreateNamespace(rx, kNsName); + + constexpr int kNsSize = 5; + + auto insertFn = [&rx](const std::string& nsName, int count) { + std::vector precepts = {"id=SERIAL()"}; + + auto tx = rx.NewTransaction(nsName); + ASSERT_TRUE(tx.Status().ok()) << tx.Status().what(); + + for (int i = 0; i < count; ++i) { + auto item = tx.NewItem(); + ASSERT_TRUE(item.Status().ok()) << item.Status().what(); + + WrSerializer wrser; + JsonBuilder jsonBuilder(wrser, ObjType::TypeObject); + jsonBuilder.Put("id", 100); + jsonBuilder.End(); + + char* endp = nullptr; + auto err = item.Unsafe().FromJSON(wrser.Slice(), &endp); + ASSERT_TRUE(err.ok()) << err.what(); + + item.SetPrecepts(precepts); + + err = tx.Insert(std::move(item)); + ASSERT_TRUE(err.ok()) << err.what(); + } + reindexer::client::CoroQueryResults qr; + auto err = rx.CommitTransaction(tx, qr); + ASSERT_TRUE(err.ok()) << err.what(); + ASSERT_EQ(qr.Count(), count); + }; + + insertFn(kNsName, kNsSize); + + { + client::CoroQueryResults qr; + err = rx.Select(Query(kNsName), qr); + ASSERT_TRUE(err.ok()) << err.what(); + ASSERT_EQ(qr.Count(), kNsSize); + for (auto& it : qr) { + ASSERT_TRUE(it.Status().ok()) << it.Status().what(); + } + } + + rx.Stop(); + }); + + loop.run(); +} diff --git a/cpp_src/readme.md b/cpp_src/readme.md index e4626811c..3dd7bb331 100644 --- a/cpp_src/readme.md +++ b/cpp_src/readme.md @@ -4,7 +4,7 @@ Reindexer's goal is to provide fast search with complex queries. Reindexer is compact, fast and it does not have heavy dependencies. -reindexer is up to 5x times faster, than mongodb, and 10x times than elastic search. See [benchmaks section](https://github.com/Restream/reindexer-benchmarks) for details. +reindexer is up to 5x times faster, than mongodb, and 10x times than elastic search. See [benchmarks section](https://github.com/Restream/reindexer-benchmarks) for details. # Installation @@ -20,7 +20,7 @@ docker run -p9088:9088 -p6534:6534 -it reindexer/reindexer ### Container configuration -While using docker, you may pass reindexer server config options via envinronment variables: +While using docker, you may pass reindexer server config options via environment variables: - `RX_DATABASE` - path to reindexer's storage. Default value is `/db`. - `RX_CORELOG` - path to core log file (or `none` to disable core logging). Default value is `stdout`. @@ -149,7 +149,7 @@ The simplest way to use reindexer with any program language - is using REST API. ## GRPC API -[GPRC](https://grpc.io) is a modern open-source high-performance RPC framework developed at Google that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication. It uses HTTP/2 for transport, Protocol Buffers as the interface description language and it is more efficient (and also easier) to use than HTTP API. Reindexer supports GRPC API since version 3.0. +[GRPC](https://grpc.io) is a modern open-source high-performance RPC framework developed at Google that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication. It uses HTTP/2 for transport, Protocol Buffers as the interface description language, and it is more efficient (and also easier) to use than HTTP API. Reindexer supports GRPC API since version 3.0. Reindexer's GRPC API is defined in [reindexer.proto](server/proto/reindexer.proto) file. @@ -177,20 +177,20 @@ The concept of streaming is described [here](https://grpc.io/docs/what-is-grpc/c ### Prometheus (server-side) -Reindexer has a bunch of prometheus metrics available via http-URL `/metrics` (i.e. `http://localhost:9088/metrics`). This metrics may be enabled by passing `--prometheus` as reindexer_server command line argument or by setting `metrics:prometheus` flag in server yaml-config file. Some of the metrics also require `perfstats` to be enabled in `profiling`-config +Reindexer has a bunch of prometheus metrics available via http-URL `/metrics` (i.e. `http://localhost:9088/metrics`). This metrics may be enabled by passing `--prometheus` as reindexer_server command line argument or by setting `metrics:prometheus` flag in server yaml-config file. Some metrics also require `perfstats` to be enabled in `profiling`-config `reindexer_qps_total` - total queries per second for each database, namespace and query type -`reindexer_avg_latency` - average queryies latency for each database, namespace and query type +`reindexer_avg_latency` - average queries latency for each database, namespace and query type `reindexer_caches_size_bytes`, `reindexer_indexes_size_bytes`, `reindexer_data_size_bytes` - caches, indexes and data size for each namespace `reindexer_items_count` - items count in each namespace -`reindexer_memory_allocated_bytes` - current amount of dynamicly allocated memory according to tcmalloc/jemalloc +`reindexer_memory_allocated_bytes` - current amount of dynamically allocated memory according to tcmalloc/jemalloc `reindexer_rpc_clients_count` - current number of RPC clients for each database `reindexer_input_traffic_total_bytes`, `reindexer_output_traffic_total_bytes` - total input/output RPC/http traffic for each database `reindexer_info` - generic reindexer server info (currently it's just a version number) ### Prometheus (client-side, Go) -Go binding for reindexer is using [prometheus/client_golang](https://github.com/prometheus/client_golang) to collect some metrics (RPS and request's latency) from client's side. Pass `WithPrometheusMetrics()`-option to enable metric's collecting: +Go binding for reindexer is using [prometheus/client_golang](https://github.com/prometheus/client_golang) to collect some metrics (RPS and request's latency) from client's side. Pass `WithPrometheusMetrics()`-option to enable metrics collecting: ``` // Create DB connection for cproto-mode with metrics enabled db := reindexer.NewReindex("cproto://127.0.0.1:6534/testdb", reindex.WithPrometheusMetrics()) @@ -198,10 +198,10 @@ db := reindexer.NewReindex("cproto://127.0.0.1:6534/testdb", reindex.WithPrometh http.Handle("/metrics", promhttp.Handler()) ``` -All of the metricts will be exported into `DefaultRegistry`. Check [this](https://github.com/prometheus/client_golang/blob/main/prometheus/promauto/auto.go#L57-L85) for basic prometheus usage example. +All the metrics will be exported into `DefaultRegistry`. Check [this](https://github.com/prometheus/client_golang/blob/main/prometheus/promauto/auto.go#L57-L85) for basic prometheus usage example. -Both server-side and client-side metrics contain 'latency', however, client-side latency will also count all the time consumed by the binding's queue, network communication (for cproto/ucproto) and deseriallization. -So client-side latency may be more rellevant for user's applications the server-side latency. +Both server-side and client-side metrics contain 'latency', however, client-side latency will also count all the time consumed by the binding's queue, network communication (for cproto/ucproto) and deserialization. +So client-side latency may be more relevant for user's applications the server-side latency. ## Maintenance @@ -270,7 +270,7 @@ Mode may be set via command line options on server startup: reindexer_server --db /tmp/rx --rpc-threading dedicated --http-threading shared ``` -In shared mode server creates fixed number of threads to handle connections (one thread per physical CPU core) and all of the connection will be distributed between those threads. In this mode requests from different connections may be forced to be executed sequentially. +In shared mode server creates fixed number of threads to handle connections (one thread per physical CPU core) and all the connection will be distributed between those threads. In this mode requests from different connections may be forced to be executed sequentially. In dedicated mode server creates one thread per connection. This approach may be inefficient in case of frequent reconnects or large amount of database clients (due to thread creation overhead), however it allows to reach maximum level of concurrency for requests. @@ -297,12 +297,12 @@ storage: engine: leveldb ``` -To configure storage type for Go bindings either `bindings.ConnectOptions` (for builtin) or `confg.ServerConfig` (for builtinserver) structs may be used. +To configure storage type for Go bindings either `bindings.ConnectOptions` (for builtin) or `confg.ServerConfig` (for builtin-server) structs may be used. ### RocksDB Reindexer will try to autodetect RocksDB library and its dependencies at compile time if CMake flag `ENABLE_ROCKSDB` was passed (enabled by default). -If reindexer library was built with rocksdb, it requires Go build tag `rocksdb` in order to link with go-applications and go-bindinds. +If reindexer library was built with rocksdb, it requires Go build tag `rocksdb` in order to link with go-applications and go-bindings. ### Data transport formats @@ -310,7 +310,7 @@ Reindexer supports the following data formats to communicate with other applicat ## Log rotation -There are no builtin mechanis for automatic log rotation, however `reindexer server` is able to reopen logfiles on `SIGHUP`. +There are no builtin mechanism for automatic log rotation, however `reindexer server` is able to reopen logfiles on `SIGHUP`. So, your external log manager (it may be even a simple `cron` script) have to move existing log files somewhere and then send `SIGHUP`-signal to the `reindexer server` process to recreate log files. ## Protobuf diff --git a/cpp_src/server/rpcserver.cc b/cpp_src/server/rpcserver.cc index 837a56423..500768980 100644 --- a/cpp_src/server/rpcserver.cc +++ b/cpp_src/server/rpcserver.cc @@ -443,14 +443,15 @@ Error RPCServer::CommitTx(cproto::Context& ctx, int64_t txId, std::optional ResultFetchOpts opts; int flags; if (flagsOpts) { - flags = *flagsOpts; + // Transactions can not send items content + flags = flagsOpts.value() & ~kResultsFormatMask; } else { flags = kResultsWithItemID; if (tr.IsTagsUpdated()) { flags |= kResultsWithPayloadTypes; } } - if (tr.IsTagsUpdated()) { + if (flags & kResultsWithPayloadTypes) { opts = ResultFetchOpts{ .flags = flags, .ptVersions = {&ptVers, 1}, .fetchOffset = 0, .fetchLimit = INT_MAX, .withAggregations = true}; } else { diff --git a/cpp_src/server/serverimpl.cc b/cpp_src/server/serverimpl.cc index 98ee4c095..948e20d7b 100644 --- a/cpp_src/server/serverimpl.cc +++ b/cpp_src/server/serverimpl.cc @@ -212,7 +212,21 @@ void ServerImpl::ReopenLogFiles() { std::string ServerImpl::GetCoreLogPath() const { return GetDirPath(config_.CoreLog); } +#if defined(WITH_GRPC) && defined(REINDEX_WITH_LIBDL) +static void* tryToOpenGRPCLib(bool enabled) noexcept { +#ifdef __APPLE__ + return enabled ? dlopen("libreindexer_grpc_library.dylib", RTLD_NOW) : nullptr; +#else // __APPLE__ + return enabled ? dlopen("libreindexer_grpc_library.so", RTLD_NOW) : nullptr; +#endif // __APPLE__ +} +#endif // defined(WITH_GRPC) && defined(REINDEX_WITH_LIBDL) + int ServerImpl::run() { +#if defined(WITH_GRPC) && defined(REINDEX_WITH_LIBDL) + void* hGRPCServiceLib = tryToOpenGRPCLib(config_.EnableGRPC); +#endif // defined(WITH_GRPC) && defined(REINDEX_WITH_LIBDL) + auto err = loggerConfigure(); (void)err; // ingore; In case of the multiple builtin servers, we will get errors here @@ -351,29 +365,23 @@ int ServerImpl::run() { } #if defined(WITH_GRPC) void* hGRPCService = nullptr; -#if REINDEX_WITH_LIBDL -#ifdef __APPLE__ - auto hGRPCServiceLib = dlopen("libreindexer_grpc_library.dylib", RTLD_NOW); -#else - auto hGRPCServiceLib = dlopen("libreindexer_grpc_library.so", RTLD_NOW); -#endif - if (hGRPCServiceLib && config_.EnableGRPC) { - auto start_grpc = reinterpret_cast(dlsym(hGRPCServiceLib, "start_reindexer_grpc")); - - hGRPCService = start_grpc(*dbMgr_, config_.TxIdleTimeout, loop_, config_.GRPCAddr); - logger_.info("Listening gRPC service on {0}", config_.GRPCAddr); - } else if (config_.EnableGRPC) { - logger_.error("Can't load libreindexer_grpc_library. gRPC will not work: {}", dlerror()); - return EXIT_FAILURE; - } -#else if (config_.EnableGRPC) { +#if REINDEX_WITH_LIBDL + if (hGRPCServiceLib) { + auto start_grpc = reinterpret_cast(dlsym(hGRPCServiceLib, "start_reindexer_grpc")); + hGRPCService = start_grpc(*dbMgr_, config_.TxIdleTimeout, loop_, config_.GRPCAddr); + logger_.info("Listening gRPC service on {0}", config_.GRPCAddr); + } else { + logger_.error("Can't load libreindexer_grpc_library. gRPC will not work: {}", dlerror()); + return EXIT_FAILURE; + } +#else // REINDEX_WITH_LIBDL hGRPCService = start_reindexer_grpc(*dbMgr_, config_.TxIdleTimeout, loop_, config_.GRPCAddr); logger_.info("Listening gRPC service on {0}", config_.GRPCAddr); +#endif // REINDEX_WITH_LIBDL } +#endif // WITH_GRPC -#endif -#endif auto sigCallback = [&](ev::sig& sig) { logger_.info("Signal received. Terminating..."); #ifndef REINDEX_WITH_ASAN @@ -430,21 +438,20 @@ int ServerImpl::run() { logger_.info("RPC Server(TCP) shutdown completed."); httpServer.Stop(); logger_.info("HTTP Server shutdown completed."); -#if defined(WITH_GRPC) -#if REINDEX_WITH_LIBDL - if (hGRPCServiceLib && config_.EnableGRPC) { - auto stop_grpc = reinterpret_cast(dlsym(hGRPCServiceLib, "stop_reindexer_grpc")); - stop_grpc(hGRPCService); - logger_.info("gRPC Server shutdown completed."); - } -#else +#ifdef WITH_GRPC if (config_.EnableGRPC) { +#if REINDEX_WITH_LIBDL + if (hGRPCServiceLib) { + auto stop_grpc = reinterpret_cast(dlsym(hGRPCServiceLib, "stop_reindexer_grpc")); + stop_grpc(hGRPCService); + logger_.info("gRPC Server shutdown completed."); + } +#else // REINDEX_WITH_LIBDL stop_reindexer_grpc(hGRPCService); logger_.info("gRPC Server shutdown completed."); +#endif // REINDEX_WITH_LIBDL } - -#endif -#endif +#endif // WITH_GRPC } catch (const Error& err) { logger_.error("Unhandled exception occurred: {0}", err.what()); } diff --git a/query.go b/query.go index 313843655..994667eeb 100644 --- a/query.go +++ b/query.go @@ -325,7 +325,6 @@ func (q *Query) WhereQuery(subQuery *Query, condition int, keys interface{}) *Qu } // Where - Add comparing two fields where condition to DB query -// For composite indexes keys must be []interface{}, with value of each subindex func (q *Query) WhereBetweenFields(firstField string, condition int, secondField string) *Query { q.ser.PutVarCUInt(queryBetweenFieldsCondition) q.ser.PutVarCUInt(q.nextOp) @@ -502,13 +501,13 @@ func (q *Query) WhereComposite(index string, condition int, keys ...interface{}) return q.Where(index, condition, keys) } -// WhereString - Add where condition to DB query with string args +// Match - Add where string EQ-condition to DB query with string args func (q *Query) Match(index string, keys ...string) *Query { return q.WhereString(index, EQ, keys...) } -// WhereString - Add where condition to DB query with bool args +// WhereBool - Add where condition to DB query with bool args func (q *Query) WhereBool(index string, condition int, keys ...bool) *Query { q.ser.PutVarCUInt(queryCondition).PutVString(index).PutVarCUInt(q.nextOp).PutVarCUInt(condition) @@ -658,14 +657,14 @@ func (q *Query) SortStFieldDistance(field1 string, field2 string, desc bool) *Qu return q.Sort(sb.String(), desc) } -// AND - next condition will added with AND +// AND - next condition will be added with AND // This is the default operation for WHERE statement. Do not have to be called explicitly in user's code. Used in DSL conversion func (q *Query) And() *Query { q.nextOp = opAND return q } -// OR - next condition will added with OR +// OR - next condition will be added with OR // Implements short-circuiting: // if the previous condition is successful the next will not be evaluated, but except Join conditions func (q *Query) Or() *Query { @@ -673,7 +672,7 @@ func (q *Query) Or() *Query { return q } -// Not - next condition will added with NOT AND +// Not - next condition will be added with NOT AND // Implements short-circuiting: // if the previous condition is failed the next will not be evaluated func (q *Query) Not() *Query {