Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete objects from ES before loading from object database #2623

Merged
merged 7 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions libraries/plugins/es_objects/es_objects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ class es_objects_plugin_impl
{ index_database( ids, action_type::deletion ); }

void index_database(const vector<object_id_type>& ids, action_type action);
void sync_db();
void remove_from_database( const object_id_type& id, const plugin_options::object_options& opt );
/// Load all data from the object database into ES
void sync_db( bool delete_before_load = false );
/// Delete one object from ES
void delete_from_database( const object_id_type& id, const plugin_options::object_options& opt );
/// Delete all objects of the specified type from ES
void delete_all_from_database( const plugin_options::object_options& opt ) const;

es_objects_plugin& _self;
plugin_options _options;
Expand Down Expand Up @@ -145,19 +149,28 @@ struct data_loader
}

template<typename ObjType>
void load( const es_objects_plugin_impl::plugin_options::object_options& opt )
void load( const es_objects_plugin_impl::plugin_options::object_options& opt,
bool force_delete = false )
{
if( !opt.enabled )
return;

// If no_delete or store_updates is true, do not delete
if( force_delete || !( opt.no_delete || opt.store_updates ) )
{
ilog( "Deleting all data in index " + my->_options.index_prefix + opt.index_name );
my->delete_all_from_database( opt );
}

ilog( "Loading data into index " + my->_options.index_prefix + opt.index_name );
db.get_index( ObjType::space_id, ObjType::type_id ).inspect_all_objects(
[this, &opt](const graphene::db::object &o) {
my->prepareTemplate( static_cast<const ObjType&>(o), opt );
});
}
};

void es_objects_plugin_impl::sync_db()
void es_objects_plugin_impl::sync_db( bool delete_before_load )
{
ilog("elasticsearch OBJECTS: loading data from the object database (chain state)");

Expand All @@ -168,13 +181,15 @@ void es_objects_plugin_impl::sync_db()

data_loader loader( this );

loader.load<account_object >( _options.accounts );
loader.load<asset_object >( _options.assets );
loader.load<asset_bitasset_data_object >( _options.asset_bitasset );
loader.load<account_balance_object >( _options.balances );
loader.load<proposal_object >( _options.proposals );
loader.load<limit_order_object >( _options.limit_orders );
loader.load<budget_record_object >( _options.budget );
loader.load<account_object >( _options.accounts, delete_before_load );
loader.load<asset_object >( _options.assets, delete_before_load );
loader.load<asset_bitasset_data_object >( _options.asset_bitasset, delete_before_load );
loader.load<account_balance_object >( _options.balances, delete_before_load );
loader.load<proposal_object >( _options.proposals, delete_before_load );
loader.load<limit_order_object >( _options.limit_orders, delete_before_load );
loader.load<budget_record_object >( _options.budget, delete_before_load );

ilog("elasticsearch OBJECTS: done loading data from the object database (chain state)");
}

void es_objects_plugin_impl::index_database(const vector<object_id_type>& ids, action_type action)
Expand Down Expand Up @@ -213,7 +228,7 @@ void es_objects_plugin_impl::index_database(const vector<object_id_type>& ids, a
continue;
const auto& opt = itr->second;
if( action_type::deletion == action )
remove_from_database( value, opt );
delete_from_database( value, opt );
else
{
switch( itr->first )
Expand Down Expand Up @@ -247,7 +262,7 @@ void es_objects_plugin_impl::index_database(const vector<object_id_type>& ids, a

}

void es_objects_plugin_impl::remove_from_database(
void es_objects_plugin_impl::delete_from_database(
const object_id_type& id, const es_objects_plugin_impl::plugin_options::object_options& opt )
{
if( opt.no_delete )
Expand All @@ -266,6 +281,17 @@ void es_objects_plugin_impl::remove_from_database(
send_bulk_if_ready();
}

void es_objects_plugin_impl::delete_all_from_database( const plugin_options::object_options& opt ) const
{
// Note:
// 1. The _delete_by_query API deletes the data but keeps the index mapping, so the function is OK.
// Simply deleting the index is probably faster, but it requires the "delete_index" permission, and
// may probably mess up the index mapping and other existing settings.
// Don't know if there is a good way to only delete objects that do not exist in the object database.
// 2. We don't check the return value here, it's probably OK
es->query( _options.index_prefix + opt.index_name + "/_delete_by_query", R"({"query":{"match_all":{}}})" );
}

template<typename T>
void es_objects_plugin_impl::prepareTemplate(
const T& blockchain_object, const es_objects_plugin_impl::plugin_options::object_options& opt )
Expand Down Expand Up @@ -463,7 +489,9 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable

void es_objects_plugin::plugin_startup()
{
if( my->_options.sync_db_on_startup || 0 == database().head_block_num() )
if( 0 == database().head_block_num() )
my->sync_db( true );
else if( my->_options.sync_db_on_startup )
my->sync_db();
}

Expand Down
4 changes: 2 additions & 2 deletions tests/common/database_fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ std::shared_ptr<boost::program_options::variables_map> database_fixture_base::in
fixture.app.register_plugin<graphene::es_objects::es_objects_plugin>(true);

fc::set_option( options, "es-objects-elasticsearch-url", GRAPHENE_TESTING_ES_URL );
fc::set_option( options, "es-objects-bulk-replay", uint32_t(2) );
fc::set_option( options, "es-objects-bulk-sync", uint32_t(2) );
fc::set_option( options, "es-objects-bulk-replay", uint32_t(1) );
fc::set_option( options, "es-objects-bulk-sync", uint32_t(1) );
fc::set_option( options, "es-objects-proposals", true );
fc::set_option( options, "es-objects-accounts", true );
fc::set_option( options, "es-objects-assets", true );
Expand Down
36 changes: 35 additions & 1 deletion tests/elasticsearch/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) {
if(delete_objects) { // all records deleted

// asset and bitasset
create_bitasset("USD", account_id_type());
asset_id_type usd_id = create_bitasset("USD", account_id_type()).id;
generate_block();

string query = "{ \"query\" : { \"bool\" : { \"must\" : [{\"match_all\": {}}] } } }";
Expand Down Expand Up @@ -268,8 +268,28 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) {
auto bitasset_object_id = j["hits"]["hits"][size_t(0)]["_source"]["object_id"].as_string();
BOOST_CHECK_EQUAL(bitasset_object_id, bitasset_data_id);

// create a limit order that expires at the next maintenance time
create_sell_order( account_id_type(), asset(1), asset(1, usd_id),
db.get_dynamic_global_properties().next_maintenance_time );
generate_block();

es.endpoint = es.index_prefix + "limitorder/_doc/_count";
es.query = "";
fc::wait_for( ES_WAIT_TIME, [&]() {
res = graphene::utilities::getEndPoint(es);
j = fc::json::from_string(res);
if( !j.is_object() )
return false;
const auto& obj = j.get_object();
if( obj.find("count") == obj.end() )
return false;
total = obj["count"].as_string();
return (total == "1");
});

// maintenance, for budget records
generate_blocks( db.get_dynamic_global_properties().next_maintenance_time );
generate_block();

es.endpoint = es.index_prefix + "budget/_doc/_count";
es.query = "";
Expand All @@ -285,6 +305,20 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) {
return (total == "1"); // new record inserted at the first maintenance block
});

es.endpoint = es.index_prefix + "limitorder/_doc/_count";
es.query = "";
fc::wait_for( ES_WAIT_TIME, [&]() {
res = graphene::utilities::getEndPoint(es);
j = fc::json::from_string(res);
if( !j.is_object() )
return false;
const auto& obj = j.get_object();
if( obj.find("count") == obj.end() )
return false;
total = obj["count"].as_string();
return (total == "0"); // the limit order expired, so the object is removed
});

}
}
catch (fc::exception &e) {
Expand Down