diff --git a/ecosystem/cache/server.go b/ecosystem/cache/server.go index 9f3f8df1dc..fd867b303c 100644 --- a/ecosystem/cache/server.go +++ b/ecosystem/cache/server.go @@ -107,7 +107,7 @@ func (cs *CacheServer) Serve(ctx context.Context, if strings.HasPrefix(listenAddr, unixPrefix) { // Unix socket host, port, err := net.SplitHostPort(listenAddr) if err != nil { - utils.LavaFormatFatal("Failed to parse unix socket, provide address in this format unix:/tmp/example.sock: %v\n", err) + utils.LavaFormatFatal("Failed to parse unix socket, provide address in this format unix:/tmp/example.sock", err) return } @@ -115,26 +115,26 @@ func (cs *CacheServer) Serve(ctx context.Context, addr, err := net.ResolveUnixAddr(host, port) if err != nil { - utils.LavaFormatFatal("Failed to resolve unix socket address: %v\n", err) + utils.LavaFormatFatal("Failed to resolve unix socket address", err) return } lis, err = net.ListenUnix(host, addr) if err != nil { - utils.LavaFormatFatal("Faild to listen to unix socket listener: %v\n", err) + utils.LavaFormatFatal("Failed to listen to unix socket listener", err) return } // Set permissions for the Unix socket err = os.Chmod(port, 0o600) if err != nil { - utils.LavaFormatFatal("Failed to set permissions for Unix socket: %v\n", err) + utils.LavaFormatFatal("Failed to set permissions for Unix socket", err) return } } else { lis, err = net.Listen("tcp", listenAddr) if err != nil { - utils.LavaFormatFatal("Cache server failure setting up TCP listener: %v\n", err) + utils.LavaFormatFatal("Cache server failure setting up TCP listener", err) return } } diff --git a/go.mod b/go.mod index 67e086b80a..82fc25f95b 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/fullstorydev/grpcurl v1.8.5 github.com/goccy/go-json v0.10.2 github.com/gogo/status v1.1.0 + github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.4 github.com/itchyny/gojq v0.12.16 github.com/jhump/protoreflect v1.15.1 @@ -85,7 +86,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/googleapis v1.4.1 // indirect github.com/golang/glog v1.2.0 // indirect - github.com/golang/mock v1.6.0 // indirect github.com/google/flatbuffers v1.12.1 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/s2a-go v0.1.7 // indirect diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 048bafedda..33a69599f1 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -10,11 +10,13 @@ import ( "net/url" "os" "strconv" + "strings" "sync" "testing" "time" "github.com/gorilla/websocket" + "github.com/lavanet/lava/v3/ecosystem/cache" "github.com/lavanet/lava/v3/protocol/chainlib" "github.com/lavanet/lava/v3/protocol/chainlib/chainproxy/rpcInterfaceMessages" "github.com/lavanet/lava/v3/protocol/chaintracker" @@ -22,6 +24,7 @@ import ( "github.com/lavanet/lava/v3/protocol/lavaprotocol/finalizationconsensus" "github.com/lavanet/lava/v3/protocol/lavasession" "github.com/lavanet/lava/v3/protocol/metrics" + "github.com/lavanet/lava/v3/protocol/performance" "github.com/lavanet/lava/v3/protocol/provideroptimizer" "github.com/lavanet/lava/v3/protocol/rpcconsumer" "github.com/lavanet/lava/v3/protocol/rpcprovider" @@ -162,21 +165,33 @@ func createInMemoryRewardDb(specs []string) (*rewardserver.RewardDB, error) { return rewardDB, nil } -func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInterface string, account sigs.Account, consumerListenAddress string, epoch uint64, pairingList map[uint64]*lavasession.ConsumerSessionsWithProvider, requiredResponses int, lavaChainID string) (*rpcconsumer.RPCConsumerServer, *mockConsumerStateTracker) { +type rpcConsumerOptions struct { + specId string + apiInterface string + account sigs.Account + consumerListenAddress string + epoch uint64 + pairingList map[uint64]*lavasession.ConsumerSessionsWithProvider + requiredResponses int + lavaChainID string + cacheListenAddress string +} + +func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpcConsumerOptions) (*rpcconsumer.RPCConsumerServer, *mockConsumerStateTracker) { serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Handle the incoming request and provide the desired response w.WriteHeader(http.StatusOK) }) - chainParser, _, chainFetcher, _, _, err := chainlib.CreateChainLibMocks(ctx, specId, apiInterface, serverHandler, nil, "../../", nil) + chainParser, _, chainFetcher, _, _, err := chainlib.CreateChainLibMocks(ctx, rpcConsumerOptions.specId, rpcConsumerOptions.apiInterface, serverHandler, nil, "../../", nil) require.NoError(t, err) require.NotNil(t, chainParser) require.NotNil(t, chainFetcher) rpcConsumerServer := &rpcconsumer.RPCConsumerServer{} rpcEndpoint := &lavasession.RPCEndpoint{ - NetworkAddress: consumerListenAddress, - ChainID: specId, - ApiInterface: apiInterface, + NetworkAddress: rpcConsumerOptions.consumerListenAddress, + ChainID: rpcConsumerOptions.specId, + ApiInterface: rpcConsumerOptions.apiInterface, TLSEnabled: false, HealthCheckPath: "", Geolocation: 1, @@ -187,13 +202,21 @@ func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInte baseLatency := common.AverageWorldLatency / 2 optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2) consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage()) - consumerSessionManager.UpdateAllProviders(epoch, pairingList) + consumerSessionManager.UpdateAllProviders(rpcConsumerOptions.epoch, rpcConsumerOptions.pairingList) + + var cache *performance.Cache = nil + if rpcConsumerOptions.cacheListenAddress != "" { + cache, err = performance.InitCache(ctx, rpcConsumerOptions.cacheListenAddress) + if err != nil { + t.Fatalf("Failed To Connect to cache at address %s: %v", rpcConsumerOptions.cacheListenAddress, err) + } + } - consumerConsistency := rpcconsumer.NewConsumerConsistency(specId) + consumerConsistency := rpcconsumer.NewConsumerConsistency(rpcConsumerOptions.specId) consumerCmdFlags := common.ConsumerCmdFlags{} rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil) require.NoError(t, err) - err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, requiredResponses, account.SK, lavaChainID, nil, rpcconsumerLogs, account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil) + err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, rpcConsumerOptions.requiredResponses, rpcConsumerOptions.account.SK, rpcConsumerOptions.lavaChainID, cache, rpcconsumerLogs, rpcConsumerOptions.account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil) require.NoError(t, err) // wait for consumer to finish initialization @@ -214,7 +237,19 @@ func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInte return rpcConsumerServer, consumerStateTracker } -func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string, specId string, apiInterface string, listenAddress string, account sigs.Account, lavaChainID string, addons []string, providerUniqueId string) (*rpcprovider.RPCProviderServer, *lavasession.RPCProviderEndpoint, *ReplySetter, *MockChainFetcher, *MockReliabilityManager) { +type rpcProviderOptions struct { + consumerAddress string + specId string + apiInterface string + listenAddress string + account sigs.Account + lavaChainID string + addons []string + providerUniqueId string + cacheListenAddress string +} + +func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpcProviderOptions) (*rpcprovider.RPCProviderServer, *lavasession.RPCProviderEndpoint, *ReplySetter, *MockChainFetcher, *MockReliabilityManager) { replySetter := ReplySetter{ status: http.StatusOK, replyDataBuf: []byte(`{"reply": "REPLY-STUB"}`), @@ -234,16 +269,16 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string fmt.Fprint(w, string(data)) }) - chainParser, chainRouter, chainFetcher, _, endpoint, err := chainlib.CreateChainLibMocks(ctx, specId, apiInterface, serverHandler, nil, "../../", addons) + chainParser, chainRouter, chainFetcher, _, endpoint, err := chainlib.CreateChainLibMocks(ctx, rpcProviderOptions.specId, rpcProviderOptions.apiInterface, serverHandler, nil, "../../", rpcProviderOptions.addons) require.NoError(t, err) require.NotNil(t, chainParser) require.NotNil(t, chainFetcher) require.NotNil(t, chainRouter) - endpoint.NetworkAddress.Address = listenAddress + endpoint.NetworkAddress.Address = rpcProviderOptions.listenAddress rpcProviderServer := &rpcprovider.RPCProviderServer{} - if providerUniqueId != "" { - rpcProviderServer.SetProviderUniqueId(providerUniqueId) + if rpcProviderOptions.providerUniqueId != "" { + rpcProviderServer.SetProviderUniqueId(rpcProviderOptions.providerUniqueId) } rpcProviderEndpoint := &lavasession.RPCProviderEndpoint{ @@ -253,8 +288,8 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string CertPem: "", DisableTLS: false, }, - ChainID: specId, - ApiInterface: apiInterface, + ChainID: rpcProviderOptions.specId, + ApiInterface: rpcProviderOptions.apiInterface, Geolocation: 1, NodeUrls: []common.NodeUrl{ { @@ -263,22 +298,22 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string AuthConfig: common.AuthConfig{}, IpForwarding: false, Timeout: 0, - Addons: addons, + Addons: rpcProviderOptions.addons, SkipVerifications: []string{}, }, }, } - rewardDB, err := createInMemoryRewardDb([]string{specId}) + rewardDB, err := createInMemoryRewardDb([]string{rpcProviderOptions.specId}) require.NoError(t, err) _, averageBlockTime, blocksToFinalization, blocksInFinalizationData := chainParser.ChainBlockStats() - mockProviderStateTracker := mockProviderStateTracker{consumerAddressForPairing: consumerAddress, averageBlockTime: averageBlockTime} + mockProviderStateTracker := mockProviderStateTracker{consumerAddressForPairing: rpcProviderOptions.consumerAddress, averageBlockTime: averageBlockTime} rws := rewardserver.NewRewardServer(&mockProviderStateTracker, nil, rewardDB, "badger_test", 1, 10, nil) blockMemorySize, err := mockProviderStateTracker.GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx) require.NoError(t, err) providerSessionManager := lavasession.NewProviderSessionManager(rpcProviderEndpoint, blockMemorySize) providerPolicy := rpcprovider.GetAllAddonsAndExtensionsFromNodeUrlSlice(rpcProviderEndpoint.NodeUrls) - chainParser.SetPolicy(providerPolicy, specId, apiInterface) + chainParser.SetPolicy(providerPolicy, rpcProviderOptions.specId, rpcProviderOptions.apiInterface) blocksToSaveChainTracker := uint64(blocksToFinalization + blocksInFinalizationData) chainTrackerConfig := chaintracker.ChainTrackerConfig{ @@ -290,13 +325,21 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string Pmetrics: nil, } + var cache *performance.Cache = nil + if rpcProviderOptions.cacheListenAddress != "" { + cache, err = performance.InitCache(ctx, rpcProviderOptions.cacheListenAddress) + if err != nil { + t.Fatalf("Failed To Connect to cache at address %s: %v", rpcProviderOptions.cacheListenAddress, err) + } + } + mockChainFetcher := NewMockChainFetcher(1000, int64(blocksToSaveChainTracker), nil) chainTracker, err := chaintracker.NewChainTracker(ctx, mockChainFetcher, chainTrackerConfig) require.NoError(t, err) chainTracker.StartAndServe(ctx) - reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, account.Addr.String(), chainRouter, chainParser) + reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser) mockReliabilityManager := NewMockReliabilityManager(reliabilityManager) - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, account.SK, nil, chainRouter, &mockProviderStateTracker, account.Addr, lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false) listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health") err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) require.NoError(t, err) @@ -307,13 +350,22 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string return rpcProviderServer, endpoint, &replySetter, mockChainFetcher, mockReliabilityManager } +func createCacheServer(t *testing.T, ctx context.Context, listenAddress string) { + go func() { + cs := cache.CacheServer{CacheMaxCost: 2 * 1024 * 1024 * 1024} // taken from max-items default value + cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DefaultExpirationNodeErrors, cache.DefaultExpirationBlocksHashesToHeights, "disabled", cache.DefaultExpirationTimeFinalizedMultiplier, cache.DefaultExpirationTimeNonFinalizedMultiplier) + cs.Serve(ctx, listenAddress) + }() + cacheServerUp := checkServerStatusWithTimeout("http://"+listenAddress, time.Second*7) + require.True(t, cacheServerUp) +} + func TestConsumerProviderBasic(t *testing.T) { ctx := context.Background() // can be any spec and api interface specId := "LAV1" - apiInterface := spectypes.APIInterfaceTendermintRPC + apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 1 @@ -339,7 +391,17 @@ func TestConsumerProviderBasic(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) } for i := 0; i < numProviders; i++ { pairingList[uint64(i)] = &lavasession.ConsumerSessionsWithProvider{ @@ -357,10 +419,21 @@ func TestConsumerProviderBasic(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) client := http.Client{} - resp, err := client.Get("http://" + consumerListenAddress + "/status") + resp, err := client.Get("http://" + consumerListenAddress + "/cosmos/base/tendermint/v1beta1/blocks/latest") require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) bodyBytes, err := io.ReadAll(resp.Body) @@ -390,7 +463,6 @@ func TestConsumerProviderWithProviders(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceTendermintRPC epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 5 @@ -415,7 +487,17 @@ func TestConsumerProviderWithProviders(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"reply": %d}`, i+1)) } for i := 0; i < numProviders; i++ { @@ -434,7 +516,18 @@ func TestConsumerProviderWithProviders(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) if play.scenario != 1 { counter := map[int]int{} @@ -524,7 +617,6 @@ func TestConsumerProviderTx(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 5 @@ -550,7 +642,17 @@ func TestConsumerProviderTx(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } for i := 0; i < numProviders; i++ { @@ -569,7 +671,18 @@ func TestConsumerProviderTx(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) for i := 0; i < numProviders; i++ { @@ -631,7 +744,6 @@ func TestConsumerProviderJsonRpcWithNullID(t *testing.T) { specId := play.specId apiInterface := play.apiInterface epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 5 @@ -656,7 +768,17 @@ func TestConsumerProviderJsonRpcWithNullID(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } for i := 0; i < numProviders; i++ { @@ -675,7 +797,18 @@ func TestConsumerProviderJsonRpcWithNullID(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) for i := 0; i < numProviders; i++ { @@ -741,7 +874,6 @@ func TestConsumerProviderSubscriptionsHappyFlow(t *testing.T) { specId := play.specId apiInterface := play.apiInterface epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 5 @@ -766,7 +898,17 @@ func TestConsumerProviderSubscriptionsHappyFlow(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } for i := 0; i < numProviders; i++ { @@ -785,7 +927,18 @@ func TestConsumerProviderSubscriptionsHappyFlow(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) for i := 0; i < numProviders; i++ { @@ -855,7 +1008,6 @@ func TestSameProviderConflictBasicResponseCheck(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := play.numOfProviders @@ -883,7 +1035,17 @@ func TestSameProviderConflictBasicResponseCheck(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } @@ -903,7 +1065,18 @@ func TestSameProviderConflictBasicResponseCheck(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) // Set first provider as a "liar", to return wrong block hashes @@ -991,7 +1164,6 @@ func TestArchiveProvidersRetry(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := play.numOfProviders @@ -1023,7 +1195,18 @@ func TestArchiveProvidersRetry(t *testing.T) { if i+1 <= play.archiveProviders { addons = []string{"archive"} } - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, addons, fmt.Sprintf("provider%d", i)) + + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: addons, + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(`{"result": "success"}`) if i+1 <= play.nodeErrorProviders { providers[i].replySetter.replyDataBuf = []byte(`{"error": "failure", "message": "test", "code": "-32132"}`) @@ -1047,7 +1230,18 @@ func TestArchiveProvidersRetry(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) client := http.Client{Timeout: 1000 * time.Millisecond} @@ -1095,7 +1289,17 @@ func TestSameProviderConflictReport(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } } @@ -1129,7 +1333,6 @@ func TestSameProviderConflictReport(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 1 @@ -1140,8 +1343,17 @@ func TestSameProviderConflictReport(t *testing.T) { initProvidersData(consumerAccount, providers, specId, apiInterface, lavaChainID) - pairingList := initPairingList(providers, epoch) - rpcconsumerServer, mockConsumerStateTracker := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: initPairingList(providers, epoch), + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, mockConsumerStateTracker := createRpcConsumer(t, ctx, rpcConsumerOptions) conflictSent := false wg := sync.WaitGroup{} @@ -1202,7 +1414,6 @@ func TestSameProviderConflictReport(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 2 @@ -1212,8 +1423,17 @@ func TestSameProviderConflictReport(t *testing.T) { initProvidersData(consumerAccount, providers, specId, apiInterface, lavaChainID) - pairingList := initPairingList(providers, epoch) - rpcconsumerServer, mockConsumerStateTracker := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: initPairingList(providers, epoch), + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, mockConsumerStateTracker := createRpcConsumer(t, ctx, rpcConsumerOptions) twoProvidersConflictSent := false sameProviderConflictSent := false @@ -1285,7 +1505,6 @@ func TestConsumerProviderStatic(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceTendermintRPC epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 1 @@ -1311,7 +1530,17 @@ func TestConsumerProviderStatic(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) } // provider is static for i := 0; i < numProviders; i++ { @@ -1331,7 +1560,18 @@ func TestConsumerProviderStatic(t *testing.T) { StaticProvider: true, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) client := http.Client{} // consumer sends the relay to a provider with an address BANANA+%d so the provider needs to skip validations for this to work @@ -1351,3 +1591,359 @@ func TestConsumerProviderStatic(t *testing.T) { require.Equal(t, providers[0].replySetter.replyDataBuf, bodyBytes) resp.Body.Close() } + +func jsonRpcIdToInt(t *testing.T, rawID json.RawMessage) int { + var idInterface interface{} + err := json.Unmarshal(rawID, &idInterface) + require.NoError(t, err) + + id, ok := idInterface.(float64) + require.True(t, ok, idInterface) + return int(id) +} + +func TestConsumerProviderWithConsumerSideCache(t *testing.T) { + ctx := context.Background() + // can be any spec and api interface + specId := "LAV1" + apiInterface := spectypes.APIInterfaceTendermintRPC + epoch := uint64(100) + lavaChainID := "lava" + + consumerListenAddress := addressGen.GetAddress() + cacheListenAddress := addressGen.GetAddress() + pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} + type providerData struct { + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + replySetter *ReplySetter + } + + consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + providerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + provider := providerData{account: providerAccount} + listenAddress := addressGen.GetAddress() + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: provider.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: "provider", + } + _, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, rpcProviderOptions) + provider.replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err := json.Unmarshal(req, &jsonRpcMessage) + require.NoError(t, err, req) + + response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID)) + return []byte(response), http.StatusOK + } + + pairingList[0] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: provider.account.Addr.String(), + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: provider.endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, + }, + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, + } + + createCacheServer(t, ctx, cacheListenAddress) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + cacheListenAddress: cacheListenAddress, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) + require.NotNil(t, rpcconsumerServer) + + client := http.Client{} + id := 0 + sendMessage := func(method string, params []string) http.Header { + // Get latest block + body := fmt.Sprintf(`{"jsonrpc":"2.0","method":"%v","params": [%v], "id":%v}`, method, strings.Join(params, ","), id) + resp, err := client.Post("http://"+consumerListenAddress, "application/json", bytes.NewBuffer([]byte(body))) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err = json.Unmarshal(bodyBytes, &jsonRpcMessage) + require.NoError(t, err) + + respId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + require.Equal(t, id, respId) + resp.Body.Close() + id++ + + return resp.Header + } + + // Get latest for sanity check + providerAddr := provider.account.Addr.String() + headers := sendMessage("status", []string{}) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + // Get block, this should be cached for next time + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + // Get block again, this time it should be from cache + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, "Cached", headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) +} + +func TestConsumerProviderWithProviderSideCache(t *testing.T) { + ctx := context.Background() + // can be any spec and api interface + specId := "LAV1" + apiInterface := spectypes.APIInterfaceTendermintRPC + epoch := uint64(100) + lavaChainID := "lava" + + consumerListenAddress := addressGen.GetAddress() + cacheListenAddress := addressGen.GetAddress() + pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} + type providerData struct { + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + replySetter *ReplySetter + } + + consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + providerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + provider := providerData{account: providerAccount} + providerListenAddress := addressGen.GetAddress() + + createCacheServer(t, ctx, cacheListenAddress) + testJsonRpcId := 42 + nodeRequestsCounter := 0 + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: providerListenAddress, + account: provider.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: "provider", + cacheListenAddress: cacheListenAddress, + } + _, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, rpcProviderOptions) + provider.replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err := json.Unmarshal(req, &jsonRpcMessage) + require.NoError(t, err, req) + + reqId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + if reqId == testJsonRpcId { + nodeRequestsCounter++ + } + + response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID)) + return []byte(response), http.StatusOK + } + + pairingList[0] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: provider.account.Addr.String(), + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: provider.endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, + }, + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, + } + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) + require.NotNil(t, rpcconsumerServer) + + client := http.Client{} + sendMessage := func(method string, params []string) http.Header { + body := fmt.Sprintf(`{"jsonrpc":"2.0","method":"%v","params": [%v], "id":%v}`, method, strings.Join(params, ","), testJsonRpcId) + resp, err := client.Post("http://"+consumerListenAddress, "application/json", bytes.NewBuffer([]byte(body))) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err = json.Unmarshal(bodyBytes, &jsonRpcMessage) + require.NoError(t, err) + + respId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + require.Equal(t, testJsonRpcId, respId) + resp.Body.Close() + + return resp.Header + } + + // Get latest for sanity check + sendMessage("status", []string{}) + + for i := 0; i < 5; i++ { + // Get block + sendMessage("block", []string{"1000"}) + } + + // Verify node was called to only twice + require.Equal(t, 2, nodeRequestsCounter) +} + +func TestConsumerProviderWithConsumerAndProviderSideCache(t *testing.T) { + ctx := context.Background() + // can be any spec and api interface + specId := "LAV1" + apiInterface := spectypes.APIInterfaceTendermintRPC + epoch := uint64(100) + lavaChainID := "lava" + + consumerListenAddress := addressGen.GetAddress() + consumerCacheListenAddress := addressGen.GetAddress() + providerCacheListenAddress := addressGen.GetAddress() + + createCacheServer(t, ctx, consumerCacheListenAddress) + createCacheServer(t, ctx, providerCacheListenAddress) + + pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} + type providerData struct { + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + replySetter *ReplySetter + } + + consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + providerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + provider := providerData{account: providerAccount} + providerListenAddress := addressGen.GetAddress() + + testJsonRpcId := 42 + nodeRequestsCounter := 0 + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: providerListenAddress, + account: provider.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: "provider", + cacheListenAddress: providerCacheListenAddress, + } + _, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, rpcProviderOptions) + provider.replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err := json.Unmarshal(req, &jsonRpcMessage) + require.NoError(t, err, req) + + reqId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + if reqId == testJsonRpcId { + nodeRequestsCounter++ + } + + response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID)) + return []byte(response), http.StatusOK + } + + pairingList[0] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: provider.account.Addr.String(), + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: provider.endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, + }, + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, + } + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + cacheListenAddress: consumerCacheListenAddress, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) + require.NotNil(t, rpcconsumerServer) + + client := http.Client{} + sendMessage := func(method string, params []string) http.Header { + body := fmt.Sprintf(`{"jsonrpc":"2.0","method":"%v","params": [%v], "id":%v}`, method, strings.Join(params, ","), testJsonRpcId) + resp, err := client.Post("http://"+consumerListenAddress, "application/json", bytes.NewBuffer([]byte(body))) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err = json.Unmarshal(bodyBytes, &jsonRpcMessage) + require.NoError(t, err) + + respId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + require.Equal(t, testJsonRpcId, respId) + resp.Body.Close() + + return resp.Header + } + + providerAddr := provider.account.Addr.String() + // Get latest for sanity check + headers := sendMessage("status", []string{}) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + // Get block, this should be cached for next time + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + for i := 0; i < 5; i++ { + // Get block again, this time it should be from cache + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, "Cached", headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + } + + // Verify node was called to only twice + require.Equal(t, 2, nodeRequestsCounter) +}