Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub/event decode #2717

Merged
merged 5 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions abi/decoder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub struct DecodedPackage {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct DecodedMoveValue(serde_json::Value);
impl From<DecodedMoveValue> for serde_json::Value {
fn from(v: DecodedMoveValue) -> Self {
v.0
}
}

/// Transform AnnotatedMoveValue into DecodedMoveValue.
fn struct_to_json(origin: AnnotatedMoveStruct) -> serde_json::Value {
Expand Down
2 changes: 1 addition & 1 deletion account/service/src/account_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl EventHandler<Self, ContractEventNotification> for AccountEventService {
return;
}

for i in item.0.as_ref() {
for i in item.0 .1.as_ref() {
if watched_keys.contains(i.contract_event.key()) {
if let Err(e) = self.handle_contract_event(&i.contract_event) {
error!(
Expand Down
3 changes: 2 additions & 1 deletion chain/chain-notify/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ impl ChainNotifyHandlerService {
.map(|evt| Event::new(block_id, block_number, txn_hash, Some(i as u32), evt)),
);
}
let events_notification: ContractEventNotification = Notification(all_events.into());
let events_notification: ContractEventNotification =
Notification((block.header.state_root(), all_events.into()));
ctx.broadcast(events_notification);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion chain/chain-notify/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct Notification<T>(pub T);

pub type ContractEventNotification = Notification<Arc<[Event]>>;
pub type ContractEventNotification = Notification<(HashValue, Arc<[Event]>)>;
pub type NewHeadEventNotification = Notification<ThinBlock>;

#[derive(Clone, Debug, Eq, PartialEq)]
Expand Down
5 changes: 3 additions & 2 deletions cmd/starcoin/src/chain/get_events_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::StarcoinOpt;
use anyhow::Result;
use scmd::{CommandAction, ExecContext};
use starcoin_crypto::HashValue;
use starcoin_rpc_api::chain::{GetEventOption, GetEventResponse};
use starcoin_rpc_api::chain::GetEventOption;
use starcoin_rpc_api::types::TransactionEventResponse;
use structopt::StructOpt;

/// Get chain's events by txn hash
Expand All @@ -24,7 +25,7 @@ impl CommandAction for GetEventsCommand {
type State = CliState;
type GlobalOpt = StarcoinOpt;
type Opt = GetEventsOpt;
type ReturnItem = Vec<GetEventResponse>;
type ReturnItem = Vec<TransactionEventResponse>;

fn run(
&self,
Expand Down
7 changes: 3 additions & 4 deletions cmd/starcoin/src/dev/call_contract_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use crate::cli_state::CliState;
use crate::StarcoinOpt;
use anyhow::Result;
use scmd::{CommandAction, ExecContext};
use starcoin_rpc_api::types::{
AnnotatedMoveValueView, ContractCall, FunctionIdView, TransactionArgumentView, TypeTagView,
};
use starcoin_abi_decoder::DecodedMoveValue;
use starcoin_rpc_api::types::{ContractCall, FunctionIdView, TransactionArgumentView, TypeTagView};
use structopt::StructOpt;

/// Call Contract command
Expand Down Expand Up @@ -46,7 +45,7 @@ impl CommandAction for CallContractCommand {
type State = CliState;
type GlobalOpt = StarcoinOpt;
type Opt = CallContractOpt;
type ReturnItem = Vec<AnnotatedMoveValueView>;
type ReturnItem = Vec<DecodedMoveValue>;

fn run(
&self,
Expand Down
28 changes: 19 additions & 9 deletions cmd/starcoin/src/dev/subscribe_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use anyhow::Result;
use futures::{StreamExt, TryStream, TryStreamExt};
use scmd::{CommandAction, ExecContext};
use starcoin_rpc_api::types::pubsub::EventFilter;
use starcoin_rpc_api::types::TypeTagView;
use starcoin_types::account_address::AccountAddress;
use starcoin_types::event::EventKey;
use structopt::StructOpt;
use tokio::io::AsyncBufReadExt;
Expand All @@ -29,13 +31,18 @@ pub struct SubscribeEventOpt {
multiple = true
)]
event_key: Option<Vec<EventKey>>,
#[structopt(
short = "l",
long = "limit",
name = "limit",
help = "limit return size"
)]
#[structopt(long = "address", name = "address", multiple = true)]
/// events of which addresses to subscribe
addresses: Option<Vec<AccountAddress>>,
#[structopt(long = "type_tag", name = "type-tag", multiple = true)]
/// type tags of the events to subscribe
type_tags: Option<Vec<TypeTagView>>,
#[structopt(short = "l", long = "limit", name = "limit")]
/// limit return size
limit: Option<usize>,
#[structopt(long = "decode")]
/// whether decode event
decode: bool,
}

pub struct SubscribeEventCommand;
Expand All @@ -52,12 +59,15 @@ impl CommandAction for SubscribeEventCommand {
from_block: ctx.opt().from_block,
to_block: ctx.opt().to_block,
event_keys: ctx.opt().event_key.clone(),
addrs: None,
type_tags: None,
addrs: ctx.opt().addresses.clone(),
type_tags: ctx.opt().type_tags.clone(),
limit: ctx.opt().limit,
};

let event_stream = ctx.state().client().subscribe_events(filter)?;
let event_stream = ctx
.state()
.client()
.subscribe_events(filter, ctx.opt().decode)?;
println!("Subscribe successful, Press `q` and Enter to quit");
blocking_display_notification(event_stream, |evt| {
serde_json::to_string(&evt).expect("should never fail")
Expand Down
6 changes: 2 additions & 4 deletions cmd/starcoin/src/dev/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use anyhow::{format_err, Result};
use starcoin_config::NodeConfig;
use starcoin_logger::prelude::*;
use starcoin_node::NodeHandle;
use starcoin_rpc_api::types::{
AnnotatedMoveValueView, ContractCall, FunctionIdView, TransactionStatusView,
};
use starcoin_rpc_api::types::{ContractCall, FunctionIdView, TransactionStatusView};
use starcoin_rpc_client::{RemoteStateReader, RpcClient};
use starcoin_state_api::AccountStateReader;
use starcoin_transaction_builder::{
Expand Down Expand Up @@ -421,7 +419,7 @@ fn test_upgrade_module() {
let result = cli_state.client().contract_call(call).unwrap();
assert!(!result.is_empty());
info!("result: {:?}", result);
if let AnnotatedMoveValueView::Bool(flag) = result.get(0).unwrap() {
if let serde_json::Value::Bool(flag) = result.get(0).unwrap().clone().into() {
assert!(flag);
} else {
unreachable!("result err.");
Expand Down
8 changes: 4 additions & 4 deletions cmd/starcoin/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use anyhow::format_err;
use serde::{Deserialize, Serialize};
use starcoin_account_api::AccountInfo;
use starcoin_crypto::HashValue;
use starcoin_rpc_api::chain::GetEventResponse;
pub use starcoin_rpc_api::types::TransactionOutputView;
use starcoin_rpc_api::types::{
DryRunOutputView, RawUserTransactionView, StrView, TransactionEventView, TransactionInfoView,
DryRunOutputView, RawUserTransactionView, StrView, TransactionEventResponse,
TransactionEventView, TransactionInfoView,
};
use starcoin_types::account_address::AccountAddress;
use starcoin_types::account_config::{DepositEvent, MintEvent, WithdrawEvent};
Expand Down Expand Up @@ -312,7 +312,7 @@ impl ExecuteResultView {
pub struct ExecutionOutputView {
pub txn_hash: HashValue,
pub txn_info: Option<TransactionInfoView>,
pub events: Option<Vec<GetEventResponse>>,
pub events: Option<Vec<TransactionEventResponse>>,
}

impl ExecutionOutputView {
Expand All @@ -327,7 +327,7 @@ impl ExecutionOutputView {
pub fn new_with_info(
txn_hash: HashValue,
txn_info: TransactionInfoView,
events: Vec<GetEventResponse>,
events: Vec<TransactionEventResponse>,
) -> Self {
Self {
txn_hash,
Expand Down
14 changes: 3 additions & 11 deletions rpc/api/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ pub use self::gen_client::Client as ChainClient;
use crate::types::pubsub::EventFilter;
use crate::types::{
BlockHeaderView, BlockSummaryView, BlockView, ChainId, ChainInfoView, EpochUncleSummaryView,
TransactionEventView, TransactionInfoView, TransactionView,
TransactionEventResponse, TransactionInfoView, TransactionView,
};
use crate::FutureResult;
use jsonrpc_core::Result;
use jsonrpc_derive::rpc;
use serde::{Deserialize, Serialize};
use starcoin_abi_decoder::DecodedMoveValue;
use starcoin_crypto::HashValue;
use starcoin_types::block::{BlockInfo, BlockNumber};
use starcoin_vm_types::on_chain_resource::{EpochInfo, GlobalTimeOnChain};
Expand Down Expand Up @@ -78,14 +77,14 @@ pub trait ChainApi {
&self,
txn_hash: HashValue,
option: Option<GetEventOption>,
) -> FutureResult<Vec<GetEventResponse>>;
) -> FutureResult<Vec<TransactionEventResponse>>;

#[rpc(name = "chain.get_events")]
fn get_events(
&self,
filter: EventFilter,
option: Option<GetEventOption>,
) -> FutureResult<Vec<GetEventResponse>>;
) -> FutureResult<Vec<TransactionEventResponse>>;

/// Get current epoch info.
#[rpc(name = "chain.epoch")]
Expand Down Expand Up @@ -135,10 +134,3 @@ pub struct GetEventOption {
#[serde(default)]
pub decode: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GetEventResponse {
pub decode_event_data: Option<DecodedMoveValue>,
#[serde(flatten)]
pub event: TransactionEventView,
}
5 changes: 5 additions & 0 deletions rpc/api/src/contract_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::types::{
};
use crate::FutureResult;
use jsonrpc_derive::rpc;
use starcoin_abi_decoder::DecodedMoveValue;
use starcoin_abi_types::{ModuleABI, ScriptFunctionABI, StructABI};
use starcoin_vm_types::account_address::AccountAddress;
use starcoin_vm_types::language_storage::{ModuleId, StructTag};
Expand All @@ -28,6 +29,10 @@ pub trait ContractApi {
#[rpc(name = "contract.call")]
fn call(&self, call: ContractCall) -> FutureResult<Vec<AnnotatedMoveValueView>>;

/// Call a move contract, return move values.
#[rpc(name = "contract.call_v2")]
fn call_v2(&self, call: ContractCall) -> FutureResult<Vec<DecodedMoveValue>>;

#[rpc(name = "contract.dry_run")]
fn dry_run(&self, txn: DryRunTransactionRequest) -> FutureResult<DryRunOutputView>;

Expand Down
18 changes: 12 additions & 6 deletions rpc/api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@
mod node_api_types;
pub mod pubsub;

pub use node_api_types::*;

use bcs_ext::BCSCodec;
use hex::FromHex;
use jsonrpc_core_client::RpcChannel;
pub use node_api_types::*;
use serde::de::{DeserializeOwned, Error};
use serde::{Deserialize, Serializer};
use serde::{Deserializer, Serialize};
pub use starcoin_abi_decoder::DecodedMoveValue;
use starcoin_abi_decoder::{
DecodedMoveValue, DecodedPackage, DecodedScript, DecodedScriptFunction,
DecodedTransactionPayload,
DecodedPackage, DecodedScript, DecodedScriptFunction, DecodedTransactionPayload,
};
use starcoin_abi_types::ModuleABI;
use starcoin_crypto::{CryptoMaterialError, HashValue, ValidCryptoMaterialStringExt};
Expand Down Expand Up @@ -874,14 +873,21 @@ impl From<DiscardedVMStatus> for TransactionStatusView {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TransactionEventResponse {
#[serde(skip_serializing_if = "Option::is_none")]
pub decode_event_data: Option<DecodedMoveValue>,
#[serde(flatten)]
pub event: TransactionEventView,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct TransactionEventView {
pub block_hash: Option<HashValue>,
pub block_number: Option<StrView<BlockNumber>>,
pub transaction_hash: Option<HashValue>,
// txn index in block
pub transaction_index: Option<u32>,

pub data: StrView<Vec<u8>>,
pub type_tag: TypeTag,
pub event_key: EventKey,
Expand Down
14 changes: 11 additions & 3 deletions rpc/api/src/types/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::errors;
use crate::types::{BlockView, TransactionEventView, TypeTagView};
use crate::types::{BlockView, TransactionEventResponse, TypeTagView};
use jsonrpc_core::error::Error as JsonRpcError;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down Expand Up @@ -39,7 +39,7 @@ pub enum Result {
Block(Box<BlockView>),
/// Transaction hash
TransactionHash(Vec<HashValue>),
Event(Box<TransactionEventView>),
Event(Box<TransactionEventResponse>),
MintBlock(Box<MintBlockEvent>),
}

Expand All @@ -63,7 +63,7 @@ pub enum Params {
/// No parameters passed.
None,
/// Log parameters.
Events(EventFilter),
Events(EventParams),
}

impl Default for Params {
Expand All @@ -89,6 +89,14 @@ impl<'a> Deserialize<'a> for Params {
}
}

#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Eq, Hash)]
pub struct EventParams {
#[serde(flatten)]
pub filter: EventFilter,
#[serde(default)]
pub decode: bool,
}

/// Filter
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Eq, Hash)]
#[serde(deny_unknown_fields)]
Expand Down
20 changes: 10 additions & 10 deletions rpc/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@ use serde_json::Value;
use starcoin_account_api::AccountInfo;
use starcoin_crypto::HashValue;
use starcoin_logger::{prelude::*, LogPattern};
use starcoin_rpc_api::chain::{
GetBlockOption, GetEventOption, GetEventResponse, GetTransactionOption,
};
use starcoin_rpc_api::chain::{GetBlockOption, GetEventOption, GetTransactionOption};
use starcoin_rpc_api::node::NodeInfo;
use starcoin_rpc_api::service::RpcAsyncService;
use starcoin_rpc_api::state::{
GetCodeOption, GetResourceOption, ListCodeOption, ListResourceOption,
};
use starcoin_rpc_api::types::pubsub::EventFilter;
use starcoin_rpc_api::types::{
AccountStateSetView, AnnotatedMoveStructView, AnnotatedMoveValueView, BlockHeaderView,
BlockSummaryView, BlockView, ChainId, ChainInfoView, CodeView, ContractCall, DryRunOutputView,
AccountStateSetView, AnnotatedMoveStructView, BlockHeaderView, BlockSummaryView, BlockView,
ChainId, ChainInfoView, CodeView, ContractCall, DecodedMoveValue, DryRunOutputView,
DryRunTransactionRequest, EpochUncleSummaryView, FactoryAction, ListCodeView, ListResourceView,
MintedBlockView, PeerInfoView, ResourceView, SignedMessageView, SignedUserTransactionView,
StateWithProofView, StrView, TransactionInfoView, TransactionRequest, TransactionView,
StateWithProofView, StrView, TransactionEventResponse, TransactionInfoView, TransactionRequest,
TransactionView,
};
use starcoin_rpc_api::{
account::AccountClient, chain::ChainClient, contract_api::ContractClient, debug::DebugClient,
Expand Down Expand Up @@ -560,8 +559,8 @@ impl RpcClient {
.map_err(map_err)
}

pub fn contract_call(&self, call: ContractCall) -> anyhow::Result<Vec<AnnotatedMoveValueView>> {
self.call_rpc_blocking(|inner| inner.contract_client.call(call))
pub fn contract_call(&self, call: ContractCall) -> anyhow::Result<Vec<DecodedMoveValue>> {
self.call_rpc_blocking(|inner| inner.contract_client.call_v2(call))
.map_err(map_err)
}

Expand Down Expand Up @@ -706,7 +705,7 @@ impl RpcClient {
&self,
txn_hash: HashValue,
option: Option<GetEventOption>,
) -> anyhow::Result<Vec<GetEventResponse>> {
) -> anyhow::Result<Vec<TransactionEventResponse>> {
self.call_rpc_blocking(|inner| inner.chain_client.get_events_by_txn_hash(txn_hash, option))
.map_err(map_err)
}
Expand Down Expand Up @@ -777,9 +776,10 @@ impl RpcClient {
pub fn subscribe_events(
&self,
filter: EventFilter,
decode: bool,
) -> anyhow::Result<impl TryStream<Ok = TransactionEventView, Error = anyhow::Error>> {
self.call_rpc_blocking(|inner| async move {
let res = inner.pubsub_client.subscribe_events(filter).await;
let res = inner.pubsub_client.subscribe_events(filter, decode).await;
res.map(|s| s.map_err(map_err))
})
.map_err(map_err)
Expand Down
Loading