diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 1adf8c0e..52a9014f 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -953,6 +953,8 @@ const ( StorageReplicationJobMaxConsecutiveFailures // StorageReplicationJobCurrentFailures is the number of failed job in current run StorageReplicationJobCurrentFailures + // StorageReplicationJobCurrentSuccess is the number of success job in current run + StorageReplicationJobCurrentSuccess // -- Controller metrics -- // @@ -1233,6 +1235,7 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{ StorageOutFlushTChannelLatency: {Timer, "storage.out.flush-tchannel-latency"}, StorageReplicationJobMaxConsecutiveFailures: {Gauge, "storage.replication-job.max-consecutive-failures"}, StorageReplicationJobCurrentFailures: {Gauge, "storage.replication-job.current-failures"}, + StorageReplicationJobCurrentSuccess: {Gauge, "storage.replication-job.current-success"}, }, // definitions for Controller metrics diff --git a/services/storehost/replicationJobRunner.go b/services/storehost/replicationJobRunner.go index bc6d4dec..49c93a68 100644 --- a/services/storehost/replicationJobRunner.go +++ b/services/storehost/replicationJobRunner.go @@ -118,6 +118,8 @@ func (runner *replicationJobRunner) run() { return } + defer atomic.StoreInt64(&runner.running, 0) + runner.logger.Info("replication run started") listReq := &metadata.ListStoreExtentsStatsRequest{ @@ -138,6 +140,7 @@ func (runner *replicationJobRunner) run() { openedForReplication := 0 primaryExtents := 0 secondaryExtents := 0 + jobsStarted := 0 currentFailedJobs := make(map[string]struct{}) for _, extentStats := range res.GetExtentStatsList() { extentID := extentStats.GetExtent().GetExtentUUID() @@ -186,6 +189,7 @@ func (runner *replicationJobRunner) run() { common.TagDst: common.FmtDst(destID), common.TagExt: common.FmtExt(extentID), common.TagStor: common.FmtStor(runner.storeID), + common.TagErr: err, }).Error(`Remote replication for extent failed`) currentFailedJobs[extentID] = struct{}{} continue @@ -221,6 +225,7 @@ func (runner *replicationJobRunner) run() { common.TagDst: common.FmtDst(destID), common.TagExt: common.FmtExt(extentID), common.TagStor: common.FmtStor(runner.storeID), + common.TagErr: err, }).Error(`Rereplication for extent failed`) currentFailedJobs[extentID] = struct{}{} continue @@ -232,6 +237,8 @@ func (runner *replicationJobRunner) run() { common.TagExt: common.FmtExt(extentID), common.TagStor: common.FmtStor(runner.storeID), }).Info(`replication for extent started`) + + jobsStarted++ } var maxConsecutiveFailures int @@ -259,14 +266,13 @@ func (runner *replicationJobRunner) run() { runner.m3Client.UpdateGauge(metrics.ReplicateExtentScope, metrics.StorageReplicationJobCurrentFailures, int64(len(currentFailedJobs))) runner.m3Client.UpdateGauge(metrics.ReplicateExtentScope, metrics.StorageReplicationJobMaxConsecutiveFailures, int64(maxConsecutiveFailures)) + runner.m3Client.UpdateGauge(metrics.ReplicateExtentScope, metrics.StorageReplicationJobCurrentSuccess, int64(jobsStarted)) runner.logger.WithFields(bark.Fields{ - `stats`: fmt.Sprintf(`total extents: %v, remote extents:%v, opened for replication: %v, primary: %v, secondary: %v, failed: %v`, - totalExtents, totalRemoteExtents, openedForReplication, primaryExtents, secondaryExtents, len(currentFailedJobs)), + `stats`: fmt.Sprintf(`total extents: %v, remote extents:%v, opened for replication: %v, primary: %v, secondary: %v, failed: %v, success: %v`, + totalExtents, totalRemoteExtents, openedForReplication, primaryExtents, secondaryExtents, len(currentFailedJobs), jobsStarted), common.TagStor: common.FmtStor(runner.storeID), }).Info(`replication run finished`) - - atomic.StoreInt64(&runner.running, 0) } func (runner *replicationJobRunner) houseKeep() {