Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

fix(core): serialize filters properly and always rewake #61

Merged
merged 3 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 112 additions & 15 deletions ethers-core/src/types/chainstate/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
types::{Address, BlockNumber, Bytes, H256, U256, U64},
utils::keccak256,
};
use serde::{Deserialize, Serialize, Serializer};
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
use std::str::FromStr;

/// A log produced by a transaction.
Expand Down Expand Up @@ -64,18 +64,15 @@ pub struct Log {
}

/// Filter for
#[derive(Default, Debug, PartialEq, Clone, Serialize)]
#[derive(Default, Debug, PartialEq, Clone)]
pub struct Filter {
/// From Block
#[serde(rename = "fromBlock", skip_serializing_if = "Option::is_none")]
pub from_block: Option<BlockNumber>,

/// To Block
#[serde(rename = "toBlock", skip_serializing_if = "Option::is_none")]
pub to_block: Option<BlockNumber>,

/// Address
#[serde(skip_serializing_if = "Option::is_none")]
// TODO: The spec says that this can also be an array, do we really want to
// monitor for the same event for multiple contracts?
address: Option<Address>,
Expand All @@ -86,10 +83,48 @@ pub struct Filter {
pub topics: [Option<ValueOrArray<H256>>; 4],

/// Limit
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<usize>,
}

impl Serialize for Filter {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut s = serializer.serialize_struct("Filter", 5)?;
if let Some(ref from_block) = self.from_block {
s.serialize_field("fromBlock", from_block)?;
}

if let Some(ref to_block) = self.to_block {
s.serialize_field("toBlock", to_block)?;
}

if let Some(ref address) = self.address {
s.serialize_field("address", address)?;
}

let mut filtered_topics = Vec::new();
for i in 0..4 {
if self.topics[i].is_some() {
filtered_topics.push(&self.topics[i]);
} else {
// TODO: This can be optimized
if self.topics[i + 1..].iter().any(|x| x.is_some()) {
filtered_topics.push(&None);
}
}
}
s.serialize_field("topics", &filtered_topics)?;

if let Some(ref limit) = self.limit {
s.serialize_field("limit", limit)?;
}

s.end()
}
}

impl Filter {
pub fn new() -> Self {
Self::default()
Expand Down Expand Up @@ -205,21 +240,83 @@ where
mod tests {
use super::*;
use crate::utils::serialize;
use serde_json::json;

#[test]
fn filter_serialization_test() {
let t1 = "9729a6fbefefc8f6005933898b13dc45c3a2c8b7"
.parse::<Address>()
.unwrap();
let t2 = H256::from([0; 32]);
let t3 = U256::from(123);
let filter = Filter::new()
.address_str("f817796F60D268A36a57b8D2dF1B97B14C0D0E1d")
.unwrap()
.event("ValueChanged(address,string,string)") // event name
.topic1(t1)
.topic2(t3);

let ser = serialize(&filter).to_string();
assert_eq!(ser, "{\"address\":\"0xf817796f60d268a36a57b8d2df1b97b14c0d0e1d\",\"topics\":[\"0xe826f71647b8486f2bae59832124c70792fba044036720a54ec8dacdd5df4fcb\",\"0x0000000000000000000000009729a6fbefefc8f6005933898b13dc45c3a2c8b7\",\"0x000000000000000000000000000000000000000000000000000000000000007b\",null]}");

let t1_padded = H256::from(t1);
let t3_padded = H256::from({
let mut x = [0; 32];
x[31] = 123;
x
});

let event = "ValueChanged(address,string,string)";
let t0 = H256::from(keccak256(event.as_bytes()));
let addr = Address::from_str("f817796F60D268A36a57b8D2dF1B97B14C0D0E1d").unwrap();
let filter = Filter::new();

let ser = serialize(&filter.clone());
assert_eq!(ser, json!({ "topics": [] }));

let filter = filter.address(addr);

let ser = serialize(&filter.clone());
assert_eq!(ser, json!({"address" : addr, "topics": []}));

let filter = filter.event(event);

// 0
let ser = serialize(&filter.clone());
assert_eq!(ser, json!({ "address" : addr, "topics": [t0]}));

// 1
let ser = serialize(&filter.clone().topic1(t1));
assert_eq!(ser, json!({ "address" : addr, "topics": [t0, t1_padded]}));

// 2
let ser = serialize(&filter.clone().topic2(t2));
assert_eq!(ser, json!({ "address" : addr, "topics": [t0, null, t2]}));

// 3
let ser = serialize(&filter.clone().topic3(t3));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, null, null, t3_padded]})
);

// 1 & 2
let ser = serialize(&filter.clone().topic1(t1).topic2(t2));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, t1_padded, t2]})
);

// 1 & 3
let ser = serialize(&filter.clone().topic1(t1).topic3(t3));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, t1_padded, null, t3_padded]})
);

// 2 & 3
let ser = serialize(&filter.clone().topic2(t2).topic3(t3));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, null, t2, t3_padded]})
);

// 1 & 2 & 3
let ser = serialize(&filter.clone().topic1(t1).topic2(t2).topic3(t3));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, t1_padded, t2, t3_padded]})
);
}
}
13 changes: 7 additions & 6 deletions ethers-providers/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,20 @@ where
// in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error).
let items: Vec<R> = futures_util::ready!(fut.poll_unpin(cx)).unwrap_or_default();
cx.waker().wake_by_ref();
FilterWatcherState::NextItem(items.into_iter())
}
// Consume 1 element from the vector. If more elements are in the vector,
// the next call will immediately go to this branch instead of trying to get
// filter changes again. Once the whole vector is consumed, it will poll again
// for new logs
FilterWatcherState::NextItem(iter) => match iter.next() {
Some(item) => return Poll::Ready(Some(item)),
None => {
cx.waker().wake_by_ref();
FilterWatcherState::WaitForInterval
FilterWatcherState::NextItem(iter) => {
cx.waker().wake_by_ref();
match iter.next() {
Some(item) => return Poll::Ready(Some(item)),
None => FilterWatcherState::WaitForInterval,
}
},
}
};

Poll::Pending
Expand Down