Skip to content

Commit

Permalink
Resolve "port ES changes from Bitshares"
Browse files Browse the repository at this point in the history
  • Loading branch information
Vlad Dobromyslov authored and nathanielhourt committed Feb 8, 2022
1 parent 62a553a commit 45e501b
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 302 deletions.
271 changes: 232 additions & 39 deletions libraries/plugins/elasticsearch/elasticsearch_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* THE SOFTWARE.
*/

#include <boost/algorithm/string.hpp>
#include <graphene/elasticsearch/elasticsearch_plugin.hpp>
#include <graphene/chain/impacted.hpp>
#include <graphene/chain/account_evaluator.hpp>
Expand All @@ -33,6 +34,15 @@ namespace graphene { namespace elasticsearch {
namespace detail
{

const std::string generateIndexName(const fc::time_point_sec& block_date, const std::string& _elasticsearch_index_prefix)
{
auto block_date_string = block_date.to_iso_string();
std::vector<std::string> parts;
boost::split(parts, block_date_string, boost::is_any_of("-"));
std::string index_name = _elasticsearch_index_prefix + parts[0] + "-" + parts[1];
return index_name;
}

class elasticsearch_plugin_impl
{
public:
Expand All @@ -48,6 +58,9 @@ class elasticsearch_plugin_impl
return _self.database();
}

friend class graphene::elasticsearch::elasticsearch_plugin;

private:
elasticsearch_plugin& _self;
primary_index< operation_history_index >* _oho_index;

Expand Down Expand Up @@ -75,6 +88,8 @@ class elasticsearch_plugin_impl
std::string bulk_line;
std::string index_name;
bool is_sync = false;
bool is_es_version_7_or_above = true;

private:
bool add_elasticsearch( const account_id_type account_id, const optional<operation_history_object>& oho, const uint32_t block_number );
const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj,
Expand All @@ -91,6 +106,7 @@ class elasticsearch_plugin_impl
void createBulkLine(const account_transaction_history_object& ath);
void prepareBulk(const account_transaction_history_id_type& ath_id);
void populateESstruct();
void init_program_options(const boost::program_options::variables_map& options);
};

elasticsearch_plugin_impl::~elasticsearch_plugin_impl()
Expand All @@ -105,7 +121,7 @@ elasticsearch_plugin_impl::~elasticsearch_plugin_impl()
bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b )
{
checkState(b.timestamp);
index_name = graphene::utilities::generateIndexName(b.timestamp, _elasticsearch_index_prefix);
index_name = generateIndexName(b.timestamp, _elasticsearch_index_prefix);

graphene::chain::database& db = database();
const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
Expand Down Expand Up @@ -229,6 +245,113 @@ void elasticsearch_plugin_impl::getOperationType(const optional <operation_histo
op_type = oho->op.which();
}

struct adaptor_struct
{
variant adapt(const variant_object& op)
{
fc::mutable_variant_object o(op);
vector<string> keys_to_rename;
for (auto& i : o)
{
auto& element = i.value();
if (element.is_object())
{
const string& name = i.key();
const auto& vo = element.get_object();
if (vo.contains(name.c_str()))
keys_to_rename.emplace_back(name);
element = adapt(vo);
}
else if (element.is_array())
adapt(element.get_array());
}
for (const auto& i : keys_to_rename)
{
string new_name = i + "_";
o[new_name] = variant(o[i]);
o.erase(i);
}

if (o.find("memo") != o.end())
{
auto& memo = o["memo"];
if (memo.is_string())
{
o["memo_"] = o["memo"];
o.erase("memo");
}
else if (memo.is_object())
{
fc::mutable_variant_object tmp(memo.get_object());
if (tmp.find("nonce") != tmp.end())
{
tmp["nonce"] = tmp["nonce"].as_string();
o["memo"] = tmp;
}
}
}
if (o.find("new_parameters") != o.end())
{
auto& tmp = o["new_parameters"];
if (tmp.is_object())
{
fc::mutable_variant_object tmp2(tmp.get_object());
if (tmp2.find("current_fees") != tmp2.end())
{
tmp2.erase("current_fees");
o["new_parameters"] = tmp2;
}
}
}
if (o.find("owner") != o.end() && o["owner"].is_string())
{
o["owner_"] = o["owner"].as_string();
o.erase("owner");
}
if (o.find("proposed_ops") != o.end())
{
o["proposed_ops"] = fc::json::to_string(o["proposed_ops"]);
}
if (o.find("initializer") != o.end())
{
o["initializer"] = fc::json::to_string(o["initializer"]);
}
if (o.find("policy") != o.end())
{
o["policy"] = fc::json::to_string(o["policy"]);
}
if (o.find("predicates") != o.end())
{
o["predicates"] = fc::json::to_string(o["predicates"]);
}
if (o.find("active_special_authority") != o.end())
{
o["active_special_authority"] = fc::json::to_string(o["active_special_authority"]);
}
if (o.find("owner_special_authority") != o.end())
{
o["owner_special_authority"] = fc::json::to_string(o["owner_special_authority"]);
}

variant v;
fc::to_variant(o, v, FC_PACK_MAX_DEPTH);
return v;
}

void adapt(fc::variants& v)
{
for (auto& array_element : v)
{
if (array_element.is_object())
array_element = adapt(array_element.get_object());
else if (array_element.is_array())
adapt(array_element.get_array());
else
array_element = array_element.as_string();
}
}
};

void elasticsearch_plugin_impl::doOperationHistory(const optional <operation_history_object>& oho)
{
os.trx_in_block = oho->trx_in_block;
Expand All @@ -255,6 +378,61 @@ void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_bloc
bs.trx_id = trx_id;
}

struct operation_visitor
{
using result_type = void;

share_type fee_amount;
asset_id_type fee_asset;

asset_id_type transfer_asset_id;
share_type transfer_amount;
account_id_type transfer_from;
account_id_type transfer_to;

void operator()( const graphene::chain::transfer_operation& o )
{
fee_asset = o.fee.asset_id;
fee_amount = o.fee.amount;

transfer_asset_id = o.amount.asset_id;
transfer_amount = o.amount.amount;
transfer_from = o.from;
transfer_to = o.to;
}

object_id_type fill_order_id;
account_id_type fill_account_id;
asset_id_type fill_pays_asset_id;
share_type fill_pays_amount;
asset_id_type fill_receives_asset_id;
share_type fill_receives_amount;
//double fill_fill_price;
//bool fill_is_maker;

void operator()( const graphene::chain::fill_order_operation& o )
{
fee_asset = o.fee.asset_id;
fee_amount = o.fee.amount;

fill_order_id = o.order_id;
fill_account_id = o.account_id;
fill_pays_asset_id = o.pays.asset_id;
fill_pays_amount = o.pays.amount;
fill_receives_asset_id = o.receives.asset_id;
fill_receives_amount = o.receives.amount;
//fill_fill_price = o.fill_price.to_real();
//fill_is_maker = o.is_maker;
}

template<typename T>
void operator()( const T& o )
{
fee_asset = o.fee.asset_id;
fee_amount = o.fee.amount;
}
};

void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_object>& oho)
{
graphene::chain::database& db = database();
Expand Down Expand Up @@ -380,7 +558,8 @@ void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id
const std::string _id = fc::json::to_string(ath_id);
fc::mutable_variant_object bulk_header;
bulk_header["_index"] = index_name;
bulk_header["_type"] = "data";
if( !is_es_version_7_or_above )
bulk_header["_type"] = "_doc";
bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "."
+ fc::to_string(ath_id.instance.value);
prepare = graphene::utilities::createBulk(bulk_header, std::move(bulk_line));
Expand Down Expand Up @@ -428,6 +607,43 @@ void elasticsearch_plugin_impl::populateESstruct()
es.query = "";
}

void elasticsearch_plugin_impl::init_program_options(const boost::program_options::variables_map& options)
{
if (options.count("elasticsearch-node-url")) {
_elasticsearch_node_url = options["elasticsearch-node-url"].as<std::string>();
}
if (options.count("elasticsearch-bulk-replay")) {
_elasticsearch_bulk_replay = options["elasticsearch-bulk-replay"].as<uint32_t>();
}
if (options.count("elasticsearch-bulk-sync")) {
_elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as<uint32_t>();
}
if (options.count("elasticsearch-visitor")) {
_elasticsearch_visitor = options["elasticsearch-visitor"].as<bool>();
}
if (options.count("elasticsearch-basic-auth")) {
_elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as<std::string>();
}
if (options.count("elasticsearch-index-prefix")) {
_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as<std::string>();
}
if (options.count("elasticsearch-operation-object")) {
_elasticsearch_operation_object = options["elasticsearch-operation-object"].as<bool>();
}
if (options.count("elasticsearch-start-es-after-block")) {
_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
}
if (options.count("elasticsearch-operation-string")) {
_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
}
if (options.count("elasticsearch-mode")) {
const auto option_number = options["elasticsearch-mode"].as<uint16_t>();
if(option_number > mode::all)
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid");
_elasticsearch_mode = static_cast<mode>(options["elasticsearch-mode"].as<uint16_t>());
}
}

} // end namespace detail

elasticsearch_plugin::elasticsearch_plugin() :
Expand Down Expand Up @@ -480,42 +696,12 @@ void elasticsearch_plugin::plugin_set_program_options(

void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
ilog("elasticsearch ACCOUNT HISTORY: plugin_initialize() begin");

my->_oho_index = database().add_index< primary_index< operation_history_index > >();
database().add_index< primary_index< account_transaction_history_index > >();

if (options.count("elasticsearch-node-url")) {
my->_elasticsearch_node_url = options["elasticsearch-node-url"].as<std::string>();
}
if (options.count("elasticsearch-bulk-replay")) {
my->_elasticsearch_bulk_replay = options["elasticsearch-bulk-replay"].as<uint32_t>();
}
if (options.count("elasticsearch-bulk-sync")) {
my->_elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as<uint32_t>();
}
if (options.count("elasticsearch-visitor")) {
my->_elasticsearch_visitor = options["elasticsearch-visitor"].as<bool>();
}
if (options.count("elasticsearch-basic-auth")) {
my->_elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as<std::string>();
}
if (options.count("elasticsearch-index-prefix")) {
my->_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as<std::string>();
}
if (options.count("elasticsearch-operation-object")) {
my->_elasticsearch_operation_object = options["elasticsearch-operation-object"].as<bool>();
}
if (options.count("elasticsearch-start-es-after-block")) {
my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
}
if (options.count("elasticsearch-operation-string")) {
my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
}
if (options.count("elasticsearch-mode")) {
const auto option_number = options["elasticsearch-mode"].as<uint16_t>();
if(option_number > mode::all)
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid");
my->_elasticsearch_mode = static_cast<mode>(options["elasticsearch-mode"].as<uint16_t>());
}
my->init_program_options( options );

if(my->_elasticsearch_mode != mode::only_query) {
if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string)
Expand All @@ -528,18 +714,25 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
"Error populating ES database, we are going to keep trying.");
});
}
}

void elasticsearch_plugin::plugin_startup()
{
graphene::utilities::ES es;
es.curl = my->curl;
es.elasticsearch_url = my->_elasticsearch_node_url;
es.auth = my->_elasticsearch_basic_auth;

if(!graphene::utilities::checkES(es))
FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_elasticsearch_node_url));

graphene::utilities::checkESVersion7OrAbove(es, my->is_es_version_7_or_above);

ilog("elasticsearch ACCOUNT HISTORY: plugin_initialize() end");
}

void elasticsearch_plugin::plugin_startup()
{
ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin");
// Nothing to do
ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() end");
}

operation_history_object elasticsearch_plugin::get_operation_by_id(operation_history_id_type id)
Expand Down Expand Up @@ -655,7 +848,7 @@ graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query)
es.curl = curl;
es.elasticsearch_url = my->_elasticsearch_node_url;
es.index_prefix = my->_elasticsearch_index_prefix;
es.endpoint = es.index_prefix + "*/data/_search";
es.endpoint = es.index_prefix + "*/_doc/_search";
es.query = query;

return es;
Expand Down
Loading

0 comments on commit 45e501b

Please sign in to comment.