Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka issue with ClassLoader not containing the CodecWrapper class #30

Merged
merged 24 commits into from
Dec 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1438fbd
Fixing up PiExceptions
JeVaughan Oct 22, 2018
05df6bf
Fixed up serialisation for PiExceptionEvents, serialisation for stack…
JeVaughan Oct 22, 2018
3ebe79d
Fixed missing time parameter for PiEventCall
JeVaughan Oct 22, 2018
614ac00
Update version to 1.0.0
PetrosPapapa Oct 26, 2018
13150f2
Updated and improved tests.
PetrosPapapa Oct 26, 2018
4d2001b
Intermediary superclass for PiEvents with atomic process call referen…
JeVaughan Nov 5, 2018
639a6dd
Comments for StatelessMessage types.
JeVaughan Nov 5, 2018
2ea2438
Merge remote-tracking branch 'origin/kafka' into kafka
JeVaughan Nov 5, 2018
f1b4035
Added piiId id to the AnyMsg superclass.
JeVaughan Nov 5, 2018
892810e
Added comments to classes in the Tracked file.
JeVaughan Nov 5, 2018
15116ae
Added load test for KafkaExecutor
JeVaughan Nov 5, 2018
5b404b5
Various annotations
PetrosPapapa Nov 5, 2018
649f284
Improved KafkaExecutor saturation test
JeVaughan Nov 6, 2018
cfe16dc
Merge remote-tracking branch 'origin/kafka' into kafka
JeVaughan Nov 6, 2018
5416500
Kafka load test: Swapped try/catch and forloop to avoid stopping mid …
JeVaughan Nov 6, 2018
272309a
WIP: Test for partition rebalancing robustness
JeVaughan Nov 13, 2018
28ee971
Merge Master branch into Kafka
Nov 23, 2018
5fc093f
Test kafka executor using individual components
Nov 23, 2018
bfd7f5f
Added "baremetal" Executor interface kafka test.
Nov 23, 2018
c66710c
KafkaExecutor: Smallest test-case which throws a ConfigException
Nov 23, 2018
04d2434
Gahh, ExecutionContext threads use the wrong ClassLoader!
Nov 23, 2018
b2db84f
Forgotten import statement
Nov 23, 2018
675b0b8
Ensure the correct ClassLoader is used when creating KafkaProducers
Nov 23, 2018
1240d7b
Removed the out-of-date "mainClassLoader shouldBe threadClassLoader" …
Dec 3, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import akka.stream.scaladsl.{Sink, Source}
import akka.{Done, NotUsed}
import com.workflowfm.pew.stateless.StatelessMessages.AnyMsg
import com.workflowfm.pew.stateless.instances.kafka.settings.KafkaExecutorSettings
import org.apache.kafka.clients.producer.KafkaProducer
import org.bson.types.ObjectId

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -136,6 +137,33 @@ object Tracked {

consuming.tail.foldLeft[T[Seq[Msg]]]( foldStart )( combine )
}

/** Execute a function on the current thread whilst using a specific ClassLoader.
* Restores the previous ClassLoader before returning.
*
* @param tmpClassLoader ClassLoader to use during the function execution.
* @param fnWrapped Function to execute with a different ClassLoader.
* @tparam T The return type of `fnWrapped`
* @return The returned value of `fnWrapped`
*/
def withClassLoader[T]( tmpClassLoader: ClassLoader )( fnWrapped: => T ): T = {
val pushedClassLoader = Thread.currentThread().getContextClassLoader
try{
Thread.currentThread().setContextClassLoader( tmpClassLoader )
fnWrapped
} finally {
Thread.currentThread().setContextClassLoader( pushedClassLoader )
}
}

/** Create a KafkaProducer from settings whilst explicitly un-setting the ClassLoader.
* This by-passes errors encountered in the KafkaProducer constructor where Threads
* within an ExecutionContext do not list the necessary key or value serialiser classes.
* Explicitly setting `null` causes the constructor to use the Kafka ClassLoader
* which should contain these values.
*/
def createProducer[K, V]( settings: ProducerSettings[K, V] ): KafkaProducer[K, V]
= withClassLoader( null ) { settings.createKafkaProducer() }
}

/** A Tracked type which uses a `Committable` as tracking information.
Expand All @@ -155,7 +183,7 @@ trait HasCommittableSinks[T[X] <: HasCommittable[X]]
with TrackedMultiSink[T] {

override def sink[V <: AnyMsg]( implicit s: KafkaExecutorSettings ): Sink[HasCommittable[V], Future[Done]]
= Producer.commitableSink( s.psAllMessages )
= Producer.commitableSink( s.psAllMessages, Tracked.createProducer( s.psAllMessages ) )
.contramap( msg =>
ProducerMessage.Message(
s.record( msg.value ),
Expand All @@ -164,7 +192,7 @@ trait HasCommittableSinks[T[X] <: HasCommittable[X]]
)

override def sinkMulti[V <: AnyMsg]( implicit s: KafkaExecutorSettings ): Sink[HasCommittable[Seq[V]], Future[Done]]
= Producer.commitableSink( s.psAllMessages )
= Producer.commitableSink( s.psAllMessages, Tracked.createProducer( s.psAllMessages ) )
.contramap( msgs =>
ProducerMessage.MultiMessage(
msgs.value.map(s.record).to,
Expand Down Expand Up @@ -215,7 +243,6 @@ object CommitTracked
msg.committableOffset
)
)

}

/** A Tracked wrapper superclass for Tracked types which know the partition of their inputs.
Expand Down Expand Up @@ -330,6 +357,7 @@ object Transaction
.map( msg => Transaction( msg.record.value(), msg.partitionOffset ) )

override def sink[V <: AnyMsg]( implicit s: KafkaExecutorSettings ): Sink[Transaction[V], Future[Done]] = {
// TODO: Construct the KafkaProducer with the correct ClassLoader
val id = ObjectId.get.toString
Transactional.sink( s.psAllMessages, id )
.contramap( msg =>
Expand All @@ -341,6 +369,7 @@ object Transaction
}

override def sinkMulti[V <: AnyMsg]( implicit s: KafkaExecutorSettings ): Sink[Transaction[Seq[V]], Future[Done]] = {
// TODO: Construct the KafkaProducer with the correct ClassLoader
val id = ObjectId.get.toString
Transactional.sink( s.psAllMessages, id )
.contramap( msgs =>
Expand Down Expand Up @@ -385,7 +414,7 @@ object Untracked
)

override def sink[Value <: AnyMsg]( implicit s: KafkaExecutorSettings ): Sink[Untracked[Value], Future[Done]]
= Producer.plainSink( s.psAllMessages )
= Producer.plainSink( s.psAllMessages, Tracked.createProducer( s.psAllMessages ) )
.contramap( (msg: Untracked[Value]) => s.record( msg.value ) )

}
Expand Down
80 changes: 78 additions & 2 deletions test/com/workflowfm/pew/stateless/KafkaExecutorTests.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.workflowfm.pew.stateless

import com.workflowfm.pew.stateless.StatelessMessages._
import akka.Done
import com.workflowfm.pew.stateless.StatelessMessages.{AnyMsg, _}
import com.workflowfm.pew.stateless.instances.kafka.components.KafkaConnectors
import com.workflowfm.pew._
import com.workflowfm.pew.{PromiseHandler, _}
import com.workflowfm.pew.stateless.components.{AtomicExecutor, Reducer, ResultListener}
import com.workflowfm.pew.stateless.instances.kafka.MinimalKafkaExecutor
import com.workflowfm.pew.stateless.instances.kafka.components.KafkaConnectors.{indyReducer, indySequencer, sendMessages}
import org.apache.kafka.common.utils.Utils
import org.bson.types.ObjectId
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand All @@ -17,6 +22,77 @@ class KafkaExecutorTests extends PewTestSuite with KafkaTests {
// Ensure there are no outstanding messages before starting testing.
new MessageDrain( true )

lazy val mainClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader
lazy val kafkaClassLoader: ClassLoader = Utils.getContextOrKafkaClassLoader
lazy val threadClassLoader: ClassLoader = await( Future.unit.map(_ => Thread.currentThread().getContextClassLoader ) )

it should "use the same ClassLoader for Kafka as the Main thread" in {
mainClassLoader shouldBe kafkaClassLoader
}

// Instead unset the class loader when creating KafkaProducers.
ignore should "use the same ClassLoader for the ExecutionContext threads as the Main thread" in {
mainClassLoader shouldBe threadClassLoader
}

it should "start a producer from inside a ExecutionContext" in {
implicit val settings = completeProcess.settings

val future: Future[Done] = Future { Thread.sleep(100) } map { _ =>
val pii = PiInstance( ObjectId.get, pbi, PiObject(1) )
sendMessages( ReduceRequest( pii, Seq() ) )
Done
}

await( future ) shouldBe Done
}

it should "execute an atomic PbI using a DIY KafkaExecutor" in {
implicit val settings = completeProcess.settings
val listener = new ResultListener

val c1 = KafkaConnectors.indyReducer( new Reducer )
val c2 = KafkaConnectors.indySequencer
val c3 = KafkaConnectors.indyAtomicExecutor( AtomicExecutor() )
val c4 = KafkaConnectors.uniqueResultListener( listener )

val pii = PiInstance( ObjectId.get, pbi, PiObject(1) )
sendMessages( ReduceRequest( pii, Seq() ) )

val handler = new PromiseHandler( "test", pii.id )
listener.subscribe( handler )

await( handler.promise.future ) should be ("PbISleptFor1s")
await( KafkaConnectors.shutdown( c1, c2, c3, c4 ) )

val msgsOf = new MessageDrain( true )
msgsOf[SequenceRequest] shouldBe empty
msgsOf[SequenceFailure] shouldBe empty
msgsOf[ReduceRequest] shouldBe empty
msgsOf[Assignment] shouldBe empty
msgsOf[PiiUpdate] shouldBe empty
}

it should "execute an atomic PbI using the baremetal Executor interface" in {
val ex = makeExecutor( completeProcess.settings )

val piiId = await( ex.init( pbi, Seq( PiObject(1) ) ) )
val handler = new PromiseHandler( "test", piiId )
ex.subscribe( handler )

val f1 = handler.promise.future
ex.start( piiId )

await( f1 ) should be ("PbISleptFor1s")
ex.syncShutdown()

val msgsOf = new MessageDrain( true )
msgsOf[SequenceRequest] shouldBe empty
msgsOf[SequenceFailure] shouldBe empty
msgsOf[ReduceRequest] shouldBe empty
msgsOf[Assignment] shouldBe empty
msgsOf[PiiUpdate] shouldBe empty }

it should "execute atomic PbI once" in {

val ex = makeExecutor( completeProcess.settings )
Expand Down