Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Kinesis KCL streams support #1667

Merged
merged 3 commits into from
Jan 20, 2020
Merged

Conversation

aserrallerios
Copy link
Contributor

Adds the KCL Source and record checkpointer Flow/Sink.

Updated version of the old PR: #434

@aserrallerios
Copy link
Contributor Author

Check it out @julianhowarth

@aserrallerios
Copy link
Contributor Author

FTP test are failing. All clear in Kinesis.

@aserrallerios
Copy link
Contributor Author

aserrallerios commented May 3, 2019

I've improved the shard termination, as discussed in aserrallerios/kcl-akka-stream#13 (comment) with @julianhowarth.

Please, take a careful look at this commit in particular.

Summary:
Removed the grace period, so the ShardProcessor will never commit (and close) the Shard on shardEnded before the latest record emitted by that ShardProcessor has been committed. So, the only additional synchronization point is in the ShardProcessor. Committing a record will never block (but of course, it may fail if the lease is lost or there's any other unexpected failure).

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for offering to move "back home" to Alpakka and sorry for letting you wait for some feedback.
I know too little about Kinesis so I have a few questions...

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dependency trouble... But Alpakka 1.1 will not be a long time away.

"software.amazon.awssdk" % "kinesis" % AwsSdk2Version, // ApacheV2
"software.amazon.awssdk" % "dynamodb" % AwsSdk2Version, // ApacheV2
"software.amazon.awssdk" % "cloudwatch" % AwsSdk2Version, // ApacheV2
"software.amazon.kinesis" % "amazon-kinesis-client" % "2.2.0", // Amazon Software License
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is where it all started... They changed to Apache 2 now https://github.com/awslabs/amazon-kinesis-client/releases
Even https://github.com/awslabs/amazon-kinesis-client/blob/master/LICENSE.txt

The dynamodb and cloudwatch dependencies are transitive to amazon-kinesis-client so they don't need to be listed.

I'm more worried about is pulling in protobuf-java as well. We can not have Alpakka Kinesis pulling it in in a patch version, so this PR needs to wait for Alpakka 1.1, I'm afraid.

@ennru ennru added the dependency-change For PRs changing the version of a dependency. label May 15, 2019
@ennru ennru added this to the 1.1.0 milestone May 15, 2019
@ennru ennru modified the milestones: 1.1.0, 2.0.0 Jun 28, 2019
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if you can pick this up again.

@aserrallerios
Copy link
Contributor Author

aserrallerios commented Aug 10, 2019

Moved to akka's Source.setup and removed the code that Await.result on every queue.offer.

@aserrallerios
Copy link
Contributor Author

I had to do a change:

After the usage of Source.setup, the inner materialized value is wrapped in a Future[_]. As the inner MV was already a Future[Scheduler], the final MV now becomes Future[Future[Scheduler]].

In Scala 2.12 I can use Future.flatten to flatten it without ExecutionContext, but in older versions I cannot.

So, I will drop the MV (use NotUsed). If someone finds a way to conciliate a Future[_] MV with SetupStage, please, tell me and I will add the MV again.

@aserrallerios
Copy link
Contributor Author

mqtt/jms tests failed.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't think of it before, but we'd want to move Kinesis to AWS SDK 2 before pushing this forward.

docs/src/main/paradox/kinesis.md Outdated Show resolved Hide resolved
project/Dependencies.scala Show resolved Hide resolved
@aserrallerios
Copy link
Contributor Author

Anything I can do to get this merged?

@ennru
Copy link
Member

ennru commented Dec 17, 2019

Thank you for the ping.
Now that the whole Kinesis connector on AWS SDK v2 it would be great to warm this up.
Please resolve the conflicts, I'll have another look.

@aserrallerios
Copy link
Contributor Author

Done. Build fails elsewhere.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I'd like to see a bit more about the differences of Kinesis Data Streams and KCL in the doc page.
Maybe borrow something from https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html


import scala.concurrent.duration._

final class KinesisSchedulerSourceSettings(val bufferSize: Int, val backpressureTimeout: FiniteDuration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final class KinesisSchedulerSourceSettings(val bufferSize: Int, val backpressureTimeout: FiniteDuration) {
final class KinesisSchedulerSourceSettings private (val bufferSize: Int, val backpressureTimeout: FiniteDuration) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check changes in KinesisSchedulerSettings.

val retrievalConfig =
configsBuilder.retrievalConfig
.retrievalFactory(
new SynchronousBlockingRetrievalFactory(streamName, kinesisClient, new SimpleRecordsFetcherFactory, 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor is deprecated, the new version takes kinesisRequestTimeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed unnecessary code from snippets.

implicit val executor =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1000))

KinesisSchedulerSource(builder, schedulerSourceSettings).to(Sink.ignore)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example looks a bit disturbing, would it actually do anything useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a some stream logic.


object KinesisSchedulerSourceSettings {

val defaultInstance = new KinesisSchedulerSourceSettings(1000, 1.minute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return the default from parameterless apply and create methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I add a create method in each companion obj here?

Comment on lines 41 to 48
// This implementation will try to checkpoint every Record with the original
// checkpointer. Other option would be to keep a reference of the latest
// checkpointer passed to this instance using any of these methods:
// * processRecords
// * shutdownRequested
// * shardEnded
val checkpoint = (record: KinesisClientRecord) =>
processRecordsInput.checkpointer().checkpoint(record.sequenceNumber(), record.subSequenceNumber())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that be something the user would want to decide?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean using the "original" checkpointer vs use the "latest" checkpointer?

I think it's a low-level decission that the library should take for the user. It looks to work both ways, although that can change in the future. I'll try to improve the code a bit btw.

@aserrallerios
Copy link
Contributor Author

Please have a look now. The documentation improvements are still missing.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting there, I dug a bit deeper and came up with more suggestions.
(FYI: I'll be off for the next 3 weeks.)

extends CommittableRecord(record, batchData, shardData) {
private def checkpoint(): Unit =
checkpointer.checkpoint(record.sequenceNumber(), record.subSequenceNumber())
private def checkpointAndRelease(): Unit = { checkpoint(); semaphore.release() }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def checkpointAndRelease(): Unit = { checkpoint(); semaphore.release() }
private def checkpointAndRelease(): Unit = {
checkpoint()
semaphore.release()
}

Would it make sense to move the if (isLatestRecord) into a single checkpoint() method?

new InternalCommittableRecord(
record,
batchData,
isLatestRecord = processRecordsInput.isAtShardEnd && index + 1 == numberOfRecords
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds more like the "last" record.

callback: CommittableRecord => Unit
) extends ShardRecordProcessor {

private val semaphore = new Semaphore(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment about what the semaphore is blocking and why. Which dispatcher will it run on?

self = getStageActor(awaitingRecords)
val newRecordCallback: CommittableRecord => Unit = {
semaphore.tryAcquire(backpressureTimeout.length, backpressureTimeout.unit)
self.ref ! NewRecord(_)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is OK, but most other Alpakka connectors use async callbacks instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the async callbacks logic now, is a bit less fair than before as I turned some of the intra-stage calls synchronous. I expect this to have higher performance.

This can still be changed to everything asynchronous to maximize fairness.

*/
def apply(
schedulerBuilder: ShardRecordProcessorFactory => Scheduler,
settings: KinesisSchedulerSourceSettings = KinesisSchedulerSourceSettings.defaultInstance
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default parameters hinder bin-compatible evolvement. Passing the defaults explicitly is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we want to remove default parameters? They are all over the Kinesis API...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please remove them on these new methods.

Comment on lines 74 to 79
Flow[CommittableRecord]
.map {
case record if record.canBeCheckpointed =>
record
.tryToCheckpoint()
.recover({
case _: ShutdownException => Done
})
.get
case _ => Done
}
.addAttributes(Attributes(ActorAttributes.IODispatcher))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you move this to a ShardProcessor object, the semaphore and the IO dispatcher show together and are easier to understand.


split
.out(0)
.map(_.max)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to point to the ordering defined in CommittableRecord.

}
.addAttributes(Attributes(ActorAttributes.IODispatcher))
) ~> join.in0
split.out(1) ~> join.in1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you need to drop down into the DSL, use it all the way.

      split.out(0) ~> checkpoint ~> join.in0
      split.out(1) ~> join.in1
      
      join.out ~> flatten ~> result

Flow[CommittableRecord]
.groupBy(MAX_KINESIS_SHARDS, _.processorData.shardId)
.groupedWithin(settings.maxBatchSize, settings.maxBatchWait)
.via(GraphDSL.create() { implicit b =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the DSL in a val so you can give it a describing name.

@aserrallerios
Copy link
Contributor Author

The checkpoint coordination is tricky. Let me explain.

The issue is that when the ShardProcessor notifies that the shard has ended, you must checkpoint before returning control to the caller. That must happen synchronously, of course.

    /**
     * The checkpointer used to record that the record processor has completed the shard.
     *
     * The record processor <b>must</b> call {@link RecordProcessorCheckpointer#checkpoint()} before returning from
     * {@link ShardRecordProcessor#shardEnded(ShardEndedInput)}. Failing to do so will trigger the Scheduler to retry
     * shutdown until a successful checkpoint occurs.
     */

As we are checkpointing in an asynchronous way, that needs to be coordinated using another method: we lock the shardEnded method until all "regular" records have been checkpointed using the stream mechanism.

I'll put a comment in semaphore, but the shardEnded method must block, otherwise we're "closing" the shard before the "regular" records are consumed.

An alternative implementation to that is to give a grace period, to the final shardEnded checkpoint, but that'd potentially still leave records unconsumed.

I'm open to suggestions here.

@aserrallerios
Copy link
Contributor Author

Made all the checkpoint stuff a bit more explicit. Added comments and documentation to methods.

Only kinesis.md part is missing.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding more context about the required blocking.

*/
def apply(
schedulerBuilder: ShardRecordProcessorFactory => Scheduler,
settings: KinesisSchedulerSourceSettings = KinesisSchedulerSourceSettings.defaultInstance
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please remove them on these new methods.

/**
* Java API
*/
def create(maxBatchSize: Int, maxBatchWait: FiniteDuration): KinesisSchedulerCheckpointSettings =
Copy link
Member

@ennru ennru Jan 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as below.

Comment on lines 35 to 45
def apply(bufferSize: Int, backpressureTimeout: java.time.Duration): KinesisSchedulerSourceSettings =
KinesisSchedulerSourceSettings(bufferSize, FiniteDuration.apply(backpressureTimeout.toMillis, MILLISECONDS))
def apply: KinesisSchedulerSourceSettings = KinesisSchedulerSourceSettings(1000, 1.minute)

val defaultInstance: KinesisSchedulerSourceSettings = KinesisSchedulerSourceSettings.apply

/**
* Java API
*/
def create(bufferSize: Int, backpressureTimeout: FiniteDuration): KinesisSchedulerSourceSettings =
new KinesisSchedulerSourceSettings(bufferSize, backpressureTimeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason for java.time.Duration in apply, but in the Java API.
You should create the instance in val defaultInstance and return that from apply.

Comment on lines 34 to 37
def create(
schedulerBuilder: SchedulerBuilder
): Source[CommittableRecord, CompletionStage[Scheduler]] =
create(schedulerBuilder, KinesisSchedulerSourceSettings.defaultInstance)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the same as the default arguments in the Scala DSL. I think it is better to make passing the default instance look as nice as possible (maybe call it just defaults?).

@ennru
Copy link
Member

ennru commented Jan 20, 2020

@aserrallerios Any chance to push this the last bit?

@aserrallerios
Copy link
Contributor Author

Yes, let me see if I can get through it today.

@aserrallerios
Copy link
Contributor Author

@ennru can you please remind me the suggestions regarding the kinesis.md file. I'm not able to find them.

) ++ Seq(
"software.amazon.awssdk" % "kinesis" % AwsSdk2Version, // ApacheV2
"software.amazon.awssdk" % "firehose" % AwsSdk2Version, // ApacheV2
"software.amazon.awssdk" % "dynamodb" % AwsSdk2Version, // ApacheV2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DynamoDB is only used in tests, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is used by the KCL library, it's a transitive dependency. Probably I should remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK.
Might be better to pull it transitively, so please remove it.

@ennru
Copy link
Member

ennru commented Jan 20, 2020

I believe the docs are OK.
We switched to Scalatest 3.1.0 which renames a few things. Would be great if you could rebase and fix those. (But if you are too short on time, I can do that.)

@aserrallerios
Copy link
Contributor Author

Rebased and squashed (I can push the old history if needed).

@ennru ennru added the p:new label Jan 20, 2020
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@ennru ennru changed the title AWS Kinesis KCL streams AWS Kinesis KCL streams support Jan 20, 2020
@ennru ennru merged commit 86be456 into akka:master Jan 20, 2020
@ennru
Copy link
Member

ennru commented Jan 20, 2020

Long-time coming! Thank you for your work on this.

@seglo
Copy link
Member

seglo commented Mar 24, 2020

Hi @aserrallerios. We're observing a transiently failing error for a Kinesis test case. You can find runtime details here in #2219. The test in question utilizes a lot of Thread.sleeps and asynchronous code in Futures that are never awaited upon. When I run locally it can fail ~50% of the time. Can you take a look at this test? I spent some time troubleshooting it this afternoon, but could not determine when additional checkpointing was called (it must be from the AWS libs).

@aserrallerios
Copy link
Contributor Author

Will have a look later today. Thanks!

@seglo
Copy link
Member

seglo commented Mar 25, 2020

Thanks a lot @aserrallerios!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dependency-change For PRs changing the version of a dependency. documentation p:kinesis p:new
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants