Skip to content

Commit

Permalink
streamline lumberjack mode behavior on send failure
Browse files Browse the repository at this point in the history
have all send modes behave the same on failed send attempts:
- drop message if maximum number of attempts reached
- block infinitely (until closed) if max attempts is set to 0

Fix possible deadlock in load balancer when all workers start failing
at same time by buffering retry messages.
  • Loading branch information
urso committed Sep 25, 2015
1 parent c420211 commit fca48f5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 34 deletions.
94 changes: 61 additions & 33 deletions outputs/lumberjack/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ type singleConnectionMode struct {

closed bool // mode closed flag to break publisher loop

timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until reconnect
maxAttempts int // maximum number of configured send attempts
timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until reconnect

// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
maxAttempts int
}

// failOverConnectionMode connects to at most one host by random and swap to
Expand All @@ -79,9 +82,12 @@ type failOverConnectionMode struct {

closed bool // mode closed flag to break publisher loop

timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until trying a new connection
maxAttempts int // maximum number of configured send attempts
timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until trying a new connection

// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
maxAttempts int
}

// loadBalancerMode balances the sending of events between multiple connections.
Expand Down Expand Up @@ -110,14 +116,23 @@ type loadBalancerMode struct {
timeout time.Duration // send/retry timeout. Every timeout is a failed send attempt
waitRetry time.Duration // duration to wait during re-connection attempts

maxAttempts int // maximum number of configured send attempts
// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
maxAttempts int

// waitGroup + signaling channel for handling shutdown
wg sync.WaitGroup
done chan struct{}

work chan eventsMessage // work channel with new events to be published
retries chan eventsMessage // work channel for fail send attempts being forwarded to other workers
// channels for forwarding work items to workers.
// The work channel is used by publisher to insert new events
// into the load balancer. The work channel is synchronous blocking until timeout
// for one worker available.
// The retries channel is used to forward failed send attempts to other workers.
// The retries channel is buffered to mitigate possible deadlocks when all
// workers become unresponsive.
work chan eventsMessage
retries chan eventsMessage
}

type eventsMessage struct {
Expand Down Expand Up @@ -321,7 +336,7 @@ func newLoadBalancerMode(
maxAttempts: maxAttempts,

work: make(chan eventsMessage),
retries: make(chan eventsMessage),
retries: make(chan eventsMessage, len(clients)*2),
done: make(chan struct{}),
}
m.start(clients)
Expand Down Expand Up @@ -377,37 +392,29 @@ func (m *loadBalancerMode) start(clients []ProtocolClient) {
func (m *loadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
published := 0
events := msg.events
send := 0
for published < len(events) {
n, err := client.PublishEvents(events[published:])
if err != nil {
// retry only non-confirmed subset of events in batch
msg.events = msg.events[published:]

// reset attempt count if subset of message has been send
if send > 0 {
msg.attemptsLeft = m.maxAttempts + 1
}
m.onFail(msg)
return
}
published += n
send++
}
outputs.SignalCompleted(msg.signaler)
}

func (m *loadBalancerMode) onFail(msg eventsMessage) {
for {
msg.attemptsLeft--
if msg.attemptsLeft <= 0 {
outputs.SignalFailed(msg.signaler)
return
}

select {
case <-m.done:
// shutting down -> mark message as failed and return
outputs.SignalFailed(msg.signaler)
return
case m.retries <- msg: // forward message to another worker
return
case <-time.After(m.timeout):
// another failed send
}
if ok := m.forwardEvent(m.retries, msg); !ok {
outputs.SignalFailed(msg.signaler)
}
}

Expand All @@ -427,12 +434,33 @@ func (m *loadBalancerMode) PublishEvents(
events: events,
}

select {
case m.work <- msg:
case <-time.After(m.timeout):
// failed send attempt if no worker is available to pick up message
// within configured time limit.
m.onFail(msg)
if ok := m.forwardEvent(m.work, msg); !ok {
outputs.SignalFailed(msg.signaler)
}
return nil
}

func (m *loadBalancerMode) forwardEvent(
ch chan eventsMessage,
msg eventsMessage,
) bool {
if m.maxAttempts == 0 {
select {
case ch <- msg:
return true
case <-m.done: // shutdown
return false
}
} else {
for ; msg.attemptsLeft > 0; msg.attemptsLeft-- {
select {
case ch <- msg:
return true
case <-m.done: // shutdown
return false
case <-time.After(m.timeout):
}
}
}
return false
}
6 changes: 5 additions & 1 deletion outputs/lumberjack/modes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestSingleSend(t *testing.T) {
connect: connectOK,
publish: collectPublish(&collected),
},
3,
0,
100*time.Millisecond,
)
Expand All @@ -181,9 +182,10 @@ func TestSingleConnectFailConnect(t *testing.T) {
&mockClient{
connected: false,
close: closeOK,
connect: failConnect(5, errFail),
connect: failConnect(2, errFail),
publish: collectPublish(&collected),
},
3,
0,
100*time.Millisecond,
)
Expand All @@ -207,6 +209,7 @@ func TestFailoverSingleSend(t *testing.T) {
publish: collectPublish(&collected),
},
},
3,
0,
100*time.Millisecond,
)
Expand All @@ -233,6 +236,7 @@ func TestFailoverFlakyConnections(t *testing.T) {
publish: publishTimeoutEvery(2, collectPublish(&collected)),
},
},
3,
1*time.Millisecond,
100*time.Millisecond,
)
Expand Down

0 comments on commit fca48f5

Please sign in to comment.