Skip to content

Commit

Permalink
Merge pull request #33 from darkforestry/fix/v3-sync-from-log
Browse files Browse the repository at this point in the history
Fix/v3 sync from log
  • Loading branch information
0xKitsune authored May 6, 2023
2 parents b2022a2 + 5c88844 commit f85831c
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 92 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "damms"
version = "0.4.6"
version = "0.4.7"
edition = "2021"


Expand Down
86 changes: 62 additions & 24 deletions src/amm/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ethers::{
types::{BlockNumber, Filter, Log, ValueOrArray, H160, H256, U64},
};
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;

use crate::errors::DAMMError;

Expand All @@ -15,11 +16,13 @@ use super::{
AMM,
};

pub const TASK_LIMIT: usize = 10;

#[async_trait]
pub trait AutomatedMarketMakerFactory {
fn address(&self) -> H160;

async fn get_all_amms<M: Middleware>(
async fn get_all_amms<M: 'static + Middleware>(
&self,
to_block: Option<u64>,
middleware: Arc<M>,
Expand All @@ -36,7 +39,7 @@ pub trait AutomatedMarketMakerFactory {

fn creation_block(&self) -> u64;

async fn new_amm_from_log<M: Middleware>(
async fn new_amm_from_log<M: 'static + Middleware>(
&self,
log: Log,
middleware: Arc<M>,
Expand Down Expand Up @@ -67,7 +70,7 @@ impl AutomatedMarketMakerFactory for Factory {
}
}

async fn new_amm_from_log<M: Middleware>(
async fn new_amm_from_log<M: 'static + Middleware>(
&self,
log: Log,
middleware: Arc<M>,
Expand All @@ -85,7 +88,7 @@ impl AutomatedMarketMakerFactory for Factory {
}
}

async fn get_all_amms<M: Middleware>(
async fn get_all_amms<M: 'static + Middleware>(
&self,
to_block: Option<u64>,
middleware: Arc<M>,
Expand Down Expand Up @@ -125,40 +128,75 @@ impl AutomatedMarketMakerFactory for Factory {
impl Factory {
pub async fn get_all_pools_from_logs<M: 'static + Middleware>(
&self,
from_block: u64,
mut from_block: u64,
to_block: u64,
step: usize,
step: u64,
middleware: Arc<M>,
) -> Result<Vec<AMM>, DAMMError<M>> {
let factory_address = self.address();
let amm_created_event_signature = self.amm_created_event_signature();
let mut log_group = vec![];
let mut handles = vec![];
let mut tasks = 0;
let mut aggregated_amms: Vec<AMM> = vec![];

//For each block within the range, get all pairs asynchronously
for from_block in (from_block..=to_block).step_by(step) {
let provider = middleware.clone();
while from_block < to_block {
let middleware = middleware.clone();
let mut target_block = from_block + step - 1;
if target_block > to_block {
target_block = to_block;
}

//Get pair created event logs within the block range
let to_block = from_block + step as u64;
handles.push(tokio::spawn(async move {
let logs = middleware
.get_logs(
&Filter::new()
.topic0(ValueOrArray::Value(amm_created_event_signature))
.address(factory_address)
.from_block(BlockNumber::Number(U64([from_block])))
.to_block(BlockNumber::Number(U64([target_block]))),
)
.await
.map_err(DAMMError::MiddlewareError)?;

let logs = provider
.get_logs(
&Filter::new()
.topic0(ValueOrArray::Value(self.amm_created_event_signature()))
.address(self.address())
.from_block(BlockNumber::Number(U64([from_block])))
.to_block(BlockNumber::Number(U64([to_block]))),
)
.await
.map_err(DAMMError::MiddlewareError)?;
Ok::<Vec<Log>, DAMMError<M>>(logs)
}));

for log in logs {
let amm = self.new_empty_amm_from_log(log)?;
aggregated_amms.push(amm);
from_block += step;
tasks += 1;
if tasks == TASK_LIMIT {
self.process_logs_from_handles(handles, &mut log_group)
.await?;

handles = vec![];
tasks = 0;
}
}

self.process_logs_from_handles(handles, &mut log_group)
.await?;

for log in log_group {
aggregated_amms.push(self.new_empty_amm_from_log(log)?);
}

Ok(aggregated_amms)
}

async fn process_logs_from_handles<M: Middleware>(
&self,
handles: Vec<JoinHandle<Result<Vec<Log>, DAMMError<M>>>>,
log_group: &mut Vec<Log>,
) -> Result<(), DAMMError<M>> {
for handle in handles {
let logs = handle.await??;
for log in logs {
log_group.push(log);
}
}
Ok(())
}

pub fn new_empty_factory_from_event_signature(event_signature: H256) -> Self {
if event_signature == PAIR_CREATED_EVENT_SIGNATURE {
Factory::UniswapV2Factory(UniswapV2Factory::default())
Expand Down
2 changes: 1 addition & 1 deletion src/amm/uniswap_v2/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl AutomatedMarketMakerFactory for UniswapV2Factory {
PAIR_CREATED_EVENT_SIGNATURE
}

async fn new_amm_from_log<M: Middleware>(
async fn new_amm_from_log<M: 'static + Middleware>(
&self,
log: Log,
middleware: Arc<M>,
Expand Down
120 changes: 86 additions & 34 deletions src/amm/uniswap_v3/factory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};

use async_trait::async_trait;
use ethers::{
Expand All @@ -8,10 +11,14 @@ use ethers::{
types::{BlockNumber, Filter, Log, H160, H256, U256, U64},
};
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;

use crate::{
amm::{factory::AutomatedMarketMakerFactory, AutomatedMarketMaker, AMM},
errors::DAMMError,
amm::{
factory::{AutomatedMarketMakerFactory, TASK_LIMIT},
AutomatedMarketMaker, AMM,
},
errors::{DAMMError, EventLogError},
};

use super::{batch_request, UniswapV3Pool, BURN_EVENT_SIGNATURE, MINT_EVENT_SIGNATURE};
Expand Down Expand Up @@ -51,7 +58,7 @@ impl AutomatedMarketMakerFactory for UniswapV3Factory {
POOL_CREATED_EVENT_SIGNATURE
}

async fn new_amm_from_log<M: Middleware>(
async fn new_amm_from_log<M: 'static + Middleware>(
&self,
log: Log,
middleware: Arc<M>,
Expand All @@ -71,7 +78,7 @@ impl AutomatedMarketMakerFactory for UniswapV3Factory {
}
}

async fn get_all_amms<M: Middleware>(
async fn get_all_amms<M: 'static + Middleware>(
&self,
to_block: Option<u64>,
middleware: Arc<M>,
Expand Down Expand Up @@ -99,9 +106,6 @@ impl AutomatedMarketMakerFactory for UniswapV3Factory {
middleware.clone(),
)
.await?;

//TODO: add back progress bars
// progress_bar.inc(step as u64);
}
} else {
return Err(DAMMError::BlockNumberNotFound);
Expand Down Expand Up @@ -139,40 +143,63 @@ impl UniswapV3Factory {
}

//Function to get all pair created events for a given Dex factory address and sync pool data
pub async fn get_all_pools_from_logs<M: Middleware>(
pub async fn get_all_pools_from_logs<M: 'static + Middleware>(
self,
to_block: u64,
step: usize,
step: u64,
middleware: Arc<M>,
) -> Result<Vec<AMM>, DAMMError<M>> {
//Unwrap can be used here because the creation block was verified within `Dex::new()`
let from_block = self.creation_block;

let mut from_block = self.creation_block;
let mut aggregated_amms: HashMap<H160, AMM> = HashMap::new();
let mut ordered_logs: BTreeMap<U64, Vec<Log>> = BTreeMap::new();

//For each block within the range, get all pairs asynchronously
for from_block in (from_block..=to_block).step_by(step) {
let provider = middleware.clone();

//Get pair created event logs within the block range
let to_block = from_block + step as u64 - 1;

let logs = provider
.get_logs(
&Filter::new()
.topic0(vec![
POOL_CREATED_EVENT_SIGNATURE,
BURN_EVENT_SIGNATURE,
MINT_EVENT_SIGNATURE,
])
.from_block(BlockNumber::Number(U64([from_block])))
.to_block(BlockNumber::Number(U64([to_block]))),
)
.await
.map_err(DAMMError::MiddlewareError)?;
let mut handles = vec![];

//For each pair created log, create a new Pair type and add it to the pairs vec
for log in logs {
let mut tasks = 0;
while from_block < to_block {
let middleware = middleware.clone();

let mut target_block = from_block + step - 1;
if target_block > to_block {
target_block = to_block;
}

handles.push(tokio::spawn(async move {
let logs = middleware
.get_logs(
&Filter::new()
.topic0(vec![
POOL_CREATED_EVENT_SIGNATURE,
BURN_EVENT_SIGNATURE,
MINT_EVENT_SIGNATURE,
])
.from_block(BlockNumber::Number(U64([from_block])))
.to_block(BlockNumber::Number(U64([target_block]))),
)
.await
.map_err(DAMMError::MiddlewareError)?;

Ok::<Vec<Log>, DAMMError<M>>(logs)
}));

from_block += step;

tasks += 1;
//Here we are limiting the number of green threads that can be spun up to not have the node time out
if tasks == TASK_LIMIT {
self.process_logs_from_handles(handles, &mut ordered_logs)
.await?;
handles = vec![];
tasks = 0;
}
}

self.process_logs_from_handles(handles, &mut ordered_logs)
.await?;

for (_, log_group) in ordered_logs {
for log in log_group {
let event_signature = log.topics[0];

//If the event sig is the pool created event sig, then the log is coming from the factory
Expand All @@ -198,6 +225,31 @@ impl UniswapV3Factory {
}
}
}

Ok(aggregated_amms.into_values().collect::<Vec<AMM>>())
}

async fn process_logs_from_handles<M: Middleware>(
&self,
handles: Vec<JoinHandle<Result<Vec<Log>, DAMMError<M>>>>,
ordered_logs: &mut BTreeMap<U64, Vec<Log>>,
) -> Result<(), DAMMError<M>> {
// group the logs from each thread by block number and then sync the logs in chronological order
for handle in handles {
let logs = handle.await??;

for log in logs {
if let Some(log_block_number) = log.block_number {
if let Some(log_group) = ordered_logs.get_mut(&log_block_number) {
log_group.push(log);
} else {
ordered_logs.insert(log_block_number, vec![log]);
}
} else {
return Err(EventLogError::LogBlockNumberNotFound)?;
}
}
}
Ok(())
}
}
Loading

0 comments on commit f85831c

Please sign in to comment.