Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added cache ttl refresh on heartbeat message for TP/ICAP #3210

Merged
merged 5 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/long-zebras-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@chainlink/tp-adapter': patch
'@chainlink/icap-adapter': patch
---

Add support for cache TTL refresh on heartbeat messages
4 changes: 4 additions & 0 deletions packages/sources/icap/docs/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ wscat -c 'ws://json.mktdata.portal.apac.parametasolutions.com:12000'
```

- If credentials work for a single connection, open a second terminal and run the same commands while the first is still running. The expected behaviour is that both terminals should fire out a massive amount of price data.

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (60000ms), the extended cache TTL feature for out-of-market-hours that relies on heartbeats will not work.
karen-stepanyan marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 4 additions & 0 deletions packages/sources/tp/docs/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ wscat -c 'ws://json.mktdata.portal.apac.parametasolutions.com:12000'
```

- If credentials work for a single connection, open a second terminal and run the same commands while the first is still running. The expected behaviour is that both terminals should fire out a massive amount of price data.

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (60000ms), the extended cache TTL feature for out-of-market-hours that relies on heartbeats will not work.
37 changes: 36 additions & 1 deletion packages/sources/tp/src/transport/price.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Decimal from 'decimal.js'
import { WebSocketTransport } from '@chainlink/external-adapter-framework/transports/websocket'
import { makeLogger, ProviderResult } from '@chainlink/external-adapter-framework/util'
import { BaseEndpointTypes, GeneratePriceOptions } from '../endpoint/price'
import { calculateCacheKey } from '@chainlink/external-adapter-framework/cache'

const logger = makeLogger('TpIcapPrice')

Expand Down Expand Up @@ -32,8 +33,28 @@ const isNum = (i: number | undefined) => typeof i === 'number'

let providerDataStreamEstablishedUnixMs: number

/*
TP and ICAP EAs currently do not receive asset prices during off-market hours. When a heartbeat message is received during these hours,
we update the TTL of cache entries that EA is requested to provide a price during off-market hours.
*/
const updateTTL = async (transport: WebSocketTransport<WsTransportTypes>) => {
// Get current active entries in the subscription set
const sSet = await transport.subscriptionSet.getAll()
// For each entry in sSet, calculate the cache key and try to update the ttl of an entry
sSet.forEach((param) => {
const key = calculateCacheKey({
transportName: transport.name,
data: param,
adapterName: transport.responseCache.adapterName,
endpointName: transport.responseCache.endpointName,
adapterSettings: transport.responseCache.adapterSettings,
})
transport.responseCache.cache.setTTL(key, transport.responseCache.adapterSettings.CACHE_MAX_AGE)
karen-stepanyan marked this conversation as resolved.
Show resolved Hide resolved
})
}

export const generateTransport = (generatePriceOptions: GeneratePriceOptions) => {
return new WebSocketTransport<WsTransportTypes>({
const tpTransport = new WebSocketTransport<WsTransportTypes>({
url: ({ adapterSettings: { WS_API_ENDPOINT } }) => WS_API_ENDPOINT,
handlers: {
open: (connection, { adapterSettings: { WS_API_USERNAME, WS_API_PASSWORD } }) => {
Expand Down Expand Up @@ -71,6 +92,19 @@ export const generateTransport = (generatePriceOptions: GeneratePriceOptions) =>
return []
}

// Check for a heartbeat message, refresh the TTLs of all requested entries in the cache
if (rec.includes('HBHHH')) {
const stream = rec.slice(22, 24)
if (stream === generatePriceOptions.streamName) {
logger.debug({
msg: 'Received heartbeat message from WS, updating TTLs of active entries',
message,
})
updateTTL(tpTransport)
return []
}
}

const stream = rec.slice(31, 34)
if (stream !== generatePriceOptions.streamName) {
logger.debug({
Expand Down Expand Up @@ -118,4 +152,5 @@ export const generateTransport = (generatePriceOptions: GeneratePriceOptions) =>
},
},
})
return tpTransport
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,17 @@ exports[`Price Endpoint should return price for specific source 1`] = `
},
}
`;

exports[`Price Endpoint should update the ttl after heartbeat is received 1`] = `
{
"data": {
"result": 1.0539,
},
"result": 1.0539,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 1018,
"providerDataStreamEstablishedUnixMs": 1010,
},
}
`;
12 changes: 11 additions & 1 deletion packages/sources/tp/test/integration/adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
setEnvVariables,
mockWebSocketProvider,
MockWebsocketServer,
runAllUntilTime,
} from '@chainlink/external-adapter-framework/util/testing-utils'
import FakeTimers from '@sinonjs/fake-timers'
import { mockPriceWebSocketServer } from './fixtures'
Expand All @@ -14,7 +15,7 @@ describe('Price Endpoint', () => {
const wsEndpoint = 'ws://localhost:9090'
let oldEnv: NodeJS.ProcessEnv
const data = {
base: 'JPY',
base: 'EUR',
quote: 'USD',
}

Expand All @@ -23,6 +24,7 @@ describe('Price Endpoint', () => {
process.env['WS_API_USERNAME'] = 'test-username'
process.env['WS_API_PASSWORD'] = 'test-password'
process.env['WS_API_ENDPOINT'] = wsEndpoint
process.env['WS_SUBSCRIPTION_UNRESPONSIVE_TTL'] = '180000'

// Start mock web socket server
mockWebSocketProvider(WebSocketClassProvider)
Expand Down Expand Up @@ -85,4 +87,12 @@ describe('Price Endpoint', () => {
const response = await testAdapter.request({ base: 'EUR' })
expect(response.json()).toMatchSnapshot()
})
it('should update the ttl after heartbeat is received', async () => {
// The cache tll is 90 seconds. Mocked heartbeat message is sent after 10s after connection which should
// update the ttl and therefore after 91 seconds (from the initial message) we can access the asset
await runAllUntilTime(testAdapter.clock, 91000)
const response = await testAdapter.request({ base: 'EUR', quote: 'USD' })
expect(response.statusCode).toBe(200)
expect(response.json()).toMatchSnapshot()
})
})
11 changes: 11 additions & 0 deletions packages/sources/tp/test/integration/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ export const mockStalePriceResponse = {
},
}

export const mockHeartbeatResponse = {
msg: 'sub',
pro: 'OMM',
rec: 'HBHHH:GBL.BIL.QTE.RTM!TP',
sta: 1,
fvs: { TIMACT: '23:01:00', GV1_TIME: '23:01:13' },
}

export const adapterResponse = {
result: 1.0539,
statusCode: 200,
Expand All @@ -120,6 +128,9 @@ export const mockPriceWebSocketServer = (URL: string): MockWebsocketServer => {
socket.send(JSON.stringify(mockInversePriceResponse))
socket.send(JSON.stringify(mockICPriceResponse))
socket.send(JSON.stringify(mockSeparateSourcePriceResponse))
setTimeout(() => {
socket.send(JSON.stringify(mockHeartbeatResponse))
}, 10000)
})
})
return mockWsServer
Expand Down
Loading