Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check on data staleness to prevent submitting outdated data #486

Merged
merged 2 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ export const DATA_FEED_SERVICE_NAME = 'Aggregator'
export const VRF_SERVICE_NAME = 'VRF'
export const REQUEST_RESPONSE_SERVICE_NAME = 'RequestResponse'

// Data Feed
export const MAX_DATA_STALENESS = 5_000

// BullMQ
export const REMOVE_ON_COMPLETE = 500
export const REMOVE_ON_FAIL = 1_000
Expand Down
2 changes: 1 addition & 1 deletion core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ export interface IAggregator {

export interface IAggregate {
id: bigint
timestamp: string | Date
timestamp: string
value: bigint
aggregatorId: bigint
}
5 changes: 2 additions & 3 deletions core/src/worker/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export const AGGREGATOR_ENDPOINT = buildUrl(ORAKL_NETWORK_API_URL, 'aggregator')
*
* @param {string} aggregator hash
* @param {Logger} logger
* @return {number} the latest aggregated value
* @return {IAggregate} metadata about the latest aggregate
* @exception {FailedToGetAggregate}
*/
export async function fetchDataFeed({
Expand All @@ -27,8 +27,7 @@ export async function fetchDataFeed({
}): Promise<IAggregate> {
try {
const url = buildUrl(AGGREGATE_ENDPOINT, `${aggregatorHash}/latest`)
const response = (await axios.get(url))?.data
return response
return (await axios.get(url))?.data
} catch (e) {
logger.error(e)
throw new OraklError(OraklErrorCode.FailedToGetAggregate)
Expand Down
62 changes: 37 additions & 25 deletions core/src/worker/data-feed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import {
SUBMIT_HEARTBEAT_QUEUE_SETTINGS,
WORKER_AGGREGATOR_QUEUE_NAME,
WORKER_CHECK_HEARTBEAT_QUEUE_NAME,
CHECK_HEARTBEAT_QUEUE_SETTINGS
CHECK_HEARTBEAT_QUEUE_SETTINGS,
MAX_DATA_STALENESS
} from '../settings'
import { buildSubmissionRoundJobId, buildHeartbeatJobId } from '../utils'
import { oracleRoundStateCall } from './data-feed.utils'
Expand Down Expand Up @@ -171,7 +172,8 @@ function aggregatorJob(submitHeartbeatQueue: Queue, reporterQueue: Queue, _logge
logger
})

const { value: submission } = await fetchDataFeed({ aggregatorHash, logger })
const { timestamp, value: submission } = await fetchDataFeed({ aggregatorHash, logger })
logger.debug({ aggregatorHash, fetchedAt: timestamp, submission }, 'Latest data aggregate')

// Submit heartbeat
const outDataSubmitHeartbeat: IAggregatorSubmitHeartbeatWorker = {
Expand All @@ -183,31 +185,41 @@ function aggregatorJob(submitHeartbeatQueue: Queue, reporterQueue: Queue, _logge
...SUBMIT_HEARTBEAT_QUEUE_SETTINGS
})

// Report submission
const outDataReporter: IAggregatorWorkerReporter = {
oracleAddress,
submission,
roundId,
workerSource
}
logger.debug(outDataReporter, 'outDataReporter')
await reporterQueue.add(workerSource, outDataReporter, {
jobId: buildSubmissionRoundJobId({
// Check on data staleness
const now = Date.now()
const fetchedAt = Date.parse(timestamp)
const dataStaleness = Math.max(0, now - fetchedAt)
logger.debug(`Data staleness ${dataStaleness} ms`)

if (dataStaleness > MAX_DATA_STALENESS) {
logger.warn(`Data became stale (> ${MAX_DATA_STALENESS}). Not reporting.`)
} else {
// Report submission
const outDataReporter: IAggregatorWorkerReporter = {
oracleAddress,
submission,
roundId,
deploymentName: DEPLOYMENT_NAME
}),
removeOnComplete: REMOVE_ON_COMPLETE,
// Reporter job can fail, and should be either retried or
// removed. We need to remove the job after repeated failure
// to prevent deadlock when running with a single node operator.
// After removing the job on failure, we can resubmit the job
// with the same unique ID representing the submission for
// specific aggregator on specific round.
//
// FIXME Rethink!
removeOnFail: true
})
workerSource
}
logger.debug(outDataReporter, 'outDataReporter')
await reporterQueue.add(workerSource, outDataReporter, {
jobId: buildSubmissionRoundJobId({
oracleAddress,
roundId,
deploymentName: DEPLOYMENT_NAME
}),
removeOnComplete: REMOVE_ON_COMPLETE,
// Reporter job can fail, and should be either retried or
// removed. We need to remove the job after repeated failure
// to prevent deadlock when running with a single node operator.
// After removing the job on failure, we can resubmit the job
// with the same unique ID representing the submission for
// specific aggregator on specific round.
//
// FIXME Rethink!
removeOnFail: true
})
}
} catch (e) {
// `FailedToFetchFromDataFeed` exception can be raised from `prepareDataForReporter`.
// `aggregatorJob` is being triggered by either `fixed` or `event` worker.
Expand Down