Skip to content

Commit

Permalink
feat: impl web3 subscription (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Feb 28, 2022
1 parent 71adcd2 commit cbc5b52
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
# async-graphql = { version = "3.0", features = ["tracing"] }
beef = "0.5"
cita_trie = "3.0"
jsonrpsee = { version = "0.9", features = ["http-server", "macros", "ws-server"] }
log = "0.4"
Expand Down
18 changes: 7 additions & 11 deletions core/api/src/jsonrpc/impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,7 @@ impl<Adapter: APIAdapter + 'static> AxonJsonRpcServer for JsonRpcImpl<Adapter> {

#[metrics_rpc("eth_getLogs")]
async fn get_logs(&self, filter: Web3Filter) -> RpcResult<Vec<Web3Log>> {
if filter.topics.is_none() {
return Ok(Vec::new());
}

let topics = filter.topics.unwrap();
let topics = filter.topics.unwrap_or_default();

#[allow(clippy::large_enum_variant)]
enum BlockPosition {
Expand All @@ -379,9 +375,9 @@ impl<Adapter: APIAdapter + 'static> AxonJsonRpcServer for JsonRpcImpl<Adapter> {
let log_len = receipt.logs.len();
match address {
Some(s) if s == &receipt.sender => {
from_receipt_to_web3_log(index, topics, receipt, logs)
from_receipt_to_web3_log(index, topics, &receipt, logs)
}
None => from_receipt_to_web3_log(index, topics, receipt, logs),
None => from_receipt_to_web3_log(index, topics, &receipt, logs),
_ => (),
}
index += log_len;
Expand Down Expand Up @@ -747,15 +743,15 @@ fn mock_header_by_call_req(latest_header: Header, call_req: &Web3CallRequest) ->
}
}

fn from_receipt_to_web3_log(
pub fn from_receipt_to_web3_log(
index: usize,
topics: &[H256],
receipt: Receipt,
receipt: &Receipt,
logs: &mut Vec<Web3Log>,
) {
for log in receipt.logs {
for log in &receipt.logs {
for (idx, topic) in log.topics.iter().enumerate() {
if topics.contains(topic) {
if topics.is_empty() || topics.contains(topic) {
let web3_log = Web3Log {
address: receipt.sender,
topics: log.topics.clone(),
Expand Down
24 changes: 17 additions & 7 deletions core/api/src/jsonrpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod r#impl;
mod web3_types;
mod ws_subscription;

use std::sync::Arc;

Expand All @@ -12,9 +13,12 @@ use protocol::traits::APIAdapter;
use protocol::types::{Hash, Hex, H160, H256, U256};
use protocol::ProtocolResult;

use crate::jsonrpc::web3_types::{
BlockId, Web3Block, Web3CallRequest, Web3FeeHistory, Web3Filter, Web3Log, Web3Receipt,
Web3SyncStatus, Web3Transaction,
use crate::jsonrpc::{
web3_types::{
BlockId, Web3Block, Web3CallRequest, Web3FeeHistory, Web3Filter, Web3Log, Web3Receipt,
Web3SyncStatus, Web3Transaction,
},
ws_subscription::{ws_subscription_module, HexIdProvider},
};

use crate::APIError;
Expand Down Expand Up @@ -182,16 +186,22 @@ pub async fn run_jsonrpc_server<Adapter: APIAdapter + 'static>(
let server = WsServerBuilder::new()
.max_request_body_size(config.rpc.max_payload_size as u32)
.max_connections(config.rpc.maxconn as u64)
.set_id_provider(HexIdProvider::default())
.build(addr)
.await
.map_err(|e| APIError::WebSocketServer(e.to_string()))?;

let mut rpc = r#impl::JsonRpcImpl::new(
Arc::clone(&adapter),
&config.rpc.client_version,
config.data_path,
)
.into_rpc();
rpc.merge(ws_subscription_module(adapter).await).unwrap();

ret.1 = Some(
server
.start(
r#impl::JsonRpcImpl::new(adapter, &config.rpc.client_version, config.data_path)
.into_rpc(),
)
.start(rpc)
.map_err(|e| APIError::WebSocketServer(e.to_string()))?,
)
}
Expand Down
45 changes: 43 additions & 2 deletions core/api/src/jsonrpc/web3_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use core_consensus::SyncStatus as InnerSyncStatus;
use protocol::codec::ProtocolCodec;
use protocol::types::{
AccessList, Block, Bloom, Bytes, Hash, Hex, Public, Receipt, SignedTransaction, H160, H256,
U256, U64,
AccessList, Block, Bloom, Bytes, Hash, Header, Hex, Public, Receipt, SignedTransaction, H160,
H256, U256, U64,
};

#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -517,6 +517,47 @@ pub struct Web3FeeHistory {
pub gas_used_ratio: Vec<U256>,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Web3Header {
pub difficulty: U256,
pub extra_data: Hex,
pub gas_limit: U256,
pub gas_used: U256,
pub logs_bloom: Option<Bloom>,
pub miner: H160,
pub nonce: U256,
pub number: U256,
pub parent_hash: H256,
pub receipts_root: H256,
#[serde(rename = "sha3Uncles")]
pub sha3_uncles: H256,
pub state_root: H256,
pub timestamp: U256,
pub transactions_root: H256,
}

impl From<Header> for Web3Header {
fn from(h: Header) -> Self {
Web3Header {
number: h.number.into(),
parent_hash: h.prev_hash,
sha3_uncles: Default::default(),
logs_bloom: Some(h.log_bloom),
transactions_root: h.transactions_root,
state_root: h.state_root,
receipts_root: h.receipts_root,
miner: h.proposer,
difficulty: h.difficulty,
extra_data: Hex::encode(&h.extra_data),
gas_limit: h.gas_limit,
gas_used: h.gas_used,
timestamp: h.timestamp.into(),
nonce: U256::default(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit cbc5b52

Please sign in to comment.