-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding unit tests for publisher output #17460
Adding unit tests for publisher output #17460
Conversation
Pinging @elastic/integrations-services (Team:Services) |
rand.Seed(time.Now().UnixNano()) | ||
|
||
wqu := makeWorkQueue() | ||
client := &mockNetworkClient{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I'm only testing with a outputs.NetworkClient
here is because that will result in a netClientWorker
below (instead of a clientWorker
). And it appears that only in the case of a netClientWorker
do we cancel the batch if the worker was closed before the batch was published:
beats/libbeat/publisher/pipeline/output.go
Line 120 in 8d43169
batch.Cancelled() |
Are we missing a similar check + batch cancellation for clientWorker
inside this loop?
beats/libbeat/publisher/pipeline/output.go
Lines 69 to 75 in 8d43169
for batch := range w.qu { | |
w.observer.outBatchSend(len(batch.events)) | |
if err := w.client.Publish(batch); err != nil { | |
break | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reloading, I would say we're missing the Cancel for client worker. clientworker
is only used for console and file output, which I don't think are available via fleet, but better to have the code correct.
Could the reason be that the console and file output do not support a 'Stop/Close' method? Maybe we need to touch those as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed the difference is the Connect
method. Yeah, clientWorker should also cancel batches in case it is replaced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in 1dd819f2ab787081389564534ea927ef17e086a8.
|
||
for name, test := range tests { | ||
t.Run(name, func(t *testing.T) { | ||
rand.Seed(time.Now().UnixNano()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip: print the seed via t.Log
and add a CLI flag to configure a static seed. In case randomized testing fails it should be reproducible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in bd38221da24c14ae7797b2a48603688ac7a591d1.
wqu <- batch | ||
}() | ||
} | ||
wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for randomized testing testing/quick
is your friend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in 94b133a94.
rand.Seed(time.Now().UnixNano()) | ||
|
||
wqu := makeWorkQueue() | ||
client := &mockNetworkClient{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reloading, I would say we're missing the Cancel for client worker. clientworker
is only used for console and file output, which I don't think are available via fleet, but better to have the code correct.
Could the reason be that the console and file output do not support a 'Stop/Close' method? Maybe we need to touch those as well?
|
||
// Make sure that all events have eventually been published | ||
c := test.client.(interface{ Published() int }) | ||
assert.Equal(t, numEvents.Load(), c.Published()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better have an assert loop with timeout instead of a hard sleep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in 9b1cf7133.
numEvents.Add(len(batch.Events())) | ||
|
||
wqu <- batch | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are moving batches into the work queue only. Why do we need a go-routine for this?
In case the queue/worker blocks the test will timeout (default test timeout in go is 10min).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good point. I was trying to simulate as much as possible how the publisher is being used in reality — e.g. multiple FB inputs concurrently trying to publish batches of events. However, the work queue is an unbuffered channel so trying to concurrently send batches to it is pointless 🤦♂. I will remove the goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 33af88a42.
assert.Equal(t, numEvents.Load(), client.Published()) | ||
} | ||
|
||
type mockClient struct{ published int } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we run our tests with -race
. The way this type is used we should either use an atomic or protect operations via mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 33391da48.
@urso I've addressed all your review feedback. It's my first time using |
b47a623
to
e9e20c5
Compare
@urso I could use some help here with the
Essentially this means that all batches are being consumed by the worker before So I tried a couple things (see the last few commits) to try and leave some batches in the queue when Related: I can only see this test failing on Travis CI right now as the |
cc2d4b3
to
8bb19ee
Compare
// Block publishing | ||
if c.publishLimit > 0 && c.published >= c.publishLimit { | ||
batch.Retry() // to simulate not acking | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso WDYT about this logic to emulate blocking publishing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This emulation is not blocking, but a failing output. The batch is not acked, but this Retrty + return nil
will signal that the output is ready to consumer another batch.
A Blocking simulation would require you to wait for some signal (e.g. via a control channel).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At one point I had a sleep in here between the retry and return. The idea then was that the first client would be closed before the sleep finished.
A control channel is better than a sleep. Once the first client is closed I can close the control channel to remove the block. However, the Retry (before waiting to consume from the control channel) will still be needed, otherwise the final publish count doesn't add up to the expected total number of events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented blocking with control channel in 508b606.
total := published + client.Published() | ||
return numEvents.Load() == total | ||
}) | ||
}, &quick.Config{MaxCount: 50}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping the MaxCount
to 50 seems to keep Travis happy. Before that the build job was being killed because it was taking too long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before introducing quick check the count was actually 1 :)
This is some kind of stress test. Unfortunately stress tests don't sit well with travis. We have had bad performance issues with the queue stress tests as well. I think long termI think we should not have stress tests run by travis, but have a separate job running those for even longer. For 'some' simple unit testing a count of 1 might be ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, good point :)
So would you recommend leaving this at 50, lowering it to 1 or maybe somewhere in between? I ask because while 50 is working at the moment I'm worried whether it'll become a source of flakiness. I don't think there's a way to know for sure until Travis runs this several times, though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, it's difficult to find the right value. Maybe set it to 25, so we have some more head-room.
numEvents.Add(uint(len(batch.Events()))) | ||
wqu <- batch | ||
}() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of creating a go-routine per batch, how about creating a single go-routine that will execute the for loop? This would actually more close to how the workqueue is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will do that. Just out of curiosity, say there are multiple Filebeat inputs — don't they each get their own goroutine sending to the same publisher work queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in 94f3445.
SeedFlag = flag.Int64("seed", 0, "Randomization seed") | ||
) | ||
|
||
func TestPublish(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can rename the test? We don't test publish, but the tests seem to check the behavior of the clientWorkers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in be4bf09.
type mockClient struct { | ||
mu sync.RWMutex | ||
publishLimit uint | ||
published uint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can excert more control by replacing published
and publishLimit
with a func
. The func
could decide when to block and when to continue. This way the test logic would be more contained within the test. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented mockable publish behavior function in 508b606.
Travis CI is green and Jenkins CI failures are unrelated. Merging. |
* Adding unit tests for publisher output * Adding another unit test (TODO) * Adding unit test for closing worker midway * Reorganizing imports * Output PRNG seed + provide flag to specify seed * Cancel batch with netClient if it is closed * Use waitUntil loop instead of hard sleep * Making mockClient threadsafe * Removing goroutine from happy path unit test * Using testing/quick * Increase batch sizes in tests * Adding sleep to ensure some batches are still at time of close * Experiment witht with slihigher sleep time * Moving sleep to publish time * Increase publish latency * Increasing publish latency again * Removing publishLatency * Fix timeout to large value * Make first client block after publishing X events * Actually block publishing * Reduce number of batches to prevent running out of memory * Bumping up # of batches * Bumping up # of batches again * Try different strategy - publish 80% of events * Cranking up sleep time in publish blocking * Only publish first 20% of events * Make sure to return batch for retrying * Adding debugging statements to see what's happening in Travis * More robust to race conditions * Restricting quick iterations to 50 to see if that helps in Travis * Put entire loop into goroutine * Renaming tests * Emulate blocking + mockable publish behavior * Removing retry and return * Clarify intent with comment * Setting # of quick iterations to 25
* Adding unit tests for publisher output * Adding another unit test (TODO) * Adding unit test for closing worker midway * Reorganizing imports * Output PRNG seed + provide flag to specify seed * Cancel batch with netClient if it is closed * Use waitUntil loop instead of hard sleep * Making mockClient threadsafe * Removing goroutine from happy path unit test * Using testing/quick * Increase batch sizes in tests * Adding sleep to ensure some batches are still at time of close * Experiment witht with slihigher sleep time * Moving sleep to publish time * Increase publish latency * Increasing publish latency again * Removing publishLatency * Fix timeout to large value * Make first client block after publishing X events * Actually block publishing * Reduce number of batches to prevent running out of memory * Bumping up # of batches * Bumping up # of batches again * Try different strategy - publish 80% of events * Cranking up sleep time in publish blocking * Only publish first 20% of events * Make sure to return batch for retrying * Adding debugging statements to see what's happening in Travis * More robust to race conditions * Restricting quick iterations to 50 to see if that helps in Travis * Put entire loop into goroutine * Renaming tests * Emulate blocking + mockable publish behavior * Removing retry and return * Clarify intent with comment * Setting # of quick iterations to 25
What does this PR do?
Adds unit tests for the publisher output functionality in libbeat.
Why is it important?
It checks that the publisher output publishes the expected number of events under various scenarios.
Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesI have added an entry inCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Related PRs