diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index c549cb6772..ef66d83782 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -715,7 +715,7 @@ func CreateConsumerSessionManager(chainID, apiInterface, consumerPublicAddress s baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better return lavasession.NewConsumerSessionManager( &lavasession.RPCEndpoint{NetworkAddress: "stub", ChainID: chainID, ApiInterface: apiInterface, TLSEnabled: false, HealthCheckPath: "/", Geolocation: 0}, - provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), + provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil), nil, nil, consumerPublicAddress, lavasession.NewActiveSubscriptionProvidersStorage(), ) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index c3318de15c..fc1038858e 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -185,7 +185,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInte finalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID) _, averageBlockTime, _, _ := chainParser.ChainBlockStats() baseLatency := common.AverageWorldLatency / 2 - optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2) + optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2, nil) consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage()) consumerSessionManager.UpdateAllProviders(epoch, pairingList) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 30c83c6c5f..70b571c24f 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -638,7 +638,7 @@ func (csm *ConsumerSessionManager) getValidProviderAddresses(ignoredProvidersLis if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST { providers = csm.getTopTenProvidersForStatefulCalls(validAddresses, ignoredProvidersList) } else { - providers = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock, OptimizerPerturbation) + providers = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock, OptimizerPerturbation, csm.currentEpoch) for _, chosenProvider := range providers { go csm.consumerMetricsManager.UpdateProviderChosenByOptimizerCount(csm.rpcEndpoint.ChainID, csm.rpcEndpoint.ApiInterface, chosenProvider, csm.currentEpoch) } diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index b22f0c2e61..82341e2d2c 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -162,7 +162,7 @@ func TestEndpointSortingFlow(t *testing.T) { func CreateConsumerSessionManager() *ConsumerSessionManager { rand.InitRandomSeed() baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better - return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage()) + return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage()) } func TestMain(m *testing.M) { diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index ef9b61e829..bbacce1588 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -72,7 +72,7 @@ type ProviderOptimizer interface { AppendProbeRelayData(providerAddress string, latency time.Duration, success bool) AppendRelayFailure(providerAddress string) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64) - ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string) + ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64, epoch uint64) (addresses []string) GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport) Strategy() provideroptimizer.Strategy } diff --git a/protocol/metrics/consumer_metrics_manager.go b/protocol/metrics/consumer_metrics_manager.go index 3ce6e3ddb7..2074795920 100644 --- a/protocol/metrics/consumer_metrics_manager.go +++ b/protocol/metrics/consumer_metrics_manager.go @@ -39,6 +39,7 @@ type ConsumerMetricsManager struct { latencyMetric *prometheus.GaugeVec qosMetric *prometheus.GaugeVec qosExcellenceMetric *prometheus.GaugeVec + qosMetricPerChoosing *prometheus.GaugeVec LatestBlockMetric *prometheus.GaugeVec LatestProviderRelay *prometheus.GaugeVec virtualEpochMetric *prometheus.GaugeVec @@ -110,6 +111,11 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM Help: "The QOS metrics per provider excellence", }, []string{"spec", "provider_address", "epoch", "qos_metric"}) + qosMetricPerChoosing := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_qos_metrics_per_choosing", + Help: "The QOS metrics per provider, per each time we choose a provider", + }, []string{"spec", "apiInterface", "provider_address", "epoch", "chosen", "qos_metric"}) + latestBlockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "lava_consumer_latest_provider_block", Help: "The latest block reported by provider", @@ -189,6 +195,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM prometheus.MustRegister(latencyMetric) prometheus.MustRegister(qosMetric) prometheus.MustRegister(qosExcellenceMetric) + prometheus.MustRegister(qosMetricPerChoosing) prometheus.MustRegister(latestBlockMetric) prometheus.MustRegister(latestProviderRelay) prometheus.MustRegister(virtualEpochMetric) @@ -213,6 +220,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM latencyMetric: latencyMetric, qosMetric: qosMetric, qosExcellenceMetric: qosExcellenceMetric, + qosMetricPerChoosing: qosMetricPerChoosing, LatestBlockMetric: latestBlockMetric, LatestProviderRelay: latestProviderRelay, providerRelays: map[string]uint64{}, @@ -467,6 +475,23 @@ func (pme *ConsumerMetricsManager) UpdateProviderChosenByOptimizerCount(chainId pme.providerChosenByOptimizerCount.WithLabelValues(chainId, apiInterface, providerAddress, strconv.FormatUint(epoch, 10)).Inc() } +func (pme *ConsumerMetricsManager) UpdateOptimizerChoosingProviderWithData( + chainId string, + apiInterface string, + providerAddress string, + epoch uint64, + chosen bool, + providerDataScore ProviderDataScore, +) { + if pme == nil { + return + } + + pme.qosMetricPerChoosing.WithLabelValues(chainId, apiInterface, providerAddress, strconv.FormatUint(epoch, 10), strconv.FormatBool(chosen), AvailabilityLabel).Set(providerDataScore.Availability) + pme.qosMetricPerChoosing.WithLabelValues(chainId, apiInterface, providerAddress, strconv.FormatUint(epoch, 10), strconv.FormatBool(chosen), SyncLabel).Set(providerDataScore.Sync) + pme.qosMetricPerChoosing.WithLabelValues(chainId, apiInterface, providerAddress, strconv.FormatUint(epoch, 10), strconv.FormatBool(chosen), LatencyLabel).Set(providerDataScore.Latency) +} + func SetVersionInner(protocolVersionMetric *prometheus.GaugeVec, version string) { var major, minor, patch int _, err := fmt.Sscanf(version, "%d.%d.%d", &major, &minor, &patch) diff --git a/protocol/metrics/consumer_optimizer_data_collector.go b/protocol/metrics/consumer_optimizer_data_collector.go new file mode 100644 index 0000000000..7725e1f85c --- /dev/null +++ b/protocol/metrics/consumer_optimizer_data_collector.go @@ -0,0 +1,94 @@ +package metrics + +import ( + "context" + "sync" + "time" +) + +type ProviderDataScore struct { + Availability float64 + Sync float64 + Latency float64 +} + +type ProviderDataWithChosen struct { + providerData ProviderDataScore + epoch uint64 + chosen bool +} + +type ConsumerOptimizerDataCollector struct { + latestProvidersOptimizerData map[string]ProviderDataWithChosen + interval time.Duration + consumerMetricsManager *ConsumerMetricsManager + chainId string + apiInterface string + lock sync.RWMutex +} + +func NewConsumerOptimizerDataCollector(chainId, apiInterface string, interval time.Duration, consumerMetricsManager *ConsumerMetricsManager) *ConsumerOptimizerDataCollector { + return &ConsumerOptimizerDataCollector{ + latestProvidersOptimizerData: make(map[string]ProviderDataWithChosen), + chainId: chainId, + interval: interval, + consumerMetricsManager: consumerMetricsManager, + apiInterface: apiInterface, + } +} + +func (codc *ConsumerOptimizerDataCollector) SetProviderData(providerAddress string, epoch uint64, chosen bool, availability, sync, latency float64) { + if codc == nil { + return + } + + codc.lock.Lock() + defer codc.lock.Unlock() + codc.latestProvidersOptimizerData[providerAddress] = ProviderDataWithChosen{ + providerData: ProviderDataScore{ + Availability: availability, + Sync: sync, + Latency: latency, + }, + epoch: epoch, + chosen: chosen, + } +} + +func (codc *ConsumerOptimizerDataCollector) Start(ctx context.Context) { + if codc == nil { + return + } + + go func() { + ticker := time.NewTicker(codc.interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + codc.collectProvidersData() + } + } + }() +} + +func (codc *ConsumerOptimizerDataCollector) collectProvidersData() { + if codc == nil { + return + } + + codc.lock.RLock() + defer codc.lock.RUnlock() + + for providerAddress, providerData := range codc.latestProvidersOptimizerData { + codc.consumerMetricsManager.UpdateOptimizerChoosingProviderWithData( + codc.chainId, + codc.apiInterface, + providerAddress, + providerData.epoch, + providerData.chosen, + providerData.providerData, + ) + } +} diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index 6936c87fb5..cb79b33195 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -9,6 +9,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dgraph-io/ristretto" "github.com/lavanet/lava/v3/protocol/common" + "github.com/lavanet/lava/v3/protocol/metrics" "github.com/lavanet/lava/v3/utils" "github.com/lavanet/lava/v3/utils/lavaslices" "github.com/lavanet/lava/v3/utils/rand" @@ -49,6 +50,9 @@ type ProviderOptimizer struct { baseWorldLatency time.Duration wantedNumProvidersInConcurrency uint latestSyncData ConcurrentBlockStore + consumerOptimizerDataCollector *metrics.ConsumerOptimizerDataCollector + chainId string + apiInterface string } type ProviderData struct { @@ -74,10 +78,12 @@ const ( func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string) { po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now()) + // Add to prometheus } func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64) { po.appendRelayData(providerAddress, latency, isHangingApi, true, cu, syncBlock, time.Now()) + // Add to prometheus } func (po *ProviderOptimizer) appendRelayData(providerAddress string, latency time.Duration, isHangingApi, success bool, cu, syncBlock uint64, sampleTime time.Time) { @@ -124,6 +130,7 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc } po.providersStorage.Set(providerAddress, providerData, 1) + // Add to prometheus utils.LavaFormatTrace("probe update", utils.LogAttr("providerAddress", providerAddress), utils.LogAttr("latency", latency), @@ -132,7 +139,7 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc } // returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top -func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string) { +func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64, epoch uint64) (addresses []string) { returnedProviders := make([]string, 1) // location 0 is always the best score latencyScore := math.MaxFloat64 // smaller = better i.e less latency syncScore := math.MaxFloat64 // smaller = better i.e less sync lag @@ -181,10 +188,14 @@ func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProvid returnedProviders[0] = providerAddress // best provider is always on position 0 latencyScore = latencyScoreCurrent syncScore = syncScoreCurrent + po.consumerOptimizerDataCollector.SetProviderData(providerAddress, epoch, true, providerData.Availability.Num/providerData.Availability.Denom, syncScoreCurrent, latencyScoreCurrent) continue } if po.shouldExplore(len(returnedProviders), numProviders) { returnedProviders = append(returnedProviders, providerAddress) + po.consumerOptimizerDataCollector.SetProviderData(providerAddress, epoch, true, providerData.Availability.Num/providerData.Availability.Denom, syncScoreCurrent, latencyScoreCurrent) + } else { + po.consumerOptimizerDataCollector.SetProviderData(providerAddress, epoch, false, providerData.Availability.Num/providerData.Availability.Denom, syncScoreCurrent, latencyScoreCurrent) } } @@ -456,7 +467,7 @@ func (po *ProviderOptimizer) getRelayStatsTimes(providerAddress string) []time.T return nil } -func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency time.Duration, wantedNumProvidersInConcurrency uint) *ProviderOptimizer { +func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency time.Duration, wantedNumProvidersInConcurrency uint, consumerOptimizerDataCollector *metrics.ConsumerOptimizerDataCollector) *ProviderOptimizer { cache, err := ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true}) if err != nil { utils.LavaFormatFatal("failed setting up cache for queries", err) @@ -469,7 +480,16 @@ func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency // overwrite wantedNumProvidersInConcurrency = 1 } - return &ProviderOptimizer{strategy: strategy, providersStorage: cache, averageBlockTime: averageBlockTIme, baseWorldLatency: baseWorldLatency, providerRelayStats: relayCache, wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency} + + return &ProviderOptimizer{ + strategy: strategy, + providersStorage: cache, + averageBlockTime: averageBlockTIme, + baseWorldLatency: baseWorldLatency, + providerRelayStats: relayCache, + wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency, + consumerOptimizerDataCollector: consumerOptimizerDataCollector, + } } // calculate the probability a random variable with a poisson distribution diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go index 31a4df2f53..5242749471 100644 --- a/protocol/provideroptimizer/provider_optimizer_test.go +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -40,7 +40,7 @@ func (posc *providerOptimizerSyncCache) Set(key, value interface{}, cost int64) func setupProviderOptimizer(maxProvidersCount int) *ProviderOptimizer { averageBlockTIme := TEST_AVERAGE_BLOCK_TIME baseWorldLatency := TEST_BASE_WORLD_LATENCY - return NewProviderOptimizer(STRATEGY_BALANCED, averageBlockTIme, baseWorldLatency, 1) + return NewProviderOptimizer(STRATEGY_BALANCED, averageBlockTIme, baseWorldLatency, 1, nil) } type providersGenerator struct { @@ -141,16 +141,16 @@ func TestProviderOptimizerBasic(t *testing.T) { requestBlock := int64(1000) pertrubationPercentage := 0.0 - returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[1], TEST_BASE_WORLD_LATENCY*3, true) time.Sleep(4 * time.Millisecond) - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.NotEqual(t, returnedProviders[0], providersGen.providersAddresses[1]) // we shouldn't pick the wrong provider providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[0], TEST_BASE_WORLD_LATENCY/2, true) time.Sleep(4 * time.Millisecond) - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[0], returnedProviders[0]) // we should pick the best provider } @@ -165,11 +165,11 @@ func TestProviderOptimizerBasicRelayData(t *testing.T) { syncBlock := uint64(requestBlock) providerOptimizer.AppendRelayData(providersGen.providersAddresses[1], TEST_BASE_WORLD_LATENCY*4, false, requestCU, syncBlock) - returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.NotEqual(t, returnedProviders[0], providersGen.providersAddresses[1]) // we shouldn't pick the wrong provider providerOptimizer.AppendRelayData(providersGen.providersAddresses[0], TEST_BASE_WORLD_LATENCY/4, false, requestCU, syncBlock) - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[0], returnedProviders[0]) // we should pick the best provider } @@ -192,10 +192,10 @@ func TestProviderOptimizerAvailability(t *testing.T) { providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY, false) } time.Sleep(4 * time.Millisecond) - returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[skipIndex], returnedProviders[0]) - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[skipIndex]: {}}, requestCU, requestBlock, pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[skipIndex]: {}}, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.NotEqual(t, providersGen.providersAddresses[skipIndex], returnedProviders[0]) } @@ -217,10 +217,10 @@ func TestProviderOptimizerAvailabilityRelayData(t *testing.T) { providerOptimizer.AppendRelayFailure(providersGen.providersAddresses[i]) } time.Sleep(4 * time.Millisecond) - returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[skipIndex], returnedProviders[0]) - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[skipIndex]: {}}, requestCU, requestBlock, pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[skipIndex]: {}}, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.NotEqual(t, providersGen.providersAddresses[skipIndex], returnedProviders[0]) } @@ -247,11 +247,11 @@ func TestProviderOptimizerAvailabilityBlockError(t *testing.T) { providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY, false, requestCU, syncBlock-1) // update that he doesn't have the latest requested block } time.Sleep(4 * time.Millisecond) - returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[chosenIndex], returnedProviders[0]) // now try to get a previous block, our chosenIndex should be inferior in latency and blockError chance should be the same - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock-1, pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock-1, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.NotEqual(t, providersGen.providersAddresses[chosenIndex], returnedProviders[0]) } @@ -307,7 +307,7 @@ func TestProviderOptimizerStrategiesProviderCount(t *testing.T) { testProvidersCount := func(iterations int) float64 { exploration := 0.0 for i := 0; i < iterations; i++ { - returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) if len(returnedProviders) > 1 { exploration++ } @@ -357,11 +357,11 @@ func TestProviderOptimizerSyncScore(t *testing.T) { sampleTime = sampleTime.Add(time.Millisecond * 5) } time.Sleep(4 * time.Millisecond) - returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[chosenIndex], returnedProviders[0]) // we should pick the best sync score - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, int64(syncBlock), pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, int64(syncBlock), pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.NotEqual(t, providersGen.providersAddresses[chosenIndex], returnedProviders[0]) // sync score doesn't matter now } @@ -423,25 +423,25 @@ func TestProviderOptimizerStrategiesScoring(t *testing.T) { time.Sleep(4 * time.Millisecond) providerOptimizer.strategy = STRATEGY_BALANCED // a balanced strategy should pick provider 2 because of it's high availability - returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[2], returnedProviders[0]) providerOptimizer.strategy = STRATEGY_COST // with a cost strategy we expect the same as balanced - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[2], returnedProviders[0]) providerOptimizer.strategy = STRATEGY_LATENCY // latency strategy should pick the best latency - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock, pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[0], returnedProviders[0]) providerOptimizer.strategy = STRATEGY_SYNC_FRESHNESS // freshness strategy should pick the most advanced provider - returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock, pertrubationPercentage) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock, pertrubationPercentage, uint64(requestBlock)) require.Equal(t, 1, len(returnedProviders)) require.Equal(t, providersGen.providersAddresses[1], returnedProviders[0]) } @@ -495,11 +495,11 @@ func TestProviderOptimizerPerturbation(t *testing.T) { t.Logf("rand seed %d", seed) same := 0 pickFaults := 0 - chosenProvider := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, 0)[0] + chosenProvider := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, 0, syncBlock)[0] runs := 1000 // runs := 0 for i := 0; i < runs; i++ { - returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage, syncBlock) require.Equal(t, 1, len(returnedProviders)) if chosenProvider == returnedProviders[0] { same++ diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index cd5e9eb4e7..487e65358d 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -49,9 +49,13 @@ const ( ) var ( - Yaml_config_properties = []string{"network-address", "chain-id", "api-interface"} - RelaysHealthEnableFlagDefault = true - RelayHealthIntervalFlagDefault = 5 * time.Minute + Yaml_config_properties = []string{"network-address", "chain-id", "api-interface"} + RelaysHealthEnableFlagDefault = true + RelayHealthIntervalFlagDefault = 5 * time.Minute + CollectOptimizerProviderDataFlag = false + CollectOptimizerProviderDataFlagName = "collect-optimizer-provider-data" + OptimizerProviderDataCollectionIntervalFlag = 1 * time.Second + OptimizerProviderDataCollectionIntervalFlagNam = "optimizer-provider-data-collection-interval" ) type strategyValue struct { @@ -232,9 +236,14 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt defer chainMutexes[chainID].Unlock() value, exists := optimizers.Load(chainID) if !exists { - // doesn't exist for this chain create a new one + // if doesn't exist for this chain, create a new one + var consumerOptimizerDataCollector *metrics.ConsumerOptimizerDataCollector + if CollectOptimizerProviderDataFlag { + consumerOptimizerDataCollector = metrics.NewConsumerOptimizerDataCollector(chainID, rpcEndpoint.ApiInterface, OptimizerProviderDataCollectionIntervalFlag, consumerMetricsManager) // TODO: From flag + consumerOptimizerDataCollector.Start(ctx) + } baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better - optimizer = provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, baseLatency, options.maxConcurrentProviders) + optimizer = provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, baseLatency, options.maxConcurrentProviders, consumerOptimizerDataCollector) optimizers.Store(chainID, optimizer) } else { var ok bool @@ -620,6 +629,8 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks") cmdRPCConsumer.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain") cmdRPCConsumer.Flags().IntVar(&relayCountOnNodeError, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") + cmdRPCConsumer.Flags().BoolVar(&CollectOptimizerProviderDataFlag, CollectOptimizerProviderDataFlagName, CollectOptimizerProviderDataFlag, "enables collection of data from the provider optimizer") + cmdRPCConsumer.Flags().DurationVar(&OptimizerProviderDataCollectionIntervalFlag, OptimizerProviderDataCollectionIntervalFlagNam, OptimizerProviderDataCollectionIntervalFlag, "sets the interval for collecting data from the provider optimizer") common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer } diff --git a/protocol/rpcconsumer/rpcconsumer_server_test.go b/protocol/rpcconsumer/rpcconsumer_server_test.go index 639e03cb5d..78912edde8 100644 --- a/protocol/rpcconsumer/rpcconsumer_server_test.go +++ b/protocol/rpcconsumer/rpcconsumer_server_test.go @@ -54,7 +54,7 @@ func createRpcConsumer(t *testing.T, ctrl *gomock.Controller, ctx context.Contex finalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID) _, averageBlockTime, _, _ := chainParser.ChainBlockStats() baseLatency := common.AverageWorldLatency / 2 - optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2) + optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2, nil) consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage()) consumerSessionManager.UpdateAllProviders(epoch, map[uint64]*lavasession.ConsumerSessionsWithProvider{ epoch: {