Skip to content

Commit

Permalink
move rewards update loop out of zinnia loop (#462)
Browse files Browse the repository at this point in the history
* move rewards update loop out of zinnia loop

* fix: initialise `lastRewardsScheduledForAddress`

Signed-off-by: Miroslav Bajtoš <oss@bajtos.net>

* fix: print full details for event handler errors

Signed-off-by: Miroslav Bajtoš <oss@bajtos.net>

---------

Signed-off-by: Miroslav Bajtoš <oss@bajtos.net>
Co-authored-by: Miroslav Bajtoš <oss@bajtos.net>
  • Loading branch information
juliangruber and bajtos authored May 14, 2024
1 parent 9d112af commit bdb853f
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 130 deletions.
45 changes: 42 additions & 3 deletions commands/station.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import { getStationId } from '../lib/station-id.js'
import pRetry from 'p-retry'
import { fetch } from 'undici'
import { ethAddressFromDelegated } from '@glif/filecoin-address'
import { formatEther } from 'ethers'
import { ethers, formatEther } from 'ethers'
import { Obj } from '../lib/obj.js'
import { runUpdateRewardsLoop } from '../lib/rewards.js'
import { runUpdateContractsLoop } from '../lib/contracts.js'
import { fileURLToPath } from 'node:url'

const {
FIL_WALLET_ADDRESS,
Expand Down Expand Up @@ -94,8 +98,33 @@ export const station = async ({ json, experimental }) => {
console.error('No experimental modules available at this point')
}

const lastTotalJobsCompleted = new Obj(0)
const lastRewardsScheduledForAddress = new Obj(0n)
const contracts = new Obj()

const fetchRequest = new ethers.FetchRequest(
'https://api.node.glif.io/rpc/v1'
)
fetchRequest.setHeader(
'Authorization',
'Bearer RXQ2SKH/BVuwN7wisZh3b5uXStGPj1JQIrIWD+rxF0Y='
)
const provider = new ethers.JsonRpcProvider(
fetchRequest,
null,
{ batchMaxCount: 1 }
)
const abi = JSON.parse(
await fs.readFile(
fileURLToPath(new URL('../lib/abi.json', import.meta.url)),
'utf8'
)
)

await Promise.all([
zinniaRuntime.run({
provider,
abi,
STATION_ID,
FIL_WALLET_ADDRESS: ethAddress,
ethAddress,
Expand All @@ -112,9 +141,19 @@ export const station = async ({ json, experimental }) => {
source: activity.source || 'Zinnia'
})
},
onMetrics: m => metrics.submit('zinnia', m)
onMetrics: m => metrics.submit('zinnia', m),
lastTotalJobsCompleted,
lastRewardsScheduledForAddress
}),
runPingLoop({ STATION_ID }),
runMachinesLoop({ STATION_ID })
runMachinesLoop({ STATION_ID }),
runUpdateContractsLoop({ provider, abi, contracts }),
runUpdateRewardsLoop({
contracts,
ethAddress,
onMetrics: m => metrics.submit('zinnia', m),
lastTotalJobsCompleted,
lastRewardsScheduledForAddress
})
])
}
51 changes: 51 additions & 0 deletions lib/contracts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import timers from 'node:timers/promises'
import pRetry from 'p-retry'
import * as Name from 'w3name'
import { ethers } from 'ethers'

const {
// https://github.com/filecoin-station/contract-addresses
CONTRACT_ADDRESSES_IPNS_KEY = 'k51qzi5uqu5dmaqrefqazad0ca8b24fb79zlacfjw2awdt5gjf2cr6jto5jyqe'
} = process.env

export const runUpdateContractsLoop = async ({ provider, abi, contracts }) => {
while (true) {
const delay = 10 * 60 * 1000 // 10 minutes
const jitter = Math.random() * 20_000 - 10_000 // +- 10 seconds
try {
await timers.setTimeout(delay + jitter)
} catch (err) {
if (err.name === 'AbortError') return
throw err
}
contracts.set(await getContractsWithRetry({ provider, abi }))
}
}

async function getContractsWithRetry ({ provider, abi }) {
const contractAddresses = await pRetry(getContractAddresses, {
retries: 10,
onFailedAttempt: err => {
console.error(err)
console.error('Failed to get contract addresses. Retrying...')
if (String(err).includes('You are being rate limited')) {
const delaySeconds = 60 + (Math.random() * 60)
// Don't DDOS the w3name services
console.error(
`Rate limited. Waiting ${delaySeconds} seconds...`
)
return timers.setTimeout(delaySeconds * 1000)
}
}
})
console.error(`Meridian contract addresses: ${contractAddresses.join(', ')}`)
return contractAddresses.map(address => {
return new ethers.Contract(address, abi, provider)
})
}

async function getContractAddresses () {
const name = Name.parse(CONTRACT_ADDRESSES_IPNS_KEY)
const revision = await Name.resolve(name)
return revision.value.split('\n').filter(Boolean)
}
13 changes: 13 additions & 0 deletions lib/obj.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export class Obj {
constructor (value = null) {
this._value = value
}

set (val) {
this._value = val
}

get () {
return this._value
}
}
31 changes: 31 additions & 0 deletions lib/rewards.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import timers from 'node:timers/promises'

export const runUpdateRewardsLoop = async ({ contracts, ethAddress, onMetrics, lastTotalJobsCompleted, lastRewardsScheduledForAddress }) => {
while (true) {
while (!contracts.get()) {
await timers.setTimeout(1000)
}
const contractRewards = await Promise.all(contracts.get().map(async contract => {
return getScheduledRewardsWithFallback(contract, ethAddress)
}))
const totalRewards = contractRewards.reduce((a, b) => a + b, 0n)
onMetrics({
totalJobsCompleted: lastTotalJobsCompleted.get(),
rewardsScheduledForAddress: totalRewards
})
lastRewardsScheduledForAddress.set(totalRewards)

const delay = 10 * 60 * 1000 // 10 minutes
const jitter = Math.random() * 20_000 - 10_000 // +- 10 seconds
await timers.setTimeout(delay + jitter)
}
}

async function getScheduledRewardsWithFallback (contract, ethAddress) {
try {
return await contract.rewardsScheduledFor(ethAddress)
} catch (err) {
console.error('Failed to get scheduled rewards:', err.stack)
return 0n
}
}
Loading

0 comments on commit bdb853f

Please sign in to comment.