Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kinesis batch publisher with retries #408

Merged
merged 4 commits into from
Oct 30, 2017

Conversation

aserrallerios
Copy link
Contributor

This adds a Kinesis publisher with batching and retries. It does not ensure publish order (due to batching and retries; I plan to add more publishers that ensure total ordering or ordering by partition key).

More tests and documentation will be added and the stage logic may be refactored/simplified.

Feedback will be much appreciated!

@raboof
Copy link
Contributor

raboof commented Jul 17, 2017

Travis reports 4 compile errors when building with Scala 2.11 - might be good to look into?

@aserrallerios
Copy link
Contributor Author

Fixed :)

@iamthiago
Copy link

@aserrallerios Today I had to create something very similar to be used in my pipeline, unfortunately, only the Source part is present on the master branch, so I have created this:

https://gist.github.com/thiagoandrade6/4fb8684bb2409995f781151c42e349ac

It is completely based on the SNS and Lambda sub projects. IMHO perhaps you should adapt your code to look like that, but as I said, it's only my opinion :)

@aserrallerios
Copy link
Contributor Author

aserrallerios commented Jul 19, 2017

Or instead you can review the PR and ask for changes if you think something is missing/wrong :)

Your Flow lacks parallelism and retries, both implemented in this PR. This PR also includes throttling and other configuration parameters.

@iamthiago
Copy link

Don't worry, I just like the way SNS and Lambda implement their GraphStages.

But indeed, my flow is just simple enough to run in my application today, not really production ready for everybody, I just wanted to show because I couldn't wait for your PR to be approved :)

@aserrallerios
Copy link
Contributor Author

Yeah, but adding features to a stage makes it more complex and not quite as comparable.

If you cannot wait, please review :), also @raboof and @areyouspiffy feedback would be great.

I'll try to add the missing tests/docs as soon as possible.


import scala.util.control.NoStackTrace

object KinesisFlowErrors {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we renamed this KinesisErrors I could also put KinesisSourceErrors in here as well, but it's up to you if you think they're better off in separate files. Same with settings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

About the settings, I'd rather have them separated, it could be a mess mixing all those companion objects, validations and constructors. And think that we may have more Kinesis Sources/Flows in the future with different settings.

@brianmowen
Copy link
Contributor

👍 Good work!

@aserrallerios
Copy link
Contributor Author

I think it's feature-complete, ready to be reviewed. Feel free to ask for changes or new stuff.

@nvrs
Copy link
Contributor

nvrs commented Aug 23, 2017

+1 for this, good work.

}

private def checkForCompletion() =
if (inFlight <= 0 && pendingRequests.isEmpty && waitingRetries.isEmpty && isClosed(in)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should inFlight ever be able to become < 0? Perhaps log a warning then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it cannot happen. I'll change it to == 0.

@@ -60,10 +60,10 @@ Java
The `KinesisSource` creates one `GraphStage` per shard. Reading from a shard requires an instance of `ShardSettings`.

Scala
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #settings }
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #source-settings }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to add the root of the project as a variable like we do for akka, but that's for another PR

Copy link
Contributor

@raboof raboof left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, including tests and documentation. Haven't worked with Kinesis myself though so can't really comment on specifics.

@aserrallerios
Copy link
Contributor Author

Fixed-up commits.

@eduardojld
Copy link

@aserrallerios I took the liberty of reviewing your code, and it looks great, in general there is not much difference from what I was doing, but there is something I would not do, and its the retries, I do not think it should be a responsibility of the flow. You even put in some nice things like batching and throttling, So Im not sure would be a good decision to make retries part of the flow.

Other than that I would just add a KinesisSink to complete the whole picture like in SQS

@aserrallerios
Copy link
Contributor Author

aserrallerios commented Aug 29, 2017

Thnx for the review @eduardojld. I'll add a Sink version of the publish.

About the retires, note that the code doesn't retry any "internal" or "unknown" server error, just partial errors i.e. records that failed to publish among other records that were successfully published. For example, if you PUT 500 records, the first 100 may succeed and the remaining 400 may fail. We have found that handling this partials errors is mandatory if you really need to get your PUT throughput near the available throughput limit (depending on the number of shards), as they are quite common. Removing the retries logic makes it nearly unusable if high throughput is required.

An alternative to internal retries would be to publish failed records downstream, forcing the user to decide what to do with failed records. But honestly I cannot think in any other scenario than the user wanting to feed the same "publish records flow" again, to retry those records, producing a cycle in the graph. I'd rather do it internally unless the community find strong arguments for it, as it would increase the complexity of the end-user graphs (with graph cycles!).

You can always disable retries with settings parameter :D

@aserrallerios
Copy link
Contributor Author

Added sink version.

@aserrallerios
Copy link
Contributor Author

aserrallerios commented Oct 3, 2017

Guys, more reviews, anything before accepting it?

@etspaceman
Copy link

It looks like there's some conflicts w/ your branch @aserrallerios - Those likely have to be resolved prior to a merge.

@aserrallerios
Copy link
Contributor Author

Thanks @etspaceman 😅

Cherry-picked

@nvrs
Copy link
Contributor

nvrs commented Oct 24, 2017

Hi @aserrallerios, the merge of #535, which changed the formatting of many files including KinesisSourceErrors, is now causing a conflict since you have renamed that file to KinesisErrors.

Do you need any help with this? I am keen on getting this for the next release :)

@aserrallerios
Copy link
Contributor Author

Rebased & formatted, thanks @nvrs!

@nvrs
Copy link
Contributor

nvrs commented Oct 25, 2017

Hi @raboof , do you think this needs more reviewers before accepting the PR?

@aserrallerios
Copy link
Contributor Author

aserrallerios commented Oct 25, 2017

Guys, I've been testing with Graphs that self-restart on failures, and I have one question: do the initialization code of the StageLogic execute on each restart of the Stage or just the code on preStart method? (for example when using akka's RestartSource.withBackoff).

If the answer is that only the code on preStart executes, I'll need to initialize all variables of the StageLogic inside the preStart method, to avoid malfunction of the Stage after Graph restarts.

@aserrallerios
Copy link
Contributor Author

It didn't harm, anyway, so I've done it: change initialization of Stage's variables & some minor stuff to improve code clearness in the Stage.

@aserrallerios aserrallerios force-pushed the kinesis-publisher branch 2 times, most recently from 3629d6d to 218f7ff Compare October 26, 2017 09:20
@raboof raboof merged commit 6cff13a into akka:master Oct 30, 2017
@aserrallerios aserrallerios deleted the kinesis-publisher branch April 26, 2019 09:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants