Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added cache ttl refresh on heartbeat message for Finnhub #3246

Merged
merged 5 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/sixty-parents-march.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@chainlink/finnhub-secondary-adapter': patch
'@chainlink/finnhub-adapter': patch
---

Added support for cache TTL refresh on heartbeat messages for 'forex' endpoint
17 changes: 12 additions & 5 deletions packages/sources/finnhub-secondary/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

This document was generated automatically. Please see [README Generator](../../scripts#readme-generator) for more info.

## Known Issues

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (60000ms), the extended cache TTL feature for out-of-market-hours that relies on heartbeats will not work.

## Environment Variables

| Required? | Name | Description | Type | Options | Default |
Expand Down Expand Up @@ -36,11 +42,12 @@ Supported names for this endpoint are: `commodities`, `commodity-quote`, `common

### Input Params

| Required? | Name | Aliases | Description | Type | Options | Default | Depends On | Not Valid With |
| :-------: | :------: | :------------: | :--------------------------------------------: | :----: | :-----: | :-----: | :--------: | :------------: |
| ✅ | base | `coin`, `from` | The symbol of symbols of the currency to query | string | | | | |
| | quote | `market`, `to` | The symbol of the currency to convert to | string | | | | |
| | exchange | | The exchange to fetch data for | string | | | | |
| Required? | Name | Aliases | Description | Type | Options | Default | Depends On | Not Valid With |
| :-------: | :----------: | :------------: | :-------------------------------------------------------------------------------------------: | :----: | :-----: | :-----: | :--------: | :------------: |
| ✅ | base | `coin`, `from` | The symbol of symbols of the currency to query | string | | | | |
| | quote | `market`, `to` | The symbol of the currency to convert to | string | | | | |
| | exchange | | The exchange to fetch data for | string | | | | |
| | endpointName | | Is set automatically based on request endpoint alias/name. Modifying this value has no effect | string | | | | |

### Example

Expand Down
5 changes: 5 additions & 0 deletions packages/sources/finnhub-secondary/docs/known-issues.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Known Issues

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (60000ms), the extended cache TTL feature for out-of-market-hours that relies on heartbeats will not work.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ exports[`websocket should return success for full symbols 1`] = `
"result": 1.098455,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 1018,
"providerDataReceivedUnixMs": 2028,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
Expand All @@ -23,7 +23,7 @@ exports[`websocket should return success for inverted pairs 1`] = `
"result": 0.007010066455429998,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 4048,
"providerDataReceivedUnixMs": 5058,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
Expand All @@ -38,7 +38,7 @@ exports[`websocket should return success for requests with base and quote 1`] =
"result": 1.098455,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 2028,
"providerDataReceivedUnixMs": 3038,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
Expand All @@ -53,7 +53,22 @@ exports[`websocket should return success for standard pairs, when pair has inver
"result": 142.652,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 3038,
"providerDataReceivedUnixMs": 4048,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
}
`;

exports[`websocket should update the ttl of forex params after heartbeat is received 1`] = `
{
"data": {
"result": 1.098455,
},
"result": 1.098455,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 1018,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
TestAdapter,
mockWebSocketProvider,
setEnvVariables,
runAllUntilTime,
} from '@chainlink/external-adapter-framework/util/testing-utils'
import FakeTimers from '@sinonjs/fake-timers'

Expand Down Expand Up @@ -32,6 +33,9 @@ const mockWebSocketServer = (url: string) => {
type: 'trade',
}),
)
setTimeout(() => {
socket.send(JSON.stringify({ type: 'ping' }))
}, 10000)
})
})
return mockWsServer
Expand All @@ -42,6 +46,7 @@ describe('websocket', () => {

const data = {
base: 'OANDA:EUR_USD',
endpoint: 'forex',
}

let spy: jest.SpyInstance
Expand Down Expand Up @@ -118,4 +123,16 @@ describe('websocket', () => {
expect(response.statusCode).toBe(200)
expect(response.json()).toMatchSnapshot()
})

it('should update the ttl of forex params after heartbeat is received', async () => {
await runAllUntilTime(testAdapter.clock, 93000)
const expiredCacheResponse = await testAdapter.request({ base: 'JPY', quote: 'USD' })
expect(expiredCacheResponse.statusCode).toBe(504)

// The cache ttl is 90 seconds. Mocked heartbeat message is sent after 10s after connection which should
// update the ttl and therefore after 93 seconds (from the initial message) we can access the asset
const response = await testAdapter.request(data)
expect(response.statusCode).toBe(200)
expect(response.json()).toMatchSnapshot()
})
})
17 changes: 12 additions & 5 deletions packages/sources/finnhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

This document was generated automatically. Please see [README Generator](../../scripts#readme-generator) for more info.

## Known Issues

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (60000ms), the extended cache TTL feature for out-of-market-hours that relies on heartbeats will not work.

## Environment Variables

| Required? | Name | Description | Type | Options | Default |
Expand Down Expand Up @@ -36,11 +42,12 @@ Supported names for this endpoint are: `commodities`, `commodity-quote`, `common

### Input Params

| Required? | Name | Aliases | Description | Type | Options | Default | Depends On | Not Valid With |
| :-------: | :------: | :------------: | :--------------------------------------------: | :----: | :-----: | :-----: | :--------: | :------------: |
| ✅ | base | `coin`, `from` | The symbol of symbols of the currency to query | string | | | | |
| | quote | `market`, `to` | The symbol of the currency to convert to | string | | | | |
| | exchange | | The exchange to fetch data for | string | | | | |
| Required? | Name | Aliases | Description | Type | Options | Default | Depends On | Not Valid With |
| :-------: | :----------: | :------------: | :-------------------------------------------------------------------------------------------: | :----: | :-----: | :-----: | :--------: | :------------: |
| ✅ | base | `coin`, `from` | The symbol of symbols of the currency to query | string | | | | |
| | quote | `market`, `to` | The symbol of the currency to convert to | string | | | | |
| | exchange | | The exchange to fetch data for | string | | | | |
| | endpointName | | Is set automatically based on request endpoint alias/name. Modifying this value has no effect | string | | | | |

### Example

Expand Down
5 changes: 5 additions & 0 deletions packages/sources/finnhub/docs/known-issues.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Known Issues

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (60000ms), the extended cache TTL feature for out-of-market-hours that relies on heartbeats will not work.
15 changes: 14 additions & 1 deletion packages/sources/finnhub/src/endpoint/quote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ export const inputParameters = new InputParameters(
description: 'The exchange to fetch data for',
required: false,
},
endpointName: {
type: 'string',
description:
'Is set automatically based on request endpoint alias/name. Providing a custom value has no effect',
required: false,
},
karen-stepanyan marked this conversation as resolved.
Show resolved Hide resolved
},
[
{
Expand Down Expand Up @@ -135,6 +141,13 @@ const validateExchange = (req: AdapterRequest<typeof inputParameters.validated>)
}
}

// Explicitly sets the requested endpoint alias into requestContext.data.endpointName, so it is available in the subscription set
const setEndpointName = (req: AdapterRequest<typeof inputParameters.validated>) => {
req.requestContext.data.endpointName = (
req.body.data as unknown as typeof inputParameters.validated & { endpoint?: string }
).endpoint
}

export const buildQuoteEndpoint = (overrides?: Record<string, string>) =>
new PriceEndpoint<BaseEndpointTypes>({
name: 'quote',
Expand All @@ -152,7 +165,7 @@ export const buildQuoteEndpoint = (overrides?: Record<string, string>) =>
.register('rest', httpTransport),
defaultTransport: 'rest',
customRouter: (_req, adapterConfig) => (adapterConfig.WS_ENABLED ? 'ws' : 'rest'),
requestTransforms: [requestTransform, validateExchange],
requestTransforms: [requestTransform, validateExchange, setEndpointName],
inputParameters: inputParameters,
overrides,
})
Expand Down
29 changes: 26 additions & 3 deletions packages/sources/finnhub/src/transport/quote-ws.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { BaseEndpointTypes, buildSymbol } from '../endpoint/quote'
import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports'
import {
WebsocketReverseMappingTransport,
WebSocketTransport,
} from '@chainlink/external-adapter-framework/transports'
import { ProviderResult, makeLogger } from '@chainlink/external-adapter-framework/util'
import { parseResult } from './utils'

Expand All @@ -10,6 +13,10 @@ type WsMessageError = {
msg: string
}

type WsMessageHeartbeat = {
type: 'ping'
}

type WsMessageTrade = {
type: 'trade'
data: {
Expand All @@ -21,24 +28,40 @@ type WsMessageTrade = {
}[]
}

type WsMessage = WsMessageError | WsMessageTrade
type WsMessage = WsMessageError | WsMessageTrade | WsMessageHeartbeat

type WsEndpointTypes = BaseEndpointTypes & {
Provider: {
WsMessage: WsMessage
}
}
/*
Finnhub EA currently does not receive asset prices during off-market hours. When a heartbeat message is received during these hours,
we update the TTL of **forex** cache entries that EA is requested to provide a price during off-market hours.
*/
const updateTTL = async (transport: WebSocketTransport<WsEndpointTypes>, ttl: number) => {
const allParams = await transport.subscriptionSet.getAll()
const forexParams = allParams.filter((p) => p.endpointName === 'forex')
transport.responseCache.writeTTL(transport.name, forexParams, ttl)
}

export const wsTransport = new WebsocketReverseMappingTransport<WsEndpointTypes, string>({
url: ({ adapterSettings }) =>
`${adapterSettings.WS_API_ENDPOINT}?token=${adapterSettings.API_KEY}`,
handlers: {
message: (message) => {
message: (message, context) => {
if (message.type === 'error') {
logger.error(message.msg)
return
}

// Check for a heartbeat message, refresh the TTLs of all requested entries in the cache
if (message.type === 'ping') {
wsTransport.lastMessageReceivedAt = Date.now()
updateTTL(wsTransport, context.adapterSettings.CACHE_MAX_AGE)
return []
}

if (message.type === 'trade') {
const results: ProviderResult<WsEndpointTypes>[] = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ exports[`websocket should return success for full symbols 1`] = `
"result": 1.098455,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 1018,
"providerDataReceivedUnixMs": 2028,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
Expand All @@ -23,7 +23,7 @@ exports[`websocket should return success for inverted pairs 1`] = `
"result": 0.007010066455429998,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 4048,
"providerDataReceivedUnixMs": 5058,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
Expand All @@ -38,7 +38,7 @@ exports[`websocket should return success for requests with base and quote 1`] =
"result": 1.098455,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 2028,
"providerDataReceivedUnixMs": 3038,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
Expand All @@ -53,7 +53,22 @@ exports[`websocket should return success for standard pairs, when pair has inver
"result": 142.652,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 3038,
"providerDataReceivedUnixMs": 4048,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
}
`;

exports[`websocket should update the ttl of forex params after heartbeat is received 1`] = `
{
"data": {
"result": 1.098455,
},
"result": 1.098455,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 1018,
"providerDataStreamEstablishedUnixMs": 1010,
"providerIndicatedTimeUnixMs": 1641035471111,
},
Expand Down
17 changes: 17 additions & 0 deletions packages/sources/finnhub/test/integration/adapter-ws.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
TestAdapter,
mockWebSocketProvider,
setEnvVariables,
runAllUntilTime,
} from '@chainlink/external-adapter-framework/util/testing-utils'
import FakeTimers from '@sinonjs/fake-timers'

Expand Down Expand Up @@ -32,6 +33,9 @@ const mockWebSocketServer = (url: string) => {
type: 'trade',
}),
)
setTimeout(() => {
socket.send(JSON.stringify({ type: 'ping' }))
}, 10000)
})
})
return mockWsServer
Expand All @@ -42,6 +46,7 @@ describe('websocket', () => {

const data = {
base: 'FHFX:EUR-USD',
endpoint: 'forex',
}

let spy: jest.SpyInstance
Expand Down Expand Up @@ -118,4 +123,16 @@ describe('websocket', () => {
expect(response.statusCode).toBe(200)
expect(response.json()).toMatchSnapshot()
})

it('should update the ttl of forex params after heartbeat is received', async () => {
await runAllUntilTime(testAdapter.clock, 93000)
const expiredCacheResponse = await testAdapter.request({ base: 'EUR', quote: 'USD' })
expect(expiredCacheResponse.statusCode).toBe(504)

// The cache ttl is 90 seconds. Mocked heartbeat message is sent after 10s after connection which should
// update the ttl and therefore after 93 seconds (from the initial message) we can access the asset
const response = await testAdapter.request(data)
expect(response.statusCode).toBe(200)
expect(response.json()).toMatchSnapshot()
})
})
Loading