From 51e7e060882e896d061f715797d5576600db5eed Mon Sep 17 00:00:00 2001 From: Daniel Wojda Date: Thu, 12 Apr 2018 19:35:35 +0100 Subject: [PATCH 1/4] Allow to pass built topology to MockedStreams --- src/main/scala/MockedStreams.scala | 24 ++++++++++++++---------- src/test/scala/MockedStreamsSpec.scala | 19 ++++++++++++++++++- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 52eab63..8886251 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -19,7 +19,7 @@ package com.madewithtea.mockedstreams import java.util.{Properties, UUID} import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig} +import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig, Topology} import org.apache.kafka.streams.state.ReadOnlyWindowStore import org.apache.kafka.test.{ProcessorTopologyTestDriver => Driver} @@ -31,14 +31,23 @@ object MockedStreams { case class Record(topic: String, key: Array[Byte], value: Array[Byte]) - case class Builder(topology: Option[(StreamsBuilder => Unit)] = None, + case class Builder(topology: Option[() => Topology] = None, configuration: Properties = new Properties(), stateStores: Seq[String] = Seq(), inputs: List[Record] = List.empty) { def config(configuration: Properties) = this.copy(configuration = configuration) - def topology(func: (StreamsBuilder => Unit)) = this.copy(topology = Some(func)) + def topology(func: (StreamsBuilder => Unit)) = { + val buildTopology = () => { + val builder = new StreamsBuilder() + func(builder) + builder.build() + } + this.copy(topology = Some(buildTopology)) + } + + def withTopology(t: () => Topology) = this.copy(topology = Some(t)) def stores(stores: Seq[String]) = this.copy(stateStores = stores) @@ -93,14 +102,9 @@ object MockedStreams { props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.putAll(configuration) - val builder = new StreamsBuilder() - - topology match { - case Some(t) => t(builder) - case _ => throw new NoTopologySpecified - } + val t = topology.getOrElse(throw new NoTopologySpecified) - new Driver(new StreamsConfig(props), builder.build()) + new Driver(new StreamsConfig(props), t()) } private def produce(driver: Driver): Unit = { diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index c0eecf6..4e15b62 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -20,7 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.kstream._ import org.apache.kafka.streams.processor.TimestampExtractor -import org.apache.kafka.streams.{Consumed, StreamsBuilder, KeyValue, StreamsConfig} +import org.apache.kafka.streams._ import org.scalatest.{FlatSpec, Matchers} class MockedStreamsSpec extends FlatSpec with Matchers { @@ -156,6 +156,23 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .shouldEqual(expectedCy.toMap) } + it should "accept already built topology" in { + import Fixtures.Uppercase._ + + def getTopology() = { + val builder = new StreamsBuilder() + topology(builder) + builder.build() + } + + val output = MockedStreams() + .withTopology(getTopology) + .input(InputTopic, strings, strings, input) + .output(OutputTopic, strings, strings, expected.size) + + output shouldEqual expected + } + class LastInitializer extends Initializer[Integer] { override def apply() = 0 } From 71edab1391d5d95d0a069e68cfc7579f42d3dab1 Mon Sep 17 00:00:00 2001 From: Eddpt Date: Mon, 14 May 2018 13:46:05 +0100 Subject: [PATCH 2/4] Update to Kafka 1.1.0 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 0b50f1a..3b22419 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ lazy val commonSettings = Seq( val scalaTestVersion = "3.0.4" val rocksDBVersion = "5.7.3" -val kafkaVersion = "1.0.1" +val kafkaVersion = "1.1.0" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, From 4f1d5318cb42f0b2d3848fd96e976f053671148c Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 2 Jun 2018 12:36:05 +0200 Subject: [PATCH 3/4] updated CONTRIBUTORS; README.md and bumped versions in build.sbt --- CONTRIBUTORS.md | 1 + README.md | 3 ++- build.sbt | 8 ++++---- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index f37ac8d..ba96900 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -3,3 +3,4 @@ * Hamidreza Afzali * Jendrik Poloczek * Svend Vanderveken +* Daniel Wojda diff --git a/README.md b/README.md index 801783e..c535402 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 1.6.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 1.7.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.6.0" % "test" @@ -12,6 +12,7 @@ Mocked Streams 1.6.0 [(git)](https://github.com/jpzk/mockedstreams) is a library | Mocked Streams Version | Apache Kafka Version | |------------- |-------------| +| 1.7.0 | 1.1.0.0 | | 1.6.0 | 1.0.1.0 | | 1.5.0 | 1.0.0.0 | | 1.4.0 | 0.11.0.1 | diff --git a/build.sbt b/build.sbt index 3b22419..599dd5a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,14 +1,14 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.6.0", - scalaVersion := "2.12.4", - crossScalaVersions := Seq("2.12.4","2.11.11"), + version := "1.7.0", + scalaVersion := "2.12.6", + crossScalaVersions := Seq("2.12.6","2.11.12"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) -val scalaTestVersion = "3.0.4" +val scalaTestVersion = "3.0.5" val rocksDBVersion = "5.7.3" val kafkaVersion = "1.1.0" From 175056e4fa693279919067ee49b81ffe3514ae34 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 2 Jun 2018 12:39:21 +0200 Subject: [PATCH 4/4] added CHANGELOG entries; changed version number in README --- CHANGELOG.md | 7 +++++++ README.md | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0147e7f..36bc100 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## Mocked Streams 1.7.0 + +* Bumping versions of dependencies +* Build against Apache Kafka 1.1.0 +* Daniel Wojda added new way of supplying topologies (withTopology) +* Added Daniel Wojda to CONTRIBUTORS.md + ## Mocked Streams 1.6.0 * Build against Apache Kafka 1.0.1 diff --git a/README.md b/README.md index c535402..4bc7e67 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Mocked Streams 1.7.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.6.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.7.0" % "test" ## Apache Kafka Compatibility