Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hermes support #54

Merged
merged 10 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions examples/read_hermes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python3

import asyncio

from pythclient.hermes import HermesClient

async def get_hermes_prices():
hermes_client = HermesClient([])
feed_ids = await hermes_client.get_price_feed_ids()
feed_ids_rel = feed_ids[:2]

hermes_client.add_feed_ids(feed_ids_rel)

prices_latest = await hermes_client.get_all_prices()

sd = list(prices_latest.keys())[0]
anihamde marked this conversation as resolved.
Show resolved Hide resolved
import pdb; pdb.set_trace()
anihamde marked this conversation as resolved.
Show resolved Hide resolved

for feed_id, price_feed in prices_latest.items():
print("Initial prices")
price_latest = price_feed["price"].price
conf_latest = price_feed["price"].conf
print(f"Feed ID: {feed_id}, Price: {price_latest}, Confidence: {conf_latest}, Time: {price_feed['price'].publish_time}")

print("Starting web socket...")
ws_call = hermes_client.ws_pyth_prices()
asyncio.create_task(ws_call)

while True:
await asyncio.sleep(5)
print("Latest prices:")
for feed_id, price_feed in hermes_client.prices_dict.items():
print(f"Feed ID: {feed_id}, Price: {price_latest}, Confidence: {conf_latest}, Time: {price_feed['price'].publish_time}")

asyncio.run(get_hermes_prices())
173 changes: 173 additions & 0 deletions pythclient/hermes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import asyncio
from typing import TypedDict

import httpx
import os

from .price_feeds import Price

HERMES_ENDPOINT_HTTPS = "https://hermes.pyth.network/api/"
HERMES_ENDPOINT_WSS = "wss://hermes.pyth.network/ws"


class PriceFeed(TypedDict):
feed_id: str
price: Price
ema_price: Price
vaa: str
Copy link
Contributor

@cctdaniel cctdaniel Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you follow the v1 api schema? maybe good to implement for v2 instead since that is live already e.g. https://hermes.pyth.network/docs/#/rest/latest_price_updates
but happy to defer to @ali-bahjati on what the right action here is




class HermesClient:
def __init__(self, feed_ids: list[str], endpoint=HERMES_ENDPOINT_HTTPS, ws_endpoint=HERMES_ENDPOINT_WSS):
self.feed_ids = feed_ids
self.pending_feed_ids = feed_ids
self.prices_dict: dict[str, PriceFeed] = {}
self.client = httpx.AsyncClient()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the httpx.AsyncClient is instantiated but not explicitly closed, which could potentially lead to resource leaks, maybe use the client as a context manager to ensure it's properly closed e.g.

    async with self.client as client:
        data = (await client.get(url)).json()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that with context manager usage, any one call will close the client which makes it impossible to open again. I think using asyncio.run manages the resource leakage by closing all tasks running within the event loop whenever it's shut down: https://github.com/python/cpython/blob/c0b0c2f2015fb27db4306109b2b3781eb2057c2b/Lib/asyncio/runners.py#L64-L79

self.endpoint = endpoint
self.ws_endpoint = ws_endpoint

async def get_price_feed_ids(self) -> list[str]:
"""
Queries the Hermes https endpoint for a list of the IDs of all Pyth price feeds.
"""

url = os.path.join(self.endpoint, "price_feed_ids")

client = httpx.AsyncClient()

data = (await client.get(url)).json()
anihamde marked this conversation as resolved.
Show resolved Hide resolved

return data

def add_feed_ids(self, feed_ids: list[str]):
self.feed_ids += feed_ids
self.feed_ids = list(set(self.feed_ids))
self.pending_feed_ids += feed_ids
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does self.pending_feed_ids not require uniqueness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we always call list(set(self.feed_ids)), so the feed_ids will always be unique

Copy link
Contributor

@cctdaniel cctdaniel Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's possible for self.pending_feed_ids to contain duplicates if the same feed IDs are added more than once before they are cleared, for e.g.

  • add_feed_ids method is called with ['feed1', 'feed2']
  • before any operation that clears self.pending_feed_ids is performed, add_feed_ids is called again with ['feed2', 'feed3']
  • self.feed_ids will now contain ['feed1', 'feed2', 'feed3'] without duplicates because it's converted to a set and then back to a list
  • however, self.pending_feed_ids will contain ['feed1', 'feed2', 'feed2', 'feed3'], where 'feed2' is duplicated.

Copy link
Contributor

@cctdaniel cctdaniel Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo you can do something like this to make it cleaner

    # convert feed_ids to a set to remove any duplicates from the input
    new_feed_ids_set = set(feed_ids)
    
    # update self.feed_ids; convert to set for union operation, then back to list
    self.feed_ids = list(set(self.feed_ids).union(new_feed_ids_set))
    
    # update self.pending_feed_ids with only those IDs that are truly new
    self.pending_feed_ids = list(set(self.pending_feed_ids).union(new_feed_ids_set))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another thing that we can possibly do here is to validate that the feed_ids passed in is in the format we expect
e.g.

def validate_feed_ids(self, feed_ids: list[str]):
    """
    Validates the format of feed IDs. Each ID should be a 64-character hexadecimal string,
    optionally prefixed with '0x'.
    """
    hex_pattern = re.compile(r'^0x[a-fA-F0-9]{64}$|^[a-fA-F0-9]{64}$')
    for feed_id in feed_ids:
        if not hex_pattern.match(feed_id):
            raise ValueError(f"Invalid feed ID format: {feed_id}")

and then call validate_feed_ids in the first line of add_feed_ids and write tests to ensure that invalid ids (e.g. 30 chars, invalid chars, etc) throw an error


@staticmethod
def extract_price_feed(data: dict) -> PriceFeed:
"""
Extracts a PriceFeed object from the JSON response from Hermes.
"""
price = Price.from_dict(data["price"])
ema_price = Price.from_dict(data["ema_price"])
vaa = data["vaa"]
price_feed = {
"feed_id": data["id"],
"price": price,
"ema_price": ema_price,
"vaa": vaa,
}
return price_feed

async def get_pyth_prices_latest(self, feedIds: list[str]) -> list[PriceFeed]:
"""
Queries the Hermes https endpoint for the latest price feeds for a list of Pyth feed IDs.
"""
url = os.path.join(self.endpoint, "latest_price_feeds?")
params = {"ids[]": feedIds, "binary": "true"}

data = (await self.client.get(url, params=params)).json()

results = []
for res in data:
price_feed = self.extract_price_feed(res)
results.append(price_feed)

return results

async def get_pyth_price_at_time(self, feed_id: str, timestamp: int) -> PriceFeed:
"""
Queries the Hermes https endpoint for the price feed for a Pyth feed ID at a given timestamp.
"""
url = os.path.join(self.endpoint, "get_price_feed")
params = {"id": feed_id, "publish_time": timestamp, "binary": "true"}

data = (await self.client.get(url, params=params)).json()

price_feed = self.extract_price_feed(data)

return price_feed

async def get_all_prices(self) -> dict[str, PriceFeed]:
"""
Queries the Hermes http endpoint for the latest price feeds for all feed IDs in the class object.

There are limitations on the number of feed IDs that can be queried at once, so this function queries the feed IDs in batches.
"""
pyth_prices_latest = []
i = 0
batch_size = 100
anihamde marked this conversation as resolved.
Show resolved Hide resolved
while len(self.feed_ids[i : i + batch_size]) > 0:
pyth_prices_latest += await self.get_pyth_prices_latest(
self.feed_ids[i : i + batch_size]
)
i += batch_size

return dict([(feed['feed_id'], feed) for feed in pyth_prices_latest])

async def ws_pyth_prices(self):
"""
Opens a websocket connection to Hermes for latest prices for all feed IDs in the class object.
"""
import json

import websockets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a specific reason to do this? otherwise I recommend to import it at top-level to improve readability and maintainability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we did this so that we didn't import these when we were using non-websocket client--i think it was more relevant where we had the code before, I think here it's def better to move to top


async with websockets.connect(self.ws_endpoint) as ws:
while True:
# add new price feed ids to the ws subscription
if len(self.pending_feed_ids) > 0:
json_subscribe = {
"ids": self.pending_feed_ids,
"type": "subscribe",
"verbose": True,
"binary": True,
}
await ws.send(json.dumps(json_subscribe))
self.pending_feed_ids = []

msg = json.loads(await ws.recv())
if msg.get("type") == "response":
if msg.get("status") != "success":
raise Exception("Error in subscribing to websocket")
try:
if msg["type"] != "price_update":
continue

feed_id = msg["price_feed"]["id"]
new_feed = msg["price_feed"]

self.prices_dict[feed_id] = self.extract_price_feed(new_feed)

except:
raise Exception("Error in price_update message", msg)
anihamde marked this conversation as resolved.
Show resolved Hide resolved


async def main():
hermes_client = HermesClient([])
feed_ids = await hermes_client.get_price_feed_ids()
feed_ids_rel = feed_ids[:50]

hermes_client.add_feed_ids(feed_ids_rel)

prices_latest = await hermes_client.get_pyth_prices_latest(feed_ids_rel)

try:
price_at_time = await hermes_client.get_pyth_price_at_time(feed_ids[0], 1_700_000_000)
except Exception as e:
print(f"Error in get_pyth_price_at_time, {e}")

all_prices = await hermes_client.get_all_prices()

print("Starting web socket...")
ws_call = hermes_client.ws_pyth_prices()
asyncio.create_task(ws_call)

while True:
await asyncio.sleep(1)


if __name__ == "__main__":
asyncio.run(main())
anihamde marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 16 additions & 1 deletion pythclient/price_feeds.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import base64
import binascii
from struct import unpack
from typing import List, Literal, Optional, Union, cast
from typing import List, Literal, Optional, Union, cast, TypedDict

from Crypto.Hash import keccak
from loguru import logger
Expand All @@ -17,6 +17,11 @@

MAX_MESSAGE_IN_SINGLE_UPDATE_DATA = 255

class PriceDict(TypedDict):
conf: str
expo: int
price: str
publish_time: int

class Price:
def __init__(self, conf, expo, price, publish_time) -> None:
Expand All @@ -35,6 +40,16 @@ def to_dict(self):
"price": self.price,
"publish_time": self.publish_time,
}

@staticmethod
def from_dict(price_dict: PriceDict):
return Price(
conf=int(price_dict["conf"]),
expo=price_dict["expo"],
price=int(price_dict["price"]),
publish_time=price_dict["publish_time"],
)



class PriceUpdate:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup

requirements = ['aiodns', 'aiohttp>=3.7.4', 'backoff', 'base58', 'flake8', 'loguru', 'typing-extensions', 'pytz', 'pycryptodome']
requirements = ['aiodns', 'aiohttp>=3.7.4', 'backoff', 'base58', 'flake8', 'loguru', 'typing-extensions', 'pytz', 'pycryptodome', 'httpx']

with open('README.md', 'r', encoding='utf-8') as fh:
long_description = fh.read()
Expand Down
15 changes: 15 additions & 0 deletions tests/test_hermes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pythclient.hermes import HermesClient, PriceFeed

BTC_ID = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC/USD
ETH_ID = "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace" # ETH/USD

async def test_hermes_return_price_feed_object():
# Test that the hermes get request returns a dict with same keys as PriceFeed
hermes_client = HermesClient([])
hermes_client.add_feed_ids([BTC_ID, ETH_ID])

all_prices = await hermes_client.get_all_prices()

assert isinstance(all_prices, dict)
assert set(all_prices[BTC_ID].keys()) == set(PriceFeed.__annotations__.keys())
assert set(all_prices[ETH_ID].keys()) == set(PriceFeed.__annotations__.keys())
Loading