Skip to content

Commit

Permalink
Add the lava_consumer_qos_metrics_per_choosing metric
Browse files Browse the repository at this point in the history
  • Loading branch information
shleikes committed Sep 9, 2024
1 parent b3d8e1d commit 23b8586
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 35 deletions.
2 changes: 1 addition & 1 deletion protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func CreateConsumerSessionManager(chainID, apiInterface, consumerPublicAddress s
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return lavasession.NewConsumerSessionManager(
&lavasession.RPCEndpoint{NetworkAddress: "stub", ChainID: chainID, ApiInterface: apiInterface, TLSEnabled: false, HealthCheckPath: "/", Geolocation: 0},
provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1),
provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil),
nil, nil, consumerPublicAddress,
lavasession.NewActiveSubscriptionProvidersStorage(),
)
Expand Down
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInte
finalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID)
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
baseLatency := common.AverageWorldLatency / 2
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2)
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2, nil)
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage())
consumerSessionManager.UpdateAllProviders(epoch, pairingList)

Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (csm *ConsumerSessionManager) getValidProviderAddresses(ignoredProvidersLis
if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST {
providers = csm.getTopTenProvidersForStatefulCalls(validAddresses, ignoredProvidersList)
} else {
providers = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock, OptimizerPerturbation)
providers = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock, OptimizerPerturbation, csm.currentEpoch)
for _, chosenProvider := range providers {
go csm.consumerMetricsManager.UpdateProviderChosenByOptimizerCount(csm.rpcEndpoint.ChainID, csm.rpcEndpoint.ApiInterface, chosenProvider, csm.currentEpoch)
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestEndpointSortingFlow(t *testing.T) {
func CreateConsumerSessionManager() *ConsumerSessionManager {
rand.InitRandomSeed()
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage())
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage())
}

func TestMain(m *testing.M) {
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type ProviderOptimizer interface {
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool)
AppendRelayFailure(providerAddress string)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64, epoch uint64) (addresses []string)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport)
Strategy() provideroptimizer.Strategy
}
Expand Down
25 changes: 25 additions & 0 deletions protocol/metrics/consumer_metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ConsumerMetricsManager struct {
latencyMetric *prometheus.GaugeVec
qosMetric *prometheus.GaugeVec
qosExcellenceMetric *prometheus.GaugeVec
qosMetricPerChoosing *prometheus.GaugeVec
LatestBlockMetric *prometheus.GaugeVec
LatestProviderRelay *prometheus.GaugeVec
virtualEpochMetric *prometheus.GaugeVec
Expand Down Expand Up @@ -110,6 +111,11 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
Help: "The QOS metrics per provider excellence",
}, []string{"spec", "provider_address", "epoch", "qos_metric"})

qosMetricPerChoosing := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_qos_metrics_per_choosing",
Help: "The QOS metrics per provider, per each time we choose a provider",
}, []string{"spec", "apiInterface", "provider_address", "epoch", "chosen", "qos_metric"})

latestBlockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_latest_provider_block",
Help: "The latest block reported by provider",
Expand Down Expand Up @@ -189,6 +195,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
prometheus.MustRegister(latencyMetric)
prometheus.MustRegister(qosMetric)
prometheus.MustRegister(qosExcellenceMetric)
prometheus.MustRegister(qosMetricPerChoosing)
prometheus.MustRegister(latestBlockMetric)
prometheus.MustRegister(latestProviderRelay)
prometheus.MustRegister(virtualEpochMetric)
Expand All @@ -213,6 +220,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
latencyMetric: latencyMetric,
qosMetric: qosMetric,
qosExcellenceMetric: qosExcellenceMetric,
qosMetricPerChoosing: qosMetricPerChoosing,
LatestBlockMetric: latestBlockMetric,
LatestProviderRelay: latestProviderRelay,
providerRelays: map[string]uint64{},
Expand Down Expand Up @@ -467,6 +475,23 @@ func (pme *ConsumerMetricsManager) UpdateProviderChosenByOptimizerCount(chainId
pme.providerChosenByOptimizerCount.WithLabelValues(chainId, apiInterface, providerAddress, strconv.FormatUint(epoch, 10)).Inc()
}

func (pme *ConsumerMetricsManager) UpdateOptimizerChoosingProviderWithData(
chainId string,
apiInterface string,
providerAddress string,
epoch uint64,
chosen bool,
providerDataScore ProviderDataScore,
) {
if pme == nil {
return
}

pme.qosMetricPerChoosing.WithLabelValues(chainId, apiInterface, providerAddress, strconv.FormatUint(epoch, 10), strconv.FormatBool(chosen), AvailabilityLabel).Set(providerDataScore.Availability)
pme.qosMetricPerChoosing.WithLabelValues(chainId, apiInterface, providerAddress, strconv.FormatUint(epoch, 10), strconv.FormatBool(chosen), SyncLabel).Set(providerDataScore.Sync)
pme.qosMetricPerChoosing.WithLabelValues(chainId, apiInterface, providerAddress, strconv.FormatUint(epoch, 10), strconv.FormatBool(chosen), LatencyLabel).Set(providerDataScore.Latency)
}

func SetVersionInner(protocolVersionMetric *prometheus.GaugeVec, version string) {
var major, minor, patch int
_, err := fmt.Sscanf(version, "%d.%d.%d", &major, &minor, &patch)
Expand Down
94 changes: 94 additions & 0 deletions protocol/metrics/consumer_optimizer_data_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package metrics

import (
"context"
"sync"
"time"
)

type ProviderDataScore struct {
Availability float64
Sync float64
Latency float64
}

type ProviderDataWithChosen struct {
providerData ProviderDataScore
epoch uint64
chosen bool
}

type ConsumerOptimizerDataCollector struct {
latestProvidersOptimizerData map[string]ProviderDataWithChosen
interval time.Duration
consumerMetricsManager *ConsumerMetricsManager
chainId string
apiInterface string
lock sync.RWMutex
}

func NewConsumerOptimizerDataCollector(chainId, apiInterface string, interval time.Duration, consumerMetricsManager *ConsumerMetricsManager) *ConsumerOptimizerDataCollector {
return &ConsumerOptimizerDataCollector{
latestProvidersOptimizerData: make(map[string]ProviderDataWithChosen),
chainId: chainId,
interval: interval,
consumerMetricsManager: consumerMetricsManager,
apiInterface: apiInterface,
}
}

func (codc *ConsumerOptimizerDataCollector) SetProviderData(providerAddress string, epoch uint64, chosen bool, availability, sync, latency float64) {
if codc == nil {
return
}

codc.lock.Lock()
defer codc.lock.Unlock()
codc.latestProvidersOptimizerData[providerAddress] = ProviderDataWithChosen{
providerData: ProviderDataScore{
Availability: availability,
Sync: sync,
Latency: latency,
},
epoch: epoch,
chosen: chosen,
}
}

func (codc *ConsumerOptimizerDataCollector) Start(ctx context.Context) {
if codc == nil {
return
}

go func() {
ticker := time.NewTicker(codc.interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
codc.collectProvidersData()
}
}
}()
}

func (codc *ConsumerOptimizerDataCollector) collectProvidersData() {
if codc == nil {
return
}

codc.lock.RLock()
defer codc.lock.RUnlock()

for providerAddress, providerData := range codc.latestProvidersOptimizerData {
codc.consumerMetricsManager.UpdateOptimizerChoosingProviderWithData(
codc.chainId,
codc.apiInterface,
providerAddress,
providerData.epoch,
providerData.chosen,
providerData.providerData,
)
}
}
26 changes: 23 additions & 3 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dgraph-io/ristretto"
"github.com/lavanet/lava/v3/protocol/common"
"github.com/lavanet/lava/v3/protocol/metrics"
"github.com/lavanet/lava/v3/utils"
"github.com/lavanet/lava/v3/utils/lavaslices"
"github.com/lavanet/lava/v3/utils/rand"
Expand Down Expand Up @@ -49,6 +50,9 @@ type ProviderOptimizer struct {
baseWorldLatency time.Duration
wantedNumProvidersInConcurrency uint
latestSyncData ConcurrentBlockStore
consumerOptimizerDataCollector *metrics.ConsumerOptimizerDataCollector
chainId string
apiInterface string
}

type ProviderData struct {
Expand All @@ -74,10 +78,12 @@ const (

func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string) {
po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now())
// Add to prometheus
}

func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64) {
po.appendRelayData(providerAddress, latency, isHangingApi, true, cu, syncBlock, time.Now())
// Add to prometheus
}

func (po *ProviderOptimizer) appendRelayData(providerAddress string, latency time.Duration, isHangingApi, success bool, cu, syncBlock uint64, sampleTime time.Time) {
Expand Down Expand Up @@ -124,6 +130,7 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc
}
po.providersStorage.Set(providerAddress, providerData, 1)

// Add to prometheus
utils.LavaFormatTrace("probe update",
utils.LogAttr("providerAddress", providerAddress),
utils.LogAttr("latency", latency),
Expand All @@ -132,7 +139,7 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc
}

// returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top
func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string) {
func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64, epoch uint64) (addresses []string) {
returnedProviders := make([]string, 1) // location 0 is always the best score
latencyScore := math.MaxFloat64 // smaller = better i.e less latency
syncScore := math.MaxFloat64 // smaller = better i.e less sync lag
Expand Down Expand Up @@ -181,10 +188,14 @@ func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProvid
returnedProviders[0] = providerAddress // best provider is always on position 0
latencyScore = latencyScoreCurrent
syncScore = syncScoreCurrent
po.consumerOptimizerDataCollector.SetProviderData(providerAddress, epoch, true, providerData.Availability.Num/providerData.Availability.Denom, syncScoreCurrent, latencyScoreCurrent)
continue
}
if po.shouldExplore(len(returnedProviders), numProviders) {
returnedProviders = append(returnedProviders, providerAddress)
po.consumerOptimizerDataCollector.SetProviderData(providerAddress, epoch, true, providerData.Availability.Num/providerData.Availability.Denom, syncScoreCurrent, latencyScoreCurrent)
} else {
po.consumerOptimizerDataCollector.SetProviderData(providerAddress, epoch, false, providerData.Availability.Num/providerData.Availability.Denom, syncScoreCurrent, latencyScoreCurrent)
}
}

Expand Down Expand Up @@ -456,7 +467,7 @@ func (po *ProviderOptimizer) getRelayStatsTimes(providerAddress string) []time.T
return nil
}

func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency time.Duration, wantedNumProvidersInConcurrency uint) *ProviderOptimizer {
func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency time.Duration, wantedNumProvidersInConcurrency uint, consumerOptimizerDataCollector *metrics.ConsumerOptimizerDataCollector) *ProviderOptimizer {
cache, err := ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true})
if err != nil {
utils.LavaFormatFatal("failed setting up cache for queries", err)
Expand All @@ -469,7 +480,16 @@ func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency
// overwrite
wantedNumProvidersInConcurrency = 1
}
return &ProviderOptimizer{strategy: strategy, providersStorage: cache, averageBlockTime: averageBlockTIme, baseWorldLatency: baseWorldLatency, providerRelayStats: relayCache, wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency}

return &ProviderOptimizer{
strategy: strategy,
providersStorage: cache,
averageBlockTime: averageBlockTIme,
baseWorldLatency: baseWorldLatency,
providerRelayStats: relayCache,
wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency,
consumerOptimizerDataCollector: consumerOptimizerDataCollector,
}
}

// calculate the probability a random variable with a poisson distribution
Expand Down
Loading

0 comments on commit 23b8586

Please sign in to comment.