Skip to content

Commit

Permalink
Rename Worker to Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
aserrallerios committed May 2, 2019
1 parent 20764b2 commit a6d8a90
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 161 deletions.
20 changes: 10 additions & 10 deletions docs/src/main/paradox/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,35 +125,35 @@ Scala
Java
: @@snip [snip](/kinesis/src/test/java/docs/javadsl/KinesisSnippets.java) { #flow-sink }

# AWS KCL Worker Source & checkpointer
# AWS KCL Scheduler Source & checkpointer

The KCL Source can read from several shards and rebalance automatically when other Workers are started or stopped. It also handles record sequence checkpoints.
The KCL Source can read from several shards and rebalance automatically when other Schedulers are started or stopped. It also handles record sequence checkpoints.

For more information about KCL please visit the [official documentation](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

## Usage

The KCL Worker Source needs to create and manage Worker instances in order to consume records from Kinesis Streams.
The KCL Scheduler Source needs to create and manage Scheduler instances in order to consume records from Kinesis Streams.

In order to use it, you need to provide a Worker builder and the Source settings:
In order to use it, you need to provide a Scheduler builder and the Source settings:

Scala
: @@snip (/kinesis/src/test/scala/docs/scaladsl/KclSnippets.scala) { #worker-settings }
: @@snip (/kinesis/src/test/scala/docs/scaladsl/KclSnippets.scala) { #scheduler-settings }

Java
: @@snip (/kinesis/src/test/java/docs/javadsl/KclSnippets.java) { #worker-settings }
: @@snip (/kinesis/src/test/java/docs/javadsl/KclSnippets.java) { #scheduler-settings }

The Source also needs an `ExecutionContext` to run the Worker's thread and to commit/checkpoint records. Then the Source can be created as usual:
The Source also needs an `ExecutionContext` to run the Scheduler's thread and to commit/checkpoint records. Then the Source can be created as usual:

Scala
: @@snip (/kinesis/src/test/scala/docs/scaladsl/KclSnippets.scala) { #worker-source }
: @@snip (/kinesis/src/test/scala/docs/scaladsl/KclSnippets.scala) { #scheduler-source }

Java
: @@snip (/kinesis/src/test/java/docs/javadsl/KclSnippets.java) { #worker-source }
: @@snip (/kinesis/src/test/java/docs/javadsl/KclSnippets.java) { #scheduler-source }

## Committing records

The KCL Worker Source publishes messages downstream that can be committed in order to mark progression of consumers by shard. This process can be done manually or using the provided checkpointer Flow/Sink.
The KCL Scheduler Source publishes messages downstream that can be committed in order to mark progression of consumers by shard. This process can be done manually or using the provided checkpointer Flow/Sink.

In order to use the Flow/Sink you must provide additional checkpoint settings:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
package akka.stream.alpakka.kinesis

object Errors {
sealed class KinesisWorkerSourceError(err: Throwable) extends Throwable(err)
sealed class KinesisSchedulerSourceError(err: Throwable) extends Throwable(err)

case class WorkerUnexpectedShutdown(cause: Throwable) extends KinesisWorkerSourceError(cause)
case class SchedulerUnexpectedShutdown(cause: Throwable) extends KinesisSchedulerSourceError(cause)

case class BackpressureTimeout(cause: Throwable) extends KinesisWorkerSourceError(cause)
case class BackpressureTimeout(cause: Throwable) extends KinesisSchedulerSourceError(cause)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.kinesis

import scala.concurrent.duration._

case class KinesisSchedulerSourceSettings(bufferSize: Int,
terminateStreamGracePeriod: FiniteDuration,
backpressureTimeout: FiniteDuration) {
require(
bufferSize >= 1,
"Buffer size must be greater than 0; use size 1 to disable stage buffering"
)
}
case class KinesisSchedulerCheckpointSettings(maxBatchSize: Int, maxBatchWait: FiniteDuration) {
require(
maxBatchSize >= 1,
"Batch size must be greater than 0; use size 1 to commit records one at a time"
)
}

object KinesisSchedulerSourceSettings {

val defaultInstance = KinesisSchedulerSourceSettings(1000, 1.minute, 1.minute)

/**
* Java API
*/
def create(bufferSize: Int,
terminateStreamGracePeriod: FiniteDuration,
backpressureTimeout: FiniteDuration): KinesisSchedulerSourceSettings =
KinesisSchedulerSourceSettings(bufferSize, terminateStreamGracePeriod, backpressureTimeout)

}

object KinesisSchedulerCheckpointSettings {

val defaultInstance = KinesisSchedulerCheckpointSettings(1000, 10.seconds)

/**
* Java API
*/
def create(maxBatchSize: Int, maxBatchWait: FiniteDuration): KinesisSchedulerCheckpointSettings =
KinesisSchedulerCheckpointSettings(maxBatchSize, maxBatchWait)

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,40 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord

import scala.concurrent.ExecutionContext

object KinesisWorkerSource {
object KinesisSchedulerSource {

abstract class WorkerBuilder {
abstract class SchedulerBuilder {
def build(r: ShardRecordProcessorFactory): Scheduler
}

def create(
workerBuilder: WorkerBuilder,
settings: KinesisWorkerSourceSettings,
workerExecutor: Executor
schedulerBuilder: SchedulerBuilder,
settings: KinesisSchedulerSourceSettings,
schedulerExecutor: Executor
): Source[CommittableRecord, Scheduler] =
scaladsl.KinesisWorkerSource
.apply(workerBuilder.build, settings)(ExecutionContext.fromExecutor(workerExecutor))
scaladsl.KinesisSchedulerSource
.apply(schedulerBuilder.build, settings)(ExecutionContext.fromExecutor(schedulerExecutor))
.asJava

def create(
workerBuilder: WorkerBuilder,
workerExecutor: Executor
schedulerBuilder: SchedulerBuilder,
schedulerExecutor: Executor
): Source[CommittableRecord, Scheduler] =
create(workerBuilder, KinesisWorkerSourceSettings.defaultInstance, workerExecutor)
create(schedulerBuilder, KinesisSchedulerSourceSettings.defaultInstance, schedulerExecutor)

def checkpointRecordsFlow(
settings: KinesisWorkerCheckpointSettings
settings: KinesisSchedulerCheckpointSettings
): Flow[CommittableRecord, KinesisClientRecord, NotUsed] =
scaladsl.KinesisWorkerSource.checkpointRecordsFlow(settings).asJava
scaladsl.KinesisSchedulerSource.checkpointRecordsFlow(settings).asJava

def checkpointRecordsFlow(): Flow[CommittableRecord, KinesisClientRecord, NotUsed] =
checkpointRecordsFlow(KinesisWorkerCheckpointSettings.defaultInstance)
checkpointRecordsFlow(KinesisSchedulerCheckpointSettings.defaultInstance)

def checkpointRecordsSink(
settings: KinesisWorkerCheckpointSettings
settings: KinesisSchedulerCheckpointSettings
): Sink[CommittableRecord, NotUsed] =
scaladsl.KinesisWorkerSource.checkpointRecordsSink(settings).asJava
scaladsl.KinesisSchedulerSource.checkpointRecordsSink(settings).asJava

def checkpointRecordsSink(): Sink[CommittableRecord, NotUsed] =
checkpointRecordsSink(KinesisWorkerCheckpointSettings.defaultInstance)
checkpointRecordsSink(KinesisSchedulerCheckpointSettings.defaultInstance)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import akka.stream.Supervision.{Resume, Stop}
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Sink, Source, Zip}
import akka.{Done, NotUsed}
import akka.stream.alpakka.kinesis.Errors.{BackpressureTimeout, WorkerUnexpectedShutdown}
import akka.stream.alpakka.kinesis.Errors.{BackpressureTimeout, SchedulerUnexpectedShutdown}
import akka.stream.alpakka.kinesis.{
CommittableRecord,
KinesisWorkerCheckpointSettings,
KinesisWorkerSourceSettings,
KinesisSchedulerCheckpointSettings,
KinesisSchedulerSourceSettings,
ShardProcessor
}
import software.amazon.kinesis.coordinator.Scheduler
Expand All @@ -27,19 +27,19 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.control.Exception
import scala.util.{Failure, Success}

object KinesisWorkerSource {
object KinesisSchedulerSource {

def apply(
workerBuilder: ShardRecordProcessorFactory => Scheduler,
settings: KinesisWorkerSourceSettings = KinesisWorkerSourceSettings.defaultInstance
)(implicit workerExecutor: ExecutionContext): Source[CommittableRecord, Scheduler] =
schedulerBuilder: ShardRecordProcessorFactory => Scheduler,
settings: KinesisSchedulerSourceSettings = KinesisSchedulerSourceSettings.defaultInstance
)(implicit schedulerExecutor: ExecutionContext): Source[CommittableRecord, Scheduler] =
Source
.queue[CommittableRecord](settings.bufferSize, OverflowStrategy.backpressure)
.watchTermination()(Keep.both)
.mapMaterializedValue {
case (queue, watch) =>
val semaphore = new Semaphore(1, true)
val worker = workerBuilder(
val scheduler = schedulerBuilder(
new ShardRecordProcessorFactory {
override def shardRecordProcessor(): ShardRecordProcessor =
new ShardProcessor(
Expand All @@ -55,17 +55,17 @@ object KinesisWorkerSource {
}
)

Future(worker.run()).onComplete {
Future(scheduler.run()).onComplete {
case Failure(ex) =>
queue.fail(WorkerUnexpectedShutdown(ex))
queue.fail(SchedulerUnexpectedShutdown(ex))
case Success(_) => queue.complete()
}
watch.onComplete(_ => Future(worker.shutdown()))
worker
watch.onComplete(_ => Future(scheduler.shutdown()))
scheduler
}

def checkpointRecordsFlow(
settings: KinesisWorkerCheckpointSettings = KinesisWorkerCheckpointSettings.defaultInstance
settings: KinesisSchedulerCheckpointSettings = KinesisSchedulerCheckpointSettings.defaultInstance
): Flow[CommittableRecord, KinesisClientRecord, NotUsed] =
Flow[CommittableRecord]
.groupBy(MAX_KINESIS_SHARDS, _.shardId)
Expand Down Expand Up @@ -99,7 +99,7 @@ object KinesisWorkerSource {
})

def checkpointRecordsSink(
settings: KinesisWorkerCheckpointSettings = KinesisWorkerCheckpointSettings.defaultInstance
settings: KinesisSchedulerCheckpointSettings = KinesisSchedulerCheckpointSettings.defaultInstance
): Sink[CommittableRecord, NotUsed] =
checkpointRecordsFlow(settings).to(Sink.ignore)

Expand Down
30 changes: 15 additions & 15 deletions kinesis/src/test/java/docs/javadsl/KclSnippets.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import akka.stream.alpakka.kinesis.javadsl.KinesisSource;
import akka.stream.alpakka.kinesis.javadsl.KinesisFlow;
import akka.stream.alpakka.kinesis.javadsl.KinesisSink;
import akka.stream.alpakka.kinesis.javadsl.KinesisWorkerSource;
import akka.stream.alpakka.kinesis.javadsl.KinesisSchedulerSource;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
Expand Down Expand Up @@ -74,32 +74,32 @@ public class KclSnippets {
KinesisSink.create("streamName", amazonKinesisAsync);
// #flow-sink

// #worker-settings
final KinesisWorkerSource.WorkerBuilder workerBuilder =
new KinesisWorkerSource.WorkerBuilder() {
// #scheduler-settings
final KinesisSchedulerSource.SchedulerBuilder schedulerBuilder =
new KinesisSchedulerSource.SchedulerBuilder() {
@Override
public Scheduler build(ShardRecordProcessorFactory r) {
return null; // build your own Scheduler here
}
};
final KinesisWorkerSourceSettings workerSettings =
KinesisWorkerSourceSettings.create(
final KinesisSchedulerSourceSettings schedulerSettings =
KinesisSchedulerSourceSettings.create(
1000,
FiniteDuration.apply(1L, TimeUnit.SECONDS),
FiniteDuration.apply(10L, TimeUnit.SECONDS));
// #worker-settings
// #scheduler-settings

// #worker-source
final Executor workerExecutor = Executors.newFixedThreadPool(100);
final Source<CommittableRecord, Scheduler> workerSource =
KinesisWorkerSource.create(workerBuilder, workerSettings, workerExecutor);
// #worker-source
// #scheduler-source
final Executor schedulerExecutor = Executors.newFixedThreadPool(100);
final Source<CommittableRecord, Scheduler> schedulerSource =
KinesisSchedulerSource.create(schedulerBuilder, schedulerSettings, schedulerExecutor);
// #scheduler-source

// #checkpoint
final KinesisWorkerCheckpointSettings checkpointSettings =
KinesisWorkerCheckpointSettings.create(1000, FiniteDuration.apply(30L, TimeUnit.SECONDS));
final KinesisSchedulerCheckpointSettings checkpointSettings =
KinesisSchedulerCheckpointSettings.create(1000, FiniteDuration.apply(30L, TimeUnit.SECONDS));
final Flow<CommittableRecord, KinesisClientRecord, NotUsed> checkpointFlow =
KinesisWorkerSource.checkpointRecordsFlow(checkpointSettings);
KinesisSchedulerSource.checkpointRecordsFlow(checkpointSettings);
// #checkpoint

}
Loading

0 comments on commit a6d8a90

Please sign in to comment.