diff --git a/core/src/settings.ts b/core/src/settings.ts index 1c843ccd2..14d547112 100644 --- a/core/src/settings.ts +++ b/core/src/settings.ts @@ -31,6 +31,9 @@ export const DATA_FEED_SERVICE_NAME = 'Aggregator' export const VRF_SERVICE_NAME = 'VRF' export const REQUEST_RESPONSE_SERVICE_NAME = 'RequestResponse' +// Data Feed +export const MAX_DATA_STALENESS = 5_000 + // BullMQ export const REMOVE_ON_COMPLETE = 500 export const REMOVE_ON_FAIL = 1_000 diff --git a/core/src/types.ts b/core/src/types.ts index 702f6ef8b..4965b0e98 100644 --- a/core/src/types.ts +++ b/core/src/types.ts @@ -284,7 +284,7 @@ export interface IAggregator { export interface IAggregate { id: bigint - timestamp: string | Date + timestamp: string value: bigint aggregatorId: bigint } diff --git a/core/src/worker/api.ts b/core/src/worker/api.ts index 484107962..f10d0be9a 100644 --- a/core/src/worker/api.ts +++ b/core/src/worker/api.ts @@ -15,7 +15,7 @@ export const AGGREGATOR_ENDPOINT = buildUrl(ORAKL_NETWORK_API_URL, 'aggregator') * * @param {string} aggregator hash * @param {Logger} logger - * @return {number} the latest aggregated value + * @return {IAggregate} metadata about the latest aggregate * @exception {FailedToGetAggregate} */ export async function fetchDataFeed({ @@ -27,8 +27,7 @@ export async function fetchDataFeed({ }): Promise { try { const url = buildUrl(AGGREGATE_ENDPOINT, `${aggregatorHash}/latest`) - const response = (await axios.get(url))?.data - return response + return (await axios.get(url))?.data } catch (e) { logger.error(e) throw new OraklError(OraklErrorCode.FailedToGetAggregate) diff --git a/core/src/worker/data-feed.ts b/core/src/worker/data-feed.ts index 7292a146f..5d8fcd99e 100644 --- a/core/src/worker/data-feed.ts +++ b/core/src/worker/data-feed.ts @@ -25,7 +25,8 @@ import { SUBMIT_HEARTBEAT_QUEUE_SETTINGS, WORKER_AGGREGATOR_QUEUE_NAME, WORKER_CHECK_HEARTBEAT_QUEUE_NAME, - CHECK_HEARTBEAT_QUEUE_SETTINGS + CHECK_HEARTBEAT_QUEUE_SETTINGS, + MAX_DATA_STALENESS } from '../settings' import { buildSubmissionRoundJobId, buildHeartbeatJobId } from '../utils' import { oracleRoundStateCall } from './data-feed.utils' @@ -171,7 +172,8 @@ function aggregatorJob(submitHeartbeatQueue: Queue, reporterQueue: Queue, _logge logger }) - const { value: submission } = await fetchDataFeed({ aggregatorHash, logger }) + const { timestamp, value: submission } = await fetchDataFeed({ aggregatorHash, logger }) + logger.debug({ aggregatorHash, fetchedAt: timestamp, submission }, 'Latest data aggregate') // Submit heartbeat const outDataSubmitHeartbeat: IAggregatorSubmitHeartbeatWorker = { @@ -183,31 +185,41 @@ function aggregatorJob(submitHeartbeatQueue: Queue, reporterQueue: Queue, _logge ...SUBMIT_HEARTBEAT_QUEUE_SETTINGS }) - // Report submission - const outDataReporter: IAggregatorWorkerReporter = { - oracleAddress, - submission, - roundId, - workerSource - } - logger.debug(outDataReporter, 'outDataReporter') - await reporterQueue.add(workerSource, outDataReporter, { - jobId: buildSubmissionRoundJobId({ + // Check on data staleness + const now = Date.now() + const fetchedAt = Date.parse(timestamp) + const dataStaleness = Math.max(0, now - fetchedAt) + logger.debug(`Data staleness ${dataStaleness} ms`) + + if (dataStaleness > MAX_DATA_STALENESS) { + logger.warn(`Data became stale (> ${MAX_DATA_STALENESS}). Not reporting.`) + } else { + // Report submission + const outDataReporter: IAggregatorWorkerReporter = { oracleAddress, + submission, roundId, - deploymentName: DEPLOYMENT_NAME - }), - removeOnComplete: REMOVE_ON_COMPLETE, - // Reporter job can fail, and should be either retried or - // removed. We need to remove the job after repeated failure - // to prevent deadlock when running with a single node operator. - // After removing the job on failure, we can resubmit the job - // with the same unique ID representing the submission for - // specific aggregator on specific round. - // - // FIXME Rethink! - removeOnFail: true - }) + workerSource + } + logger.debug(outDataReporter, 'outDataReporter') + await reporterQueue.add(workerSource, outDataReporter, { + jobId: buildSubmissionRoundJobId({ + oracleAddress, + roundId, + deploymentName: DEPLOYMENT_NAME + }), + removeOnComplete: REMOVE_ON_COMPLETE, + // Reporter job can fail, and should be either retried or + // removed. We need to remove the job after repeated failure + // to prevent deadlock when running with a single node operator. + // After removing the job on failure, we can resubmit the job + // with the same unique ID representing the submission for + // specific aggregator on specific round. + // + // FIXME Rethink! + removeOnFail: true + }) + } } catch (e) { // `FailedToFetchFromDataFeed` exception can be raised from `prepareDataForReporter`. // `aggregatorJob` is being triggered by either `fixed` or `event` worker.