diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 3a67d3b71e6..8878bff490c 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1944,9 +1944,11 @@ const ( ShardInfoTransferStandbyPendingTasksTimer ShardInfoTimerActivePendingTasksTimer ShardInfoTimerStandbyPendingTasksTimer + ShardInfoVisibilityPendingTasksTimer ShardInfoReplicationLagHistogram ShardInfoTransferLagHistogram ShardInfoTimerLagTimer + ShardInfoVisibilityLagHistogram ShardInfoTransferDiffHistogram ShardInfoTimerDiffTimer ShardInfoTransferFailoverInProgressHistogram @@ -2399,9 +2401,11 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ShardInfoTransferStandbyPendingTasksTimer: NewDimensionlessHistogramDef("shardinfo_transfer_standby_pending_task"), ShardInfoTimerActivePendingTasksTimer: NewDimensionlessHistogramDef("shardinfo_timer_active_pending_task"), ShardInfoTimerStandbyPendingTasksTimer: NewDimensionlessHistogramDef("shardinfo_timer_standby_pending_task"), + ShardInfoVisibilityPendingTasksTimer: NewDimensionlessHistogramDef("shardinfo_visibility_pending_task"), ShardInfoReplicationLagHistogram: NewDimensionlessHistogramDef("shardinfo_replication_lag"), ShardInfoTransferLagHistogram: NewDimensionlessHistogramDef("shardinfo_transfer_lag"), ShardInfoTimerLagTimer: NewTimerDef("shardinfo_timer_lag"), + ShardInfoVisibilityLagHistogram: NewDimensionlessHistogramDef("shardinfo_visibility_lag"), ShardInfoTransferDiffHistogram: NewDimensionlessHistogramDef("shardinfo_transfer_diff"), ShardInfoTimerDiffTimer: NewTimerDef("shardinfo_timer_diff"), ShardInfoTransferFailoverInProgressHistogram: NewDimensionlessHistogramDef("shardinfo_transfer_failover_in_progress"), diff --git a/service/history/queueAckMgr.go b/service/history/queueAckMgr.go index 72a3bb1815b..0324e8cb271 100644 --- a/service/history/queueAckMgr.go +++ b/service/history/queueAckMgr.go @@ -184,13 +184,17 @@ func (a *queueAckMgrImpl) updateQueueAckLevel() error { if pendingTasks > warnPendingTasks { a.logger.Warn("Too many pending tasks") } + + metricsScope := a.metricsClient.Scope(metrics.ShardInfoScope) switch a.options.MetricScope { case metrics.ReplicatorQueueProcessorScope: - a.metricsClient.RecordDistribution(metrics.ShardInfoScope, metrics.ShardInfoReplicationPendingTasksTimer, pendingTasks) + metricsScope.RecordDistribution(metrics.ShardInfoReplicationPendingTasksTimer, pendingTasks) case metrics.TransferActiveQueueProcessorScope: - a.metricsClient.RecordDistribution(metrics.ShardInfoScope, metrics.ShardInfoTransferActivePendingTasksTimer, pendingTasks) + metricsScope.RecordDistribution(metrics.ShardInfoTransferActivePendingTasksTimer, pendingTasks) case metrics.TransferStandbyQueueProcessorScope: - a.metricsClient.RecordDistribution(metrics.ShardInfoScope, metrics.ShardInfoTransferStandbyPendingTasksTimer, pendingTasks) + metricsScope.RecordDistribution(metrics.ShardInfoTransferStandbyPendingTasksTimer, pendingTasks) + case metrics.VisibilityQueueProcessorScope: + metricsScope.RecordDistribution(metrics.ShardInfoVisibilityPendingTasksTimer, pendingTasks) } MoveAckLevelLoop: diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 433523b010c..7b98496d68c 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -1030,6 +1030,7 @@ func (s *ContextImpl) emitShardInfoMetricsLogsLocked() { replicationLag := s.transferMaxReadLevel - s.shardInfo.ReplicationAckLevel transferLag := s.transferMaxReadLevel - s.shardInfo.TransferAckLevel timerLag := time.Since(timestamp.TimeValue(s.shardInfo.TimerAckLevelTime)) + visibilityLag := s.transferMaxReadLevel - s.shardInfo.VisibilityAckLevel transferFailoverInProgress := len(s.shardInfo.TransferFailoverLevels) timerFailoverInProgress := len(s.shardInfo.TimerFailoverLevels) @@ -1046,15 +1047,17 @@ func (s *ContextImpl) emitShardInfoMetricsLogsLocked() { tag.ShardTransferAcks(s.shardInfo.ClusterTransferAckLevel)) } - s.GetMetricsClient().RecordDistribution(metrics.ShardInfoScope, metrics.ShardInfoTransferDiffHistogram, int(diffTransferLevel)) - s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTimerDiffTimer, diffTimerLevel) + metricsScope := s.GetMetricsClient().Scope(metrics.ShardInfoScope) + metricsScope.RecordDistribution(metrics.ShardInfoTransferDiffHistogram, int(diffTransferLevel)) + metricsScope.RecordTimer(metrics.ShardInfoTimerDiffTimer, diffTimerLevel) - s.GetMetricsClient().RecordDistribution(metrics.ShardInfoScope, metrics.ShardInfoReplicationLagHistogram, int(replicationLag)) - s.GetMetricsClient().RecordDistribution(metrics.ShardInfoScope, metrics.ShardInfoTransferLagHistogram, int(transferLag)) - s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTimerLagTimer, timerLag) + metricsScope.RecordDistribution(metrics.ShardInfoReplicationLagHistogram, int(replicationLag)) + metricsScope.RecordDistribution(metrics.ShardInfoTransferLagHistogram, int(transferLag)) + metricsScope.RecordTimer(metrics.ShardInfoTimerLagTimer, timerLag) + metricsScope.RecordDistribution(metrics.ShardInfoVisibilityLagHistogram, int(visibilityLag)) - s.GetMetricsClient().RecordDistribution(metrics.ShardInfoScope, metrics.ShardInfoTransferFailoverInProgressHistogram, transferFailoverInProgress) - s.GetMetricsClient().RecordDistribution(metrics.ShardInfoScope, metrics.ShardInfoTimerFailoverInProgressHistogram, timerFailoverInProgress) + metricsScope.RecordDistribution(metrics.ShardInfoTransferFailoverInProgressHistogram, transferFailoverInProgress) + metricsScope.RecordDistribution(metrics.ShardInfoTimerFailoverInProgressHistogram, timerFailoverInProgress) } func (s *ContextImpl) allocateTaskIDsLocked(