Skip to content

Commit

Permalink
Add Kinesis batch publisher with retries
Browse files Browse the repository at this point in the history
  • Loading branch information
aserrallerios committed Aug 27, 2017
1 parent 78726bd commit 4440968
Show file tree
Hide file tree
Showing 14 changed files with 636 additions and 41 deletions.
42 changes: 35 additions & 7 deletions docs/src/main/paradox/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Gradle

## Usage

Sources provided by this connector need a `AmazonKinesisAsync` instance to consume messages from a shard.
Sources and Flows provided by this connector need a `AmazonKinesisAsync` instance to consume messages from a shard.

@@@ note
The `AmazonKinesisAsync` instance you supply is thread-safe and can be shared amongst multiple `GraphStages`. As a result, individual `GraphStages` will not automatically shutdown the supplied client when they complete.
Expand All @@ -60,10 +60,10 @@ Java
The `KinesisSource` creates one `GraphStage` per shard. Reading from a shard requires an instance of `ShardSettings`.

Scala
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #settings }
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #source-settings }

Java
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #settings }
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #source-settings }

You have the choice of reading from a single shard, or reading from multiple shards. In the case of multiple shards the results of running a separate `GraphStage` for each shard will be merged together.

Expand All @@ -74,20 +74,48 @@ The `GraphStage` associated with a shard will remain open until the graph is sto
For a single shard you simply provide the settings for a single shard.

Scala
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #single }
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #source-single }

Java
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #single }
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #source-single }

You can merge multiple shards by providing a list settings.

Scala
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #list }
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #source-list }

Java
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #list }
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #source-list }

The constructed `Source` will return [Record](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html)
objects by calling [GetRecords](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) at the specified interval and according to the downstream demand.

### Using the Flow

The `KinesisFlow` publishes messages into a Kinesis stream using it's partition key and message body. It uses dynamic size batches, can perform several requests in parallel and retries failed records. These features are necessary to achieve the best possible write throughput to the stream. The Flow outputs the result of publishing each record.

@@@ warning
Batching has a drawback: message order cannot be guaranteed, as some records within a single batch may fail to be published. That also means that the Flow output may not match the same input order.

More information can be found [here](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html#kinesis-using-sdk-java-putrecords) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html).
@@@

Publishing to a Kinesis stream requires an instance of `KinesisFlowSettings`, although a default instance with sane values and a method that returns settings based on the stream shard number are also available:

Scala
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #flow-settings }

Java
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #flow-settings }

@@@ warning
Note that throughput settings `maxRecordsPerSecond` and `maxBytesPerSecond` are vital to minimize server errors (like `ProvisionedThroughputExceededException`) and retries, and thus achieve a higher publication rate.
@@@

The Flow can now be created.

Scala
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #flow }

Java
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #flow }
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis

import com.amazonaws.services.kinesis.model.PutRecordsResultEntry

import scala.util.control.NoStackTrace

object KinesisErrors {

sealed trait KinesisSourceError extends NoStackTrace
case object NoShardsError extends KinesisSourceError
case object GetShardIteratorError extends KinesisSourceError
case object GetRecordsError extends KinesisSourceError

sealed trait KinesisFlowErrors extends NoStackTrace
case class FailurePublishingRecords(e: Exception) extends RuntimeException(e) with KinesisFlowErrors
case class ErrorPublishingRecords(attempts: Int, records: Seq[PutRecordsResultEntry])
extends RuntimeException(s"Unable to publish records after $attempts attempts")
with KinesisFlowErrors

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis

import KinesisFlowSettings._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps

case class KinesisFlowSettings(parallelism: Int,
maxBatchSize: Int,
maxRecordsPerSecond: Int,
maxBytesPerSecond: Int,
maxRetries: Int,
backoffStrategy: RetryBackoffStrategy,
retryInitialTimeout: FiniteDuration) {
require(
maxBatchSize >= 1 && maxBatchSize <= 500,
"Limit must be between 1 and 500. See: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html"
)
require(maxRecordsPerSecond >= 1)
require(maxBytesPerSecond >= 1)
require(maxRetries >= 0)
}

object KinesisFlowSettings {

private val MAX_RECORDS_PER_REQUEST = 500
private val MAX_RECORDS_PER_SHARD_PER_SECOND = 1000
private val MAX_BYTES_PER_SHARD_PER_SECOND = 1000000

sealed trait RetryBackoffStrategy
case object Exponential extends RetryBackoffStrategy
case object Lineal extends RetryBackoffStrategy

val exponential = Exponential
val lineal = Lineal

val defaultInstance = byNumberOfShards(1)

def byNumberOfShards(shards: Int): KinesisFlowSettings =
KinesisFlowSettings(
parallelism = shards * (MAX_RECORDS_PER_SHARD_PER_SECOND / MAX_RECORDS_PER_REQUEST),
maxBatchSize = MAX_RECORDS_PER_REQUEST,
maxRecordsPerSecond = shards * MAX_BYTES_PER_SHARD_PER_SECOND,
maxBytesPerSecond = shards * MAX_RECORDS_PER_SHARD_PER_SECOND,
maxRetries = 5,
backoffStrategy = Exponential,
retryInitialTimeout = 100 millis
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis

import akka.stream.alpakka.kinesis.KinesisErrors.{ErrorPublishingRecords, FailurePublishingRecords}
import akka.stream.alpakka.kinesis.KinesisFlowSettings.{Exponential, Lineal, RetryBackoffStrategy}
import akka.stream.alpakka.kinesis.KinesisFlowStage._
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.kinesis.AmazonKinesisAsync
import com.amazonaws.services.kinesis.model.{
PutRecordsRequest,
PutRecordsRequestEntry,
PutRecordsResult,
PutRecordsResultEntry
}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}

import scala.language.postfixOps

private[kinesis] final class KinesisFlowStage(
streamName: String,
maxRetries: Int,
backoffStrategy: RetryBackoffStrategy,
retryInitialTimeout: FiniteDuration
)(implicit kinesisClient: AmazonKinesisAsync)
extends GraphStage[FlowShape[Seq[PutRecordsRequestEntry], Future[PutRecordsResult]]] {

private val in = Inlet[Seq[PutRecordsRequestEntry]]("records")
private val out = Outlet[Future[PutRecordsResult]]("result")
override val shape = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with StageLogging {

private val retryBaseInMillis = retryInitialTimeout.toMillis

private var completionState: Option[Try[Unit]] = None

private val pendingRequests: mutable.Queue[(Int, Seq[PutRecordsRequestEntry])] = mutable.Queue.empty
private var resultCallback: AsyncCallback[Result] = _
private var inFlight = 0

private var retryToken = 0
private val waitingRetries: mutable.HashMap[Int, Result] = mutable.HashMap.empty

private def tryToExecute() =
if (pendingRequests.nonEmpty && isAvailable(out)) {
log.debug("Executing PutRecords call")
inFlight += 1
val (attempt, records) = pendingRequests.dequeue()
push(
out,
putRecords(
streamName,
records,
recordsToRetry => resultCallback.invoke(Result(attempt, recordsToRetry))
)
)
}

private def handleResult(result: Result): Unit = result match {
case Result(_, Nil) =>
log.debug("PutRecords call finished successfully")
inFlight -= 1
tryToExecute()
if (!hasBeenPulled(in)) tryPull(in)
checkForCompletion()
case Result(attempt, errors) if attempt > maxRetries =>
log.debug("PutRecords call finished with partial errors after {} attempts", attempt)
failStage(ErrorPublishingRecords(attempt, errors.map(_._1)))
case x @ Result(attempt, _) =>
log.debug("PutRecords call finished with partial errors; scheduling retry")
inFlight -= 1
waitingRetries.put(retryToken, x)
scheduleOnce(retryToken, backoffStrategy match {
case Exponential => scala.math.pow(retryBaseInMillis, attempt).toInt millis
case Lineal => retryInitialTimeout * attempt
})
retryToken += 1
}

private def checkForCompletion() =
if (inFlight == 0 && pendingRequests.isEmpty && waitingRetries.isEmpty && isClosed(in)) {
completionState match {
case Some(Success(_)) => completeStage()
case Some(Failure(ex)) => failStage(ex)
case None => failStage(new IllegalStateException("Stage completed, but there is no info about status"))
}
}

override def preStart() = {
pull(in)
resultCallback = getAsyncCallback[Result](handleResult)
}

override protected def onTimer(timerKey: Any) =
waitingRetries.remove(timerKey.asInstanceOf[Int]) foreach { result =>
log.debug("New PutRecords retry attempt available")
pendingRequests.enqueue(result.attempt + 1 -> result.recordsToRetry.map(_._2))
tryToExecute()
}

setHandler(out, new OutHandler {
override def onPull() = {
tryToExecute()
if (waitingRetries.isEmpty && !hasBeenPulled(in)) tryPull(in)
}
})

setHandler(
in,
new InHandler {

override def onUpstreamFinish() = {
completionState = Some(Success(()))
checkForCompletion()
}

override def onUpstreamFailure(ex: Throwable) = {
completionState = Some(Failure(ex))
checkForCompletion()
}

override def onPush() = {
log.debug("New PutRecords request available")
pendingRequests.enqueue(1 -> grab(in))
tryToExecute()
}
}
)
}

}

object KinesisFlowStage {

private def putRecords(
streamName: String,
recordEntries: Seq[PutRecordsRequestEntry],
retryRecordsCallback: Seq[(PutRecordsResultEntry, PutRecordsRequestEntry)] => Unit
)(implicit kinesisClient: AmazonKinesisAsync): Future[PutRecordsResult] = {

val p = Promise[PutRecordsResult]

kinesisClient
.putRecordsAsync(
new PutRecordsRequest()
.withStreamName(streamName)
.withRecords(recordEntries.asJavaCollection),
new AsyncHandler[PutRecordsRequest, PutRecordsResult] {

override def onError(exception: Exception): Unit =
p.failure(FailurePublishingRecords(exception))

override def onSuccess(request: PutRecordsRequest, result: PutRecordsResult): Unit = {
if (result.getFailedRecordCount > 0) {
retryRecordsCallback(
result.getRecords.asScala
.zip(request.getRecords.asScala)
.filter(_._1.getErrorCode != null)
)
} else {
retryRecordsCallback(Nil)
}
p.success(result)
}
}
)

p.future
}

private case class Result(attempt: Int, recordsToRetry: Seq[(PutRecordsResultEntry, PutRecordsRequestEntry)])

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.util.concurrent.Future

import akka.actor.ActorRef
import akka.stream.alpakka.kinesis.KinesisSourceStage._
import akka.stream.alpakka.kinesis.{KinesisSourceErrors => Errors}
import akka.stream.alpakka.kinesis.{KinesisErrors => Errors}
import akka.stream.stage.GraphStageLogic.StageActor
import akka.stream.stage._
import akka.stream.{Attributes, Outlet, SourceShape}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis.javadsl

import akka.NotUsed
import akka.stream.alpakka.kinesis.{scaladsl, KinesisFlowSettings}
import akka.stream.javadsl.Flow
import com.amazonaws.services.kinesis.AmazonKinesisAsync
import com.amazonaws.services.kinesis.model.{PutRecordsRequestEntry, PutRecordsResultEntry}

object KinesisFlow {

def apply(streamName: String,
kinesisClient: AmazonKinesisAsync): Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] =
apply(streamName, KinesisFlowSettings.defaultInstance, kinesisClient)

def apply(streamName: String,
settings: KinesisFlowSettings,
kinesisClient: AmazonKinesisAsync): Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] =
(scaladsl.KinesisFlow.apply(streamName, settings)(kinesisClient)).asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
*/
package akka.stream.alpakka.kinesis.javadsl

import java.util.concurrent.Future

import akka.NotUsed
import akka.stream.alpakka.kinesis.{scaladsl, ShardSettings}
import akka.stream.javadsl.Source
import com.amazonaws.services.kinesis.AmazonKinesisAsync
import com.amazonaws.services.kinesis.model.{DescribeStreamResult, Record}
import com.amazonaws.services.kinesis.model.Record

import scala.collection.JavaConverters._

Expand Down
Loading

0 comments on commit 4440968

Please sign in to comment.