Streamz is a resource combinator library for scalaz-stream. It allows Process
instances to consume from and produce to
- Apache Camel endpoints
- Akka Persistence journals and snapshot stores and
- Akka Stream flows (reactive streams) with full back-pressure support.
resolvers += "scalaz at bintray" at "http://dl.bintray.com/scalaz/releases"
resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"
// transitively depends on akka-camel 2.3.5
libraryDependencies += "com.github.krasserm" %% "streamz-akka-camel" % "0.1"
// transitively depends on akka-persistence-experimental 2.3.5
libraryDependencies += "com.github.krasserm" %% "streamz-akka-persistence" % "0.1"
// transitively depends on akka-stream-experimental 0.6
libraryDependencies += "com.github.krasserm" %% "streamz-akka-stream" % "0.1"
import akka.actor.ActorSystem
import scalaz.concurrent.Task
import scalaz.stream.Process
import streamz.akka.camel._
implicit val system = ActorSystem("example")
val p: Process[Task,Unit] =
// receive from endpoint
receive[String]("seda:q1")
// in-only message exchange with endpoint and continue stream with in-message
.sendW("seda:q3")
// in-only message exchange with endpoint and continue stream with out-message
.request[Int]("bean:service?method=length")
// in-only message exchange with endpoint
.send("seda:q2")
// create concurrent task from process
val t: Task[Unit] = p.run
// run task (side effects only here) ...
t.run
An implicit ActorSystem
must be in scope because the combinator implementation depends on Akka Camel. A discrete stream starting from a Camel endpoint can be created with receive
where its type parameter is used to convert received message bodies to given type using a Camel type converter, if needed:
val p1: Process[Task,String] = receive[String]("seda:q1")
Streams can also be targeted at Camel endpoints. The send
and sendW
combinators initiate in-only message exchanges with a Camel endpoint:
val p2: Process[Task,Unit] = p1.send("seda:q2")
val p3: Process[Task,String] = p1.sendW("seda:q3")
The request
combinator initiates in-out message exchanges with a Camel endpoint where its type parameter is used to convert response message bodies to given type using a Camel type converter, if needed:
val p4: Process[Task,Int] = p1.request[Int]("bean:service?method=length")
These combinators are compatible with all available Camel endpoints and all Process
combinators (and now supersede the scalaz-camel project). A more concrete example how to process files from an FTP server is available here.
A discrete stream of persistent events (that are written by an akka.persistence.PersistentActor
) can be created with replay
:
import akka.actor.ActorSystem
import scalaz.concurrent.Task
import scalaz.std.string._
import scalaz.stream.Process
import streamz.akka.persistence._
implicit val system = ActorSystem("example")
// replay "processor-1" events starting from scratch (= sequence number 1)
val p1: Process[Task, Event[Any]] = replay("processor-1")
// replay "processor-1" events starting from sequence number 3
val p2: Process[Task, Event[Any]] = replay("processor-1", from = 3L)
where Event
is defined as
package streamz.akka.persistence
case class Event[A](persistenceId: String, sequenceNr: Long, data: A)
The created Process[Task, Event[Any]]
does not only replay already journaled events but also emits new events that processor-1
is going to write. Assuming journaled events Event("processor-1", 1L, "a")
, Event("processor-1", 2L, "b")
, Event("processor-1", 3L, "c")
, Event("processor-1", 4L, "d")
p1
producesEvent("processor-1", 1L, "a")
,Event("processor-1", 2L, "b")
,Event("processor-1", 3L, "c")
,Event("processor-1", 4L, "d")
, ... andp2
producesEvent("processor-1", 3L, "c")
,Event("processor-1", 4L, "d")
, ...
State can be accumulated with with scalaz-stream's scan
val p3: Process[Task, String] = p1.scan("")((acc, evt) => acc + evt.data)
so that p3
produces ""
, "a"
, "ab"
, "abc"
, "abcd"
, ... . State accumulation starting from a snapshot can be done with snapshot
:
val p4: Process[Task, String] = for {
s @ Snapshot(md, data) <- snapshot[String]("processor-1")
currentState <- replay(md.persistenceId, s.nextSequenceNr).scan(data)((acc, evt) => acc + evt.data)
} yield currentState
Additionally assuming that a snapshot "ab"
has already been written by processor-1
(after events Event("processor-1", 1L, "a")
and Event("processor-1", 2L, "b")
), p4
produces "ab"
, "abc"
, "abcd"
, ... . Finally, writing to an Akka Persistence journal from a stream can done with journal
:
val p5: Process[Task, Unit] = Process("a", "b", "c").journal("processor-2")
The following examples use these imports and definitions (full source code here):
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import akka.stream.{FlowMaterializer, MaterializerSettings}
import scalaz.concurrent.Task
import scalaz.stream._
import streamz.akka.stream._
implicit val system = ActorSystem("example")
implicit val materializer = FlowMaterializer(MaterializerSettings())
// Create process
val p1: Process[Task, Int] = Process.emitAll(1 to 20)
// Compose process with (managed) flow
val p2: Process[Task, Unit] = p1.publish() { flow: Flow[Int] =>
// Customize flow (done when running process)
flow.foreach(println)
}
// Run process
p2.run.run
// Create process
val p1: Process[Task, Int] = Process.emitAll(1 to 20)
// Create publisher (= process adapter)
val (p2, publisher) = p1.publisher()
// Create (un-managed) flow from publisher
Flow(publisher).foreach(println)
// Run process
p2.run.run
// Create flow
val f1: Flow[Int] = Flow(1 to 20)
// Create process that subscribes to the flow
val p1: Process[Task, Int] = subscribe(f1)
// Run process
p1.runLog.run.foreach(println)