From fc757c8b9170c17958387bac0b521910da072151 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Fri, 20 Sep 2024 17:23:26 +0300 Subject: [PATCH] Add TestConsumerProviderWithConsumerAndProviderSideCache --- protocol/integration/protocol_test.go | 124 ++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index e46a217f84..c6f61471a6 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -1823,3 +1823,127 @@ func TestConsumerProviderWithProviderSideCache(t *testing.T) { // Verify node was called to only twice require.Equal(t, 2, nodeRequestsCounter) } + +func TestConsumerProviderWithConsumerAndProviderSideCache(t *testing.T) { + ctx := context.Background() + // can be any spec and api interface + specId := "LAV1" + apiInterface := spectypes.APIInterfaceTendermintRPC + epoch := uint64(100) + lavaChainID := "lava" + + consumerListenAddress := addressGen.GetAddress() + consumerCacheListenAddress := addressGen.GetAddress() + providerCacheListenAddress := addressGen.GetAddress() + + createCacheServer(t, ctx, consumerCacheListenAddress) + createCacheServer(t, ctx, providerCacheListenAddress) + + pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} + type providerData struct { + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + replySetter *ReplySetter + } + + consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + providerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + provider := providerData{account: providerAccount} + providerListenAddress := addressGen.GetAddress() + + testJsonRpcId := 42 + nodeRequestsCounter := 0 + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: providerListenAddress, + account: provider.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: "provider", + cacheListenAddress: providerCacheListenAddress, + } + _, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, rpcProviderOptions) + provider.replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err := json.Unmarshal(req, &jsonRpcMessage) + require.NoError(t, err, req) + + reqId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + if reqId == testJsonRpcId { + nodeRequestsCounter++ + } + + response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID)) + return []byte(response), http.StatusOK + } + + pairingList[0] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: provider.account.Addr.String(), + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: provider.endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, + }, + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, + } + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + cacheListenAddress: consumerCacheListenAddress, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) + require.NotNil(t, rpcconsumerServer) + + client := http.Client{} + sendMessage := func(method string, params []string) http.Header { + body := fmt.Sprintf(`{"jsonrpc":"2.0","method":"%v","params": [%v], "id":%v}`, method, strings.Join(params, ","), testJsonRpcId) + resp, err := client.Post("http://"+consumerListenAddress, "application/json", bytes.NewBuffer([]byte(body))) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err = json.Unmarshal(bodyBytes, &jsonRpcMessage) + require.NoError(t, err) + + respId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + require.Equal(t, testJsonRpcId, respId) + resp.Body.Close() + + return resp.Header + } + + providerAddr := provider.account.Addr.String() + // Get latest for sanity check + headers := sendMessage("status", []string{}) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + // Get block, this should be cached for next time + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + for i := 0; i < 5; i++ { + // Get block again, this time it should be from cache + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, "Cached", headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + } + + // Verify node was called to only twice + require.Equal(t, 2, nodeRequestsCounter) +}