diff --git a/libraries/plugins/es_objects/es_objects.cpp b/libraries/plugins/es_objects/es_objects.cpp index fa4d90aecb..d0c35010ce 100644 --- a/libraries/plugins/es_objects/es_objects.cpp +++ b/libraries/plugins/es_objects/es_objects.cpp @@ -47,13 +47,14 @@ class es_objects_plugin_impl { curl = curl_easy_init(); } virtual ~es_objects_plugin_impl(); - bool updateDatabase( const vector& ids , bool isNew); + bool index_database( const vector& ids, std::string action); + void remove_from_database( object_id_type id, std::string index); es_objects_plugin& _self; std::string _es_objects_elasticsearch_url = "http://localhost:9200/"; std::string _es_objects_auth = ""; - uint32_t _es_objects_bulk_replay = 5000; - uint32_t _es_objects_bulk_sync = 10; + uint32_t _es_objects_bulk_replay = 10000; + uint32_t _es_objects_bulk_sync = 100; bool _es_objects_proposals = true; bool _es_objects_accounts = true; bool _es_objects_assets = true; @@ -64,27 +65,27 @@ class es_objects_plugin_impl CURL *curl; // curl handler vector bulk; vector prepare; - map bitassets; - map accounts; - map proposals; - map assets; - map balances; - map limit_orders; + + bool _es_objects_keep_only_current = true; + + uint32_t block_number; + fc::time_point_sec block_time; + private: - void PrepareProposal(const proposal_object& proposal_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareAccount(const account_object& account_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareAsset(const asset_object& asset_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareBalance(const balance_object& balance_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareLimit(const limit_order_object& limit_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareBitAsset(const asset_bitasset_data_object& bitasset_object, const fc::time_point_sec& block_time, const uint32_t& block_number); + void prepare_proposal(const proposal_object& proposal_object); + void prepare_account(const account_object& account_object); + void prepare_asset(const asset_object& asset_object); + void prepare_balance(const account_balance_object& account_balance_object); + void prepare_limit(const limit_order_object& limit_object); + void prepare_bitasset(const asset_bitasset_data_object& bitasset_object); }; -bool es_objects_plugin_impl::updateDatabase( const vector& ids , bool isNew) +bool es_objects_plugin_impl::index_database( const vector& ids, std::string action) { graphene::chain::database &db = _self.database(); - const fc::time_point_sec block_time = db.head_block_time(); - const uint32_t block_number = db.head_block_num(); + block_time = db.head_block_time(); + block_number = db.head_block_num(); // check if we are in replay or in sync and change number of bulk documents accordingly uint32_t limit_documents = 0; @@ -97,38 +98,62 @@ bool es_objects_plugin_impl::updateDatabase( const vector& ids , if(value.is() && _es_objects_proposals) { auto obj = db.find_object(value); auto p = static_cast(obj); - if(p != nullptr) - PrepareProposal(*p, block_time, block_number); + if(p != nullptr) { + if(action == "delete") + remove_from_database(p->id, "proposal"); + else + prepare_proposal(*p); + } } else if(value.is() && _es_objects_accounts) { auto obj = db.find_object(value); auto a = static_cast(obj); - if(a != nullptr) - PrepareAccount(*a, block_time, block_number); + if(a != nullptr) { + if(action == "delete") + remove_from_database(a->id, "account"); + else + prepare_account(*a); + } } else if(value.is() && _es_objects_assets) { auto obj = db.find_object(value); auto a = static_cast(obj); - if(a != nullptr) - PrepareAsset(*a, block_time, block_number); + if(a != nullptr) { + if(action == "delete") + remove_from_database(a->id, "asset"); + else + prepare_asset(*a); + } } - else if(value.is() && _es_objects_balances) { + else if(value.is() && _es_objects_balances) { auto obj = db.find_object(value); - auto b = static_cast(obj); - if(b != nullptr) - PrepareBalance(*b, block_time, block_number); + auto b = static_cast(obj); + if(b != nullptr) { + if(action == "delete") + remove_from_database(b->id, "balance"); + else + prepare_balance(*b); + } } else if(value.is() && _es_objects_limit_orders) { auto obj = db.find_object(value); auto l = static_cast(obj); - if(l != nullptr) - PrepareLimit(*l, block_time, block_number); + if(l != nullptr) { + if(action == "delete") + remove_from_database(l->id, "limitorder"); + else + prepare_limit(*l); + } } else if(value.is() && _es_objects_asset_bitasset) { auto obj = db.find_object(value); auto ba = static_cast(obj); - if(ba != nullptr) - PrepareBitAsset(*ba, block_time, block_number); + if(ba != nullptr) { + if(action == "delete") + remove_from_database(ba->id, "bitasset"); + else + prepare_bitasset(*ba); + } } } @@ -149,8 +174,23 @@ bool es_objects_plugin_impl::updateDatabase( const vector& ids , return true; } -void es_objects_plugin_impl::PrepareProposal(const proposal_object& proposal_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::remove_from_database( object_id_type id, std::string index) +{ + if(_es_objects_keep_only_current) + { + fc::mutable_variant_object delete_line; + delete_line["_id"] = string(id); + delete_line["_index"] = _es_objects_index_prefix + index; + delete_line["_type"] = "data"; + fc::mutable_variant_object final_delete_line; + final_delete_line["delete"] = delete_line; + prepare.push_back(fc::json::to_string(final_delete_line)); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); + prepare.clear(); + } +} + +void es_objects_plugin_impl::prepare_proposal(const proposal_object& proposal_object) { proposal_struct prop; prop.object_id = proposal_object.id; @@ -165,27 +205,22 @@ void es_objects_plugin_impl::PrepareProposal(const proposal_object& proposal_obj prop.available_key_approvals = fc::json::to_string(proposal_object.available_key_approvals); prop.proposer = proposal_object.proposer; - auto it = proposals.find(proposal_object.id); - if(it == proposals.end()) - proposals[proposal_object.id] = prop; - else { - if(it->second == prop) return; - else proposals[proposal_object.id] = prop; - } - std::string data = fc::json::to_string(prop); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "proposal"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(prop.object_id); + } prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareAccount(const account_object& account_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_account(const account_object& account_object) { account_struct acct; acct.object_id = account_object.id; @@ -206,28 +241,24 @@ void es_objects_plugin_impl::PrepareAccount(const account_object& account_object acct.active_key_auths = fc::json::to_string(account_object.active.key_auths); acct.active_address_auths = fc::json::to_string(account_object.active.address_auths); acct.voting_account = account_object.options.voting_account; - - auto it = accounts.find(account_object.id); - if(it == accounts.end()) - accounts[account_object.id] = acct; - else { - if(it->second == acct) return; - else accounts[account_object.id] = acct; - } + acct.votes = fc::json::to_string(account_object.options.votes); std::string data = fc::json::to_string(acct); fc::mutable_variant_object bulk_header; - bulk_header["_index"] = _es_objects_index_prefix + "acount"; + bulk_header["_index"] = _es_objects_index_prefix + "account"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(acct.object_id); + } prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareAsset(const asset_object& asset_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_asset(const asset_object& asset_object) { asset_struct asset; asset.object_id = asset_object.id; @@ -239,56 +270,48 @@ void es_objects_plugin_impl::PrepareAsset(const asset_object& asset_object, asset.dynamic_asset_data_id = asset_object.dynamic_asset_data_id; asset.bitasset_data_id = asset_object.bitasset_data_id; - auto it = assets.find(asset_object.id); - if(it == assets.end()) - assets[asset_object.id] = asset; - else { - if(it->second == asset) return; - else assets[asset_object.id] = asset; - } - std::string data = fc::json::to_string(asset); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "asset"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(asset.object_id); + } prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareBalance(const balance_object& balance_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_balance(const account_balance_object& account_balance_object) { balance_struct balance; - balance.object_id = balance_object.id; + balance.object_id = account_balance_object.id; balance.block_time = block_time; - balance.block_number = block_number;balance.owner = balance_object.owner; - balance.asset_id = balance_object.balance.asset_id; - balance.amount = balance_object.balance.amount; - - auto it = balances.find(balance_object.id); - if(it == balances.end()) - balances[balance_object.id] = balance; - else { - if(it->second == balance) return; - else balances[balance_object.id] = balance; - } + balance.block_number = block_number; + balance.owner = account_balance_object.owner; + balance.asset_type = account_balance_object.asset_type; + balance.balance = account_balance_object.balance; + balance.maintenance_flag = account_balance_object.maintenance_flag; std::string data = fc::json::to_string(balance); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "balance"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(balance.object_id); + } prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareLimit(const limit_order_object& limit_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_limit(const limit_order_object& limit_object) { limit_order_struct limit; limit.object_id = limit_object.id; @@ -300,27 +323,22 @@ void es_objects_plugin_impl::PrepareLimit(const limit_order_object& limit_object limit.sell_price = limit_object.sell_price; limit.deferred_fee = limit_object.deferred_fee; - auto it = limit_orders.find(limit_object.id); - if(it == limit_orders.end()) - limit_orders[limit_object.id] = limit; - else { - if(it->second == limit) return; - else limit_orders[limit_object.id] = limit; - } - std::string data = fc::json::to_string(limit); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "limitorder"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(limit.object_id); + } prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object& bitasset_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_bitasset(const asset_bitasset_data_object& bitasset_object) { if(!bitasset_object.is_prediction_market) { @@ -331,22 +349,18 @@ void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object& b bitasset.current_feed = fc::json::to_string(bitasset_object.current_feed); bitasset.current_feed_publication_time = bitasset_object.current_feed_publication_time; - auto it = bitassets.find(bitasset_object.id); - if(it == bitassets.end()) - bitassets[bitasset_object.id] = bitasset; - else { - if(it->second == bitasset) return; - else bitassets[bitasset_object.id] = bitasset; - } - std::string data = fc::json::to_string(bitasset); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "bitasset"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(bitasset.object_id); + } prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } } @@ -382,17 +396,18 @@ void es_objects_plugin::plugin_set_program_options( ) { cli.add_options() - ("es-objects-elasticsearch-url", boost::program_options::value(), "Elasticsearch node url") - ("es-objects-auth", boost::program_options::value(), "Basic auth username:password") - ("es-objects-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(5000)") - ("es-objects-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a syncronied chain(10)") - ("es-objects-proposals", boost::program_options::value(), "Store proposal objects") - ("es-objects-accounts", boost::program_options::value(), "Store account objects") - ("es-objects-assets", boost::program_options::value(), "Store asset objects") - ("es-objects-balances", boost::program_options::value(), "Store balances objects") - ("es-objects-limit-orders", boost::program_options::value(), "Store limit order objects") - ("es-objects-asset-bitasset", boost::program_options::value(), "Store feed data") + ("es-objects-elasticsearch-url", boost::program_options::value(), "Elasticsearch node url(http://localhost:9200/)") + ("es-objects-auth", boost::program_options::value(), "Basic auth username:password('')") + ("es-objects-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(10000)") + ("es-objects-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a synchronized chain(100)") + ("es-objects-proposals", boost::program_options::value(), "Store proposal objects(true)") + ("es-objects-accounts", boost::program_options::value(), "Store account objects(true)") + ("es-objects-assets", boost::program_options::value(), "Store asset objects(true)") + ("es-objects-balances", boost::program_options::value(), "Store balances objects(true)") + ("es-objects-limit-orders", boost::program_options::value(), "Store limit order objects(true)") + ("es-objects-asset-bitasset", boost::program_options::value(), "Store feed data(true)") ("es-objects-index-prefix", boost::program_options::value(), "Add a prefix to the index(objects-)") + ("es-objects-keep-only-current", boost::program_options::value(), "Keep only current state of the objects(true)") ; cfg.add(cli); } @@ -400,17 +415,25 @@ void es_objects_plugin::plugin_set_program_options( void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options) { database().new_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ) { - if(!my->updateDatabase(ids, 1)) + if(!my->index_database(ids, "create")) { - FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying."); + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error creating object from ES database, we are going to keep trying."); } }); database().changed_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ) { - if(!my->updateDatabase(ids, 0)) + if(!my->index_database(ids, "update")) { - FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying."); + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error updating object from ES database, we are going to keep trying."); } }); + database().removed_objects.connect([this](const vector& ids, const vector& objs, const flat_set& impacted_accounts) { + if(!my->index_database(ids, "delete")) + { + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error deleting object from ES database, we are going to keep trying."); + } + }); + + if (options.count("es-objects-elasticsearch-url")) { my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as(); } @@ -444,6 +467,9 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable if (options.count("es-objects-index-prefix")) { my->_es_objects_index_prefix = options["es-objects-index-prefix"].as(); } + if (options.count("es-objects-keep-only-current")) { + my->_es_objects_keep_only_current = options["es-objects-keep-only-current"].as(); + } } void es_objects_plugin::plugin_startup() diff --git a/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp index b8d7953704..54ecbdc90d 100644 --- a/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp +++ b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp @@ -67,19 +67,6 @@ struct proposal_struct { string available_owner_approvals; string available_key_approvals; account_id_type proposer; - - bool operator==(const proposal_struct& comp) const { - return object_id == comp.object_id && - expiration_time == comp.expiration_time && - review_period_time == comp.review_period_time && - proposed_transaction == comp.proposed_transaction && - required_active_approvals == comp.required_active_approvals && - required_owner_approvals == comp.required_owner_approvals && - available_owner_approvals == comp.available_owner_approvals && - available_key_approvals == comp.available_key_approvals && - proposer == comp.proposer; - } - }; struct account_struct { object_id_type object_id; @@ -100,23 +87,7 @@ struct account_struct { string active_key_auths; string active_address_auths; account_id_type voting_account; - - bool operator==(const account_struct& comp) const { - return object_id == comp.object_id && - membership_expiration_date == comp.membership_expiration_date && - registrar == comp.registrar && - referrer == comp.referrer && - lifetime_referrer == comp.lifetime_referrer && - network_fee_percentage == comp.network_fee_percentage && - name == comp.name && - owner_account_auths == comp.owner_account_auths && - owner_key_auths == comp.owner_key_auths && - owner_address_auths == comp.owner_address_auths && - active_account_auths == comp.active_account_auths && - active_key_auths == comp.active_key_auths && - active_address_auths == comp.active_address_auths && - voting_account == comp.voting_account; - } + string votes; }; struct asset_struct { object_id_type object_id; @@ -127,30 +98,15 @@ struct asset_struct { bool is_market_issued; asset_dynamic_data_id_type dynamic_asset_data_id; optional bitasset_data_id; - - bool operator==(const asset_struct& comp) const { - return object_id == comp.object_id && - symbol == comp.symbol && - issuer == comp.issuer && - is_market_issued == comp.is_market_issued && - dynamic_asset_data_id == comp.dynamic_asset_data_id && - bitasset_data_id == comp.bitasset_data_id; - } }; struct balance_struct { object_id_type object_id; fc::time_point_sec block_time; uint32_t block_number; - address owner; - asset_id_type asset_id; - share_type amount; - - bool operator==(const balance_struct& comp) const { - return object_id == comp.object_id && - owner == comp.owner && - asset_id == comp.asset_id && - amount == comp.amount; - } + account_id_type owner; + asset_id_type asset_type; + share_type balance; + bool maintenance_flag; }; struct limit_order_struct { object_id_type object_id; @@ -161,15 +117,6 @@ struct limit_order_struct { share_type for_sale; price sell_price; share_type deferred_fee; - - bool operator==(const limit_order_struct& comp) const { - return object_id == comp.object_id && - expiration == comp.expiration && - seller == comp.seller && - for_sale == comp.for_sale && - sell_price == comp.sell_price && - deferred_fee == comp.deferred_fee; - } }; struct bitasset_struct { object_id_type object_id; @@ -178,19 +125,34 @@ struct bitasset_struct { string current_feed; time_point_sec current_feed_publication_time; time_point_sec feed_expiration_time; - - bool operator==(const bitasset_struct& comp) const { - return object_id == comp.object_id && - current_feed == comp.current_feed && - feed_expiration_time == comp.feed_expiration_time; - } }; } } //graphene::es_objects -FC_REFLECT( graphene::es_objects::proposal_struct, (object_id)(block_time)(block_number)(expiration_time)(review_period_time)(proposed_transaction)(required_active_approvals)(available_active_approvals)(required_owner_approvals)(available_owner_approvals)(available_key_approvals)(proposer) ) -FC_REFLECT( graphene::es_objects::account_struct, (object_id)(block_time)(block_number)(membership_expiration_date)(registrar)(referrer)(lifetime_referrer)(network_fee_percentage)(lifetime_referrer_fee_percentage)(referrer_rewards_percentage)(name)(owner_account_auths)(owner_key_auths)(owner_address_auths)(active_account_auths)(active_key_auths)(active_address_auths)(voting_account) ) -FC_REFLECT( graphene::es_objects::asset_struct, (object_id)(block_time)(block_number)(symbol)(issuer)(is_market_issued)(dynamic_asset_data_id)(bitasset_data_id) ) -FC_REFLECT( graphene::es_objects::balance_struct, (object_id)(block_time)(block_number)(block_time)(owner)(asset_id)(amount) ) -FC_REFLECT( graphene::es_objects::limit_order_struct, (object_id)(block_time)(block_number)(expiration)(seller)(for_sale)(sell_price)(deferred_fee) ) -FC_REFLECT( graphene::es_objects::bitasset_struct, (object_id)(block_time)(block_number)(current_feed)(current_feed_publication_time) ) \ No newline at end of file +FC_REFLECT( + graphene::es_objects::proposal_struct, + (object_id)(block_time)(block_number)(expiration_time)(review_period_time)(proposed_transaction)(required_active_approvals) + (available_active_approvals)(required_owner_approvals)(available_owner_approvals)(available_key_approvals)(proposer) +) +FC_REFLECT( + graphene::es_objects::account_struct, + (object_id)(block_time)(block_number)(membership_expiration_date)(registrar)(referrer)(lifetime_referrer) + (network_fee_percentage)(lifetime_referrer_fee_percentage)(referrer_rewards_percentage)(name)(owner_account_auths) + (owner_key_auths)(owner_address_auths)(active_account_auths)(active_key_auths)(active_address_auths)(voting_account)(votes) +) +FC_REFLECT( + graphene::es_objects::asset_struct, + (object_id)(block_time)(block_number)(symbol)(issuer)(is_market_issued)(dynamic_asset_data_id)(bitasset_data_id) +) +FC_REFLECT( + graphene::es_objects::balance_struct, + (object_id)(block_time)(block_number)(owner)(asset_type)(balance)(maintenance_flag) +) +FC_REFLECT( + graphene::es_objects::limit_order_struct, + (object_id)(block_time)(block_number)(expiration)(seller)(for_sale)(sell_price)(deferred_fee) +) +FC_REFLECT( + graphene::es_objects::bitasset_struct, + (object_id)(block_time)(block_number)(current_feed)(current_feed_publication_time) +) \ No newline at end of file