Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kinesis sinks): implement full retry of partial failures in firehose/streams #16771

Conversation

jasongoodwin
Copy link
Contributor

@jasongoodwin jasongoodwin commented Mar 11, 2023

  • Has configuration, but not per each of firehose/streams (probably should be per each, not just in base)
  • It passes the failed count up for each of firehose/streams, then uses config to see if the request should be retried.
    I have to test this, but looks okay. It's a little different than the ES implementation, but the change is fairly minimal.
    Only wart is the way that the request size is "augmented" into the KinesisResponse after the call to call.

Hopefully looks alright - feel free to drop any feedback and I'll fix it up.
I think the config per each of streams/firehose is likely necessary to release this.

I reviewed/marked up the PR to help draw attention to these items and clarify.

--------- Further discussion/tickets -------

Handling of partial failures for firehose is open here: #359
Note that this issues is just the whole retry similar to what ES does (#140
)
I'd happily talk to someone about how I might improve this to handle only the partial failures (@decklyndubs on Discord - I'm in your server.) I have to look a little deeper but my fear in doing that is that a record gets indefinitely stuck and should be dropped, so in my mind there may need to be some separate retry policy in the sink. This design issue is broad and relates to multiple sinks such as ES that has partial failures.

Some other related tickets.
#7659
#9451
#9861

@jasongoodwin jasongoodwin requested a review from a team March 11, 2023 20:49
@bits-bot
Copy link

bits-bot commented Mar 11, 2023

CLA assistant check
All committers have signed the CLA.

@netlify
Copy link

netlify bot commented Mar 11, 2023

Deploy Preview for vrl-playground canceled.

Name Link
🔨 Latest commit 1d2caea
🔍 Latest deploy log https://app.netlify.com/sites/vrl-playground/deploys/640d1f376a1ee20008cf32fe

@netlify
Copy link

netlify bot commented Mar 11, 2023

Deploy Preview for vector-project canceled.

Name Link
🔨 Latest commit 1d2caea
🔍 Latest deploy log https://app.netlify.com/sites/vector-project/deploys/640d1f37068f200008638a20

@github-actions github-actions bot added the domain: sinks Anything related to the Vector's sinks label Mar 11, 2023
@@ -58,6 +58,13 @@ pub struct KinesisSinkBaseConfig {
#[serde(default)]
pub auth: AwsAuthentication,

/// Whether or not to retry successful requests containing partial failures.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want per-sink config. This is in the "base" only

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 especially if we're only supporting it for streams.

let msg = format!("partial error count {}", response.failure_count);
return RetryAction::Retry(msg.into());
} else {
RetryAction::DontRetry("ok".into())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these contain anything different? New to the project.

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if self.retry_partial && response.failure_count > 0 {
let msg = format!("partial error count {}", response.failure_count);
return RetryAction::Retry(msg.into());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixme: doesn't need a return

.map(|output: PutRecordBatchOutput| KinesisResponse {
count: rec_count,
failure_count: output.failed_put_count().unwrap_or(0) as usize,
events_byte_size: 0,
Copy link
Contributor Author

@jasongoodwin jasongoodwin Mar 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the events size isn't available here. Wasn't sure the best way to modify this - will think about it. I may just return the failure count for now, and build the KinesisResponse in the Service

@fuchsnj fuchsnj added the sink: aws_kinesis_streams Anything `aws_kinesis_streams` sink related label Mar 13, 2023
@spencergilbert spencergilbert requested a review from a team March 13, 2023 19:38
@spencergilbert
Copy link
Contributor

Thanks @jasongoodwin! I'm hoping to have this reviewed by tomorrow, appreciate the pre-review you already provided 😄

@jasongoodwin
Copy link
Contributor Author

Thanks @jasongoodwin! I'm hoping to have this reviewed by tomorrow, appreciate the pre-review you already provided 😄

Yeah it needs a few things likely - but what I'm really hoping to accomplish is to do partial retry - for firehose this pr could potentially create a lot of duplication which is a risk. Streams is okay as it can deduplicate.

If you can give some insight into how i might implement partial retry, would defo appreciate it.
Eg I can get the records that failed, no problem. How would I pass this up so that the sink can batch just the failed records on the retry while using the existing retry logic? Having a hard time understanding some of those pieces.

@jasongoodwin
Copy link
Contributor Author

jasongoodwin commented Mar 15, 2023

I'll try to clean this up a bit over the weekend after reviewing/thinking about it a bit.

I closed the related pr. #16703
I'm interested in taking a stab at the partial retry feature - thanks for the feedback on that one - it gives me some leads to analyze the code a bit more.

@spencergilbert
Copy link
Contributor

I failed to have this reviewed, but I'll do my best to leave my thoughts before you take another look this weekend

@jasongoodwin
Copy link
Contributor Author

jasongoodwin commented Mar 16, 2023

It's okay - I can see some things clean up after sitting on it. Only thing really at this point is:

  1. what do you think about the configuration?
  2. should this even be implemented for firehose? What's the risk of duplication and egregious expense on a lot of retries?
  3. the KinesisResponse is built and then aggregated as it's passed out and I think that's not great.

@spencergilbert
Copy link
Contributor

Sorry for the delay - I'm planning on dedicating a chunk of time Wednesday on this.

Copy link
Contributor

@spencergilbert spencergilbert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. what do you think about the configuration?

I think the configuration is fine. If we don't implement for firehose we can drop the note, and if there is a suggestion for how to protect against dedupes for streams - we could include that suggestion (as we do for Elasticsearch).

  1. should this even be implemented for firehose? What's the risk of duplication and egregious expense on a lot of retries?

I think if there's no way for firehose to de-dupe these we should only implement it for streams (which can de-dupe?).

@@ -58,6 +58,13 @@ pub struct KinesisSinkBaseConfig {
#[serde(default)]
pub auth: AwsAuthentication,

/// Whether or not to retry successful requests containing partial failures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 especially if we're only supporting it for streams.


fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if self.retry_partial && response.failure_count > 0 {
let msg = format!("partial error count {}", response.failure_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to include the error type and reason if we can pull that out of the response reasonably.

@@ -1,3 +1,5 @@
use crate::sinks::aws_kinesis::KinesisResponse;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be moved into the use super:: line below.

Comment on lines +80 to +84
.map(|output: PutRecordsOutput| KinesisResponse {
count: rec_count,
failure_count: output.failed_record_count().unwrap_or(0) as usize,
events_byte_size: 0,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It definitely feels better to me to do this in the service.rs

@jasongoodwin
Copy link
Contributor Author

Great thanks for the review! I'll have to rebuild some context to fix this up, but I think we can get this over the line.

@spencergilbert
Copy link
Contributor

Hey @jasongoodwin - just noticed this was still hanging around, wanted to check in and see how things were going.

spencergilbert added a commit that referenced this pull request Jun 16, 2023
…hose/streams (#17535)

This PR is from #16771 PR.
Refactor some action checking.

closes: #17424

---------

Signed-off-by: Spencer Gilbert <spencer.gilbert@datadoghq.com>
Co-authored-by: Jason Goodwin <jgoodwin@bluecatnetworks.com>
Co-authored-by: Jason Goodwin <jay.michael.goodwin@gmail.com>
Co-authored-by: Spencer Gilbert <spencer.gilbert@datadoghq.com>
@jszwedko
Copy link
Member

Superceded by #17535

@jszwedko jszwedko closed this Jun 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: sinks Anything related to the Vector's sinks sink: aws_kinesis_streams Anything `aws_kinesis_streams` sink related
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants