Skip to content

Commit

Permalink
Add TestConsumerProviderWithConsumerAndProviderSideCache
Browse files Browse the repository at this point in the history
  • Loading branch information
shleikes committed Sep 20, 2024
1 parent 1e81594 commit fc757c8
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1823,3 +1823,127 @@ func TestConsumerProviderWithProviderSideCache(t *testing.T) {
// Verify node was called to only twice
require.Equal(t, 2, nodeRequestsCounter)
}

func TestConsumerProviderWithConsumerAndProviderSideCache(t *testing.T) {
ctx := context.Background()
// can be any spec and api interface
specId := "LAV1"
apiInterface := spectypes.APIInterfaceTendermintRPC
epoch := uint64(100)
lavaChainID := "lava"

consumerListenAddress := addressGen.GetAddress()
consumerCacheListenAddress := addressGen.GetAddress()
providerCacheListenAddress := addressGen.GetAddress()

createCacheServer(t, ctx, consumerCacheListenAddress)
createCacheServer(t, ctx, providerCacheListenAddress)

pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{}
type providerData struct {
account sigs.Account
endpoint *lavasession.RPCProviderEndpoint
replySetter *ReplySetter
}

consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer)
providerAccount := sigs.GenerateDeterministicFloatingKey(randomizer)
provider := providerData{account: providerAccount}
providerListenAddress := addressGen.GetAddress()

testJsonRpcId := 42
nodeRequestsCounter := 0
rpcProviderOptions := rpcProviderOptions{
consumerAddress: consumerAccount.Addr.String(),
specId: specId,
apiInterface: apiInterface,
listenAddress: providerListenAddress,
account: provider.account,
lavaChainID: lavaChainID,
addons: []string(nil),
providerUniqueId: "provider",
cacheListenAddress: providerCacheListenAddress,
}
_, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, rpcProviderOptions)
provider.replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) {
var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage
err := json.Unmarshal(req, &jsonRpcMessage)
require.NoError(t, err, req)

reqId := jsonRpcIdToInt(t, jsonRpcMessage.ID)
if reqId == testJsonRpcId {
nodeRequestsCounter++
}

response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID))
return []byte(response), http.StatusOK
}

pairingList[0] = &lavasession.ConsumerSessionsWithProvider{
PublicLavaAddress: provider.account.Addr.String(),
Endpoints: []*lavasession.Endpoint{
{
NetworkAddress: provider.endpoint.NetworkAddress.Address,
Enabled: true,
Geolocation: 1,
},
},
Sessions: map[int64]*lavasession.SingleConsumerSession{},
MaxComputeUnits: 10000,
UsedComputeUnits: 0,
PairingEpoch: epoch,
}

rpcConsumerOptions := rpcConsumerOptions{
specId: specId,
apiInterface: apiInterface,
account: consumerAccount,
consumerListenAddress: consumerListenAddress,
epoch: epoch,
pairingList: pairingList,
requiredResponses: 1,
lavaChainID: lavaChainID,
cacheListenAddress: consumerCacheListenAddress,
}
rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions)
require.NotNil(t, rpcconsumerServer)

client := http.Client{}
sendMessage := func(method string, params []string) http.Header {
body := fmt.Sprintf(`{"jsonrpc":"2.0","method":"%v","params": [%v], "id":%v}`, method, strings.Join(params, ","), testJsonRpcId)
resp, err := client.Post("http://"+consumerListenAddress, "application/json", bytes.NewBuffer([]byte(body)))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

bodyBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage
err = json.Unmarshal(bodyBytes, &jsonRpcMessage)
require.NoError(t, err)

respId := jsonRpcIdToInt(t, jsonRpcMessage.ID)
require.Equal(t, testJsonRpcId, respId)
resp.Body.Close()

return resp.Header
}

providerAddr := provider.account.Addr.String()
// Get latest for sanity check
headers := sendMessage("status", []string{})
require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME))

// Get block, this should be cached for next time
headers = sendMessage("block", []string{"1000"})
require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME))

for i := 0; i < 5; i++ {
// Get block again, this time it should be from cache
headers = sendMessage("block", []string{"1000"})
require.Equal(t, "Cached", headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME))
}

// Verify node was called to only twice
require.Equal(t, 2, nodeRequestsCounter)
}

0 comments on commit fc757c8

Please sign in to comment.