Skip to content

Commit

Permalink
Merge pull request #29 from jpzk/release/v1.1.0
Browse files Browse the repository at this point in the history
Release/v1.7.0
  • Loading branch information
jpzk authored Jun 3, 2018
2 parents d699340 + 175056e commit 2686284
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 18 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Mocked Streams 1.7.0

* Bumping versions of dependencies
* Build against Apache Kafka 1.1.0
* Daniel Wojda added new way of supplying topologies (withTopology)
* Added Daniel Wojda to CONTRIBUTORS.md

## Mocked Streams 1.6.0

* Build against Apache Kafka 1.0.1
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
* Hamidreza Afzali
* Jendrik Poloczek
* Svend Vanderveken
* Daniel Wojda
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
[![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)


Mocked Streams 1.6.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):
Mocked Streams 1.7.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.6.0" % "test"
libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.7.0" % "test"

## Apache Kafka Compatibility

| Mocked Streams Version | Apache Kafka Version |
|------------- |-------------|
| 1.7.0 | 1.1.0.0 |
| 1.6.0 | 1.0.1.0 |
| 1.5.0 | 1.0.0.0 |
| 1.4.0 | 0.11.0.1 |
Expand Down
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@

lazy val commonSettings = Seq(
organization := "com.madewithtea",
version := "1.6.0",
scalaVersion := "2.12.4",
crossScalaVersions := Seq("2.12.4","2.11.11"),
version := "1.7.0",
scalaVersion := "2.12.6",
crossScalaVersions := Seq("2.12.6","2.11.12"),
description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",
organizationHomepage := Some(url("https://www.madewithtea.com")),
scalacOptions := Seq("-Xexperimental"))

val scalaTestVersion = "3.0.4"
val scalaTestVersion = "3.0.5"
val rocksDBVersion = "5.7.3"
val kafkaVersion = "1.0.1"
val kafkaVersion = "1.1.0"

lazy val kafka = Seq(
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
Expand Down
24 changes: 14 additions & 10 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.madewithtea.mockedstreams
import java.util.{Properties, UUID}

import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig}
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig, Topology}
import org.apache.kafka.streams.state.ReadOnlyWindowStore
import org.apache.kafka.test.{ProcessorTopologyTestDriver => Driver}

Expand All @@ -31,14 +31,23 @@ object MockedStreams {

case class Record(topic: String, key: Array[Byte], value: Array[Byte])

case class Builder(topology: Option[(StreamsBuilder => Unit)] = None,
case class Builder(topology: Option[() => Topology] = None,
configuration: Properties = new Properties(),
stateStores: Seq[String] = Seq(),
inputs: List[Record] = List.empty) {

def config(configuration: Properties) = this.copy(configuration = configuration)

def topology(func: (StreamsBuilder => Unit)) = this.copy(topology = Some(func))
def topology(func: (StreamsBuilder => Unit)) = {
val buildTopology = () => {
val builder = new StreamsBuilder()
func(builder)
builder.build()
}
this.copy(topology = Some(buildTopology))
}

def withTopology(t: () => Topology) = this.copy(topology = Some(t))

def stores(stores: Seq[String]) = this.copy(stateStores = stores)

Expand Down Expand Up @@ -93,14 +102,9 @@ object MockedStreams {
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.putAll(configuration)

val builder = new StreamsBuilder()

topology match {
case Some(t) => t(builder)
case _ => throw new NoTopologySpecified
}
val t = topology.getOrElse(throw new NoTopologySpecified)

new Driver(new StreamsConfig(props), builder.build())
new Driver(new StreamsConfig(props), t())
}

private def produce(driver: Driver): Unit = {
Expand Down
19 changes: 18 additions & 1 deletion src/test/scala/MockedStreamsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.processor.TimestampExtractor
import org.apache.kafka.streams.{Consumed, StreamsBuilder, KeyValue, StreamsConfig}
import org.apache.kafka.streams._
import org.scalatest.{FlatSpec, Matchers}

class MockedStreamsSpec extends FlatSpec with Matchers {
Expand Down Expand Up @@ -156,6 +156,23 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
.shouldEqual(expectedCy.toMap)
}

it should "accept already built topology" in {
import Fixtures.Uppercase._

def getTopology() = {
val builder = new StreamsBuilder()
topology(builder)
builder.build()
}

val output = MockedStreams()
.withTopology(getTopology)
.input(InputTopic, strings, strings, input)
.output(OutputTopic, strings, strings, expected.size)

output shouldEqual expected
}

class LastInitializer extends Initializer[Integer] {
override def apply() = 0
}
Expand Down

0 comments on commit 2686284

Please sign in to comment.