Skip to content

Commit

Permalink
Removing sigUnWait from controller
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Apr 21, 2020
1 parent 6e10331 commit 6a6680c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
1 change: 0 additions & 1 deletion libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func (c *outputController) Set(outGrp outputs.Group) {

// restart consumer (potentially blocked by retryer)
c.consumer.sigContinue()
c.consumer.sigUnWait()

c.observer.updateOutputGroup()
}
Expand Down
47 changes: 31 additions & 16 deletions libbeat/publisher/pipeline/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,7 @@ func (r *retryer) loop() {
active = buffer[0]
activeSize = len(active.Events())
if !consumerBlocked {
consumerBlocked = blockConsumer(numOutputs, len(buffer))
if consumerBlocked {
log.Info("retryer: send wait signal to consumer")
if r.consumer != nil {
r.consumer.sigWait()
}
log.Info(" done")
}
consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer))
}
}

Expand All @@ -193,27 +186,49 @@ func (r *retryer) loop() {
}

if consumerBlocked {
consumerBlocked = blockConsumer(numOutputs, len(buffer))
if !consumerBlocked {
log.Info("retryer: send unwait-signal to consumer")
if r.consumer != nil {
r.consumer.sigUnWait()
}
log.Info(" done")
}
consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer))
}

case sig := <-r.sig:
switch sig.tag {
case sigRetryerOutputAdded:
numOutputs++
if consumerBlocked {
consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer))
}
case sigRetryerOutputRemoved:
numOutputs--
if !consumerBlocked {
consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer))
}
}
}
}
}

func (r *retryer) checkConsumerBlock(numOutputs, numBatches int) bool {
consumerBlocked := blockConsumer(numOutputs, numBatches)
if r.consumer == nil {
return consumerBlocked
}

if consumerBlocked {
r.logger.Info("retryer: send wait signal to consumer")
if r.consumer != nil {
r.consumer.sigWait()
}
r.logger.Info(" done")
} else {
r.logger.Info("retryer: send unwait signal to consumer")
if r.consumer != nil {
r.consumer.sigUnWait()
}
r.logger.Info(" done")
}

return consumerBlocked
}

func blockConsumer(numOutputs, numBatches int) bool {
return numBatches/3 >= numOutputs
}

0 comments on commit 6a6680c

Please sign in to comment.