Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Refactor the replication connections #280

Merged
merged 6 commits into from
Aug 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 52 additions & 52 deletions services/replicator/inconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ type (
metricsScope int
perDestMetricsScope int

closeChannel chan struct{} // channel to indicate the connection should be closed
creditsCh chan int32 // channel to pass credits from readCreditsStream to writeMsgsStream
creditFlowExpiration time.Time // credit expiration is used to close the stream if we don't receive any credit for some period of time
creditsCh chan int32 // channel to pass credits from readCreditsStream to writeMsgsStream
creditFlowExpiration time.Time // credit expiration is used to close the stream if we don't receive any credit for some period of time

lk sync.Mutex
opened bool
closed bool
wg sync.WaitGroup
shutdownCh chan struct{}
}
)

Expand All @@ -75,68 +73,59 @@ func newInConnection(extUUID string, destPath string, stream storeStream.BStoreO
destM3Client: metrics.NewClientWithTags(m3Client, metrics.Replicator, common.GetDestinationTags(destPath, localLogger)),
metricsScope: metricsScope,
perDestMetricsScope: perDestMetricsScope,
closeChannel: make(chan struct{}),
creditsCh: make(chan int32, 5),
creditFlowExpiration: time.Now().Add(creditFlowTimeout),
shutdownCh: make(chan struct{}),
}

return conn
}

func (conn *inConnection) open() {
conn.lk.Lock()
defer conn.lk.Unlock()

if !conn.opened {
go conn.writeMsgsStream()
go conn.readCreditsStream()

conn.opened = true
}
conn.wg.Add(2)
go conn.writeMsgsStream()
go conn.readCreditsStream()
conn.logger.Info("in connection opened")
}

func (conn *inConnection) close() {
conn.lk.Lock()
defer conn.lk.Unlock()
func (conn *inConnection) WaitUntilDone() {
conn.wg.Wait()
}

if !conn.closed {
close(conn.closeChannel)
conn.closed = true
}
conn.logger.Info("in connection closed")
func (conn *inConnection) shutdown() {
close(conn.shutdownCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to put a log line here .. so you know the reasons that the go-routines have gone, etc.

conn.logger.Info(`in connection shutdown`)
}

func (conn *inConnection) readCreditsStream() {
defer conn.wg.Done()
defer close(conn.creditsCh)
for {
select {
case <-conn.closeChannel:
msg, err := conn.stream.Read()
if err != nil {
conn.logger.WithField(common.TagErr, err).Info("read credit failed")
return
default:
msg, err := conn.stream.Read()
if err != nil {
conn.logger.WithField(common.TagErr, err).Info("read credit failed")
go conn.close()
return
}
}

conn.m3Client.AddCounter(conn.metricsScope, metrics.ReplicatorInConnCreditsReceived, int64(msg.GetCredits()))
conn.m3Client.AddCounter(conn.metricsScope, metrics.ReplicatorInConnCreditsReceived, int64(msg.GetCredits()))

// send this to writeMsgsPump which keeps track of the local credits
// Make this non-blocking because writeMsgsStream could be closed before this
select {
case conn.creditsCh <- msg.GetCredits():
default:
conn.logger.
WithField(`channelLen`, len(conn.creditsCh)).
WithField(`credits`, msg.GetCredits()).
Warn(`Dropped credits because of blocked channel`)
}
// send this to writeMsgsPump which keeps track of the local credits
// Make this non-blocking because writeMsgsStream could be closed before this
select {
case conn.creditsCh <- msg.GetCredits():
case <-conn.shutdownCh:
return
default:
conn.logger.
WithField(`channelLen`, len(conn.creditsCh)).
WithField(`credits`, msg.GetCredits()).
Warn(`Dropped credits because of blocked channel`)
}
}
}

func (conn *inConnection) writeMsgsStream() {
defer conn.wg.Done()
defer conn.stream.Done()

flushTicker := time.NewTicker(flushTimeout)
Expand All @@ -146,24 +135,31 @@ func (conn *inConnection) writeMsgsStream() {
for {
if localCredits == 0 {
select {
case credit := <-conn.creditsCh:
case credit, ok := <-conn.creditsCh:
if !ok {
conn.logger.Info(`internal credit channel closed`)
return
}
conn.extentCreditExpiration()
localCredits += credit
case <-time.After(creditFlowTimeout):
conn.logger.Warn("credit flow timeout")
if conn.isCreditFlowExpired() {
conn.logger.Warn("credit flow expired")
go conn.close()
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait (and quit) on the "shutdownCh" here ..

}
case <-conn.closeChannel:
case <-conn.shutdownCh:
return
}
} else {
select {
case msg := <-conn.msgCh:
case msg, ok := <-conn.msgCh:
if !ok {
conn.logger.Info("msg channel closed")
return
}
if err := conn.stream.Write(msg); err != nil {
conn.logger.Error("write msg failed")
go conn.close()
return
}

Expand All @@ -182,15 +178,19 @@ func (conn *inConnection) writeMsgsStream() {
}

localCredits--
case credit := <-conn.creditsCh:
case credit, ok := <-conn.creditsCh:
if !ok {
conn.logger.Info(`internal credit channel closed`)
return
}
conn.extentCreditExpiration()
localCredits += credit
case <-flushTicker.C:
if err := conn.stream.Flush(); err != nil {
conn.logger.Error(`flush msg failed`)
go conn.close()
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also wait (and return) on the shutdownCh here ..

}
case <-conn.closeChannel:
case <-conn.shutdownCh:
return
}
}
Expand Down
Loading