Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kinesis KCL Source #434

Closed
wants to merge 1 commit into from
Closed

Conversation

aserrallerios
Copy link
Contributor

Kinesis Source using KCL library. Tests pending, early reviews are much welcome.

The motivation behind this is that it'll allow alpakka users to automatically handle sharding among clients of Kinesis streams, simplified checkpoint of records' sequence using Dynamo and the potential usage of the KPL library for higher throughput (with local dynamic buffering and batching).

@aserrallerios
Copy link
Contributor Author

aserrallerios commented Aug 8, 2017

I'm not entirely happy with the checkpoint part so far.

Since we're leaving the IRecordProcessor instance and it's thread pool, the checkpoint process is not synchronized anymore. That means, that the invocation of checkpoint() and the shutdown of the IRecordProcessor can occur in parallel.

My approach has been: let the user check whether the IRecordProcessor is shutdown or not on a best effort basis. Nonetheless, the invocation of checkpoint() is never guaranteed to proceed successfully, either because the IRecordProcessor has been shutdown or because of other cause.

https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java?utf8=%E2%9C%93#L51-L69

So, it'll be encouraged in the docs to use a supervision strategy to handle this failures.

WDYT?

@jaymell
Copy link

jaymell commented Aug 11, 2017

Good point. Given the potential for failed checkpoints, would it make sense to implement the "v2" IRecordProcessor interface?

http://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-implementation-app-java.html#kinesis-record-processor-implementation-interface-java

I suppose the additional sequence info passed to the initialize method may be helpful in recovering from a premature shutdown.

@aserrallerios
Copy link
Contributor Author

Thanks for the link! I'll definitely have a look at the v2 and I'll migrate to it, but unfortunately it seems that the race conditions can still occur. I guess it's not a big deal, we'll just need to capture potential failures.

I'll update the PR with these changes later today.

@aserrallerios aserrallerios force-pushed the kcl-source branch 2 times, most recently from 0757128 to e0648c9 Compare August 11, 2017 10:47
@aserrallerios
Copy link
Contributor Author

Changes applied. @jaymell I'd love to get your feedback for the current code. Do you miss something from the current interface? I mean, parameters and Source/Flow signatures suit your needs?

@jaymell
Copy link

jaymell commented Aug 14, 2017

@aserrallerios So far it looks great. I haven't yet actually started working on integrating this with our current application -- should have more meaningful feedback once I do so, hopefully this week or next. Thanks again for sharing the PR.

@cwei-bgl
Copy link

Hi,
Thanks for enabling Kinesis stream for AKKA. I was using raw AWS Kinesis SDK before, but I would like to use a stream instead. I just wonder when this will be released or be merged so that snapshot is available?

Thanks.

@aserrallerios
Copy link
Contributor Author

The Source based on Kinesis SDK (not KCL) is already merged. So you can use it right now. The publisher Flow is on its way to be merged, and this PR is still to be finished (tests and more reviews from maintainers are missing) and I cannot work on it right now (I'm on vacation). So I won't expect it merged before mid Sept or Oct.

@cwei-bgl
Copy link

cwei-bgl commented Aug 23, 2017

@aserrallerios How do I checkpoint with the Source based on Kinesis SDK? I saw your new added class KinesisWorkerCheckpointSettings(1000, 10.seconds). Does this mean the existing one does not support checkpoint? Checkpointing is critical, otherwise, when an app restarts, it will start the processing from 24 hours old given a setting of ShardIteratorType.TRIM_HORIZON.

@aserrallerios
Copy link
Contributor Author

The Kinesis SDK doesn't offer any means of checkpointing record sequences, as far as I kown. You need to handle checkpoints manually, for example, using an external database.

@cwei-bgl
Copy link

@aserrallerios Got it, Thanks. I am trying your early implementation. The following line gives null pointer exception, as the worker is still null after the constructor is invoked.

KinesisWorkerSourceStage.createLogicAndMaterializedValue: logic -> logic.worker

@aserrallerios
Copy link
Contributor Author

Thnx @cwei-bgl! I'll work on it this weekend.

@aserrallerios
Copy link
Contributor Author

I've fixed the NPE, improved some code, added javadsl methods and added the test outline (pending implementation).

TODO:

  • test implementation
  • documentation (I'd rather wait until the KinesisFlow is merged, as it modifies the documentation page)

About the implementation itself, what do you guys prefer:

  • The Source taking as parameter a IRecordProcessorFactory => Worker; the stage fully manages worker's start and shutdown.
  • Source with materialized value IRecordProcessorFactory; the materialized value must be used by the caller to build a worker, and then start it and shutdown it when the stream completes/fails.

The current implementation is the first option, but it's easy to change. The implications are more "philosophical" if you ask me. Following the inversion of control principle, the second option would be the way to go, but forcing the user to do stuff with the materialized value seems very error prone and unfriendly to new users. WDYT?

@aserrallerios
Copy link
Contributor Author

aserrallerios commented Sep 1, 2017

An argument against using a materialized value of type IRecordProcessorFactory is that the new error handling with RestartSource prevents the usage of the materialized value, transforming a Source[A, _] into Source[A, NotUsed].

So the current implementation (IRecordProcessorFactory => Worker by parameter) seems to be superior to that alternative.

@aserrallerios aserrallerios changed the title Add Kinesis KCL Source WIP - Add Kinesis KCL Source Sep 27, 2017
@etspaceman
Copy link

What's the status on this PR? Seems to be ready to review at the very least?

@aserrallerios
Copy link
Contributor Author

This can be reviewed but I plan to rebase it to this branch:

#408

It's easier to finish documentation and stuff over that branch, and thus I've been working locally on that direction. I'll try to keep this very PR updated too, if early reviewers want to help (much appreciated). Another option is to use that PR for both publisher/worker. I'll ask there, lets see what the community thinks.

@aserrallerios
Copy link
Contributor Author

aserrallerios commented Oct 15, 2017

Ready to be reviewed. Both implementation and documentation are "complete". Just some hard-to-test tests are missing.

@etspaceman: I think I'll keep this separated from the Kinesis publisher PR, we rather keep things simple and separated. The community seems to be having a tough time reviewing all the pending stuff :(

@aserrallerios
Copy link
Contributor Author

I guess build is failing due to Cassandra tests, unrelated to this PR. Can anyone confirm?

@johanandren
Copy link
Member

I think it could be the instability tracked by #527

@johanandren
Copy link
Member

I think now it failed for Kinsesis as well.

@aserrallerios
Copy link
Contributor Author

Can you please point me where the problem is? I can only see the C* test error:

[error] (cassandra/test:test) sbt.TestsFailedException: Tests unsuccessful

@2m
Copy link
Member

2m commented Oct 23, 2017

There was an issue with cassandra tests, which is now fixed in master. Please rebase from the master and push rebased commits to this PR branch. This will kick in CI validation with the cassandra problem fixed.

@aserrallerios
Copy link
Contributor Author

Rebased to fix conflicts.

@johanandren
Copy link
Member

I'm afraid we may not be able to merge this because of the Amazon License the client pulls in.

@aserrallerios
Copy link
Contributor Author

I'm not an expert on lincenses and stuff, so keep me posted, because the PR can be reworked into something more generic that doesn't include the KCL dependency. Obviously it'd require extra work from the final user and the result wouldn't be as satisfactory...

@etspaceman
Copy link

@johanandren Can you indicate what parts of the license are concerning for this merge?

@raboof
Copy link
Member

raboof commented Nov 16, 2017

"The Work and any derivative works thereof only may be used or intended for use with the web services, computing platforms or applications provided by Amazon.com, Inc. or its affiliates, including Amazon Web Services, Inc." is scary to me - IANAL, but I'm pretty sure that means the derivative work (which in this case would be Alpakka) cannot be Apache-licensed anymore.

@raboof
Copy link
Member

raboof commented Nov 16, 2017

Some interesting discussion on the topic in https://issues.apache.org/jira/browse/LEGAL-198 (especially https://issues.apache.org/jira/browse/LEGAL-198?focusedCommentId=15618136#comment-15618136).

They decided it's OK for them since they only distribute that module as source, but that frankly seems questionable, and doesn't obviously apply to us since we also distribute binaries.

@aserrallerios
Copy link
Contributor Author

Can we leave that dependency as "provided" and tell the final user to add it to his/her project?

@etspaceman
Copy link

@johanandren @raboof - Any thoughts on @aserrallerios's mention here?

@johanandren
Copy link
Member

I'm not sure we dare to do that, but will check with "those who know better"™

@ennru ennru mentioned this pull request Dec 4, 2017
@johanandren
Copy link
Member

Sorry for the delay, but we have now checked with legal, and it is not possible to include this in Alpakka because of the dependency license. Alternative options are:

  • Publish the connector yourself and we will link to it from the Alpakka docs as an external connector
  • Use another client library with a more compatible license

Sorry about this, but not much else we can do.

@aserrallerios
Copy link
Contributor Author

What do you guys think about making a generic Source that works with "Worker-type" instances? That could be used with KCL.

Otherwise, can anyone think of a better suited repository for this PR?

@aserrallerios
Copy link
Contributor Author

Have a look at localstack's license. Can we do something similar?

@tunesmith
Copy link

What a bummer - is this dead in the water, or is it still an option to leave the KCL dependency as "provided"? For publishing the connector yourself, it seems like it would be a matter of forking this entire kinesis connector, right? I'm not sure what the easiest path is here.

Honestly, I don't understand why a code that uses the KCL library would be considered derivative. You're not actually creating a different (derivative) version of KCL itself.

@huntc
Copy link
Contributor

huntc commented Jan 18, 2018

Seems that the connector should reside in its own repo as @johanandren suggests.

@aserrallerios
Copy link
Contributor Author

I'll try to publish it for myself until a better place is found:

https://github.com/aserrallerios/kcl-akka-stream

Are you ok with it? Any other place you think it could fit?

@nilsga
Copy link
Contributor

nilsga commented Jan 19, 2018

Go for it 👍

@ennru
Copy link
Member

ennru commented Feb 13, 2018

Thank you @aserrallerios for putting effort into this Akka streams connector and offering it in your own repo, with #777 we'll link to it from the external connectors listing.

@ennru ennru closed this Feb 13, 2018
@ennru ennru added this to the invalid milestone Feb 13, 2018
@etspaceman
Copy link

etspaceman commented Apr 23, 2019

So, I know this is an old PR, but its worth noting that the KCL has updated their license to Apache 2.0. Might be worth reconsidering this module as an official alpakka module:

https://github.com/awslabs/amazon-kinesis-client/releases/tag/v1.10.0

@ennru
Copy link
Member

ennru commented Apr 24, 2019

Thank you for letting us know. With the Apache 2 license, we would be able to make it part of Alpakka. /cc @aserrallerios

@aserrallerios
Copy link
Contributor Author

That's awesome! Should I work on updating this very PR then?

@ennru
Copy link
Member

ennru commented Apr 24, 2019

Prefer a new PR, please.

@aserrallerios aserrallerios deleted the kcl-source branch April 26, 2019 09:56
@aserrallerios aserrallerios restored the kcl-source branch April 26, 2019 10:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.