Skip to content

Commit

Permalink
Add PekkoStream (#572)
Browse files Browse the repository at this point in the history
* Add PekkoStream

---------

Co-authored-by: Cédric Chantepie <cchantep@users.noreply.github.com>
  • Loading branch information
kaplanbar and cchantep authored Sep 9, 2023
1 parent b515a83 commit ae46a2d
Show file tree
Hide file tree
Showing 10 changed files with 476 additions and 16 deletions.
13 changes: 13 additions & 0 deletions akka/src/test/scala-2.12+/StreamTestKit.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (C) from 2022 The Play Framework Contributors <https://github.com/playframework>, 2011-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package anorm

import akka.stream.Materializer
import akka.stream.testkit.scaladsl.{ StreamTestKit => AkkaTestKit }

private[anorm] object StreamTestKit {
def assertAllStagesStopped[T](f: => T)(implicit mat: Materializer): T =
AkkaTestKit.assertAllStagesStopped(f)
}
10 changes: 10 additions & 0 deletions akka/src/test/scala-2.12-/StreamTestKit.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (C) from 2022 The Play Framework Contributors <https://github.com/playframework>, 2011-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package anorm

/* No valid testkit for 2.11 */
private[anorm] object StreamTestKit {
def assertAllStagesStopped[T](f: => T): T = f
}
4 changes: 1 addition & 3 deletions akka/src/test/scala/anorm/AkkaStreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ final class AkkaStreamSpec(implicit ee: ExecutionEnv) extends org.specs2.mutable
implicit def materializer: Materializer =
akka.stream.ActorMaterializer.create(system)

// Akka-Contrib issue with Akka-Stream > 2.5.4
// import akka.stream.contrib.TestKit.assertAllStagesStopped
def assertAllStagesStopped[T](f: => T) = f
import StreamTestKit.assertAllStagesStopped

"Akka Stream" should {
"expose the query result as source" in assertAllStagesStopped {
Expand Down
100 changes: 88 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,6 @@ lazy val akkaVer = Def.setting[String] {
}
}

val akkaContribVer = Def.setting[String] {
if (akkaVer.value.startsWith("2.5")) "0.11+4-91b2f9fa"
else "0.10"
}

lazy val `anorm-akka` = (project in file("akka"))
.settings(
mimaPreviousArtifacts := {
Expand All @@ -280,12 +275,85 @@ lazy val `anorm-akka` = (project in file("akka"))
},
libraryDependencies ++= Seq(
acolyte,
"org.scala-lang.modules" %% "scala-xml" % xmlVer.value % Test
) ++ specs2Test ++ Seq(
("com.typesafe.akka" %% "akka-stream-contrib" % akkaContribVer.value % Test)
.cross(CrossVersion.for3Use2_13)
.exclude("com.typesafe.akka", "*")
),
"org.scala-lang.modules" %% "scala-xml" % xmlVer.value % Test,
("com.typesafe.akka" %% "akka-stream-testkit" % akkaVer.value % Test).exclude("org.scala-lang.modules", "*")
) ++ specs2Test,
scalacOptions ++= {
if (scalaBinaryVersion.value == "3") {
Seq("-Wconf:cat=deprecation&msg=.*(onDownstreamFinish|ActorMaterializer).*:s")
} else {
Seq("-P:silencer:globalFilters=deprecated")
}
},
Test / unmanagedSourceDirectories += {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n < 13 =>
(Test / sourceDirectory).value / "scala-2.13-"

case _ =>
(Test / sourceDirectory).value / "scala-2.13+"

}
},
Test / unmanagedSourceDirectories += {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, 11)) =>
(Test / sourceDirectory).value / "scala-2.12-"

case _ =>
(Test / sourceDirectory).value / "scala-2.12+"

}
}
)
.dependsOn(`anorm-core`)

lazy val pekkoVer = Def.setting[String]("1.0.1")

lazy val pekkoEnabled = Def.setting[Boolean] {
val v = scalaBinaryVersion.value

v != "2.11" && v != "2.12"
}

lazy val `anorm-pekko` = (project in file("pekko"))
.settings(
mimaPreviousArtifacts := Set.empty,
sourceDirectory := {
if (!pekkoEnabled.value) new java.io.File("/no/sources")
else sourceDirectory.value
},
publishArtifact := pekkoEnabled.value,
publish := Def.taskDyn {
val ver = scalaBinaryVersion.value
val go = publish.value

Def.task {
if (pekkoEnabled.value) {
go
}
}
}.value,
libraryDependencies ++= {
if (pekkoEnabled.value) {
Seq("pekko-testkit", "pekko-stream").map { m =>
("org.apache.pekko" %% m % pekkoVer.value % Provided).exclude("org.scala-lang.modules", "*")
}
} else {
Seq.empty
}
},
libraryDependencies ++= {
if (pekkoEnabled.value) {
Seq(
acolyte,
"org.scala-lang.modules" %% "scala-xml" % xmlVer.value % Test,
"org.apache.pekko" %% "pekko-stream-testkit" % pekkoVer.value % Test
) ++ specs2Test
} else {
Seq.empty
}
},
scalacOptions ++= {
if (scalaBinaryVersion.value == "3") {
Seq("-Wconf:cat=deprecation&msg=.*(onDownstreamFinish|ActorMaterializer).*:s")
Expand Down Expand Up @@ -362,7 +430,15 @@ lazy val `anorm-enumeratum` = (project in file("enumeratum"))

lazy val `anorm-parent` = (project in file("."))
.enablePlugins(ScalaUnidocPlugin)
.aggregate(`anorm-tokenizer`, `anorm-core`, `anorm-iteratee`, `anorm-akka`, `anorm-postgres`, `anorm-enumeratum`)
.aggregate(
`anorm-tokenizer`,
`anorm-core`,
`anorm-iteratee`,
`anorm-akka`,
`anorm-pekko`,
`anorm-postgres`,
`anorm-enumeratum`
)
.settings(
mimaPreviousArtifacts := Set.empty,
(Compile / headerSources) ++=
Expand Down
184 changes: 184 additions & 0 deletions pekko/src/main/scala/anorm/PekkoStream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright (C) from 2022 The Play Framework Contributors <https://github.com/playframework>, 2011-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package anorm

import java.sql.Connection

import scala.util.control.NonFatal

import scala.concurrent.{ Future, Promise }

import org.apache.pekko.stream.scaladsl.Source

/**
* Anorm companion for the Pekko Streams.
*
* @define materialization It materializes a [[scala.concurrent.Future]] of [[scala.Int]] containing the number of rows read from the source upon completion, and a possible exception if row parsing failed.
* @define sqlParam the SQL query
* @define connectionParam the JDBC connection, which must not be closed until the source is materialized.
* @define columnAliaserParam the column aliaser
*/
object PekkoStream {

/**
* Returns the rows parsed from the `sql` query as a reactive source.
*
* $materialization
*
* @tparam T the type of the result elements
* @param sql $sqlParam
* @param parser the result (row) parser
* @param as $columnAliaserParam
* @param connection $connectionParam
*
* {{{
* import java.sql.Connection
*
* import scala.concurrent.Future
*
* import org.apache.pekko.stream.scaladsl.Source
*
* import anorm._
*
* def resultSource(implicit con: Connection): Source[String, Future[Int]] = PekkoStream.source(SQL"SELECT * FROM Test", SqlParser.scalar[String], ColumnAliaser.empty)
* }}}
*/
@SuppressWarnings(Array("UnusedMethodParameter"))
def source[T](sql: => Sql, parser: RowParser[T], as: ColumnAliaser)(implicit
con: Connection
): Source[T, Future[Int]] = Source.fromGraph(new ResultSource[T](con, sql, as, parser))

/**
* Returns the rows parsed from the `sql` query as a reactive source.
*
* $materialization
*
* @tparam T the type of the result elements
* @param sql $sqlParam
* @param parser the result (row) parser
* @param connection $connectionParam
*/
@SuppressWarnings(Array("UnusedMethodParameter"))
def source[T](sql: => Sql, parser: RowParser[T])(implicit con: Connection): Source[T, Future[Int]] =
source[T](sql, parser, ColumnAliaser.empty)

/**
* Returns the result rows from the `sql` query as an enumerator.
* This is equivalent to `source[Row](sql, RowParser.successful, as)`.
*
* $materialization
*
* @param sql $sqlParam
* @param as $columnAliaserParam
* @param connection $connectionParam
*/
def source(sql: => Sql, as: ColumnAliaser)(implicit connection: Connection): Source[Row, Future[Int]] =
source(sql, RowParser.successful, as)

/**
* Returns the result rows from the `sql` query as an enumerator.
* This is equivalent to
* `source[Row](sql, RowParser.successful, ColumnAliaser.empty)`.
*
* $materialization
*
* @param sql $sqlParam
* @param connection $connectionParam
*/
def source(sql: => Sql)(implicit connnection: Connection): Source[Row, Future[Int]] =
source(sql, RowParser.successful, ColumnAliaser.empty)

// Internal stages

import org.apache.pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler }
import org.apache.pekko.stream.{ Attributes, Outlet, SourceShape }

import java.sql.ResultSet
import scala.util.{ Failure, Success }

private[anorm] class ResultSource[T](connection: Connection, sql: Sql, as: ColumnAliaser, parser: RowParser[T])
extends GraphStageWithMaterializedValue[SourceShape[T], Future[Int]] {

private[anorm] var resultSet: ResultSet = _

override val toString = "AnormQueryResult"
val out: Outlet[T] = Outlet(s"${toString}.out")
val shape: SourceShape[T] = SourceShape(out)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
val result = Promise[Int]()

val logic = new GraphStageLogic(shape) with OutHandler {
private var cursor: Option[Cursor] = None
private var counter: Int = 0

private def failWith(cause: Throwable): Unit = {
result.failure(cause)
fail(out, cause)
()
}

override def preStart(): Unit = {
try {
resultSet = sql.unsafeResultSet(connection)
nextCursor()
} catch {
case NonFatal(cause) => failWith(cause)
}
}

override def postStop() = release()

private def release(): Unit = {
val stmt: Option[java.sql.Statement] = {
if (resultSet != null && !resultSet.isClosed) {
val s = resultSet.getStatement
resultSet.close()
Option(s)
} else None
}

stmt.foreach { s =>
if (!s.isClosed) s.close()
}
}

private def nextCursor(): Unit = {
cursor = Sql.unsafeCursor(resultSet, sql.resultSetOnFirstRow, as)
}

def onPull(): Unit = cursor match {
case Some(c) =>
c.row.as(parser) match {
case Success(parsed) => {
counter += 1
push(out, parsed)
nextCursor()
}

case Failure(cause) =>
failWith(cause)
}

case _ => {
result.success(counter)
complete(out)
}
}

override def onDownstreamFinish() = {
result.tryFailure(new InterruptedException("Downstream finished"))
release()
super.onDownstreamFinish()
}

setHandler(out, this)
}

logic -> result.future
}
}

}
3 changes: 3 additions & 0 deletions pekko/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pekko {
loglevel = "OFF"
}
9 changes: 9 additions & 0 deletions pekko/src/test/scala-2.13+/PekkoCompat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (C) from 2022 The Play Framework Contributors <https://github.com/playframework>, 2011-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package anorm

private[anorm] object PekkoCompat {
type Seq[T] = _root_.scala.collection.immutable.Seq[T]
}
9 changes: 9 additions & 0 deletions pekko/src/test/scala-2.13-/PekkoCompat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (C) from 2022 The Play Framework Contributors <https://github.com/playframework>, 2011-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package anorm

private[anorm] object PekkoCompat {
type Seq[T] = _root_.scala.collection.Seq[T]
}
Loading

0 comments on commit ae46a2d

Please sign in to comment.