Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Ensure replication connections do not leak (#269)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiran RG authored Aug 9, 2017
1 parent a172b94 commit 138cd6e
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions services/storehost/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ func (t *ReplicationJob) replicationPump() {
var credits int32
lastCreditUpdate := time.Now()

var done bool

pump:
for {
// see if we can get some some credits
Expand All @@ -328,13 +330,20 @@ pump:
readMsg, err := stream.Read()

if err != nil {

log.WithFields(bark.Fields{
`reason`: `stream closed`,
`replMsgs`: replMsgs,
common.TagErr: err,
}).Info(`replicationPump done`)
t.err = fmt.Errorf(`stopped`)
break pump

} else if done {
// we came back here to give a chance for the connection to gracefully
// shutdown, with a read-error; if it did not, its likely we saw a local
// error (as opposed to a server-side error), so get out anyway.
break pump
}

switch readMsg.GetType() {
Expand All @@ -360,7 +369,8 @@ pump:

t.m3Client.IncCounter(metrics.ReplicateExtentScope, metrics.StorageFailures)
t.err = fmt.Errorf("%v failed writing seal-key: %v", t.extentID, err)
break pump
done = true
continue pump
}

x.setSealSeqNum(sealSeqNum) // save sealSeqNum
Expand All @@ -375,7 +385,8 @@ pump:
t.updateReplicationStatus(t.extentID.String(), shared.ExtentReplicaReplicationStatus_DONE)

t.err = nil // no error
break pump
done = true
continue pump

case store.ReadMessageContentType_ERROR:
t.err = readMsg.GetError()
Expand All @@ -384,7 +395,8 @@ pump:
common.TagErr: t.err,
`replMsgs`: replMsgs,
}).Error(`replicationPump done`)
break pump
done = true
continue pump

default:
t.err = fmt.Errorf("unknown ReadMessageContentType: %v", readMsg.GetType())
Expand All @@ -394,7 +406,8 @@ pump:
`readmsg-type`: readMsg.GetType(),
`replMsgs`: replMsgs,
}).Error(`replicationPump done`)
break pump
done = true
continue pump
}

msg := readMsg.GetMessage()
Expand Down Expand Up @@ -432,7 +445,8 @@ pump:

t.m3Client.IncCounter(metrics.ReplicateExtentScope, metrics.StorageFailures)
t.err = fmt.Errorf("%v msg key=%x less than last-key=%x", t.extentID, key, lastMessageKey)
break pump
done = true
continue pump
}

lastMessageKey = key
Expand All @@ -455,7 +469,8 @@ pump:

t.m3Client.IncCounter(metrics.ReplicateExtentScope, metrics.StorageFailures)
t.err = fmt.Errorf("%v error serializing message (seqnum=%x key=%x): %v", t.extentID, msgSeqNum, key, err)
break pump
done = true
continue pump
}

if replicateDebug {
Expand Down Expand Up @@ -486,7 +501,8 @@ pump:
t.m3Client.IncCounter(metrics.ReplicateExtentScope, metrics.StorageFailures)
t.m3Client.IncCounter(metrics.ReplicateExtentScope, metrics.StorageStoreFailures)
t.err = fmt.Errorf("%v error writing message to store (key=%x, val=%d bytes): %v", t.extentID, key, len(val), err)
break pump
done = true
continue pump
}

replMsgs++ // keep a count of the messages written during this "session"
Expand Down

0 comments on commit 138cd6e

Please sign in to comment.