Skip to content

Commit

Permalink
fixup! WIP - Add Kinesis KCL Source
Browse files Browse the repository at this point in the history
  • Loading branch information
aserrallerios committed Aug 11, 2017
1 parent 607c9aa commit e0648c9
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 64 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,19 @@ package akka.stream.alpakka.kinesis
import java.util
import java.util.concurrent.Semaphore

import akka.stream.alpakka.kinesis.KinesisWorkerSourceStage._
import akka.stream.alpakka.kinesis.worker.{CommittableRecord, IRecordProcessor}
import akka.stream.stage._
import akka.stream.{Attributes, Outlet, SourceShape}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{
IRecordProcessor,
IRecordProcessorCheckpointer,
IRecordProcessorFactory
}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{ShutdownReason, Worker}
import com.amazonaws.services.kinesis.model.Record
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

class KinesisWorkerSourceStage(
bufferSize: Int,
workerBuilder: IRecordProcessorFactory => Worker,
workerExecutor: ExecutionContext
) extends GraphStageWithMaterializedValue[SourceShape[CommittableRecord], Worker] {
workerBuilder: IRecordProcessorFactory => Worker
)(implicit executor: ExecutionContext)
extends GraphStageWithMaterializedValue[SourceShape[CommittableRecord], Worker] {

private val out: Outlet[CommittableRecord] = Outlet("records")
override val shape: SourceShape[CommittableRecord] = SourceShape(out)
Expand Down Expand Up @@ -57,14 +51,14 @@ class KinesisWorkerSourceStage(
worker = workerBuilder(
new IRecordProcessorFactory {
override def createProcessor(): IRecordProcessor =
new IRecordProcessorWithCallback(callback.invoke)(workerExecutor)
new IRecordProcessor(callback.invoke)
}
)
Future(worker.run())(workerExecutor)
Future(worker.run())
}

override def postStop(): Unit =
worker.shutdown()
Future(worker.shutdown())

setHandler(out, new OutHandler {
override def onPull(): Unit =
Expand All @@ -73,28 +67,3 @@ class KinesisWorkerSourceStage(
}

}

object KinesisWorkerSourceStage {

class IRecordProcessorWithCallback(callback: CommittableRecord => Unit)(implicit executionContext: ExecutionContext)
extends IRecordProcessor { self: IRecordProcessor =>
var shardId: String = _
var shutdown: Option[ShutdownReason] = None

override def initialize(shardId: String): Unit =
this.shardId = shardId
override def processRecords(records: util.List[Record], checkpointer: IRecordProcessorCheckpointer): Unit =
records.asScala.foreach(
record =>
callback(new CommittableRecord(shardId, record) {
override def isProcessorShutdown: Option[ShutdownReason] =
shutdown
override def checkpoint(): Future[Unit] =
Future(checkpointer.checkpoint(record))
})
)
override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason): Unit =
shutdown = Some(reason)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
package akka.stream.alpakka.kinesis.scaladsl

import akka.NotUsed
import akka.stream.alpakka.kinesis.worker.CommittableRecord
import akka.stream.alpakka.kinesis.{
CommittableRecord,
KinesisWorkerCheckpointSettings,
KinesisWorkerSourceSettings,
KinesisWorkerSourceStage
}
import akka.stream.{scaladsl, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Source, Zip}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
import com.amazonaws.services.kinesis.model.Record

Expand All @@ -25,7 +25,7 @@ object KinesisWorkerSource {
workerBuilder: IRecordProcessorFactory => Worker,
settings: KinesisWorkerSourceSettings = KinesisWorkerSourceSettings.defaultInstance
)(implicit workerExecutor: ExecutionContext): Source[CommittableRecord, Worker] =
Source.fromGraph(new KinesisWorkerSourceStage(settings.bufferSize, workerBuilder, workerExecutor))
Source.fromGraph(new KinesisWorkerSourceStage(settings.bufferSize, workerBuilder))

def checkpointRecords(
settings: KinesisWorkerCheckpointSettings = KinesisWorkerCheckpointSettings.defaultInstance
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis.worker

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.model.Record

import scala.concurrent.{ExecutionContext, Future}

class CommittableRecord(
val shardId: String,
val extendedSequenceNumber: ExtendedSequenceNumber,
val millisBehindLatest: Long,
val record: Record,
recordProcessor: IRecordProcessor,
checkpointer: IRecordProcessorCheckpointer
)(implicit executor: ExecutionContext) {

val sequenceNumber = record.getSequenceNumber

def shutdown: Option[ShutdownReason] =
recordProcessor.shutdown
def checkpoint(): Future[Unit] =
Future(checkpointer.checkpoint(record))

}

object CommittableRecord {

implicit val order: Ordering[CommittableRecord] = Ordering.by(_.sequenceNumber)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis.worker

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
import com.amazonaws.services.kinesis.clientlibrary.types.{
ExtendedSequenceNumber,
InitializationInput,
ProcessRecordsInput,
ShutdownInput
}

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext

private[kinesis] class IRecordProcessor(
callback: CommittableRecord => Unit
)(implicit executionContext: ExecutionContext)
extends com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor {
private var shardId: String = _
private var extendedSequenceNumber: ExtendedSequenceNumber = _

var shutdown: Option[ShutdownReason] = None

override def initialize(initializationInput: InitializationInput): Unit = {
this.shardId = initializationInput.getShardId
this.extendedSequenceNumber = initializationInput.getExtendedSequenceNumber
}

override def processRecords(processRecordsInput: ProcessRecordsInput): Unit =
processRecordsInput.getRecords.asScala.foreach { record =>
callback(
new CommittableRecord(
shardId,
extendedSequenceNumber,
processRecordsInput.getMillisBehindLatest,
record,
recordProcessor = this,
processRecordsInput.getCheckpointer
)
)
}

override def shutdown(shutdownInput: ShutdownInput): Unit =
shutdown = Some(shutdownInput.getShutdownReason)

}

0 comments on commit e0648c9

Please sign in to comment.