Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Retry Behaviour to AWS Kinesis Data Firehose Sink #12835

Open
mattaltberg opened this issue May 24, 2022 · 14 comments
Open

Add Retry Behaviour to AWS Kinesis Data Firehose Sink #12835

mattaltberg opened this issue May 24, 2022 · 14 comments
Labels
sink: aws_kinesis_firehose Anything `aws_kinesis_firehose` sink related type: bug A code related bug.

Comments

@mattaltberg
Copy link

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Use Cases

Currently, if any records are throttled by AWS, Vector does not retry sending the records to Firehose. We should add a retry to ensure records are sent, because right now, the data is simply dropped.

Attempted Solutions

No response

Proposal

No response

References

No response

Version

No response

@mattaltberg mattaltberg added the type: feature A value-adding code addition that introduce new functionality. label May 24, 2022
@jszwedko
Copy link
Member

Hi @mattaltberg ! What version of Vector were you using when you observed this behavior?

@jszwedko jszwedko added sink: aws_kinesis_firehose Anything `aws_kinesis_firehose` sink related type: bug A code related bug. and removed type: feature A value-adding code addition that introduce new functionality. labels May 24, 2022
@mattaltberg
Copy link
Author

Hey @jszwedko, I'm using 0.16.1. I'll ping you again after I try using the latest version.

@jszwedko
Copy link
Member

Thanks @mattaltberg . I am curious about the newest version (0.21.2) since there were some changes to switch to the new AWS SDK and handle throttling responses consistently in 0.21.0.

@mattaltberg
Copy link
Author

@jszwedko I tested using 0.21.2, and still got the same throttling issue with my Firehose stream. Is it possible it's not used for the Firehose sink?

@jszwedko
Copy link
Member

Interesting. You are seeing throttle messages in Vector's logs? Can you share the log output?

The aws_kinesis_firehose sink appears to be using the same retry logic that includes looking for throttling responses. This is wired up here:

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<PutRecordBatchError>;
type Response = KinesisResponse;
fn is_retriable_error(&self, error: &Self::Error) -> bool {
if let SdkError::ServiceError { err, raw: _ } = error {
if let PutRecordBatchErrorKind::ServiceUnavailableException(_) = err.kind {
return true;
}
}
is_retriable_error(error)
}
}

which calls:

vector/src/aws/mod.rs

Lines 26 to 63 in 498a4c8

pub fn is_retriable_error<T>(error: &SdkError<T>) -> bool {
match error {
SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) => true,
SdkError::ConstructionFailure(_) => false,
SdkError::ResponseError { err: _, raw } | SdkError::ServiceError { err: _, raw } => {
// This header is a direct indication that we should retry the request. Eventually it'd
// be nice to actually schedule the retry after the given delay, but for now we just
// check that it contains a positive value.
let retry_header = raw.http().headers().get("x-amz-retry-after").is_some();
// Certain 400-level responses will contain an error code indicating that the request
// should be retried. Since we don't retry 400-level responses by default, we'll look
// for these specifically before falling back to more general heuristics. Because AWS
// services use a mix of XML and JSON response bodies and the AWS SDK doesn't give us
// a parsed representation, we resort to a simple string match.
//
// S3: RequestTimeout
// SQS: RequestExpired, ThrottlingException
// ECS: RequestExpired, ThrottlingException
// Kinesis: RequestExpired, ThrottlingException
// Cloudwatch: RequestExpired, ThrottlingException
//
// Now just look for those when it's a client_error
let re = RETRIABLE_CODES.get_or_init(|| {
RegexSet::new(&["RequestTimeout", "RequestExpired", "ThrottlingException"])
.expect("invalid regex")
});
let status = raw.http().status();
let response_body = String::from_utf8_lossy(raw.http().body().bytes().unwrap_or(&[]));
retry_header
|| status.is_server_error()
|| status == http::StatusCode::TOO_MANY_REQUESTS
|| (status.is_client_error() && re.is_match(response_body.as_ref()))
}
}
}

@jszwedko
Copy link
Member

If you are able to re-run with the environment variable VECTOR_LOG=aws_smithy_http=trace that should output trace messages from the AWS SDK that show the responses coming from AWS.

@mattaltberg
Copy link
Author

@jszwedko The oddness continues. I'm not seeing any throttling exceptions, but I am seeing plenty of ServiceUnavailableException

2022-05-24T16:36:43.633946Z TRACE send_operation{operation="PutRecordBatch" service="firehose"}:load_response: aws_smithy_http::middleware: http_response=Response { status: 200, version: HTTP/1.1, headers: {"x-amzn-requestid": "c31f7dea-c015-1430-9818-763094685e86", "x-amz-id-2": "uWdgQME8lqQP+rR4lAa2AFXG4eTXIRgbYzNmTSZJmE8kiZAiJ00qDEgI+PKYE/MsJjpFtqlAg79oz2fC47BN28eYMPQEKAqf", "content-type": "application/x-amz-json-1.1", "content-length": "2124", "date": "Tue, 24 May 2022 16:36:43 GMT"}, body: b"{\"Encrypted\":false,\"FailedPutCount\":12,\"RequestResponses\":[{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"RecordId\":\"O9udtn6NoXmtLlk1cDIlGtceB7ZpAOhv2f+sBNg50TsSJMUTUHOz85GYSkSjnCvq05dr1OsEwwSzUPm9lGUmxtXPp50tzAPp5rUwaA3DV+3fBmxZjklJtrJf/kDCJJuRWdoTrkmIUjD9UZcxxANnTWZSaI3MyJr47ZJD9zyKI3190KmoCP8P8eXVB4Aor82/GAgCIqevqqU4lPb8r3dQvh292Us1BgDG\"},{\"RecordId\":\"pzfSBJvZEMAumH1LGhEw/Z29V5S8SfbVwQAUkfzkIehEA4EJeRD78kd0uEJRElbK5JQ0S3Xfc4WUanXNbWne8hCSNXik1DNeWhGQlFl9UiWcPsPHQrO8ccqLpPItB8ZhlHi8yf1iFT6795qF+a0rUROHUnxjuBKd+UmNGAfYeb0P5QqITX0jrB2B4gM9qRCpR1ZITJgKTUrGNqRuyFIfCmGTaHDa0haA\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"RecordId\":\"NhvJnZSom65Mf3izcf65SujsuJRSjJ+PgUQRFBI1iddKqd3lx4SlR9J1SQVmoYj0G4op6ZrDbB3MqROoMWHIsUPB5CXFA713Ubg9Yo4KAkyQaUUiVWaXji/noADLL0US7/TR8f87SWlqc7eujli2rrM50GMGkeoP1txB6+MGxIH74MPWGNhaDd6dM1yDThBaUOVHt/excooPGSKq/M6gLeQrmen2l+z0\"},{\"RecordId\":\"2anxfuWOKQ2TMcILhDIWRIxwqjIc2f0kBULRuxUMZE0KWc4k9w9jET9/yn0i8XPd85hhF/d74oPw4r6eigYShOChieRVJ+b9eu7Eb8hgpomOxMlGPF/ytDWn4ktQgpCwO950kLN3hGfcVjsvvqfBTJEnZfy/bZB7Iv8x9/LeImfqb69Q7Vf5MkN9/ZqyuvMHfEIK9/BPah2++mR+BWQaYUIjHhqGGb/P\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"RecordId\":\"P19NKA8A4/tnfqkEPV0phkFa40dDpsl2cSwiSXWVOHMYhoLVgmpqz4E+yh9LnDIYA7GQDzTjpHp0UQQI+TB7d4u8vZpk7Zg05JOTPhOlnPLmJVNbEnOUnOE/Gg5c60wxLkhsSlO8C24AsEzokNL0B2DMyLnIzW3SFzUFY8VcNCgOWsNG2KymDZ0REIwfRdYmJLbIjjwR4fG8xTjt4THCNoJg2KNg7W76\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"},{\"ErrorCode\":\"ServiceUnavailableException\",\"ErrorMessage\":\"Slow down.\"}]}" }

The thing is, when I check AWS Console, I can see a bunch of records in the Throttled Records chart, which is surprisingly not showing up in the SDK errors

@jszwedko
Copy link
Member

Interesting, thanks for sharing the response @mattaltberg ! It seems like maybe we should be looking for ServiceUnavailableException in that code block I linked, too. cc/ @fuchsnj for thoughts.

@jszwedko
Copy link
Member

Also noticing that it seems to return one response per record, in your log some of the records went in. It seems like we'll need to handle retrying only subsets of the records (similar to Elasticsearch). At the least, for an initial implementation, we could retry the full request though.

@mattaltberg
Copy link
Author

Yeah, if you want more information, my setup is using a unix_stream socket, reading from /var/log/output.sock. It then gets transformed (new line gets added to the end of the message) before the Firehose sinks gets it.

@mattaltberg
Copy link
Author

Hey @jszwedko any updates? I'm curious if I'll need to keep upping my Firehose quota

@jszwedko
Copy link
Member

Hi @mattaltberg ! Unfortunately nothing yet, but this is in the backlog.

@dengmingtong
Copy link
Contributor

Is this issue already fixed? Or is there any plan to fix it?

@jszwedko
Copy link
Member

It's not fixed yet, but a contributor has a PR open for it: #16771

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sink: aws_kinesis_firehose Anything `aws_kinesis_firehose` sink related type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

3 participants