diff --git a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp index 8777c06d0..491c011d4 100644 --- a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp +++ b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp @@ -22,6 +22,7 @@ * THE SOFTWARE. */ +#include #include #include #include @@ -33,6 +34,15 @@ namespace graphene { namespace elasticsearch { namespace detail { +const std::string generateIndexName(const fc::time_point_sec& block_date, const std::string& _elasticsearch_index_prefix) +{ + auto block_date_string = block_date.to_iso_string(); + std::vector parts; + boost::split(parts, block_date_string, boost::is_any_of("-")); + std::string index_name = _elasticsearch_index_prefix + parts[0] + "-" + parts[1]; + return index_name; +} + class elasticsearch_plugin_impl { public: @@ -48,6 +58,9 @@ class elasticsearch_plugin_impl return _self.database(); } + friend class graphene::elasticsearch::elasticsearch_plugin; + + private: elasticsearch_plugin& _self; primary_index< operation_history_index >* _oho_index; @@ -75,6 +88,8 @@ class elasticsearch_plugin_impl std::string bulk_line; std::string index_name; bool is_sync = false; + bool is_es_version_7_or_above = true; + private: bool add_elasticsearch( const account_id_type account_id, const optional& oho, const uint32_t block_number ); const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj, @@ -91,6 +106,7 @@ class elasticsearch_plugin_impl void createBulkLine(const account_transaction_history_object& ath); void prepareBulk(const account_transaction_history_id_type& ath_id); void populateESstruct(); + void init_program_options(const boost::program_options::variables_map& options); }; elasticsearch_plugin_impl::~elasticsearch_plugin_impl() @@ -105,7 +121,7 @@ elasticsearch_plugin_impl::~elasticsearch_plugin_impl() bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b ) { checkState(b.timestamp); - index_name = graphene::utilities::generateIndexName(b.timestamp, _elasticsearch_index_prefix); + index_name = generateIndexName(b.timestamp, _elasticsearch_index_prefix); graphene::chain::database& db = database(); const vector >& hist = db.get_applied_operations(); @@ -229,6 +245,113 @@ void elasticsearch_plugin_impl::getOperationType(const optional op.which(); } +struct adaptor_struct +{ + variant adapt(const variant_object& op) + { + fc::mutable_variant_object o(op); + vector keys_to_rename; + for (auto& i : o) + { + auto& element = i.value(); + if (element.is_object()) + { + const string& name = i.key(); + const auto& vo = element.get_object(); + if (vo.contains(name.c_str())) + keys_to_rename.emplace_back(name); + element = adapt(vo); + } + else if (element.is_array()) + adapt(element.get_array()); + } + for (const auto& i : keys_to_rename) + { + string new_name = i + "_"; + o[new_name] = variant(o[i]); + o.erase(i); + } + + if (o.find("memo") != o.end()) + { + auto& memo = o["memo"]; + if (memo.is_string()) + { + o["memo_"] = o["memo"]; + o.erase("memo"); + } + else if (memo.is_object()) + { + fc::mutable_variant_object tmp(memo.get_object()); + if (tmp.find("nonce") != tmp.end()) + { + tmp["nonce"] = tmp["nonce"].as_string(); + o["memo"] = tmp; + } + } + } + if (o.find("new_parameters") != o.end()) + { + auto& tmp = o["new_parameters"]; + if (tmp.is_object()) + { + fc::mutable_variant_object tmp2(tmp.get_object()); + if (tmp2.find("current_fees") != tmp2.end()) + { + tmp2.erase("current_fees"); + o["new_parameters"] = tmp2; + } + } + } + if (o.find("owner") != o.end() && o["owner"].is_string()) + { + o["owner_"] = o["owner"].as_string(); + o.erase("owner"); + } + if (o.find("proposed_ops") != o.end()) + { + o["proposed_ops"] = fc::json::to_string(o["proposed_ops"]); + } + if (o.find("initializer") != o.end()) + { + o["initializer"] = fc::json::to_string(o["initializer"]); + } + if (o.find("policy") != o.end()) + { + o["policy"] = fc::json::to_string(o["policy"]); + } + if (o.find("predicates") != o.end()) + { + o["predicates"] = fc::json::to_string(o["predicates"]); + } + if (o.find("active_special_authority") != o.end()) + { + o["active_special_authority"] = fc::json::to_string(o["active_special_authority"]); + } + if (o.find("owner_special_authority") != o.end()) + { + o["owner_special_authority"] = fc::json::to_string(o["owner_special_authority"]); + } + + variant v; + fc::to_variant(o, v, FC_PACK_MAX_DEPTH); + return v; + } + + void adapt(fc::variants& v) + { + for (auto& array_element : v) + { + if (array_element.is_object()) + array_element = adapt(array_element.get_object()); + else if (array_element.is_array()) + adapt(array_element.get_array()); + else + array_element = array_element.as_string(); + } + } +}; + void elasticsearch_plugin_impl::doOperationHistory(const optional & oho) { os.trx_in_block = oho->trx_in_block; @@ -255,6 +378,61 @@ void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_bloc bs.trx_id = trx_id; } +struct operation_visitor +{ + using result_type = void; + + share_type fee_amount; + asset_id_type fee_asset; + + asset_id_type transfer_asset_id; + share_type transfer_amount; + account_id_type transfer_from; + account_id_type transfer_to; + + void operator()( const graphene::chain::transfer_operation& o ) + { + fee_asset = o.fee.asset_id; + fee_amount = o.fee.amount; + + transfer_asset_id = o.amount.asset_id; + transfer_amount = o.amount.amount; + transfer_from = o.from; + transfer_to = o.to; + } + + object_id_type fill_order_id; + account_id_type fill_account_id; + asset_id_type fill_pays_asset_id; + share_type fill_pays_amount; + asset_id_type fill_receives_asset_id; + share_type fill_receives_amount; + //double fill_fill_price; + //bool fill_is_maker; + + void operator()( const graphene::chain::fill_order_operation& o ) + { + fee_asset = o.fee.asset_id; + fee_amount = o.fee.amount; + + fill_order_id = o.order_id; + fill_account_id = o.account_id; + fill_pays_asset_id = o.pays.asset_id; + fill_pays_amount = o.pays.amount; + fill_receives_asset_id = o.receives.asset_id; + fill_receives_amount = o.receives.amount; + //fill_fill_price = o.fill_price.to_real(); + //fill_is_maker = o.is_maker; + } + + template + void operator()( const T& o ) + { + fee_asset = o.fee.asset_id; + fee_amount = o.fee.amount; + } +}; + void elasticsearch_plugin_impl::doVisitor(const optional & oho) { graphene::chain::database& db = database(); @@ -380,7 +558,8 @@ void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id const std::string _id = fc::json::to_string(ath_id); fc::mutable_variant_object bulk_header; bulk_header["_index"] = index_name; - bulk_header["_type"] = "data"; + if( !is_es_version_7_or_above ) + bulk_header["_type"] = "_doc"; bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "." + fc::to_string(ath_id.instance.value); prepare = graphene::utilities::createBulk(bulk_header, std::move(bulk_line)); @@ -428,6 +607,43 @@ void elasticsearch_plugin_impl::populateESstruct() es.query = ""; } +void elasticsearch_plugin_impl::init_program_options(const boost::program_options::variables_map& options) +{ + if (options.count("elasticsearch-node-url")) { + _elasticsearch_node_url = options["elasticsearch-node-url"].as(); + } + if (options.count("elasticsearch-bulk-replay")) { + _elasticsearch_bulk_replay = options["elasticsearch-bulk-replay"].as(); + } + if (options.count("elasticsearch-bulk-sync")) { + _elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as(); + } + if (options.count("elasticsearch-visitor")) { + _elasticsearch_visitor = options["elasticsearch-visitor"].as(); + } + if (options.count("elasticsearch-basic-auth")) { + _elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as(); + } + if (options.count("elasticsearch-index-prefix")) { + _elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as(); + } + if (options.count("elasticsearch-operation-object")) { + _elasticsearch_operation_object = options["elasticsearch-operation-object"].as(); + } + if (options.count("elasticsearch-start-es-after-block")) { + _elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as(); + } + if (options.count("elasticsearch-operation-string")) { + _elasticsearch_operation_string = options["elasticsearch-operation-string"].as(); + } + if (options.count("elasticsearch-mode")) { + const auto option_number = options["elasticsearch-mode"].as(); + if(option_number > mode::all) + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid"); + _elasticsearch_mode = static_cast(options["elasticsearch-mode"].as()); + } +} + } // end namespace detail elasticsearch_plugin::elasticsearch_plugin() : @@ -480,42 +696,12 @@ void elasticsearch_plugin::plugin_set_program_options( void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options) { + ilog("elasticsearch ACCOUNT HISTORY: plugin_initialize() begin"); + my->_oho_index = database().add_index< primary_index< operation_history_index > >(); database().add_index< primary_index< account_transaction_history_index > >(); - if (options.count("elasticsearch-node-url")) { - my->_elasticsearch_node_url = options["elasticsearch-node-url"].as(); - } - if (options.count("elasticsearch-bulk-replay")) { - my->_elasticsearch_bulk_replay = options["elasticsearch-bulk-replay"].as(); - } - if (options.count("elasticsearch-bulk-sync")) { - my->_elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as(); - } - if (options.count("elasticsearch-visitor")) { - my->_elasticsearch_visitor = options["elasticsearch-visitor"].as(); - } - if (options.count("elasticsearch-basic-auth")) { - my->_elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as(); - } - if (options.count("elasticsearch-index-prefix")) { - my->_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as(); - } - if (options.count("elasticsearch-operation-object")) { - my->_elasticsearch_operation_object = options["elasticsearch-operation-object"].as(); - } - if (options.count("elasticsearch-start-es-after-block")) { - my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as(); - } - if (options.count("elasticsearch-operation-string")) { - my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as(); - } - if (options.count("elasticsearch-mode")) { - const auto option_number = options["elasticsearch-mode"].as(); - if(option_number > mode::all) - FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid"); - my->_elasticsearch_mode = static_cast(options["elasticsearch-mode"].as()); - } + my->init_program_options( options ); if(my->_elasticsearch_mode != mode::only_query) { if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string) @@ -528,10 +714,7 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia "Error populating ES database, we are going to keep trying."); }); } -} -void elasticsearch_plugin::plugin_startup() -{ graphene::utilities::ES es; es.curl = my->curl; es.elasticsearch_url = my->_elasticsearch_node_url; @@ -539,7 +722,17 @@ void elasticsearch_plugin::plugin_startup() if(!graphene::utilities::checkES(es)) FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_elasticsearch_node_url)); + + graphene::utilities::checkESVersion7OrAbove(es, my->is_es_version_7_or_above); + + ilog("elasticsearch ACCOUNT HISTORY: plugin_initialize() end"); +} + +void elasticsearch_plugin::plugin_startup() +{ ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin"); + // Nothing to do + ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() end"); } operation_history_object elasticsearch_plugin::get_operation_by_id(operation_history_id_type id) @@ -655,7 +848,7 @@ graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query) es.curl = curl; es.elasticsearch_url = my->_elasticsearch_node_url; es.index_prefix = my->_elasticsearch_index_prefix; - es.endpoint = es.index_prefix + "*/data/_search"; + es.endpoint = es.index_prefix + "*/_doc/_search"; es.query = query; return es; diff --git a/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp b/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp index e1441c58e..18e9b2970 100644 --- a/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp +++ b/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp @@ -79,62 +79,6 @@ class elasticsearch_plugin : public graphene::app::plugin graphene::utilities::ES prepareHistoryQuery(string query); }; - -struct operation_visitor -{ - typedef void result_type; - - share_type fee_amount; - asset_id_type fee_asset; - - asset_id_type transfer_asset_id; - share_type transfer_amount; - account_id_type transfer_from; - account_id_type transfer_to; - - void operator()( const graphene::chain::transfer_operation& o ) - { - fee_asset = o.fee.asset_id; - fee_amount = o.fee.amount; - - transfer_asset_id = o.amount.asset_id; - transfer_amount = o.amount.amount; - transfer_from = o.from; - transfer_to = o.to; - } - - object_id_type fill_order_id; - account_id_type fill_account_id; - asset_id_type fill_pays_asset_id; - share_type fill_pays_amount; - asset_id_type fill_receives_asset_id; - share_type fill_receives_amount; - //double fill_fill_price; - //bool fill_is_maker; - - void operator()( const graphene::chain::fill_order_operation& o ) - { - fee_asset = o.fee.asset_id; - fee_amount = o.fee.amount; - - fill_order_id = o.order_id; - fill_account_id = o.account_id; - fill_pays_asset_id = o.pays.asset_id; - fill_pays_amount = o.pays.amount; - fill_receives_asset_id = o.receives.asset_id; - fill_receives_amount = o.receives.amount; - //fill_fill_price = o.fill_price.to_real(); - //fill_is_maker = o.is_maker; - } - - template - void operator()( const T& o ) - { - fee_asset = o.fee.asset_id; - fee_amount = o.fee.amount; - } -}; - struct operation_history_struct { int trx_in_block; int op_in_trx; @@ -197,113 +141,6 @@ struct bulk_struct { optional additional_data; }; -struct adaptor_struct { - variant adapt(const variant_object& op) - { - fc::mutable_variant_object o(op); - vector keys_to_rename; - for (auto i = o.begin(); i != o.end(); ++i) - { - auto& element = (*i).value(); - if (element.is_object()) - { - const string& name = (*i).key(); - auto& vo = element.get_object(); - if (vo.contains(name.c_str())) - keys_to_rename.emplace_back(name); - element = adapt(vo); - } - else if (element.is_array()) - adapt(element.get_array()); - } - for (const auto& i : keys_to_rename) - { - string new_name = i + "_"; - o[new_name] = variant(o[i]); - o.erase(i); - } - - if (o.find("memo") != o.end()) - { - auto& memo = o["memo"]; - if (memo.is_string()) - { - o["memo_"] = o["memo"]; - o.erase("memo"); - } - else if (memo.is_object()) - { - fc::mutable_variant_object tmp(memo.get_object()); - if (tmp.find("nonce") != tmp.end()) - { - tmp["nonce"] = tmp["nonce"].as_string(); - o["memo"] = tmp; - } - } - } - if (o.find("new_parameters") != o.end()) - { - auto& tmp = o["new_parameters"]; - if (tmp.is_object()) - { - fc::mutable_variant_object tmp2(tmp.get_object()); - if (tmp2.find("current_fees") != tmp2.end()) - { - tmp2.erase("current_fees"); - o["new_parameters"] = tmp2; - } - } - } - if (o.find("owner") != o.end() && o["owner"].is_string()) - { - o["owner_"] = o["owner"].as_string(); - o.erase("owner"); - } - if (o.find("proposed_ops") != o.end()) - { - o["proposed_ops"] = fc::json::to_string(o["proposed_ops"]); - } - if (o.find("initializer") != o.end()) - { - o["initializer"] = fc::json::to_string(o["initializer"]); - } - if (o.find("policy") != o.end()) - { - o["policy"] = fc::json::to_string(o["policy"]); - } - if (o.find("predicates") != o.end()) - { - o["predicates"] = fc::json::to_string(o["predicates"]); - } - if (o.find("active_special_authority") != o.end()) - { - o["active_special_authority"] = fc::json::to_string(o["active_special_authority"]); - } - if (o.find("owner_special_authority") != o.end()) - { - o["owner_special_authority"] = fc::json::to_string(o["owner_special_authority"]); - } - - - variant v; - fc::to_variant(o, v, FC_PACK_MAX_DEPTH); - return v; - } - - void adapt(fc::variants& v) - { - for (auto& array_element : v) - { - if (array_element.is_object()) - array_element = adapt(array_element.get_object()); - else if (array_element.is_array()) - adapt(array_element.get_array()); - else - array_element = array_element.as_string(); - } - } -}; - } } //graphene::elasticsearch FC_REFLECT_ENUM( graphene::elasticsearch::mode, (only_save)(only_query)(all) ) diff --git a/libraries/plugins/es_objects/es_objects.cpp b/libraries/plugins/es_objects/es_objects.cpp index 4e18d1a5d..8f92c3919 100644 --- a/libraries/plugins/es_objects/es_objects.cpp +++ b/libraries/plugins/es_objects/es_objects.cpp @@ -66,6 +66,9 @@ class es_objects_plugin_impl bool genesis(); void remove_from_database(object_id_type id, std::string index); + friend class graphene::es_objects::es_objects_plugin; + + private: es_objects_plugin& _self; std::string _es_objects_elasticsearch_url = "http://localhost:9200/"; std::string _es_objects_auth = ""; @@ -97,10 +100,12 @@ class es_objects_plugin_impl uint32_t block_number; fc::time_point_sec block_time; + bool is_es_version_7_or_above = true; private: template void prepareTemplate(T blockchain_object, string index_name); + void init_program_options(const boost::program_options::variables_map& options); }; bool es_objects_plugin_impl::genesis() @@ -523,7 +528,8 @@ void es_objects_plugin_impl::remove_from_database( object_id_type id, std::strin fc::mutable_variant_object delete_line; delete_line["_id"] = string(id); delete_line["_index"] = _es_objects_index_prefix + index; - delete_line["_type"] = "data"; + if( !is_es_version_7_or_above ) + delete_line["_type"] = "_doc"; fc::mutable_variant_object final_delete_line; final_delete_line["delete"] = delete_line; prepare.push_back(fc::json::to_string(final_delete_line)); @@ -537,7 +543,8 @@ void es_objects_plugin_impl::prepareTemplate(T blockchain_object, string index_n { fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + index_name; - bulk_header["_type"] = "data"; + if( !is_es_version_7_or_above ) + bulk_header["_type"] = "_doc"; if(_es_objects_keep_only_current) { bulk_header["_id"] = string(blockchain_object.id); @@ -567,6 +574,72 @@ es_objects_plugin_impl::~es_objects_plugin_impl() } return; } +void es_objects_plugin_impl::init_program_options(const boost::program_options::variables_map& options) +{ + if (options.count("es-objects-elasticsearch-url")) { + _es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as(); + } + if (options.count("es-objects-auth")) { + _es_objects_auth = options["es-objects-auth"].as(); + } + if (options.count("es-objects-bulk-replay")) { + _es_objects_bulk_replay = options["es-objects-bulk-replay"].as(); + } + if (options.count("es-objects-bulk-sync")) { + _es_objects_bulk_sync = options["es-objects-bulk-sync"].as(); + } + if (options.count("es-objects-proposals")) { + _es_objects_proposals = options["es-objects-proposals"].as(); + } + if (options.count("es-objects-accounts")) { + _es_objects_accounts = options["es-objects-accounts"].as(); + } + if (options.count("es-objects-assets")) { + _es_objects_assets = options["es-objects-assets"].as(); + } + if (options.count("es-objects-balances")) { + _es_objects_balances = options["es-objects-balances"].as(); + } + if (options.count("es-objects-limit-orders")) { + _es_objects_limit_orders = options["es-objects-limit-orders"].as(); + } + if (options.count("es-objects-asset-bitasset")) { + _es_objects_asset_bitasset = options["es-objects-asset-bitasset"].as(); + } + if (options.count("es-objects-account-role")) { + _es_objects_balances = options["es-objects-account-role"].as(); + } + if (options.count("es-objects-committee-member")) { + _es_objects_balances = options["es-objects-committee-member"].as(); + } + if (options.count("es-objects-nft")) { + _es_objects_balances = options["es-objects-nft"].as(); + } + if (options.count("es-objects-son")) { + _es_objects_balances = options["es-objects-son"].as(); + } + if (options.count("es-objects-transaction")) { + _es_objects_balances = options["es-objects-transaction"].as(); + } + if (options.count("es-objects-vesting-balance")) { + _es_objects_balances = options["es-objects-vesting-balance"].as(); + } + if (options.count("es-objects-witness")) { + _es_objects_balances = options["es-objects-witness"].as(); + } + if (options.count("es-objects-worker")) { + _es_objects_balances = options["es-objects-worker"].as(); + } + if (options.count("es-objects-index-prefix")) { + _es_objects_index_prefix = options["es-objects-index-prefix"].as(); + } + if (options.count("es-objects-keep-only-current")) { + _es_objects_keep_only_current = options["es-objects-keep-only-current"].as(); + } + if (options.count("es-objects-start-es-after-block")) { + _es_objects_start_es_after_block = options["es-objects-start-es-after-block"].as(); + } +} } // end namespace detail @@ -627,69 +700,9 @@ void es_objects_plugin::plugin_set_program_options( void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options) { - if (options.count("es-objects-elasticsearch-url")) { - my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as(); - } - if (options.count("es-objects-auth")) { - my->_es_objects_auth = options["es-objects-auth"].as(); - } - if (options.count("es-objects-bulk-replay")) { - my->_es_objects_bulk_replay = options["es-objects-bulk-replay"].as(); - } - if (options.count("es-objects-bulk-sync")) { - my->_es_objects_bulk_sync = options["es-objects-bulk-sync"].as(); - } - if (options.count("es-objects-proposals")) { - my->_es_objects_proposals = options["es-objects-proposals"].as(); - } - if (options.count("es-objects-accounts")) { - my->_es_objects_accounts = options["es-objects-accounts"].as(); - } - if (options.count("es-objects-assets")) { - my->_es_objects_assets = options["es-objects-assets"].as(); - } - if (options.count("es-objects-balances")) { - my->_es_objects_balances = options["es-objects-balances"].as(); - } - if (options.count("es-objects-limit-orders")) { - my->_es_objects_limit_orders = options["es-objects-limit-orders"].as(); - } - if (options.count("es-objects-asset-bitasset")) { - my->_es_objects_asset_bitasset = options["es-objects-asset-bitasset"].as(); - } - if (options.count("es-objects-account-role")) { - my->_es_objects_balances = options["es-objects-account-role"].as(); - } - if (options.count("es-objects-committee-member")) { - my->_es_objects_balances = options["es-objects-committee-member"].as(); - } - if (options.count("es-objects-nft")) { - my->_es_objects_balances = options["es-objects-nft"].as(); - } - if (options.count("es-objects-son")) { - my->_es_objects_balances = options["es-objects-son"].as(); - } - if (options.count("es-objects-transaction")) { - my->_es_objects_balances = options["es-objects-transaction"].as(); - } - if (options.count("es-objects-vesting-balance")) { - my->_es_objects_balances = options["es-objects-vesting-balance"].as(); - } - if (options.count("es-objects-witness")) { - my->_es_objects_balances = options["es-objects-witness"].as(); - } - if (options.count("es-objects-worker")) { - my->_es_objects_balances = options["es-objects-worker"].as(); - } - if (options.count("es-objects-index-prefix")) { - my->_es_objects_index_prefix = options["es-objects-index-prefix"].as(); - } - if (options.count("es-objects-keep-only-current")) { - my->_es_objects_keep_only_current = options["es-objects-keep-only-current"].as(); - } - if (options.count("es-objects-start-es-after-block")) { - my->_es_objects_start_es_after_block = options["es-objects-start-es-after-block"].as(); - } + ilog("elasticsearch OBJECTS: plugin_initialize() begin"); + + my->init_program_options( options ); database().applied_block.connect([this](const signed_block &b) { if(b.block_num() == 1 && my->_es_objects_start_es_after_block == 0) { @@ -721,10 +734,7 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable "Error deleting object from ES database, we are going to keep trying."); } }); -} -void es_objects_plugin::plugin_startup() -{ graphene::utilities::ES es; es.curl = my->curl; es.elasticsearch_url = my->_es_objects_elasticsearch_url; @@ -733,7 +743,17 @@ void es_objects_plugin::plugin_startup() if(!graphene::utilities::checkES(es)) FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_es_objects_elasticsearch_url)); - ilog("elasticsearch OBJECTS: plugin_startup() begin"); + + graphene::utilities::checkESVersion7OrAbove(es, my->is_es_version_7_or_above); + + ilog("elasticsearch OBJECTS: plugin_initialize() end"); +} + +void es_objects_plugin::plugin_startup() +{ + ilog("elasticsearch OBJECTS: plugin_startup() begin"); + // Nothing to do + ilog("elasticsearch OBJECTS: plugin_startup() end"); } } } diff --git a/libraries/utilities/elasticsearch.cpp b/libraries/utilities/elasticsearch.cpp index 11a9561bf..df82695fe 100644 --- a/libraries/utilities/elasticsearch.cpp +++ b/libraries/utilities/elasticsearch.cpp @@ -24,7 +24,6 @@ #include #include -#include #include #include @@ -47,8 +46,36 @@ bool checkES(ES& es) if(doCurl(curl_request).empty()) return false; return true; +} + +const std::string getESVersion(ES& es) +{ + graphene::utilities::CurlRequest curl_request; + curl_request.handler = es.curl; + curl_request.url = es.elasticsearch_url; + curl_request.auth = es.auth; + curl_request.type = "GET"; + + fc::variant response = fc::json::from_string(doCurl(curl_request)); + + return response["version"]["number"].as_string(); +} +void checkESVersion7OrAbove(ES& es, bool& result) noexcept +{ + static const int64_t version_7 = 7; + try { + const auto es_version = graphene::utilities::getESVersion(es); + auto dot_pos = es_version.find('.'); + result = ( std::stoi(es_version.substr(0,dot_pos)) >= version_7 ); + } + catch( ... ) + { + wlog( "Unable to get ES version, assuming it is 7 or above" ); + result = true; + } } + const std::string simpleQuery(ES& es) { graphene::utilities::CurlRequest curl_request; @@ -118,13 +145,13 @@ bool handleBulkResponse(long http_code, const std::string& CurlReadBuffer) return true; } -const std::vector createBulk(const fc::mutable_variant_object& bulk_header, const std::string& data) +const std::vector createBulk(const fc::mutable_variant_object& bulk_header, const std::string&& data) { std::vector bulk; fc::mutable_variant_object final_bulk_header; final_bulk_header["index"] = bulk_header; bulk.push_back(fc::json::to_string(final_bulk_header)); - bulk.push_back(data); + bulk.emplace_back(std::move(data)); return bulk; } @@ -154,15 +181,6 @@ const std::string getEndPoint(ES& es) return doCurl(curl_request); } -const std::string generateIndexName(const fc::time_point_sec& block_date, const std::string& _elasticsearch_index_prefix) -{ - auto block_date_string = block_date.to_iso_string(); - std::vector parts; - boost::split(parts, block_date_string, boost::is_any_of("-")); - std::string index_name = _elasticsearch_index_prefix + parts[0] + "-" + parts[1]; - return index_name; -} - const std::string doCurl(CurlRequest& curl) { std::string CurlReadBuffer; diff --git a/libraries/utilities/include/graphene/utilities/elasticsearch.hpp b/libraries/utilities/include/graphene/utilities/elasticsearch.hpp index 898464b16..45749e27b 100644 --- a/libraries/utilities/include/graphene/utilities/elasticsearch.hpp +++ b/libraries/utilities/include/graphene/utilities/elasticsearch.hpp @@ -54,13 +54,14 @@ namespace graphene { namespace utilities { }; bool SendBulk(ES& es); - const std::vector createBulk(const fc::mutable_variant_object& bulk_header, const std::string& data); + const std::vector createBulk(const fc::mutable_variant_object& bulk_header, const std::string&& data); bool checkES(ES& es); + const std::string getESVersion(ES& es); + void checkESVersion7OrAbove(ES& es, bool& result) noexcept; const std::string simpleQuery(ES& es); bool deleteAll(ES& es); bool handleBulkResponse(long http_code, const std::string& CurlReadBuffer); const std::string getEndPoint(ES& es); - const std::string generateIndexName(const fc::time_point_sec& block_date, const std::string& _elasticsearch_index_prefix); const std::string doCurl(CurlRequest& curl); const std::string joinBulkLines(const std::vector& bulk); long getResponseCode(CURL *handler); diff --git a/tests/elasticsearch/main.cpp b/tests/elasticsearch/main.cpp index c948616f2..7067e1e9b 100644 --- a/tests/elasticsearch/main.cpp +++ b/tests/elasticsearch/main.cpp @@ -38,6 +38,10 @@ using namespace graphene::chain; using namespace graphene::chain::test; using namespace graphene::app; +const std::string g_es_url = "http://localhost:9200/"; +const std::string g_es_index_prefix = "peerplays-"; +const std::string g_es_ppobjects_prefix = "ppobjects-"; + BOOST_FIXTURE_TEST_SUITE( elasticsearch_tests, database_fixture ) BOOST_AUTO_TEST_CASE(elasticsearch_account_history) { @@ -48,8 +52,8 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) { graphene::utilities::ES es; es.curl = curl; - es.elasticsearch_url = "http://localhost:9200/"; - es.index_prefix = "peerplays-"; + es.elasticsearch_url = g_es_url; + es.index_prefix = g_es_index_prefix; //es.auth = "elastic:changeme"; // delete all first @@ -71,7 +75,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) { //int account_create_op_id = operation::tag::value; string query = "{ \"query\" : { \"bool\" : { \"must\" : [{\"match_all\": {}}] } } }"; - es.endpoint = es.index_prefix + "*/data/_count"; + es.endpoint = es.index_prefix + "*/_doc/_count"; es.query = query; auto res = graphene::utilities::simpleQuery(es); @@ -79,7 +83,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) { auto total = j["count"].as_string(); BOOST_CHECK_EQUAL(total, "5"); - es.endpoint = es.index_prefix + "*/data/_search"; + es.endpoint = es.index_prefix + "*/_doc/_search"; res = graphene::utilities::simpleQuery(es); j = fc::json::from_string(res); auto first_id = j["hits"]["hits"][size_t(0)]["_id"].as_string(); @@ -91,7 +95,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) { fc::usleep(fc::milliseconds(1000)); // index.refresh_interval - es.endpoint = es.index_prefix + "*/data/_count"; + es.endpoint = es.index_prefix + "*/_doc/_count"; res = graphene::utilities::simpleQuery(es); j = fc::json::from_string(res); @@ -114,9 +118,9 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) { // check the visitor data auto block_date = db.head_block_time(); - std::string index_name = graphene::utilities::generateIndexName(block_date, "peerplays-"); + std::string index_name = g_es_index_prefix + block_date.to_iso_string().substr( 0, 7 ); // yyyy-mm - es.endpoint = index_name + "/data/2.9.12"; // we know last op is a transfer of amount 300 + es.endpoint = index_name + "/_doc/2.9.12"; // we know last op is a transfer of amount 300 res = graphene::utilities::getEndPoint(es); j = fc::json::from_string(res); auto last_transfer_amount = j["_source"]["operation_history"]["op_object"]["amount_"]["amount"].as_string(); @@ -137,8 +141,8 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) { graphene::utilities::ES es; es.curl = curl; - es.elasticsearch_url = "http://localhost:9200/"; - es.index_prefix = "ppobjects-"; + es.elasticsearch_url = g_es_url; + es.index_prefix = g_es_ppobjects_prefix; //es.auth = "elastic:changeme"; // delete all first @@ -155,7 +159,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) { fc::usleep(fc::milliseconds(1000)); string query = "{ \"query\" : { \"bool\" : { \"must\" : [{\"match_all\": {}}] } } }"; - es.endpoint = es.index_prefix + "*/data/_count"; + es.endpoint = es.index_prefix + "*/_doc/_count"; es.query = query; auto res = graphene::utilities::simpleQuery(es); @@ -163,14 +167,14 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) { auto total = j["count"].as_string(); BOOST_CHECK_EQUAL(total, "2"); - es.endpoint = es.index_prefix + "asset/data/_search"; + es.endpoint = es.index_prefix + "asset/_doc/_search"; res = graphene::utilities::simpleQuery(es); j = fc::json::from_string(res); auto first_id = j["hits"]["hits"][size_t(0)]["_source"]["symbol"].as_string(); BOOST_CHECK_EQUAL(first_id, "USD"); auto bitasset_data_id = j["hits"]["hits"][size_t(0)]["_source"]["bitasset_data_id"].as_string(); - es.endpoint = es.index_prefix + "bitasset/data/_search"; + es.endpoint = es.index_prefix + "bitasset/_doc/_search"; es.query = "{ \"query\" : { \"bool\": { \"must\" : [{ \"term\": { \"object_id\": \""+bitasset_data_id+"\"}}] } } }"; res = graphene::utilities::simpleQuery(es); j = fc::json::from_string(res); @@ -192,11 +196,11 @@ BOOST_AUTO_TEST_CASE(elasticsearch_suite) { graphene::utilities::ES es; es.curl = curl; - es.elasticsearch_url = "http://localhost:9200/"; - es.index_prefix = "peerplays-"; + es.elasticsearch_url = g_es_url; + es.index_prefix = g_es_index_prefix; auto delete_account_history = graphene::utilities::deleteAll(es); fc::usleep(fc::milliseconds(1000)); - es.index_prefix = "ppobjects-"; + es.index_prefix = g_es_ppobjects_prefix; auto delete_objects = graphene::utilities::deleteAll(es); fc::usleep(fc::milliseconds(1000)); @@ -218,8 +222,8 @@ BOOST_AUTO_TEST_CASE(elasticsearch_history_api) { graphene::utilities::ES es; es.curl = curl; - es.elasticsearch_url = "http://localhost:9200/"; - es.index_prefix = "peerplays-"; + es.elasticsearch_url = g_es_url; + es.index_prefix = g_es_index_prefix; auto delete_account_history = graphene::utilities::deleteAll(es);