-
Notifications
You must be signed in to change notification settings - Fork 95
Refactor the replication connections #280
Conversation
… and close the write pump
@kirg For the empty credit case(https://github.com/uber/cherami-server/pull/280/files#diff-f8daca02f7ff8d4e7df757fe00f19bc4L148), I think we can rely on the idle timeout(creditFlowTimeout) to eventually close the pumps, instead of introducing another notification channel. |
} | ||
conn.logger.Info("in connection closed") | ||
func (conn *inConnection) shutdown() { | ||
close(conn.shutdownCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to put a log line here .. so you know the reasons that the go-routines have gone, etc.
services/replicator/inconn.go
Outdated
} | ||
} | ||
} | ||
|
||
func (conn *inConnection) writeMsgsStream() { | ||
defer conn.stream.Done() | ||
defer conn.wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should probably have "defer conn.wg.Done()" before "conn.stream.Done()" ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
conn.extentCreditExpiration() | ||
localCredits += credit | ||
case <-time.After(creditFlowTimeout): | ||
conn.logger.Warn("credit flow timeout") | ||
if conn.isCreditFlowExpired() { | ||
conn.logger.Warn("credit flow expired") | ||
go conn.close() | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait (and quit) on the "shutdownCh" here ..
conn.extentCreditExpiration() | ||
localCredits += credit | ||
case <-flushTicker.C: | ||
if err := conn.stream.Flush(); err != nil { | ||
conn.logger.Error(`flush msg failed`) | ||
go conn.close() | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also wait (and return) on the shutdownCh here ..
services/replicator/outconn.go
Outdated
} | ||
|
||
func (conn *outConnection) writeCreditsStream() { | ||
defer conn.stream.Done() | ||
defer conn.wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as with inconn, move this up one line ..
services/replicator/outconn.go
Outdated
continue readloop | ||
conn.m3Client.IncCounter(conn.metricsScope, metrics.ReplicatorOutConnMsgRead) | ||
if rmc.GetType() == store.ReadMessageContentType_SEALED { | ||
conn.logger.WithField(`SequenceNumber`, rmc.GetSealed().GetSequenceNumber()).Info(`extent sealed`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for debug/assertion purposes .. do you want to remember that you saw a "sealed" message .. and in case you don't an error from the next read, log an error or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed that logic since we want replicator be a proxy only now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand .. but do you think it might add value in case we are debugging issues, etc. I suspect we might never hit it though .. so I'll leave it to you.
services/replicator/outconn.go
Outdated
conn.logger.WithField(`Message`, msgErr.GetMessage()).Error(`received error from reading msg`) | ||
go conn.close() | ||
continue readloop | ||
conn.m3Client.IncCounter(conn.metricsScope, metrics.ReplicatorOutConnMsgRead) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will count "SEALED" also as a message .. i think you should not; that way tallying message counts across services will be be easier.
<-outConn.closeChannel | ||
|
||
outConn.WaitUntilDone() | ||
inConn.WaitUntilDone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i might suggest setting up a waitgroup within this function .. and passing a pointer to the waitgroup to inconn/outconn, etc that they increment/decrement .. and you can just do a wg.Wait() here that will automatically wait for both, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems lack of encapsulation if we do that way? (although I saw some code in our codebase does that).
<-outConn.closeChannel | ||
|
||
outConn.WaitUntilDone() | ||
inConn.WaitUntilDone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as previous comment ..
services/replicator/outconn.go
Outdated
continue readloop | ||
conn.m3Client.IncCounter(conn.metricsScope, metrics.ReplicatorOutConnMsgRead) | ||
if rmc.GetType() == store.ReadMessageContentType_SEALED { | ||
conn.logger.WithField(`SequenceNumber`, rmc.GetSealed().GetSequenceNumber()).Info(`extent sealed`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand .. but do you think it might add value in case we are debugging issues, etc. I suspect we might never hit it though .. so I'll leave it to you.
* Replictor: refactor the stream closure mechenism * update * refactor replicator connections * outConn close msg channel in read pump so that inConn can be notified and close the write pump * address comments * revert change on glide.lock
Problem to solve
Design philosophies
read pump reads from stream, write pump writes to stream
read pump communicate with write pump using an internal channel. Read pump writes to the internal channel, and write pump reads from it
Graceful shutdown sequence:
Other changes
Removed the error handling cases in OutConn.go since same error handling is also done in store. Now replicator serves as a proxy only.