Skip to content

Commit

Permalink
impl basic db connection (#120)
Browse files Browse the repository at this point in the history
* impl basic db connection

* created depth type

* use mem::take to save copy from Depth creation

* add type alias for return values, added own type for trades

* fmt
  • Loading branch information
calumrussell authored Nov 11, 2024
1 parent a029b80 commit 8155aa6
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 10 deletions.
24 changes: 14 additions & 10 deletions data/hl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,38 @@
import lz4framed
import datetime


def path_builder(date, hour, coin):
return f"market_data/{date}/{hour}/l2Book/{coin}.lz4"


def parse_date(string):
return (string[0:4], string[4:6])


def zero_padding(number):
if number < 10:
return "0" + str(number)
return str(number)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog='HL Data Fetcher',
description='Downloads data from HL, unzips and places into directory')
prog="HL Data Fetcher",
description="Downloads data from HL, unzips and places into directory",
)

parser.add_argument('-o', '--outdir')
parser.add_argument('-c', '--coin')
parser.add_argument('-s', '--start')
parser.add_argument("-o", "--outdir")
parser.add_argument("-c", "--coin")
parser.add_argument("-s", "--start")

args = parser.parse_args()

max_year = 2024
hours = list(range(0, 24))
days = list(range(1,32))
days = list(range(1, 32))
months = list(range(1, 13))
client = boto3.client('s3', config=Config(signature_version=UNSIGNED))
client = boto3.client("s3", config=Config(signature_version=UNSIGNED))
bucket_name = "hyperliquid-archive"
now = datetime.datetime.now()

Expand All @@ -52,7 +57,7 @@ def zero_padding(number):
try:
then = datetime.datetime(year, month, day)
except ValueError:
#Occurs if date isn't valid
# Occurs if date isn't valid
continue

if then > now:
Expand All @@ -72,7 +77,7 @@ def zero_padding(number):
Bucket=bucket_name,
Key=key,
)
contents = response['Body'].read()
contents = response["Body"].read()
except Exception:
print(f"Didn't find - {key}")
continue
Expand All @@ -85,4 +90,3 @@ def zero_padding(number):
with open(f"{file_path}", "w") as f:
for chunk in chunks:
f.write(chunk)

2 changes: 2 additions & 0 deletions rotala/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
tokio = { version = "1.35.1", features = ["full"] }
anyhow = "1.0.91"
tokio-postgres = "0.7.12"
tokio-pg-mapper = { version = "0.2.0", features = ["derive"] }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
Expand Down
187 changes: 187 additions & 0 deletions rotala/src/input/minerva.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#![allow(dead_code)]
use std::collections::{BTreeMap, HashMap};

use anyhow::Result;
use serde::{Deserialize, Serialize};
use tokio_pg_mapper::FromTokioPostgresRow;
use tokio_postgres::{Client, NoTls};

pub struct Minerva {
db: Client,
}

#[derive(tokio_pg_mapper::PostgresMapper, Clone, Debug)]
#[pg_mapper(table = "l2Book")]
pub struct L2Book {
coin: String,
side: bool,
px: String,
sz: String,
time: i64,
}

#[derive(tokio_pg_mapper::PostgresMapper, Clone, Debug)]
#[pg_mapper(table = "trade")]
pub struct Trade {
coin: String,
side: String,
px: String,
sz: String,
hash: String,
time: i64,
tid: i64,
}

pub type TradeByDate = BTreeMap<i64, MinervaTrade>;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum Side {
Bid,
Ask,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MinervaTrade {
coin: String,
side: Side,
px: f64,
sz: f64,
time: i64,
}

impl From<Trade> for MinervaTrade {
fn from(value: Trade) -> Self {
let side = if value.side == "B" {
Side::Bid
} else {
Side::Ask
};

Self {
coin: value.coin,
side,
px: str::parse::<f64>(&value.px).unwrap(),
sz: str::parse::<f64>(&value.sz).unwrap(),
time: value.time,
}
}
}

pub type DepthByDate = BTreeMap<i64, Depth>;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Level {
pub price: f64,
pub size: f64,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Depth {
pub bids: Vec<Level>,
pub asks: Vec<Level>,
pub date: i64,
pub symbol: String,
}

impl From<Vec<L2Book>> for Depth {
fn from(values: Vec<L2Book>) -> Self {
let mut bids = Vec::with_capacity(5);
let mut asks = Vec::with_capacity(5);

let date = values.first().unwrap().time;
let symbol = values.first().unwrap().coin.clone();

for row in values {
match row.side {
true => bids.push(Level {
price: str::parse::<f64>(&row.px).unwrap(),
size: str::parse::<f64>(&row.sz).unwrap(),
}),
false => asks.push(Level {
price: str::parse::<f64>(&row.px).unwrap(),
size: str::parse::<f64>(&row.sz).unwrap(),
}),
}
}

Depth {
bids,
asks,
date,
symbol,
}
}
}

impl Minerva {
pub async fn new(connection_string: &str) -> Minerva {
if let Ok(client) = Minerva::get_connection(connection_string).await {
return Minerva { db: client };
}
panic!("Could not connect to database")
}

async fn get_connection(connection_string: &str) -> Result<Client> {
let (client, connection) = tokio_postgres::connect(connection_string, NoTls).await?;

tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});

Ok(client)
}

pub async fn get_trades(&self, start_date: &i64, end_date: &i64, coin: &str) -> TradeByDate {
let query_result = self
.db
.query(
"select * from trade where coin=$1::TEXT and time between $2 and $3",
&[&coin, &start_date, &end_date],
)
.await;

let mut res = BTreeMap::new();
if let Ok(rows) = query_result {
for row in rows {
if let Ok(trade) = Trade::from_row(row) {
let minerva_trade: MinervaTrade = trade.into();
res.insert(minerva_trade.time, minerva_trade);
}
}
}
res
}

pub async fn get_depth(&self, start_date: &i64, end_date: &i64, coin: &str) -> DepthByDate {
let query_result = self
.db
.query(
"select * from l2Book where coin=$1::TEXT and time between $2 and $3",
&[&coin, &start_date, &end_date],
)
.await;

let mut sort_into_dates = HashMap::new();
if let Ok(rows) = query_result {
for row in rows {
if let Ok(book) = L2Book::from_row(row) {
sort_into_dates.entry(book.time).or_insert_with(Vec::new);
sort_into_dates
.get_mut(&book.time)
.unwrap()
.push(book.clone());
}
}
}

let mut depth_result = BTreeMap::new();
for (date, rows) in sort_into_dates.iter_mut() {
let depth: Depth = std::mem::take(rows).into();
depth_result.insert(*date, depth);
}

depth_result
}
}
1 change: 1 addition & 0 deletions rotala/src/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
//! Sources should be called through inputs so that clients do not have to marshall data into internal
//! types.
pub mod athena;
pub mod minerva;
pub mod penelope;

0 comments on commit 8155aa6

Please sign in to comment.