Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix partition key computation for aggregation #158

Merged
merged 1 commit into from
Aug 26, 2021

Conversation

jamiees2
Copy link
Contributor

Signed-off-by: James Elias Sigurdarson jamiees2@gmail.com

Description of changes:

The aggregator function has a bug I was wondering about earlier, but recently realized causes actual problems in our setup.

Some notes to provide context:
When producing a record and switching over the aggregator due to size limitations, we start a new aggregation record.
When aggregating into the complete record, we choose pkeys[0] to be the partition key of the Kinesis record.

The bug is that when we switch over to the new aggregation record, we don't regenerate the partition key. The partition key passed is consistent for the entire aggregation, and after returning, we regenerate this key, but we add the first record to the aggregator with the wrong partition key.

The end result is that the next aggregation record is produced with the same partition key as the previous aggregation record. Funny enough this is only a problem for the first aggregation record produced after the switch over, after which pkeys[0] will always never match the remaining records.

This is a problem when the first record is right up against the 1MB shard limit, because the next record will guaranteed end up in the same shard as the first record. As a result, we see a ErrCodeProvisionedThroughputExceededException, and the chunk can never be submitted.

Anyway, this fixes the issue by refactoring the random string generator into a class which can be passed around, and handing control over random string generation to the aggregator itself. This felt like the most maintainable solution, but ends up refactoring the way we manage partition keys. The changes themselves aren't too bad, instead of returning a random string, getPartitionKey returns a (partitionKey, ok) tuple, which is handled differently between the aggregated and non-aggregated impl.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@jamiees2 jamiees2 requested a review from a team as a code owner August 23, 2021 15:32
Copy link
Contributor Author

@jamiees2 jamiees2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments for code review

if !hasPartitionKey {
if len(a.partitionKeys) > 0 {
// Take the first partition key from the map, if any
for k, _ := range a.partitionKeys {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to suggestions for how to do this better, not sure how to get keys out of a map properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could I ask a question about what "first" means here? Does it mean the first element/key was put in the map? Then I am not sure if you could get it from the map through iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I generally just want any key that is in the map, basically we choose a "random" key for the record but AFAICT this key does not matter, since the real partition key is just the first key, so this is just a way to save space. I'll update the comment.

}
// Recompute field size, since it changed
pKeyIdx, _ = a.checkPartitionKey(partitionKey)
pkeyFieldSize = protowire.SizeVarint(pKeyIdx) + fieldNumberSize
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fixes a sizing bug, when we switch records we may be off on the size slightly (it may shrink if the previous pkeyIdx was > 128)

@jamiees2
Copy link
Contributor Author

to be even clearer, the event sequence we were seeing was

AddRecord("random key 1", "data 1") -> nil, nil
AddRecord("random key 1", "data 2") -> AggregatedRecords(pkey: "random key 1", records: ["data 1"]), nil
// regenerate aggregation key
AddRecord("random key 2", "data 3") -> nil, nil

Flush() -> AggregatedRecords(pkey: "random key 1", records: ["data 2", "data 3"]), nil

This is because the second AddRecord created a new record and added data 2 to it, but didn't regenerate the random partition key. When aggregating the second time, it used the first partition key, which will be random key 1 in this sequence.

@hossain-rayhan
Copy link
Contributor

Hi @jamiees2, just to be clear, does this change have a strong dependency with your previous PR #155? Should we revert that commit and merge together?

@jamiees2
Copy link
Contributor Author

jamiees2 commented Aug 23, 2021

oh, no, this was a separate bug we found today :) #155 is perfectly fine on it's own

@jamiees2
Copy link
Contributor Author

Funny enough, this was the problem that caused me to investigate what was behind #155. The size estimation was always incorrect if we had more than one partition key, which is what #155 fixed, but this fixes the reason why we had more than one in the first place.

@hossain-rayhan
Copy link
Contributor

oh, no, this was a separate bug we found today :) #155 is perfectly fine on it's own

Thanks for confirming.

Copy link
Contributor

@zhonghui12 zhonghui12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could I double check that you've tested the code change in an end-to-end test right?

if !hasPartitionKey {
if len(a.partitionKeys) > 0 {
// Take the first partition key from the map, if any
for k, _ := range a.partitionKeys {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could I ask a question about what "first" means here? Does it mean the first element/key was put in the map? Then I am not sure if you could get it from the map through iteration.

@jamiees2
Copy link
Contributor Author

Yeah, this was causing tons of repeated records in our production environment, so we have already deployed a version built off this branch to mitigate the issue.

@jamiees2
Copy link
Contributor Author

If anybody's interested, this is the graph of how much fluent bit's retry metrics dropped when we deployed the change :D the retries were what was causing the duplicate records, and the immediate increase corresponds to normal traffic increase.

image

@zhonghui12
Copy link
Contributor

LGTM

@hossain-rayhan
Copy link
Contributor

hossain-rayhan commented Aug 24, 2021

If anybody's interested, this is the graph of how much fluent bit's retry metrics dropped when we deployed the change :D the retries were what was causing the duplicate records, and the immediate increase corresponds to normal traffic increase.

Hey @jamiees2 This is great! Thanks for this contribution. Also, did you do a double check to make sure no data were lost?

@jamiees2
Copy link
Contributor Author

Yup, the other metrics on the Kinesis end (records ingested) remained the same before and after the rollout

@PettitWesley
Copy link
Contributor

@zackwine FYI

Copy link
Contributor

@PettitWesley PettitWesley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please squash the commits into one or improve the commit messages

Signed-off-by: James Elias Sigurdarson <jamiees2@gmail.com>
@zhonghui12
Copy link
Contributor

Will include this one in our release this week

@zhonghui12 zhonghui12 merged commit 35f3cc9 into aws:mainline Aug 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants