Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast logs no hang #639

Open
wants to merge 59 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
be7fafb
feat: some treasury stuff
BobTheBuidler Oct 23, 2023
434def3
feat: delay curve loading
BobTheBuidler Oct 23, 2023
f986d71
chore: remove deprecated code
BobTheBuidler Oct 23, 2023
4ac8a42
chore: bump get_logs_asap concurrency
BobTheBuidler Oct 23, 2023
b9e7278
chore: add comment
BobTheBuidler Oct 23, 2023
2ff5bdd
feat: load strats in subthread to unblock loop
BobTheBuidler Oct 23, 2023
db6eb51
feat: drome apy previews
BobTheBuidler Oct 23, 2023
fd08797
feat: cleanup for prod
BobTheBuidler Oct 23, 2023
ebf8149
update make
0xBasically Oct 23, 2023
0292d91
update telegram msg
0xBasically Oct 23, 2023
5238e85
fix: labels
BobTheBuidler Oct 23, 2023
af440ee
chore: refactor
BobTheBuidler Oct 23, 2023
c73a26b
feat: factor in fees
BobTheBuidler Oct 23, 2023
c6cf9ad
feat: filter out lps with vaults
BobTheBuidler Oct 23, 2023
f275b1e
chore: cleanup
BobTheBuidler Oct 23, 2023
dd775d0
fix: ApyFees
BobTheBuidler Oct 23, 2023
d0d02f7
chore: refactor out points
BobTheBuidler Oct 23, 2023
16d2ffb
feat: refactor to prep for bal apy previews
BobTheBuidler Oct 23, 2023
5da48da
fix: BatchSizeError on base
BobTheBuidler Oct 23, 2023
0fd85a6
feat: filter 0 value transfers from treasury export
BobTheBuidler Oct 23, 2023
a50ae78
fix: BadRequest tg err
BobTheBuidler Oct 23, 2023
b774feb
feat: async log loading
BobTheBuidler Oct 23, 2023
aa5296d
feat: more stuff
BobTheBuidler Oct 23, 2023
38a3a68
chore: update .env.example
BobTheBuidler Oct 23, 2023
7c574a5
feat: more stuff
BobTheBuidler Oct 23, 2023
508a9f2
feat: ydb envs
BobTheBuidler Oct 23, 2023
17eb60f
feat: ydb link
BobTheBuidler Oct 23, 2023
0db72b7
fix: missing volume
BobTheBuidler Oct 23, 2023
7066e87
fix: missing volume
BobTheBuidler Oct 23, 2023
ea6a617
fix: change underscore to hyphen
BobTheBuidler Oct 23, 2023
6db0667
fix: missing await
BobTheBuidler Oct 23, 2023
86ce144
fix: missing await
BobTheBuidler Oct 23, 2023
a2964fa
fix: Decimal type err
BobTheBuidler Oct 23, 2023
8dd1cbf
fix: curve simple Decimal err
BobTheBuidler Oct 23, 2023
9154d47
fix: load_strategies
BobTheBuidler Oct 23, 2023
77e0a52
fix: missing s3 await
BobTheBuidler Oct 23, 2023
334e2fa
fix: missing awawit
BobTheBuidler Oct 23, 2023
9916e53
get_s3 type err
BobTheBuidler Oct 23, 2023
3c9063a
fix: Harvests
BobTheBuidler Oct 23, 2023
b0d1727
fix: missing commit
BobTheBuidler Oct 23, 2023
4ffb366
fix: type err
BobTheBuidler Oct 23, 2023
a096e1c
fix: registry loader time
BobTheBuidler Oct 23, 2023
9a53d56
fix: comparison err
BobTheBuidler Oct 23, 2023
81f31fa
fix: attr err
BobTheBuidler Oct 23, 2023
bcded02
fix: Decimal type errs
BobTheBuidler Oct 23, 2023
b08349d
fix: deduplicate internal transfers in db
BobTheBuidler Oct 23, 2023
4437c4c
fix: Decimals not json encadable in s3
BobTheBuidler Oct 23, 2023
38ea4cd
str not encodable
BobTheBuidler Oct 23, 2023
fa9e6b7
fix: yeth decimal type err
BobTheBuidler Oct 23, 2023
1472cd1
fix: active_vaults_at
BobTheBuidler Oct 23, 2023
0ba60e6
feat: support rkp3r in ypm
BobTheBuidler Oct 23, 2023
140fcfe
fix: s3
BobTheBuidler Oct 23, 2023
112bb10
fix: Decimal not json encodable
BobTheBuidler Oct 23, 2023
90e373e
fix: yeth type err
BobTheBuidler Oct 23, 2023
0145175
fix: broken import
BobTheBuidler Oct 23, 2023
f785150
fix: yeth type err
BobTheBuidler Oct 23, 2023
f7d5903
fix: missing commit
BobTheBuidler Oct 23, 2023
e9175a9
fix: type err
BobTheBuidler Oct 23, 2023
6e9ee6d
chore: ignore nft from treasury
BobTheBuidler Oct 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export CONCURRENCY= # 1,2,3... Positive whole number for how many blocks get pro
export YPRICEAPI_URL= # YPriceAPI url
export YPRICEAPI_SIGNER= # Address you signed up for yPriceAPI on
export YPRICEAPI_SIGNATURE= # Signature from subscription
export YPRICEMAGIC_GETLOGS_DOP= # Max number of concurrent eth_getLogs calls
export YPRICEMAGIC_RECURSION_TIMEOUT= # Time in seconds till yPriceMagic Loop for pricing times out without returning data.
export SKIP_YPRICEAPI= # False or True. Defaults to True.

Expand Down
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ apy-yeth-monitoring:

apy-yeth:
make up commands="yeth" network=eth filter=yeth

aerodrome-apy-previews:
make up commands="drome_apy_previews" network=base

aerodrome-apy-previews-monitoring:
make up commands="drome_apy_previews with_monitoring" network=base

velodrome-apy-previews:
make up commands="drome_apy_previews" network=optimism

velodrome-apy-previews-monitoring:
make up commands="drome_apy_previews with_monitoring" network=optimism

# revenue scripts
revenues:
Expand Down
109 changes: 6 additions & 103 deletions scripts/curve_apy_previews.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
import dataclasses
import json
import logging
import os
import re
import shutil
from time import sleep, time
from datetime import datetime
import traceback

import boto3
import requests
import sentry_sdk
from brownie import ZERO_ADDRESS, chain
from brownie.exceptions import ContractNotFound
from multicall.utils import await_awaitable
from y import Contract, Network, PriceError
from y import Contract, Network
from y.exceptions import ContractNotVerified

from yearn.apy import Apy, ApyFees, ApyPoints, ApySamples, get_samples
from yearn.apy import Apy, ApyFees, ApyPoints, get_samples
from yearn.apy.curve.simple import Gauge, calculate_simple
from yearn.exceptions import EmptyS3Export
from yearn.helpers import s3, telegram

logger = logging.getLogger(__name__)
sentry_sdk.set_tag('script','curve_apy_previews')
Expand All @@ -33,7 +27,7 @@
def main():
gauges = _get_gauges()
data = _build_data(gauges)
_upload(data)
s3.upload('apy-previews', 'curve-factory', data)

def _build_data(gauges):
samples = get_samples()
Expand Down Expand Up @@ -142,99 +136,8 @@ def _get_gauges():
raise ValueError(f"Error fetching gauges from {url}")
attempts += 1
sleep(.1)


else:
raise ValueError(f"can't get curve gauges for unsupported network: {chain.id}")


def _upload(data):
print(json.dumps(data, sort_keys=True, indent=4))

file_name, s3_path = _get_export_paths("curve-factory")
with open(file_name, "w+") as f:
json.dump(data, f)

if os.getenv("DEBUG", None):
return

for item in _get_s3s():
s3 = item["s3"]
aws_bucket = item["aws_bucket"]
s3.upload_file(
file_name,
aws_bucket,
s3_path,
ExtraArgs={'ContentType': "application/json", 'CacheControl': "max-age=1800"},
)


def _get_s3s():
s3s = []
aws_buckets = os.environ.get("AWS_BUCKET").split(";")
aws_endpoint_urls = os.environ.get("AWS_ENDPOINT_URL").split(";")
aws_keys = os.environ.get("AWS_ACCESS_KEY").split(";")
aws_secrets = os.environ.get("AWS_ACCESS_SECRET").split(";")

for i in range(len(aws_buckets)):
aws_bucket = aws_buckets[i]
aws_endpoint_url = aws_endpoint_urls[i]
aws_key = aws_keys[i]
aws_secret = aws_secrets[i]
kwargs = {}
if aws_endpoint_url is not None:
kwargs["endpoint_url"] = aws_endpoint_url
if aws_key is not None:
kwargs["aws_access_key_id"] = aws_key
if aws_secret is not None:
kwargs["aws_secret_access_key"] = aws_secret

s3s.append(
{
"s3": boto3.client("s3", **kwargs),
"aws_bucket": aws_bucket
}
)

return s3s


def _get_export_paths(suffix):
out = "generated"
if os.path.isdir(out):
shutil.rmtree(out)
os.makedirs(out, exist_ok=True)

api_path = os.path.join("v1", "chains", f"{chain.id}", "apy-previews")

file_base_path = os.path.join(out, api_path)
os.makedirs(file_base_path, exist_ok=True)

file_name = os.path.join(file_base_path, suffix)
s3_path = os.path.join(api_path, suffix)
return file_name, s3_path


def with_monitoring():
if os.getenv("DEBUG", None):
main()
return
from telegram.ext import Updater

private_group = os.environ.get('TG_YFIREBOT_GROUP_INTERNAL')
public_group = os.environ.get('TG_YFIREBOT_GROUP_EXTERNAL')
updater = Updater(os.environ.get('TG_YFIREBOT'))
now = datetime.now()
message = f"`[{now}]`\n⚙️ Curve Previews API for {Network.name()} is updating..."
ping = updater.bot.send_message(chat_id=private_group, text=message, parse_mode="Markdown")
ping = ping.message_id
try:
main()
except Exception as error:
tb = traceback.format_exc()
now = datetime.now()
message = f"`[{now}]`\n🔥 Curve Previews API update for {Network.name()} failed!\n```\n{tb}\n```"[:4000]
updater.bot.send_message(chat_id=private_group, text=message, parse_mode="Markdown", reply_to_message_id=ping)
updater.bot.send_message(chat_id=public_group, text=message, parse_mode="Markdown")
raise error
message = f"✅ Curve Previews API update for {Network.name()} successful!"
updater.bot.send_message(chat_id=private_group, text=message, reply_to_message_id=ping)
telegram.run_job_with_monitoring('Curve Previews API', main)
169 changes: 169 additions & 0 deletions scripts/drome_apy_previews.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@

"""
This script produces a list of velodrome/aerodrome gauges for which vaults can be created
"""

import asyncio
import dataclasses
import logging
import os
from pprint import pformat
from time import time
from typing import List, Optional

import sentry_sdk
from brownie import ZERO_ADDRESS, chain
from msgspec import Struct
from multicall.utils import await_awaitable
from tqdm.asyncio import tqdm_asyncio
from y import Contract, Network, magic
from y.exceptions import ContractNotVerified
from y.time import get_block_timestamp_async

from yearn.apy import Apy, ApyFees, get_samples
from yearn.apy.common import SECONDS_PER_YEAR
from yearn.apy.curve.simple import Gauge
from yearn.apy.velo import COMPOUNDING
from yearn.debug import Debug
from yearn.helpers import s3, telegram
from yearn.v2.registry import Registry

logger = logging.getLogger(__name__)
sentry_sdk.set_tag('script','curve_apy_previews')

class Drome(Struct):
"""Holds various params for a drome deployment"""
label: str
job_name: str
sugar: str
voter: str
# A random vault to check fees
fee_checker: str

try:
drome = {
Network.Optimism: Drome(
label='velo',
job_name='Velodrome Previews API',
sugar='0x4D996E294B00cE8287C16A2b9A4e637ecA5c939f',
voter='0x41c914ee0c7e1a5edcd0295623e6dc557b5abf3c',
fee_checker='0xbC61B71562b01a3a4808D3B9291A3Bf743AB3361',
),
Network.Base: Drome(
label='aero',
job_name='Aerodrome Previews API',
sugar='0x2073D8035bB2b0F2e85aAF5a8732C6f397F9ff9b',
voter='0x16613524e02ad97eDfeF371bC883F2F5d6C480A5',
fee_checker='0xEcFc1e5BDa4d4191c9Cab053ec704347Db87Be5d',
),
}[chain.id]
except KeyError:
raise ValueError(f"there is no drome on unsupported network: {chain.id}")

fee_checker = Contract(drome.fee_checker)
performance_fee = fee_checker.performanceFee() / 1e4
management_fee = fee_checker.managementFee() / 1e4
fee_checker_strat = Contract(fee_checker.withdrawalQueue(0))

keep = fee_checker_strat.localKeepVELO() / 1e4
unkeep = 1 - keep

fees = ApyFees(performance=performance_fee, management=management_fee, keep_velo=keep)

def main():
data = await_awaitable(_build_data())
s3.upload('apy-previews', f'{drome.label}-factory', data)

async def _build_data():
start = int(time())
block = get_samples().now
data = [d for d in await tqdm_asyncio.gather(*[_build_data_for_lp(lp, block) for lp in await _get_lps_with_vault_potential()]) if d]
for d in data:
d['updated'] = start
print(data)
return data

async def _get_lps_with_vault_potential() -> List[dict]:
sugar_oracle = await Contract.coroutine(drome.sugar)
current_vaults = await Registry(include_experimental=False).vaults
current_underlyings = [str(vault.token) for vault in current_vaults]
return [lp for lp in await sugar_oracle.all.coroutine(999999999999999999999, 0, ZERO_ADDRESS) if lp[0] not in current_underlyings and lp[11] != ZERO_ADDRESS]

async def _build_data_for_lp(lp: dict, block: Optional[int] = None) -> Optional[dict]:
lp_token = lp[0]
gauge_name = lp[1]

try:
gauge = await _load_gauge(lp, block=block)
except ContractNotVerified as e:
return {
"gauge_name": gauge_name,
"apy": dataclasses.asdict(Apy("error:unverified", 0, 0, fees, error_reason=str(e))),
"block": block,
}

try:
apy = await _staking_apy(lp, gauge.gauge, block=block) if gauge.gauge_weight > 0 else Apy("zero_weight", 0, 0, fees)
except Exception as error:
logger.error(error)
logger.error(gauge)
apy = Apy("error", 0, 0, fees, error_reason=":".join(str(arg) for arg in error.args))

return {
"gauge_name": gauge_name,
"gauge_address": str(gauge.gauge),
"token0": lp[5],
"token1": lp[8],
"lp_token": lp_token,
"weight": str(gauge.gauge_weight),
"inflation_rate": str(gauge.gauge_inflation_rate),
"working_supply": str(gauge.gauge_working_supply),
"apy": dataclasses.asdict(apy),
"block": block,
}

async def _load_gauge(lp: dict, block: Optional[int] = None) -> Gauge:
lp_address = lp[0]
gauge_address = lp[11]
voter = await Contract.coroutine(drome.voter)
pool, gauge, weight = await asyncio.gather(
Contract.coroutine(lp_address),
Contract.coroutine(gauge_address),
voter.weights.coroutine(lp_address, block_identifier=block),
)
inflation_rate, working_supply = await asyncio.gather(
gauge.rewardRate.coroutine(block_identifier=block),
gauge.totalSupply.coroutine(block_identifier=block),
)
return Gauge(lp_address, pool, gauge, weight, inflation_rate, working_supply)

async def _staking_apy(lp: dict, staking_rewards: Contract, block: Optional[int]=None) -> float:
query_at_time = time() if block is None else await get_block_timestamp_async(block)

reward_token, rate, total_supply, end = await asyncio.gather(
staking_rewards.rewardToken.coroutine(block_identifier=block),
staking_rewards.rewardRate.coroutine(block_identifier=block),
staking_rewards.totalSupply.coroutine(block_identifier=block),
staking_rewards.periodFinish.coroutine(block_identifier=block),
)

rate *= unkeep

if end < query_at_time or total_supply == 0 or rate == 0:
return Apy(f"v2:{drome.label}_unpopular", gross_apr=0, net_apy=0, fees=fees)

pool_price, token_price = await asyncio.gather(
magic.get_price(lp[0], block=block, sync=False),
magic.get_price(reward_token, block=block, sync=False),
)

gross_apr = (SECONDS_PER_YEAR * (rate / 1e18) * token_price) / (pool_price * (total_supply / 1e18))

net_apr = gross_apr * (1 - performance_fee) - management_fee
net_apy = (1 + (net_apr / COMPOUNDING)) ** COMPOUNDING - 1
if os.getenv("DEBUG", None):
logger.info(pformat(Debug().collect_variables(locals())))
return Apy(f"v2:{drome.label}", gross_apr=gross_apr, net_apy=net_apy, fees=fees)

def with_monitoring():
telegram.run_job_with_monitoring(drome.job_name, main)
8 changes: 4 additions & 4 deletions scripts/exporters/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

warnings.simplefilter("ignore", BrownieEnvironmentWarning)

yearn = Yearn(load_strategies=False)
yearn = Yearn()

logger = logging.getLogger('yearn.transactions_exporter')

Expand All @@ -39,7 +39,7 @@
Network.Gnosis: 2_000_000,
Network.Arbitrum: 1_500_000,
Network.Optimism: 4_000_000,
Network.Base: 100_000,
Network.Base: 500_000,
}[chain.id]

FIRST_END_BLOCK = {
Expand Down Expand Up @@ -95,8 +95,8 @@ def process_and_cache_user_txs(last_saved_block=None):
from_address=cache_address(row['from']),
to_address=cache_address(row['to']),
amount = row.amount,
price = price,
value_usd = usd,
price = Decimal(price),
value_usd = Decimal(usd),
gas_used = row.gas_used,
gas_price = row.gas_price
)
Expand Down
11 changes: 7 additions & 4 deletions scripts/exporters/treasury_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ def main() -> NoReturn:

@a_sync(default='sync')
async def load_new_txs(start_block: Block, end_block: Block) -> int:
futs = []
async for entry in treasury.ledger._get_and_yield(start_block, end_block):
futs.append(asyncio.create_task(insert_treasury_tx(entry)))
return sum(await tqdm_asyncio.gather(*futs, desc="Insert Txs to Postgres"))
futs = [
asyncio.create_task(insert_treasury_tx(entry))
async for entry in treasury.ledger._get_and_yield(start_block, end_block)
if not isinstance(entry, _Done) and entry.value
]
to_sort = sum(await tqdm_asyncio.gather(*futs, desc="Insert Txs to Postgres"))
return to_sort


# NOTE: Things get sketchy when we bump these higher
Expand Down
2 changes: 1 addition & 1 deletion scripts/exporters/wallets.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

logger = logging.getLogger('yearn.wallet_exporter')

yearn = Yearn(load_strategies=False, watch_events_forever=False)
yearn = Yearn()

# start: 2020-02-12 first iearn deployment
# start opti: 2022-01-01 an arbitrary start timestamp because the default start is < block 1 on opti and messes things up
Expand Down
Loading
Loading