Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Refactor the replication connections (#280)
Browse files Browse the repository at this point in the history
* Replictor: refactor the stream closure mechenism

* update

* refactor replicator connections

* outConn close msg channel in read pump so that inConn can be notified and close the write pump

* address comments

* revert change on glide.lock
  • Loading branch information
datoug authored Aug 30, 2017
1 parent 213124a commit 7486ebf
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 178 deletions.
104 changes: 52 additions & 52 deletions services/replicator/inconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ type (
metricsScope int
perDestMetricsScope int

closeChannel chan struct{} // channel to indicate the connection should be closed
creditsCh chan int32 // channel to pass credits from readCreditsStream to writeMsgsStream
creditFlowExpiration time.Time // credit expiration is used to close the stream if we don't receive any credit for some period of time
creditsCh chan int32 // channel to pass credits from readCreditsStream to writeMsgsStream
creditFlowExpiration time.Time // credit expiration is used to close the stream if we don't receive any credit for some period of time

lk sync.Mutex
opened bool
closed bool
wg sync.WaitGroup
shutdownCh chan struct{}
}
)

Expand All @@ -75,68 +73,59 @@ func newInConnection(extUUID string, destPath string, stream storeStream.BStoreO
destM3Client: metrics.NewClientWithTags(m3Client, metrics.Replicator, common.GetDestinationTags(destPath, localLogger)),
metricsScope: metricsScope,
perDestMetricsScope: perDestMetricsScope,
closeChannel: make(chan struct{}),
creditsCh: make(chan int32, 5),
creditFlowExpiration: time.Now().Add(creditFlowTimeout),
shutdownCh: make(chan struct{}),
}

return conn
}

func (conn *inConnection) open() {
conn.lk.Lock()
defer conn.lk.Unlock()

if !conn.opened {
go conn.writeMsgsStream()
go conn.readCreditsStream()

conn.opened = true
}
conn.wg.Add(2)
go conn.writeMsgsStream()
go conn.readCreditsStream()
conn.logger.Info("in connection opened")
}

func (conn *inConnection) close() {
conn.lk.Lock()
defer conn.lk.Unlock()
func (conn *inConnection) WaitUntilDone() {
conn.wg.Wait()
}

if !conn.closed {
close(conn.closeChannel)
conn.closed = true
}
conn.logger.Info("in connection closed")
func (conn *inConnection) shutdown() {
close(conn.shutdownCh)
conn.logger.Info(`in connection shutdown`)
}

func (conn *inConnection) readCreditsStream() {
defer conn.wg.Done()
defer close(conn.creditsCh)
for {
select {
case <-conn.closeChannel:
msg, err := conn.stream.Read()
if err != nil {
conn.logger.WithField(common.TagErr, err).Info("read credit failed")
return
default:
msg, err := conn.stream.Read()
if err != nil {
conn.logger.WithField(common.TagErr, err).Info("read credit failed")
go conn.close()
return
}
}

conn.m3Client.AddCounter(conn.metricsScope, metrics.ReplicatorInConnCreditsReceived, int64(msg.GetCredits()))
conn.m3Client.AddCounter(conn.metricsScope, metrics.ReplicatorInConnCreditsReceived, int64(msg.GetCredits()))

// send this to writeMsgsPump which keeps track of the local credits
// Make this non-blocking because writeMsgsStream could be closed before this
select {
case conn.creditsCh <- msg.GetCredits():
default:
conn.logger.
WithField(`channelLen`, len(conn.creditsCh)).
WithField(`credits`, msg.GetCredits()).
Warn(`Dropped credits because of blocked channel`)
}
// send this to writeMsgsPump which keeps track of the local credits
// Make this non-blocking because writeMsgsStream could be closed before this
select {
case conn.creditsCh <- msg.GetCredits():
case <-conn.shutdownCh:
return
default:
conn.logger.
WithField(`channelLen`, len(conn.creditsCh)).
WithField(`credits`, msg.GetCredits()).
Warn(`Dropped credits because of blocked channel`)
}
}
}

func (conn *inConnection) writeMsgsStream() {
defer conn.wg.Done()
defer conn.stream.Done()

flushTicker := time.NewTicker(flushTimeout)
Expand All @@ -146,24 +135,31 @@ func (conn *inConnection) writeMsgsStream() {
for {
if localCredits == 0 {
select {
case credit := <-conn.creditsCh:
case credit, ok := <-conn.creditsCh:
if !ok {
conn.logger.Info(`internal credit channel closed`)
return
}
conn.extentCreditExpiration()
localCredits += credit
case <-time.After(creditFlowTimeout):
conn.logger.Warn("credit flow timeout")
if conn.isCreditFlowExpired() {
conn.logger.Warn("credit flow expired")
go conn.close()
return
}
case <-conn.closeChannel:
case <-conn.shutdownCh:
return
}
} else {
select {
case msg := <-conn.msgCh:
case msg, ok := <-conn.msgCh:
if !ok {
conn.logger.Info("msg channel closed")
return
}
if err := conn.stream.Write(msg); err != nil {
conn.logger.Error("write msg failed")
go conn.close()
return
}

Expand All @@ -182,15 +178,19 @@ func (conn *inConnection) writeMsgsStream() {
}

localCredits--
case credit := <-conn.creditsCh:
case credit, ok := <-conn.creditsCh:
if !ok {
conn.logger.Info(`internal credit channel closed`)
return
}
conn.extentCreditExpiration()
localCredits += credit
case <-flushTicker.C:
if err := conn.stream.Flush(); err != nil {
conn.logger.Error(`flush msg failed`)
go conn.close()
return
}
case <-conn.closeChannel:
case <-conn.shutdownCh:
return
}
}
Expand Down
Loading

0 comments on commit 7486ebf

Please sign in to comment.