Skip to content

Commit

Permalink
hermes support
Browse files Browse the repository at this point in the history
  • Loading branch information
ani authored and ani committed Feb 20, 2024
1 parent 177a31b commit ffdd934
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 1 deletion.
35 changes: 35 additions & 0 deletions examples/read_hermes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python3

import asyncio

from pythclient.hermes import HermesClient, PriceFeed

async def get_hermes_prices():
hermes_client = HermesClient([])
feed_ids = await hermes_client.get_price_feed_ids()
feed_ids_rel = feed_ids[:2]

hermes_client.add_feed_ids(feed_ids_rel)

prices_latest = await hermes_client.get_all_prices()

sd = list(prices_latest.keys())[0]
import pdb; pdb.set_trace()

for feed_id, price_feed in prices_latest.items():
print("Initial prices")
price_latest = price_feed["price"].price
conf_latest = price_feed["price"].conf
print(f"Feed ID: {feed_id}, Price: {price_latest}, Confidence: {conf_latest}, Time: {price_feed['price'].publish_time}")

print("Starting web socket...")
ws_call = hermes_client.ws_pyth_prices()
asyncio.create_task(ws_call)

while True:
await asyncio.sleep(5)
print("Latest prices:")
for feed_id, price_feed in hermes_client.prices_dict.items():
print(f"Feed ID: {feed_id}, Price: {price_latest}, Confidence: {conf_latest}, Time: {price_feed['price'].publish_time}")

asyncio.run(get_hermes_prices())
173 changes: 173 additions & 0 deletions pythclient/hermes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import asyncio
from typing import TypedDict

import httpx
import os

from .price_feeds import Price

HERMES_ENDPOINT_HTTPS = "https://hermes.pyth.network/api/"
HERMES_ENDPOINT_WSS = "wss://hermes.pyth.network/ws"


class PriceFeed(TypedDict):
feed_id: str
price: Price
ema_price: Price
vaa: str



class HermesClient:
def __init__(self, feed_ids: list[str], endpoint=HERMES_ENDPOINT_HTTPS, ws_endpoint=HERMES_ENDPOINT_WSS):
self.feed_ids = feed_ids
self.pending_feed_ids = feed_ids
self.prices_dict: dict[str, PriceFeed] = {}
self.client = httpx.AsyncClient()
self.endpoint = endpoint
self.ws_endpoint = ws_endpoint

async def get_price_feed_ids(self) -> list[str]:
"""
Queries the Hermes https endpoint for a list of the IDs of all Pyth price feeds.
"""

url = os.path.join(self.endpoint, "price_feed_ids")

client = httpx.AsyncClient()

data = (await client.get(url)).json()

return data

def add_feed_ids(self, feed_ids: list[str]):
self.feed_ids += feed_ids
self.feed_ids = list(set(self.feed_ids))
self.pending_feed_ids += feed_ids

@staticmethod
def extract_price_feed(data: dict) -> PriceFeed:
"""
Extracts a PriceFeed object from the JSON response from Hermes.
"""
price = Price.from_dict(data["price"])
ema_price = Price.from_dict(data["ema_price"])
vaa = data["vaa"]
price_feed = {
"feed_id": data["id"],
"price": price,
"ema_price": ema_price,
"vaa": vaa,
}
return price_feed

async def get_pyth_prices_latest(self, feedIds: list[str]) -> list[PriceFeed]:
"""
Queries the Hermes https endpoint for the latest price feeds for a list of Pyth feed IDs.
"""
url = os.path.join(self.endpoint, "latest_price_feeds?")
params = {"ids[]": feedIds, "binary": "true"}

data = (await self.client.get(url, params=params)).json()

results = []
for res in data:
price_feed = self.extract_price_feed(res)
results.append(price_feed)

return results

async def get_pyth_price_at_time(self, feed_id: str, timestamp: int) -> PriceFeed:
"""
Queries the Hermes https endpoint for the price feed for a Pyth feed ID at a given timestamp.
"""
url = os.path.join(self.endpoint, "get_price_feed")
params = {"id": feed_id, "publish_time": timestamp, "binary": "true"}

data = (await self.client.get(url, params=params)).json()

price_feed = self.extract_price_feed(data)

return price_feed

async def get_all_prices(self) -> dict[str, PriceFeed]:
"""
Queries the Hermes http endpoint for the latest price feeds for all feed IDs in the class object.
There are limitations on the number of feed IDs that can be queried at once, so this function queries the feed IDs in batches.
"""
pyth_prices_latest = []
i = 0
batch_size = 100
while len(self.feed_ids[i : i + batch_size]) > 0:
pyth_prices_latest += await self.get_pyth_prices_latest(
self.feed_ids[i : i + batch_size]
)
i += batch_size

return dict([(feed['feed_id'], feed) for feed in pyth_prices_latest])

async def ws_pyth_prices(self):
"""
Opens a websocket connection to Hermes for latest prices for all feed IDs in the class object.
"""
import json

import websockets

async with websockets.connect(self.ws_endpoint) as ws:
while True:
# add new price feed ids to the ws subscription
if len(self.pending_feed_ids) > 0:
json_subscribe = {
"ids": self.pending_feed_ids,
"type": "subscribe",
"verbose": True,
"binary": True,
}
await ws.send(json.dumps(json_subscribe))
self.pending_feed_ids = []

msg = json.loads(await ws.recv())
if msg.get("type") == "response":
if msg.get("status") != "success":
raise Exception("Error in subscribing to websocket")
try:
if msg["type"] != "price_update":
continue

feed_id = msg["price_feed"]["id"]
new_feed = msg["price_feed"]

self.prices_dict[feed_id] = self.extract_price_feed(new_feed)

except:
raise Exception("Error in price_update message", msg)


async def main():
hermes_client = HermesClient([])
feed_ids = await hermes_client.get_price_feed_ids()
feed_ids_rel = feed_ids[:50]

hermes_client.add_feed_ids(feed_ids_rel)

prices_latest = await hermes_client.get_pyth_prices_latest(feed_ids_rel)

try:
price_at_time = await hermes_client.get_pyth_price_at_time(feed_ids[0], 1_700_000_000)
except Exception as e:
print(f"Error in get_pyth_price_at_time, {e}")

all_prices = await hermes_client.get_all_prices()

print("Starting web socket...")
ws_call = hermes_client.ws_pyth_prices()
asyncio.create_task(ws_call)

while True:
await asyncio.sleep(1)


if __name__ == "__main__":
asyncio.run(main())
17 changes: 16 additions & 1 deletion pythclient/price_feeds.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import base64
import binascii
from struct import unpack
from typing import List, Literal, Optional, Union, cast
from typing import List, Literal, Optional, Union, cast, TypedDict

from Crypto.Hash import keccak
from loguru import logger
Expand All @@ -17,6 +17,11 @@

MAX_MESSAGE_IN_SINGLE_UPDATE_DATA = 255

class PriceDict(TypedDict):
conf: str
expo: int
price: str
publish_time: int

class Price:
def __init__(self, conf, expo, price, publish_time) -> None:
Expand All @@ -35,6 +40,16 @@ def to_dict(self):
"price": self.price,
"publish_time": self.publish_time,
}

@staticmethod
def from_dict(price_dict: PriceDict):
return Price(
conf=int(price_dict["conf"]),
expo=price_dict["expo"],
price=int(price_dict["price"]),
publish_time=price_dict["publish_time"],
)



class PriceUpdate:
Expand Down
15 changes: 15 additions & 0 deletions tests/test_hermes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pythclient.hermes import HermesClient, PriceFeed

BTC_ID = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC/USD
ETH_ID = "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace" # ETH/USD

async def test_hermes_return_price_feed_object():
# Test that the hermes get request returns a dict with same keys as PriceFeed
hermes_client = HermesClient([])
hermes_client.add_feed_ids([BTC_ID, ETH_ID])

all_prices = await hermes_client.get_all_prices()

assert isinstance(all_prices, dict)
assert set(all_prices[BTC_ID].keys()) == set(PriceFeed.__annotations__.keys())
assert set(all_prices[ETH_ID].keys()) == set(PriceFeed.__annotations__.keys())

0 comments on commit ffdd934

Please sign in to comment.