Skip to content

Commit

Permalink
support repair indexer rpc (#2396)
Browse files Browse the repository at this point in the history
  • Loading branch information
baichuan3 authored Aug 9, 2024
1 parent df759c4 commit 6d542fe
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 6 deletions.
33 changes: 32 additions & 1 deletion crates/rooch-indexer/src/actor/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use crate::actor::messages::{
IndexerEventsMessage, IndexerStatesMessage, IndexerTransactionMessage, UpdateIndexerMessage,
IndexerDeleteObjectStatesMessage, IndexerEventsMessage,
IndexerPersistOrUpdateObjectStatesMessage, IndexerStatesMessage, IndexerTransactionMessage,
UpdateIndexerMessage,
};
use crate::store::traits::IndexerStoreTrait;
use crate::IndexerStore;
Expand Down Expand Up @@ -161,3 +163,32 @@ impl Handler<IndexerEventsMessage> for IndexerActor {
Ok(())
}
}

#[async_trait]
impl Handler<IndexerPersistOrUpdateObjectStatesMessage> for IndexerActor {
async fn handle(
&mut self,
msg: IndexerPersistOrUpdateObjectStatesMessage,
_ctx: &mut ActorContext,
) -> Result<()> {
let IndexerPersistOrUpdateObjectStatesMessage { states } = msg;

self.indexer_store.persist_or_update_object_states(states)?;
Ok(())
}
}

#[async_trait]
impl Handler<IndexerDeleteObjectStatesMessage> for IndexerActor {
async fn handle(
&mut self,
msg: IndexerDeleteObjectStatesMessage,
_ctx: &mut ActorContext,
) -> Result<()> {
let IndexerDeleteObjectStatesMessage { object_ids } = msg;

let state_pks = object_ids.into_iter().map(|v| v.to_string()).collect();
self.indexer_store.delete_object_states(state_pks)?;
Ok(())
}
}
26 changes: 26 additions & 0 deletions crates/rooch-indexer/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,29 @@ pub struct QueryIndexerObjectIdsMessage {
impl Message for QueryIndexerObjectIdsMessage {
type Result = Result<Vec<(ObjectID, IndexerStateID)>>;
}

#[derive(Debug, Serialize, Deserialize)]
pub struct IndexerPersistOrUpdateObjectStatesMessage {
pub states: Vec<IndexerObjectState>,
}

impl Message for IndexerPersistOrUpdateObjectStatesMessage {
type Result = Result<()>;
}

pub struct IndexerDeleteObjectStatesMessage {
pub object_ids: Vec<ObjectID>,
}

impl Message for IndexerDeleteObjectStatesMessage {
type Result = Result<()>;
}

#[derive(Debug, Serialize, Deserialize)]
pub struct QueryLastStateIndexByTxOrderMessage {
pub tx_order: u64,
}

impl Message for QueryLastStateIndexByTxOrderMessage {
type Result = Result<u64>;
}
21 changes: 21 additions & 0 deletions crates/rooch-indexer/src/actor/reader_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::actor::messages::{
QueryIndexerEventsMessage, QueryIndexerObjectStatesMessage, QueryIndexerTransactionsMessage,
QueryLastStateIndexByTxOrderMessage,
};
use crate::indexer_reader::IndexerReader;
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -102,3 +103,23 @@ impl Handler<QueryIndexerObjectIdsMessage> for IndexerReaderActor {
.map_err(|e| anyhow!(format!("Failed to query indexer object states: {:?}", e)))
}
}

#[async_trait]
impl Handler<QueryLastStateIndexByTxOrderMessage> for IndexerReaderActor {
async fn handle(
&mut self,
msg: QueryLastStateIndexByTxOrderMessage,
_ctx: &mut ActorContext,
) -> Result<u64> {
let QueryLastStateIndexByTxOrderMessage { tx_order } = msg;

self.indexer_reader
.query_last_state_index_by_tx_order(tx_order)
.map_err(|e| {
anyhow!(format!(
"Failed to query indexer last state index by tx order: {:?}",
e
))
})
}
}
26 changes: 24 additions & 2 deletions crates/rooch-indexer/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

use crate::actor::indexer::IndexerActor;
use crate::actor::messages::{
IndexerEventsMessage, IndexerStatesMessage, IndexerTransactionMessage,
IndexerDeleteObjectStatesMessage, IndexerEventsMessage,
IndexerPersistOrUpdateObjectStatesMessage, IndexerStatesMessage, IndexerTransactionMessage,
QueryIndexerEventsMessage, QueryIndexerObjectIdsMessage, QueryIndexerObjectStatesMessage,
QueryIndexerTransactionsMessage, UpdateIndexerMessage,
QueryIndexerTransactionsMessage, QueryLastStateIndexByTxOrderMessage, UpdateIndexerMessage,
};
use crate::actor::reader_indexer::IndexerReaderActor;
use anyhow::{Ok, Result};
Expand Down Expand Up @@ -174,4 +175,25 @@ impl IndexerProxy {
})
.await?
}

pub async fn persist_or_update_object_states(
&self,
states: Vec<IndexerObjectState>,
) -> Result<()> {
self.actor
.send(IndexerPersistOrUpdateObjectStatesMessage { states })
.await?
}

pub async fn delete_object_states(&self, object_ids: Vec<ObjectID>) -> Result<()> {
self.actor
.send(IndexerDeleteObjectStatesMessage { object_ids })
.await?
}

pub async fn query_last_state_index_by_tx_order(&self, tx_order: u64) -> Result<u64> {
self.reader_actor
.send(QueryLastStateIndexByTxOrderMessage { tx_order })
.await?
}
}
60 changes: 60 additions & 0 deletions crates/rooch-open-rpc-spec/schemas/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,33 @@
}
}
},
{
"name": "rooch_repairIndexer",
"description": "Repair indexer by sync from states",
"params": [
{
"name": "repair_type",
"required": true,
"schema": {
"$ref": "#/components/schemas/rooch_types::repair::RepairIndexerType"
}
},
{
"name": "repair_params",
"required": true,
"schema": {
"$ref": "#/components/schemas/RepairIndexerParamsView"
}
}
],
"result": {
"name": "()",
"required": true,
"schema": {
"type": "null"
}
}
},
{
"name": "rooch_sendRawTransaction",
"description": "Send the signed transaction in bcs hex format This method does not block waiting for the transaction to be executed.",
Expand Down Expand Up @@ -2550,6 +2577,36 @@
}
}
},
"RepairIndexerParamsView": {
"oneOf": [
{
"description": "Repair by owner.",
"type": "object",
"required": [
"owner"
],
"properties": {
"owner": {
"$ref": "#/components/schemas/rooch_rpc_api::jsonrpc_types::address::UnitedAddress"
}
},
"additionalProperties": false
},
{
"description": "Repair by object ids.",
"type": "object",
"required": [
"object_id"
],
"properties": {
"object_id": {
"$ref": "#/components/schemas/alloc::vec::Vec<moveos_types::moveos_std::object::ObjectID>"
}
},
"additionalProperties": false
}
]
},
"ScriptCallView": {
"type": "object",
"required": [
Expand Down Expand Up @@ -3211,6 +3268,9 @@
"rooch_types::address::RoochAddress": {
"type": "string"
},
"rooch_types::repair::RepairIndexerType": {
"type": "string"
},
"u128": {
"type": "string"
},
Expand Down
9 changes: 9 additions & 0 deletions crates/rooch-rpc-api/src/api/rooch_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::jsonrpc_types::account_view::BalanceInfoView;
use crate::jsonrpc_types::address::UnitedAddressView;
use crate::jsonrpc_types::event_view::{EventFilterView, IndexerEventIDView};
use crate::jsonrpc_types::repair_view::{RepairIndexerParamsView, RepairIndexerTypeView};
use crate::jsonrpc_types::transaction_view::{TransactionFilterView, TransactionWithInfoView};
use crate::jsonrpc_types::{
AccessPathView, AnnotatedFunctionResultView, BalanceInfoPageView, BytesView,
Expand Down Expand Up @@ -189,4 +190,12 @@ pub trait RoochAPI {
limit: Option<StrView<u64>>,
query_option: Option<QueryOptions>,
) -> RpcResult<IndexerObjectStatePageView>;

/// Repair indexer by sync from states
#[method(name = "repairIndexer")]
async fn repair_indexer(
&self,
repair_type: RepairIndexerTypeView,
repair_params: RepairIndexerParamsView,
) -> RpcResult<()>;
}
1 change: 1 addition & 0 deletions crates/rooch-rpc-api/src/jsonrpc_types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod transaction_view;

pub mod address;
pub mod btc;
pub mod repair_view;

pub use self::rooch_types::*;
pub use address::*;
Expand Down
43 changes: 43 additions & 0 deletions crates/rooch-rpc-api/src/jsonrpc_types/repair_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::jsonrpc_types::{ObjectIDVecView, StrView, UnitedAddressView};
use rooch_types::repair::{RepairIndexerParams, RepairIndexerType};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::str::FromStr;

pub type RepairIndexerTypeView = StrView<RepairIndexerType>;

impl std::fmt::Display for RepairIndexerTypeView {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl FromStr for RepairIndexerTypeView {
type Err = anyhow::Error;
fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
Ok(StrView(RepairIndexerType::from_str(s)?))
}
}

#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum RepairIndexerParamsView {
/// Repair by owner.
Owner(UnitedAddressView),
/// Repair by object ids.
ObjectId(ObjectIDVecView),
}

impl From<RepairIndexerParamsView> for RepairIndexerParams {
fn from(repair_params: RepairIndexerParamsView) -> Self {
match repair_params {
RepairIndexerParamsView::Owner(owner) => RepairIndexerParams::Owner(owner.into()),
RepairIndexerParamsView::ObjectId(object_id_vec_view) => {
RepairIndexerParams::ObjectId(object_id_vec_view.into())
}
}
}
}
12 changes: 12 additions & 0 deletions crates/rooch-rpc-server/src/server/rooch_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use moveos_types::{
moveos_std::{move_module::MoveModule, object::ObjectID},
state::{AnnotatedState, FieldKey},
};
use rooch_rpc_api::jsonrpc_types::repair_view::{RepairIndexerParamsView, RepairIndexerTypeView};
use rooch_rpc_api::jsonrpc_types::{
account_view::BalanceInfoView,
event_view::{EventFilterView, EventView, IndexerEventIDView, IndexerEventView},
Expand Down Expand Up @@ -759,6 +760,17 @@ impl RoochAPIServer for RoochServer {
has_next_page,
})
}

async fn repair_indexer(
&self,
repair_type: RepairIndexerTypeView,
repair_params: RepairIndexerParamsView,
) -> RpcResult<()> {
self.rpc_service
.repair_indexer(repair_type.0, repair_params.into())
.await?;
Ok(())
}
}

impl RoochRpcModule for RoochServer {
Expand Down
Loading

0 comments on commit 6d542fe

Please sign in to comment.