From 5b9eb18e20aaf28e7e6681fda6cb216070b9da1e Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Thu, 19 Sep 2024 14:54:48 +0300 Subject: [PATCH 01/14] Add golang/mock as a direct requirement in go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 67e086b80a..82fc25f95b 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/fullstorydev/grpcurl v1.8.5 github.com/goccy/go-json v0.10.2 github.com/gogo/status v1.1.0 + github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.4 github.com/itchyny/gojq v0.12.16 github.com/jhump/protoreflect v1.15.1 @@ -85,7 +86,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/googleapis v1.4.1 // indirect github.com/golang/glog v1.2.0 // indirect - github.com/golang/mock v1.6.0 // indirect github.com/google/flatbuffers v1.12.1 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/s2a-go v0.1.7 // indirect From 76d3239cc5ce0a3c8cae57079900e0947aba5d1a Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Thu, 19 Sep 2024 17:31:42 +0300 Subject: [PATCH 02/14] Move all createRpcConsumer function arguments to a struct --- protocol/integration/protocol_test.go | 167 +++++++++++++++++++++----- 1 file changed, 137 insertions(+), 30 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 048bafedda..6ee4840af2 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -162,21 +162,32 @@ func createInMemoryRewardDb(specs []string) (*rewardserver.RewardDB, error) { return rewardDB, nil } -func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInterface string, account sigs.Account, consumerListenAddress string, epoch uint64, pairingList map[uint64]*lavasession.ConsumerSessionsWithProvider, requiredResponses int, lavaChainID string) (*rpcconsumer.RPCConsumerServer, *mockConsumerStateTracker) { +type rpcConsumerOptions struct { + specId string + apiInterface string + account sigs.Account + consumerListenAddress string + epoch uint64 + pairingList map[uint64]*lavasession.ConsumerSessionsWithProvider + requiredResponses int + lavaChainID string +} + +func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpcConsumerOptions) (*rpcconsumer.RPCConsumerServer, *mockConsumerStateTracker) { serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Handle the incoming request and provide the desired response w.WriteHeader(http.StatusOK) }) - chainParser, _, chainFetcher, _, _, err := chainlib.CreateChainLibMocks(ctx, specId, apiInterface, serverHandler, nil, "../../", nil) + chainParser, _, chainFetcher, _, _, err := chainlib.CreateChainLibMocks(ctx, rpcConsumerOptions.specId, rpcConsumerOptions.apiInterface, serverHandler, nil, "../../", nil) require.NoError(t, err) require.NotNil(t, chainParser) require.NotNil(t, chainFetcher) rpcConsumerServer := &rpcconsumer.RPCConsumerServer{} rpcEndpoint := &lavasession.RPCEndpoint{ - NetworkAddress: consumerListenAddress, - ChainID: specId, - ApiInterface: apiInterface, + NetworkAddress: rpcConsumerOptions.consumerListenAddress, + ChainID: rpcConsumerOptions.specId, + ApiInterface: rpcConsumerOptions.apiInterface, TLSEnabled: false, HealthCheckPath: "", Geolocation: 1, @@ -187,13 +198,13 @@ func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInte baseLatency := common.AverageWorldLatency / 2 optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2) consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage()) - consumerSessionManager.UpdateAllProviders(epoch, pairingList) + consumerSessionManager.UpdateAllProviders(rpcConsumerOptions.epoch, rpcConsumerOptions.pairingList) - consumerConsistency := rpcconsumer.NewConsumerConsistency(specId) + consumerConsistency := rpcconsumer.NewConsumerConsistency(rpcConsumerOptions.specId) consumerCmdFlags := common.ConsumerCmdFlags{} rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil) require.NoError(t, err) - err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, requiredResponses, account.SK, lavaChainID, nil, rpcconsumerLogs, account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil) + err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, rpcConsumerOptions.requiredResponses, rpcConsumerOptions.account.SK, rpcConsumerOptions.lavaChainID, nil, rpcconsumerLogs, rpcConsumerOptions.account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil) require.NoError(t, err) // wait for consumer to finish initialization @@ -313,7 +324,6 @@ func TestConsumerProviderBasic(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceTendermintRPC epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 1 @@ -357,7 +367,18 @@ func TestConsumerProviderBasic(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) client := http.Client{} resp, err := client.Get("http://" + consumerListenAddress + "/status") @@ -390,7 +411,6 @@ func TestConsumerProviderWithProviders(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceTendermintRPC epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 5 @@ -434,7 +454,18 @@ func TestConsumerProviderWithProviders(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) if play.scenario != 1 { counter := map[int]int{} @@ -524,7 +555,6 @@ func TestConsumerProviderTx(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 5 @@ -569,7 +599,18 @@ func TestConsumerProviderTx(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) for i := 0; i < numProviders; i++ { @@ -631,7 +672,6 @@ func TestConsumerProviderJsonRpcWithNullID(t *testing.T) { specId := play.specId apiInterface := play.apiInterface epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 5 @@ -675,7 +715,18 @@ func TestConsumerProviderJsonRpcWithNullID(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) for i := 0; i < numProviders; i++ { @@ -741,7 +792,6 @@ func TestConsumerProviderSubscriptionsHappyFlow(t *testing.T) { specId := play.specId apiInterface := play.apiInterface epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 5 @@ -785,7 +835,18 @@ func TestConsumerProviderSubscriptionsHappyFlow(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) for i := 0; i < numProviders; i++ { @@ -855,7 +916,6 @@ func TestSameProviderConflictBasicResponseCheck(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := play.numOfProviders @@ -903,7 +963,18 @@ func TestSameProviderConflictBasicResponseCheck(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) // Set first provider as a "liar", to return wrong block hashes @@ -991,7 +1062,6 @@ func TestArchiveProvidersRetry(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := play.numOfProviders @@ -1047,7 +1117,18 @@ func TestArchiveProvidersRetry(t *testing.T) { PairingEpoch: epoch, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) client := http.Client{Timeout: 1000 * time.Millisecond} @@ -1129,7 +1210,6 @@ func TestSameProviderConflictReport(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 1 @@ -1140,8 +1220,17 @@ func TestSameProviderConflictReport(t *testing.T) { initProvidersData(consumerAccount, providers, specId, apiInterface, lavaChainID) - pairingList := initPairingList(providers, epoch) - rpcconsumerServer, mockConsumerStateTracker := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: initPairingList(providers, epoch), + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, mockConsumerStateTracker := createRpcConsumer(t, ctx, rpcConsumerOptions) conflictSent := false wg := sync.WaitGroup{} @@ -1202,7 +1291,6 @@ func TestSameProviderConflictReport(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 2 @@ -1212,8 +1300,17 @@ func TestSameProviderConflictReport(t *testing.T) { initProvidersData(consumerAccount, providers, specId, apiInterface, lavaChainID) - pairingList := initPairingList(providers, epoch) - rpcconsumerServer, mockConsumerStateTracker := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: initPairingList(providers, epoch), + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, mockConsumerStateTracker := createRpcConsumer(t, ctx, rpcConsumerOptions) twoProvidersConflictSent := false sameProviderConflictSent := false @@ -1285,7 +1382,6 @@ func TestConsumerProviderStatic(t *testing.T) { specId := "LAV1" apiInterface := spectypes.APIInterfaceTendermintRPC epoch := uint64(100) - requiredResponses := 1 lavaChainID := "lava" numProviders := 1 @@ -1331,7 +1427,18 @@ func TestConsumerProviderStatic(t *testing.T) { StaticProvider: true, } } - rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) client := http.Client{} // consumer sends the relay to a provider with an address BANANA+%d so the provider needs to skip validations for this to work From 08db01a4a5344b44c69cd35bee4f6a9273fee203 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Thu, 19 Sep 2024 17:32:19 +0300 Subject: [PATCH 03/14] Small fix to one of the integration tests --- protocol/integration/protocol_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 6ee4840af2..2e411fe11e 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -322,7 +322,7 @@ func TestConsumerProviderBasic(t *testing.T) { ctx := context.Background() // can be any spec and api interface specId := "LAV1" - apiInterface := spectypes.APIInterfaceTendermintRPC + apiInterface := spectypes.APIInterfaceRest epoch := uint64(100) lavaChainID := "lava" From d293c610e5b55a164406f55393700f25437bb7df Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Thu, 19 Sep 2024 17:33:05 +0300 Subject: [PATCH 04/14] Add option to add cache for consumer in integration tests --- protocol/integration/protocol_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 2e411fe11e..fa4d7efa16 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -22,6 +22,7 @@ import ( "github.com/lavanet/lava/v3/protocol/lavaprotocol/finalizationconsensus" "github.com/lavanet/lava/v3/protocol/lavasession" "github.com/lavanet/lava/v3/protocol/metrics" + "github.com/lavanet/lava/v3/protocol/performance" "github.com/lavanet/lava/v3/protocol/provideroptimizer" "github.com/lavanet/lava/v3/protocol/rpcconsumer" "github.com/lavanet/lava/v3/protocol/rpcprovider" @@ -171,6 +172,7 @@ type rpcConsumerOptions struct { pairingList map[uint64]*lavasession.ConsumerSessionsWithProvider requiredResponses int lavaChainID string + cacheListenAddress string } func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpcConsumerOptions) (*rpcconsumer.RPCConsumerServer, *mockConsumerStateTracker) { @@ -204,7 +206,14 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc consumerCmdFlags := common.ConsumerCmdFlags{} rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil) require.NoError(t, err) - err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, rpcConsumerOptions.requiredResponses, rpcConsumerOptions.account.SK, rpcConsumerOptions.lavaChainID, nil, rpcconsumerLogs, rpcConsumerOptions.account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil) + var cache *performance.Cache = nil + if rpcConsumerOptions.cacheListenAddress != "" { + cache, err = performance.InitCache(ctx, rpcConsumerOptions.cacheListenAddress) + if err != nil { + t.Fatalf("Failed To Connect to cache at address %s: %v", rpcConsumerOptions.cacheListenAddress, err) + } + } + err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, rpcConsumerOptions.requiredResponses, rpcConsumerOptions.account.SK, rpcConsumerOptions.lavaChainID, cache, rpcconsumerLogs, rpcConsumerOptions.account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil) require.NoError(t, err) // wait for consumer to finish initialization From 39bcd27b81e040abfa36d6545ea71eb7de3db913 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Thu, 19 Sep 2024 17:33:53 +0300 Subject: [PATCH 05/14] Add consumer with cache integration test --- protocol/integration/protocol_test.go | 132 ++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index fa4d7efa16..9be6f70454 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -10,11 +10,13 @@ import ( "net/url" "os" "strconv" + "strings" "sync" "testing" "time" "github.com/gorilla/websocket" + "github.com/lavanet/lava/v3/ecosystem/cache" "github.com/lavanet/lava/v3/protocol/chainlib" "github.com/lavanet/lava/v3/protocol/chainlib/chainproxy/rpcInterfaceMessages" "github.com/lavanet/lava/v3/protocol/chaintracker" @@ -327,6 +329,16 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string return rpcProviderServer, endpoint, &replySetter, mockChainFetcher, mockReliabilityManager } +func createCacheServer(t *testing.T, ctx context.Context, listenAddress string) { + go func() { + cs := cache.CacheServer{CacheMaxCost: 2 * 1024 * 1024 * 1024} // taken from max-items default value + cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DefaultExpirationNodeErrors, cache.DefaultExpirationBlocksHashesToHeights, "", cache.DefaultExpirationTimeFinalizedMultiplier, cache.DefaultExpirationTimeNonFinalizedMultiplier) + cs.Serve(ctx, listenAddress) + }() + consumerUp := checkServerStatusWithTimeout("http://"+listenAddress, time.Millisecond*61) + require.True(t, consumerUp) +} + func TestConsumerProviderBasic(t *testing.T) { ctx := context.Background() // can be any spec and api interface @@ -1467,3 +1479,123 @@ func TestConsumerProviderStatic(t *testing.T) { require.Equal(t, providers[0].replySetter.replyDataBuf, bodyBytes) resp.Body.Close() } + +func TestConsumerProviderWithConsumerSideCache(t *testing.T) { + ctx := context.Background() + // can be any spec and api interface + specId := "LAV1" + apiInterface := spectypes.APIInterfaceTendermintRPC + epoch := uint64(100) + lavaChainID := "lava" + + numProviders := 1 + + consumerListenAddress := addressGen.GetAddress() + cacheListenAddress := addressGen.GetAddress() + pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} + type providerData struct { + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + server *rpcprovider.RPCProviderServer + replySetter *ReplySetter + mockChainFetcher *MockChainFetcher + } + providers := []providerData{} + + for i := 0; i < numProviders; i++ { + account := sigs.GenerateDeterministicFloatingKey(randomizer) + providerDataI := providerData{account: account} + providers = append(providers, providerDataI) + } + + consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + for i := 0; i < numProviders; i++ { + ctx := context.Background() + providerDataI := providers[i] + listenAddress := addressGen.GetAddress() + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + providers[i].replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err := json.Unmarshal(req, &jsonRpcMessage) + require.NoError(t, err, req) + + response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID)) + return []byte(response), http.StatusOK + } + } + + for i := 0; i < numProviders; i++ { + pairingList[uint64(i)] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: providers[i].account.Addr.String(), + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: providers[i].endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, + }, + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, + } + } + + createCacheServer(t, ctx, cacheListenAddress) + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + cacheListenAddress: cacheListenAddress, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) + require.NotNil(t, rpcconsumerServer) + + client := http.Client{} + id := 0 + sendMessage := func(method string, params []string) http.Header { + // Get latest block + body := fmt.Sprintf(`{"jsonrpc":"2.0","method":"%v","params": [%v], "id":%v}`, method, strings.Join(params, ","), id) + resp, err := client.Post("http://"+consumerListenAddress, "application/json", bytes.NewBuffer([]byte(body))) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err = json.Unmarshal(bodyBytes, &jsonRpcMessage) + require.NoError(t, err) + + respId, idErr := rpcInterfaceMessages.IdFromRawMessage(jsonRpcMessage.ID) + if idErr != nil { + require.NoError(t, idErr) + } + + require.Equal(t, rpcInterfaceMessages.JSONRPCIntID(id), respId) + resp.Body.Close() + id++ + + return resp.Header + } + + // Get latest for sanity check + providerAddr := providers[0].account.Addr.String() + headers := sendMessage("status", []string{}) + fmt.Println(headers) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + // Get block, this should be cached for next time + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + // Get block again, this time it should be from cache + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, "Cached", headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) +} From db40e509829e69fa838e5049c2dca0faee4e635f Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Fri, 20 Sep 2024 16:09:24 +0300 Subject: [PATCH 06/14] Small code clean for the test --- protocol/integration/protocol_test.go | 68 +++++++++++---------------- 1 file changed, 27 insertions(+), 41 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 9be6f70454..97b7cdc505 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -1488,57 +1488,43 @@ func TestConsumerProviderWithConsumerSideCache(t *testing.T) { epoch := uint64(100) lavaChainID := "lava" - numProviders := 1 - consumerListenAddress := addressGen.GetAddress() cacheListenAddress := addressGen.GetAddress() pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} type providerData struct { - account sigs.Account - endpoint *lavasession.RPCProviderEndpoint - server *rpcprovider.RPCProviderServer - replySetter *ReplySetter - mockChainFetcher *MockChainFetcher - } - providers := []providerData{} - - for i := 0; i < numProviders; i++ { - account := sigs.GenerateDeterministicFloatingKey(randomizer) - providerDataI := providerData{account: account} - providers = append(providers, providerDataI) + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + replySetter *ReplySetter } consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) - for i := 0; i < numProviders; i++ { - ctx := context.Background() - providerDataI := providers[i] - listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) - providers[i].replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { - var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage - err := json.Unmarshal(req, &jsonRpcMessage) - require.NoError(t, err, req) + providerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + provider := providerData{account: providerAccount} + listenAddress := addressGen.GetAddress() - response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID)) - return []byte(response), http.StatusOK - } + _, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, provider.account, lavaChainID, []string(nil), "provider") + provider.replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err := json.Unmarshal(req, &jsonRpcMessage) + require.NoError(t, err, req) + + response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID)) + return []byte(response), http.StatusOK } - for i := 0; i < numProviders; i++ { - pairingList[uint64(i)] = &lavasession.ConsumerSessionsWithProvider{ - PublicLavaAddress: providers[i].account.Addr.String(), - Endpoints: []*lavasession.Endpoint{ - { - NetworkAddress: providers[i].endpoint.NetworkAddress.Address, - Enabled: true, - Geolocation: 1, - }, + pairingList[0] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: provider.account.Addr.String(), + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: provider.endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, }, - Sessions: map[int64]*lavasession.SingleConsumerSession{}, - MaxComputeUnits: 10000, - UsedComputeUnits: 0, - PairingEpoch: epoch, - } + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, } createCacheServer(t, ctx, cacheListenAddress) @@ -1586,7 +1572,7 @@ func TestConsumerProviderWithConsumerSideCache(t *testing.T) { } // Get latest for sanity check - providerAddr := providers[0].account.Addr.String() + providerAddr := provider.account.Addr.String() headers := sendMessage("status", []string{}) fmt.Println(headers) require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) From c612c82f22137a15783935e9cb7244feb4aadd66 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Fri, 20 Sep 2024 16:20:29 +0300 Subject: [PATCH 07/14] Move all createRpcProvider function arguments to a struct --- protocol/integration/protocol_test.go | 159 ++++++++++++++++++++++---- 1 file changed, 135 insertions(+), 24 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 97b7cdc505..2ef47e1b39 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -236,7 +236,18 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc return rpcConsumerServer, consumerStateTracker } -func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string, specId string, apiInterface string, listenAddress string, account sigs.Account, lavaChainID string, addons []string, providerUniqueId string) (*rpcprovider.RPCProviderServer, *lavasession.RPCProviderEndpoint, *ReplySetter, *MockChainFetcher, *MockReliabilityManager) { +type rpcProviderOptions struct { + consumerAddress string + specId string + apiInterface string + listenAddress string + account sigs.Account + lavaChainID string + addons []string + providerUniqueId string +} + +func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpcProviderOptions) (*rpcprovider.RPCProviderServer, *lavasession.RPCProviderEndpoint, *ReplySetter, *MockChainFetcher, *MockReliabilityManager) { replySetter := ReplySetter{ status: http.StatusOK, replyDataBuf: []byte(`{"reply": "REPLY-STUB"}`), @@ -256,16 +267,16 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string fmt.Fprint(w, string(data)) }) - chainParser, chainRouter, chainFetcher, _, endpoint, err := chainlib.CreateChainLibMocks(ctx, specId, apiInterface, serverHandler, nil, "../../", addons) + chainParser, chainRouter, chainFetcher, _, endpoint, err := chainlib.CreateChainLibMocks(ctx, rpcProviderOptions.specId, rpcProviderOptions.apiInterface, serverHandler, nil, "../../", rpcProviderOptions.addons) require.NoError(t, err) require.NotNil(t, chainParser) require.NotNil(t, chainFetcher) require.NotNil(t, chainRouter) - endpoint.NetworkAddress.Address = listenAddress + endpoint.NetworkAddress.Address = rpcProviderOptions.listenAddress rpcProviderServer := &rpcprovider.RPCProviderServer{} - if providerUniqueId != "" { - rpcProviderServer.SetProviderUniqueId(providerUniqueId) + if rpcProviderOptions.providerUniqueId != "" { + rpcProviderServer.SetProviderUniqueId(rpcProviderOptions.providerUniqueId) } rpcProviderEndpoint := &lavasession.RPCProviderEndpoint{ @@ -275,8 +286,8 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string CertPem: "", DisableTLS: false, }, - ChainID: specId, - ApiInterface: apiInterface, + ChainID: rpcProviderOptions.specId, + ApiInterface: rpcProviderOptions.apiInterface, Geolocation: 1, NodeUrls: []common.NodeUrl{ { @@ -285,22 +296,22 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string AuthConfig: common.AuthConfig{}, IpForwarding: false, Timeout: 0, - Addons: addons, + Addons: rpcProviderOptions.addons, SkipVerifications: []string{}, }, }, } - rewardDB, err := createInMemoryRewardDb([]string{specId}) + rewardDB, err := createInMemoryRewardDb([]string{rpcProviderOptions.specId}) require.NoError(t, err) _, averageBlockTime, blocksToFinalization, blocksInFinalizationData := chainParser.ChainBlockStats() - mockProviderStateTracker := mockProviderStateTracker{consumerAddressForPairing: consumerAddress, averageBlockTime: averageBlockTime} + mockProviderStateTracker := mockProviderStateTracker{consumerAddressForPairing: rpcProviderOptions.consumerAddress, averageBlockTime: averageBlockTime} rws := rewardserver.NewRewardServer(&mockProviderStateTracker, nil, rewardDB, "badger_test", 1, 10, nil) blockMemorySize, err := mockProviderStateTracker.GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx) require.NoError(t, err) providerSessionManager := lavasession.NewProviderSessionManager(rpcProviderEndpoint, blockMemorySize) providerPolicy := rpcprovider.GetAllAddonsAndExtensionsFromNodeUrlSlice(rpcProviderEndpoint.NodeUrls) - chainParser.SetPolicy(providerPolicy, specId, apiInterface) + chainParser.SetPolicy(providerPolicy, rpcProviderOptions.specId, rpcProviderOptions.apiInterface) blocksToSaveChainTracker := uint64(blocksToFinalization + blocksInFinalizationData) chainTrackerConfig := chaintracker.ChainTrackerConfig{ @@ -316,9 +327,9 @@ func createRpcProvider(t *testing.T, ctx context.Context, consumerAddress string chainTracker, err := chaintracker.NewChainTracker(ctx, mockChainFetcher, chainTrackerConfig) require.NoError(t, err) chainTracker.StartAndServe(ctx) - reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, account.Addr.String(), chainRouter, chainParser) + reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser) mockReliabilityManager := NewMockReliabilityManager(reliabilityManager) - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, account.SK, nil, chainRouter, &mockProviderStateTracker, account.Addr, lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, nil, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false) listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health") err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) require.NoError(t, err) @@ -370,7 +381,17 @@ func TestConsumerProviderBasic(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) } for i := 0; i < numProviders; i++ { pairingList[uint64(i)] = &lavasession.ConsumerSessionsWithProvider{ @@ -456,7 +477,17 @@ func TestConsumerProviderWithProviders(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"reply": %d}`, i+1)) } for i := 0; i < numProviders; i++ { @@ -601,7 +632,17 @@ func TestConsumerProviderTx(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } for i := 0; i < numProviders; i++ { @@ -717,7 +758,17 @@ func TestConsumerProviderJsonRpcWithNullID(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } for i := 0; i < numProviders; i++ { @@ -837,7 +888,17 @@ func TestConsumerProviderSubscriptionsHappyFlow(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } for i := 0; i < numProviders; i++ { @@ -964,7 +1025,17 @@ func TestSameProviderConflictBasicResponseCheck(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } @@ -1114,7 +1185,18 @@ func TestArchiveProvidersRetry(t *testing.T) { if i+1 <= play.archiveProviders { addons = []string{"archive"} } - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, addons, fmt.Sprintf("provider%d", i)) + + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: addons, + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(`{"result": "success"}`) if i+1 <= play.nodeErrorProviders { providers[i].replySetter.replyDataBuf = []byte(`{"error": "failure", "message": "test", "code": "-32132"}`) @@ -1197,7 +1279,17 @@ func TestSameProviderConflictReport(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, rpcProviderOptions) providers[i].replySetter.replyDataBuf = []byte(fmt.Sprintf(`{"result": %d}`, i+1)) } } @@ -1428,7 +1520,17 @@ func TestConsumerProviderStatic(t *testing.T) { ctx := context.Background() providerDataI := providers[i] listenAddress := addressGen.GetAddress() - providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, []string(nil), fmt.Sprintf("provider%d", i)) + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, _ = createRpcProvider(t, ctx, rpcProviderOptions) } // provider is static for i := 0; i < numProviders; i++ { @@ -1501,8 +1603,17 @@ func TestConsumerProviderWithConsumerSideCache(t *testing.T) { providerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) provider := providerData{account: providerAccount} listenAddress := addressGen.GetAddress() - - _, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, provider.account, lavaChainID, []string(nil), "provider") + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: provider.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: "provider", + } + _, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, rpcProviderOptions) provider.replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage err := json.Unmarshal(req, &jsonRpcMessage) From e8a0e3c2234e558272deacce9a6e0168f951495a Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Fri, 20 Sep 2024 17:06:31 +0300 Subject: [PATCH 08/14] Small fixes to TestConsumerProviderWithConsumerSideCache --- protocol/integration/protocol_test.go | 28 ++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 2ef47e1b39..aebfcf5ba0 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -204,10 +204,6 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage()) consumerSessionManager.UpdateAllProviders(rpcConsumerOptions.epoch, rpcConsumerOptions.pairingList) - consumerConsistency := rpcconsumer.NewConsumerConsistency(rpcConsumerOptions.specId) - consumerCmdFlags := common.ConsumerCmdFlags{} - rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil) - require.NoError(t, err) var cache *performance.Cache = nil if rpcConsumerOptions.cacheListenAddress != "" { cache, err = performance.InitCache(ctx, rpcConsumerOptions.cacheListenAddress) @@ -215,6 +211,11 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc t.Fatalf("Failed To Connect to cache at address %s: %v", rpcConsumerOptions.cacheListenAddress, err) } } + + consumerConsistency := rpcconsumer.NewConsumerConsistency(rpcConsumerOptions.specId) + consumerCmdFlags := common.ConsumerCmdFlags{} + rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil) + require.NoError(t, err) err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, rpcConsumerOptions.requiredResponses, rpcConsumerOptions.account.SK, rpcConsumerOptions.lavaChainID, cache, rpcconsumerLogs, rpcConsumerOptions.account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil) require.NoError(t, err) @@ -1582,6 +1583,16 @@ func TestConsumerProviderStatic(t *testing.T) { resp.Body.Close() } +func jsonRpcIdToInt(t *testing.T, rawID json.RawMessage) int { + var idInterface interface{} + err := json.Unmarshal(rawID, &idInterface) + require.NoError(t, err) + + id, ok := idInterface.(float64) + require.True(t, ok, idInterface) + return int(id) +} + func TestConsumerProviderWithConsumerSideCache(t *testing.T) { ctx := context.Background() // can be any spec and api interface @@ -1670,12 +1681,8 @@ func TestConsumerProviderWithConsumerSideCache(t *testing.T) { err = json.Unmarshal(bodyBytes, &jsonRpcMessage) require.NoError(t, err) - respId, idErr := rpcInterfaceMessages.IdFromRawMessage(jsonRpcMessage.ID) - if idErr != nil { - require.NoError(t, idErr) - } - - require.Equal(t, rpcInterfaceMessages.JSONRPCIntID(id), respId) + respId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + require.Equal(t, id, respId) resp.Body.Close() id++ @@ -1685,7 +1692,6 @@ func TestConsumerProviderWithConsumerSideCache(t *testing.T) { // Get latest for sanity check providerAddr := provider.account.Addr.String() headers := sendMessage("status", []string{}) - fmt.Println(headers) require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) // Get block, this should be cached for next time From d2a0c3776dc83d3f8dbdfcdeb229b634da96e73a Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Fri, 20 Sep 2024 17:06:42 +0300 Subject: [PATCH 09/14] Add TestConsumerProviderWithProviderSideCache --- protocol/integration/protocol_test.go | 139 ++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 9 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index aebfcf5ba0..d404b850e9 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -238,14 +238,15 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc } type rpcProviderOptions struct { - consumerAddress string - specId string - apiInterface string - listenAddress string - account sigs.Account - lavaChainID string - addons []string - providerUniqueId string + consumerAddress string + specId string + apiInterface string + listenAddress string + account sigs.Account + lavaChainID string + addons []string + providerUniqueId string + cacheListenAddress string } func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpcProviderOptions) (*rpcprovider.RPCProviderServer, *lavasession.RPCProviderEndpoint, *ReplySetter, *MockChainFetcher, *MockReliabilityManager) { @@ -324,13 +325,21 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc Pmetrics: nil, } + var cache *performance.Cache = nil + if rpcProviderOptions.cacheListenAddress != "" { + cache, err = performance.InitCache(ctx, rpcProviderOptions.cacheListenAddress) + if err != nil { + t.Fatalf("Failed To Connect to cache at address %s: %v", rpcProviderOptions.cacheListenAddress, err) + } + } + mockChainFetcher := NewMockChainFetcher(1000, int64(blocksToSaveChainTracker), nil) chainTracker, err := chaintracker.NewChainTracker(ctx, mockChainFetcher, chainTrackerConfig) require.NoError(t, err) chainTracker.StartAndServe(ctx) reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser) mockReliabilityManager := NewMockReliabilityManager(reliabilityManager) - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, nil, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false) listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health") err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) require.NoError(t, err) @@ -1702,3 +1711,115 @@ func TestConsumerProviderWithConsumerSideCache(t *testing.T) { headers = sendMessage("block", []string{"1000"}) require.Equal(t, "Cached", headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) } + +func TestConsumerProviderWithProviderSideCache(t *testing.T) { + ctx := context.Background() + // can be any spec and api interface + specId := "LAV1" + apiInterface := spectypes.APIInterfaceTendermintRPC + epoch := uint64(100) + lavaChainID := "lava" + + consumerListenAddress := addressGen.GetAddress() + cacheListenAddress := addressGen.GetAddress() + pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} + type providerData struct { + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + replySetter *ReplySetter + } + + consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + providerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + provider := providerData{account: providerAccount} + providerListenAddress := addressGen.GetAddress() + + createCacheServer(t, ctx, cacheListenAddress) + testJsonRpcId := 42 + nodeRequestsCounter := 0 + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: providerListenAddress, + account: provider.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: "provider", + cacheListenAddress: cacheListenAddress, + } + _, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, rpcProviderOptions) + provider.replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err := json.Unmarshal(req, &jsonRpcMessage) + require.NoError(t, err, req) + + reqId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + if reqId == testJsonRpcId { + nodeRequestsCounter++ + } + + response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID)) + return []byte(response), http.StatusOK + } + + pairingList[0] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: provider.account.Addr.String(), + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: provider.endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, + }, + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, + } + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) + require.NotNil(t, rpcconsumerServer) + + client := http.Client{} + sendMessage := func(method string, params []string) http.Header { + body := fmt.Sprintf(`{"jsonrpc":"2.0","method":"%v","params": [%v], "id":%v}`, method, strings.Join(params, ","), testJsonRpcId) + resp, err := client.Post("http://"+consumerListenAddress, "application/json", bytes.NewBuffer([]byte(body))) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err = json.Unmarshal(bodyBytes, &jsonRpcMessage) + require.NoError(t, err) + + respId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + require.Equal(t, testJsonRpcId, respId) + resp.Body.Close() + + return resp.Header + } + + // Get latest for sanity check + sendMessage("status", []string{}) + + for i := 0; i < 5; i++ { + // Get block, this should be cached for next time + sendMessage("block", []string{"1000"}) + } + + // Verify node was called to only twice + require.Equal(t, 2, nodeRequestsCounter) +} From 90710539289041d019cc60b86e111aed3c235322 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Fri, 20 Sep 2024 17:22:53 +0300 Subject: [PATCH 10/14] Small fix to cache server --- ecosystem/cache/server.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ecosystem/cache/server.go b/ecosystem/cache/server.go index 9f3f8df1dc..fd867b303c 100644 --- a/ecosystem/cache/server.go +++ b/ecosystem/cache/server.go @@ -107,7 +107,7 @@ func (cs *CacheServer) Serve(ctx context.Context, if strings.HasPrefix(listenAddr, unixPrefix) { // Unix socket host, port, err := net.SplitHostPort(listenAddr) if err != nil { - utils.LavaFormatFatal("Failed to parse unix socket, provide address in this format unix:/tmp/example.sock: %v\n", err) + utils.LavaFormatFatal("Failed to parse unix socket, provide address in this format unix:/tmp/example.sock", err) return } @@ -115,26 +115,26 @@ func (cs *CacheServer) Serve(ctx context.Context, addr, err := net.ResolveUnixAddr(host, port) if err != nil { - utils.LavaFormatFatal("Failed to resolve unix socket address: %v\n", err) + utils.LavaFormatFatal("Failed to resolve unix socket address", err) return } lis, err = net.ListenUnix(host, addr) if err != nil { - utils.LavaFormatFatal("Faild to listen to unix socket listener: %v\n", err) + utils.LavaFormatFatal("Failed to listen to unix socket listener", err) return } // Set permissions for the Unix socket err = os.Chmod(port, 0o600) if err != nil { - utils.LavaFormatFatal("Failed to set permissions for Unix socket: %v\n", err) + utils.LavaFormatFatal("Failed to set permissions for Unix socket", err) return } } else { lis, err = net.Listen("tcp", listenAddr) if err != nil { - utils.LavaFormatFatal("Cache server failure setting up TCP listener: %v\n", err) + utils.LavaFormatFatal("Cache server failure setting up TCP listener", err) return } } From 1e815943f9130983db2e19dffd14a3b60563348f Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Fri, 20 Sep 2024 17:23:18 +0300 Subject: [PATCH 11/14] Disable metrics server for cache in tests --- protocol/integration/protocol_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index d404b850e9..e46a217f84 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -353,7 +353,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc func createCacheServer(t *testing.T, ctx context.Context, listenAddress string) { go func() { cs := cache.CacheServer{CacheMaxCost: 2 * 1024 * 1024 * 1024} // taken from max-items default value - cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DefaultExpirationNodeErrors, cache.DefaultExpirationBlocksHashesToHeights, "", cache.DefaultExpirationTimeFinalizedMultiplier, cache.DefaultExpirationTimeNonFinalizedMultiplier) + cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DefaultExpirationNodeErrors, cache.DefaultExpirationBlocksHashesToHeights, "disabled", cache.DefaultExpirationTimeFinalizedMultiplier, cache.DefaultExpirationTimeNonFinalizedMultiplier) cs.Serve(ctx, listenAddress) }() consumerUp := checkServerStatusWithTimeout("http://"+listenAddress, time.Millisecond*61) @@ -1816,7 +1816,7 @@ func TestConsumerProviderWithProviderSideCache(t *testing.T) { sendMessage("status", []string{}) for i := 0; i < 5; i++ { - // Get block, this should be cached for next time + // Get block sendMessage("block", []string{"1000"}) } From fc757c8b9170c17958387bac0b521910da072151 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Fri, 20 Sep 2024 17:23:26 +0300 Subject: [PATCH 12/14] Add TestConsumerProviderWithConsumerAndProviderSideCache --- protocol/integration/protocol_test.go | 124 ++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index e46a217f84..c6f61471a6 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -1823,3 +1823,127 @@ func TestConsumerProviderWithProviderSideCache(t *testing.T) { // Verify node was called to only twice require.Equal(t, 2, nodeRequestsCounter) } + +func TestConsumerProviderWithConsumerAndProviderSideCache(t *testing.T) { + ctx := context.Background() + // can be any spec and api interface + specId := "LAV1" + apiInterface := spectypes.APIInterfaceTendermintRPC + epoch := uint64(100) + lavaChainID := "lava" + + consumerListenAddress := addressGen.GetAddress() + consumerCacheListenAddress := addressGen.GetAddress() + providerCacheListenAddress := addressGen.GetAddress() + + createCacheServer(t, ctx, consumerCacheListenAddress) + createCacheServer(t, ctx, providerCacheListenAddress) + + pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} + type providerData struct { + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + replySetter *ReplySetter + } + + consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + providerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + provider := providerData{account: providerAccount} + providerListenAddress := addressGen.GetAddress() + + testJsonRpcId := 42 + nodeRequestsCounter := 0 + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: providerListenAddress, + account: provider.account, + lavaChainID: lavaChainID, + addons: []string(nil), + providerUniqueId: "provider", + cacheListenAddress: providerCacheListenAddress, + } + _, provider.endpoint, provider.replySetter, _, _ = createRpcProvider(t, ctx, rpcProviderOptions) + provider.replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err := json.Unmarshal(req, &jsonRpcMessage) + require.NoError(t, err, req) + + reqId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + if reqId == testJsonRpcId { + nodeRequestsCounter++ + } + + response := fmt.Sprintf(`{"jsonrpc":"2.0","result": {}, "id": %v}`, string(jsonRpcMessage.ID)) + return []byte(response), http.StatusOK + } + + pairingList[0] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: provider.account.Addr.String(), + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: provider.endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, + }, + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, + } + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + cacheListenAddress: consumerCacheListenAddress, + } + rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) + require.NotNil(t, rpcconsumerServer) + + client := http.Client{} + sendMessage := func(method string, params []string) http.Header { + body := fmt.Sprintf(`{"jsonrpc":"2.0","method":"%v","params": [%v], "id":%v}`, method, strings.Join(params, ","), testJsonRpcId) + resp, err := client.Post("http://"+consumerListenAddress, "application/json", bytes.NewBuffer([]byte(body))) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var jsonRpcMessage rpcInterfaceMessages.JsonrpcMessage + err = json.Unmarshal(bodyBytes, &jsonRpcMessage) + require.NoError(t, err) + + respId := jsonRpcIdToInt(t, jsonRpcMessage.ID) + require.Equal(t, testJsonRpcId, respId) + resp.Body.Close() + + return resp.Header + } + + providerAddr := provider.account.Addr.String() + // Get latest for sanity check + headers := sendMessage("status", []string{}) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + // Get block, this should be cached for next time + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, providerAddr, headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + + for i := 0; i < 5; i++ { + // Get block again, this time it should be from cache + headers = sendMessage("block", []string{"1000"}) + require.Equal(t, "Cached", headers.Get(common.PROVIDER_ADDRESS_HEADER_NAME)) + } + + // Verify node was called to only twice + require.Equal(t, 2, nodeRequestsCounter) +} From 582fba1b9131d0094d69ca824af367eb9a3b4257 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Sun, 22 Sep 2024 12:09:15 +0300 Subject: [PATCH 13/14] Attempt to fix test --- protocol/integration/protocol_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index c6f61471a6..3bed27fae1 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -356,8 +356,8 @@ func createCacheServer(t *testing.T, ctx context.Context, listenAddress string) cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DefaultExpirationNodeErrors, cache.DefaultExpirationBlocksHashesToHeights, "disabled", cache.DefaultExpirationTimeFinalizedMultiplier, cache.DefaultExpirationTimeNonFinalizedMultiplier) cs.Serve(ctx, listenAddress) }() - consumerUp := checkServerStatusWithTimeout("http://"+listenAddress, time.Millisecond*61) - require.True(t, consumerUp) + cacheServerUp := checkServerStatusWithTimeout("http://"+listenAddress, time.Second*7) + require.True(t, cacheServerUp) } func TestConsumerProviderBasic(t *testing.T) { From 0ac63792b84de00c9618bed434115a2bd857cb6b Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Sun, 22 Sep 2024 12:24:57 +0300 Subject: [PATCH 14/14] Small fix to a different test --- protocol/integration/protocol_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 3bed27fae1..33a69599f1 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -433,7 +433,7 @@ func TestConsumerProviderBasic(t *testing.T) { rpcconsumerServer, _ := createRpcConsumer(t, ctx, rpcConsumerOptions) require.NotNil(t, rpcconsumerServer) client := http.Client{} - resp, err := client.Get("http://" + consumerListenAddress + "/status") + resp, err := client.Get("http://" + consumerListenAddress + "/cosmos/base/tendermint/v1beta1/blocks/latest") require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) bodyBytes, err := io.ReadAll(resp.Body)