Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
store bug fix: ReplicationJobRunner should always mark running as fal…
Browse files Browse the repository at this point in the history
…se after the function exits (#267)
  • Loading branch information
datoug authored Aug 8, 2017
1 parent c12bb7f commit 871f90e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,8 @@ const (
StorageReplicationJobMaxConsecutiveFailures
// StorageReplicationJobCurrentFailures is the number of failed job in current run
StorageReplicationJobCurrentFailures
// StorageReplicationJobCurrentSuccess is the number of success job in current run
StorageReplicationJobCurrentSuccess

// -- Controller metrics -- //

Expand Down Expand Up @@ -1233,6 +1235,7 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{
StorageOutFlushTChannelLatency: {Timer, "storage.out.flush-tchannel-latency"},
StorageReplicationJobMaxConsecutiveFailures: {Gauge, "storage.replication-job.max-consecutive-failures"},
StorageReplicationJobCurrentFailures: {Gauge, "storage.replication-job.current-failures"},
StorageReplicationJobCurrentSuccess: {Gauge, "storage.replication-job.current-success"},
},

// definitions for Controller metrics
Expand Down
14 changes: 10 additions & 4 deletions services/storehost/replicationJobRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (runner *replicationJobRunner) run() {
return
}

defer atomic.StoreInt64(&runner.running, 0)

runner.logger.Info("replication run started")

listReq := &metadata.ListStoreExtentsStatsRequest{
Expand All @@ -138,6 +140,7 @@ func (runner *replicationJobRunner) run() {
openedForReplication := 0
primaryExtents := 0
secondaryExtents := 0
jobsStarted := 0
currentFailedJobs := make(map[string]struct{})
for _, extentStats := range res.GetExtentStatsList() {
extentID := extentStats.GetExtent().GetExtentUUID()
Expand Down Expand Up @@ -186,6 +189,7 @@ func (runner *replicationJobRunner) run() {
common.TagDst: common.FmtDst(destID),
common.TagExt: common.FmtExt(extentID),
common.TagStor: common.FmtStor(runner.storeID),
common.TagErr: err,
}).Error(`Remote replication for extent failed`)
currentFailedJobs[extentID] = struct{}{}
continue
Expand Down Expand Up @@ -221,6 +225,7 @@ func (runner *replicationJobRunner) run() {
common.TagDst: common.FmtDst(destID),
common.TagExt: common.FmtExt(extentID),
common.TagStor: common.FmtStor(runner.storeID),
common.TagErr: err,
}).Error(`Rereplication for extent failed`)
currentFailedJobs[extentID] = struct{}{}
continue
Expand All @@ -232,6 +237,8 @@ func (runner *replicationJobRunner) run() {
common.TagExt: common.FmtExt(extentID),
common.TagStor: common.FmtStor(runner.storeID),
}).Info(`replication for extent started`)

jobsStarted++
}

var maxConsecutiveFailures int
Expand Down Expand Up @@ -259,14 +266,13 @@ func (runner *replicationJobRunner) run() {

runner.m3Client.UpdateGauge(metrics.ReplicateExtentScope, metrics.StorageReplicationJobCurrentFailures, int64(len(currentFailedJobs)))
runner.m3Client.UpdateGauge(metrics.ReplicateExtentScope, metrics.StorageReplicationJobMaxConsecutiveFailures, int64(maxConsecutiveFailures))
runner.m3Client.UpdateGauge(metrics.ReplicateExtentScope, metrics.StorageReplicationJobCurrentSuccess, int64(jobsStarted))

runner.logger.WithFields(bark.Fields{
`stats`: fmt.Sprintf(`total extents: %v, remote extents:%v, opened for replication: %v, primary: %v, secondary: %v, failed: %v`,
totalExtents, totalRemoteExtents, openedForReplication, primaryExtents, secondaryExtents, len(currentFailedJobs)),
`stats`: fmt.Sprintf(`total extents: %v, remote extents:%v, opened for replication: %v, primary: %v, secondary: %v, failed: %v, success: %v`,
totalExtents, totalRemoteExtents, openedForReplication, primaryExtents, secondaryExtents, len(currentFailedJobs), jobsStarted),
common.TagStor: common.FmtStor(runner.storeID),
}).Info(`replication run finished`)

atomic.StoreInt64(&runner.running, 0)
}

func (runner *replicationJobRunner) houseKeep() {
Expand Down

0 comments on commit 871f90e

Please sign in to comment.