Skip to content

Commit

Permalink
Added cache ttl refresh on heartbeat message for TP/ICAP (#3210)
Browse files Browse the repository at this point in the history
* add cache ttl refresh on heartbeat message

* changeset

* update known issues

---------

Co-authored-by: cl-ea <93770670+cl-ea@users.noreply.github.com>
  • Loading branch information
karen-stepanyan and cl-ea committed Mar 8, 2024
1 parent 240f191 commit 1fb155a
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 2 deletions.
6 changes: 6 additions & 0 deletions .changeset/long-zebras-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@chainlink/tp-adapter': patch
'@chainlink/icap-adapter': patch
---

Add support for cache TTL refresh on heartbeat messages
4 changes: 4 additions & 0 deletions packages/sources/icap/docs/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ wscat -c 'ws://json.mktdata.portal.apac.parametasolutions.com:12000'
```

- If credentials work for a single connection, open a second terminal and run the same commands while the first is still running. The expected behaviour is that both terminals should fire out a massive amount of price data.

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (60000ms), the extended cache TTL feature for out-of-market-hours that relies on heartbeats will not work.
4 changes: 4 additions & 0 deletions packages/sources/tp/docs/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ wscat -c 'ws://json.mktdata.portal.apac.parametasolutions.com:12000'
```

- If credentials work for a single connection, open a second terminal and run the same commands while the first is still running. The expected behaviour is that both terminals should fire out a massive amount of price data.

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (60000ms), the extended cache TTL feature for out-of-market-hours that relies on heartbeats will not work.
37 changes: 36 additions & 1 deletion packages/sources/tp/src/transport/price.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Decimal from 'decimal.js'
import { WebSocketTransport } from '@chainlink/external-adapter-framework/transports/websocket'
import { makeLogger, ProviderResult } from '@chainlink/external-adapter-framework/util'
import { BaseEndpointTypes, GeneratePriceOptions } from '../endpoint/price'
import { calculateCacheKey } from '@chainlink/external-adapter-framework/cache'

const logger = makeLogger('TpIcapPrice')

Expand Down Expand Up @@ -32,8 +33,28 @@ const isNum = (i: number | undefined) => typeof i === 'number'

let providerDataStreamEstablishedUnixMs: number

/*
TP and ICAP EAs currently do not receive asset prices during off-market hours. When a heartbeat message is received during these hours,
we update the TTL of cache entries that EA is requested to provide a price during off-market hours.
*/
const updateTTL = async (transport: WebSocketTransport<WsTransportTypes>) => {
// Get current active entries in the subscription set
const sSet = await transport.subscriptionSet.getAll()
// For each entry in sSet, calculate the cache key and try to update the ttl of an entry
sSet.forEach((param) => {
const key = calculateCacheKey({
transportName: transport.name,
data: param,
adapterName: transport.responseCache.adapterName,
endpointName: transport.responseCache.endpointName,
adapterSettings: transport.responseCache.adapterSettings,
})
transport.responseCache.cache.setTTL(key, transport.responseCache.adapterSettings.CACHE_MAX_AGE)
})
}

export const generateTransport = (generatePriceOptions: GeneratePriceOptions) => {
return new WebSocketTransport<WsTransportTypes>({
const tpTransport = new WebSocketTransport<WsTransportTypes>({
url: ({ adapterSettings: { WS_API_ENDPOINT } }) => WS_API_ENDPOINT,
handlers: {
open: (connection, { adapterSettings: { WS_API_USERNAME, WS_API_PASSWORD } }) => {
Expand Down Expand Up @@ -71,6 +92,19 @@ export const generateTransport = (generatePriceOptions: GeneratePriceOptions) =>
return []
}

// Check for a heartbeat message, refresh the TTLs of all requested entries in the cache
if (rec.includes('HBHHH')) {
const stream = rec.slice(22, 24)
if (stream === generatePriceOptions.streamName) {
logger.debug({
msg: 'Received heartbeat message from WS, updating TTLs of active entries',
message,
})
updateTTL(tpTransport)
return []
}
}

const stream = rec.slice(31, 34)
if (stream !== generatePriceOptions.streamName) {
logger.debug({
Expand Down Expand Up @@ -118,4 +152,5 @@ export const generateTransport = (generatePriceOptions: GeneratePriceOptions) =>
},
},
})
return tpTransport
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,17 @@ exports[`Price Endpoint should return price for specific source 1`] = `
},
}
`;

exports[`Price Endpoint should update the ttl after heartbeat is received 1`] = `
{
"data": {
"result": 1.0539,
},
"result": 1.0539,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 1018,
"providerDataStreamEstablishedUnixMs": 1010,
},
}
`;
12 changes: 11 additions & 1 deletion packages/sources/tp/test/integration/adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
setEnvVariables,
mockWebSocketProvider,
MockWebsocketServer,
runAllUntilTime,
} from '@chainlink/external-adapter-framework/util/testing-utils'
import FakeTimers from '@sinonjs/fake-timers'
import { mockPriceWebSocketServer } from './fixtures'
Expand All @@ -14,7 +15,7 @@ describe('Price Endpoint', () => {
const wsEndpoint = 'ws://localhost:9090'
let oldEnv: NodeJS.ProcessEnv
const data = {
base: 'JPY',
base: 'EUR',
quote: 'USD',
}

Expand All @@ -23,6 +24,7 @@ describe('Price Endpoint', () => {
process.env['WS_API_USERNAME'] = 'test-username'
process.env['WS_API_PASSWORD'] = 'test-password'
process.env['WS_API_ENDPOINT'] = wsEndpoint
process.env['WS_SUBSCRIPTION_UNRESPONSIVE_TTL'] = '180000'

// Start mock web socket server
mockWebSocketProvider(WebSocketClassProvider)
Expand Down Expand Up @@ -85,4 +87,12 @@ describe('Price Endpoint', () => {
const response = await testAdapter.request({ base: 'EUR' })
expect(response.json()).toMatchSnapshot()
})
it('should update the ttl after heartbeat is received', async () => {
// The cache tll is 90 seconds. Mocked heartbeat message is sent after 10s after connection which should
// update the ttl and therefore after 91 seconds (from the initial message) we can access the asset
await runAllUntilTime(testAdapter.clock, 91000)
const response = await testAdapter.request({ base: 'EUR', quote: 'USD' })
expect(response.statusCode).toBe(200)
expect(response.json()).toMatchSnapshot()
})
})
11 changes: 11 additions & 0 deletions packages/sources/tp/test/integration/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ export const mockStalePriceResponse = {
},
}

export const mockHeartbeatResponse = {
msg: 'sub',
pro: 'OMM',
rec: 'HBHHH:GBL.BIL.QTE.RTM!TP',
sta: 1,
fvs: { TIMACT: '23:01:00', GV1_TIME: '23:01:13' },
}

export const adapterResponse = {
result: 1.0539,
statusCode: 200,
Expand All @@ -120,6 +128,9 @@ export const mockPriceWebSocketServer = (URL: string): MockWebsocketServer => {
socket.send(JSON.stringify(mockInversePriceResponse))
socket.send(JSON.stringify(mockICPriceResponse))
socket.send(JSON.stringify(mockSeparateSourcePriceResponse))
setTimeout(() => {
socket.send(JSON.stringify(mockHeartbeatResponse))
}, 10000)
})
})
return mockWsServer
Expand Down

0 comments on commit 1fb155a

Please sign in to comment.