From 704c2682eea7a7c832b4b43d539697642b5245fe Mon Sep 17 00:00:00 2001 From: kaplanbar Date: Fri, 18 Aug 2023 14:44:59 +0200 Subject: [PATCH] Add PekkoStream --- build.sbt | 40 +++- pekko/src/main/scala/anorm/PekkoStream.scala | 182 ++++++++++++++++++ pekko/src/test/resources/reference.conf | 3 + pekko/src/test/scala-2.13+/PekkoCompat.scala | 9 + pekko/src/test/scala-2.13-/PekkoCompat.scala | 9 + .../test/scala/anorm/PekkoStreamSpec.scala | 160 +++++++++++++++ 6 files changed, 402 insertions(+), 1 deletion(-) create mode 100644 pekko/src/main/scala/anorm/PekkoStream.scala create mode 100644 pekko/src/test/resources/reference.conf create mode 100644 pekko/src/test/scala-2.13+/PekkoCompat.scala create mode 100644 pekko/src/test/scala-2.13-/PekkoCompat.scala create mode 100644 pekko/src/test/scala/anorm/PekkoStreamSpec.scala diff --git a/build.sbt b/build.sbt index 00483d76..13700fe9 100644 --- a/build.sbt +++ b/build.sbt @@ -255,6 +255,8 @@ lazy val akkaVer = Def.setting[String] { } } +lazy val pekkoVer = Def.setting[String]("1.0.1") + val akkaContribVer = Def.setting[String] { if (akkaVer.value.startsWith("2.5")) "0.11+4-91b2f9fa" else "0.10" @@ -300,6 +302,42 @@ lazy val `anorm-akka` = (project in file("akka")) ) .dependsOn(`anorm-core`) +lazy val `anorm-pekko` = (project in file("pekko")) + .settings( + mimaPreviousArtifacts := { + if (scalaBinaryVersion.value == "3") { + Set.empty + } else { + mimaPreviousArtifacts.value + } + }, + libraryDependencies ++= Seq("pekko-testkit", "pekko-stream").map { m => + ("org.apache.pekko" %% m % pekkoVer.value % Provided).exclude("org.scala-lang.modules", "*") + }, + libraryDependencies ++= Seq( + acolyte, + "org.scala-lang.modules" %% "scala-xml" % xmlVer.value % Test + ) ++ specs2Test, + scalacOptions ++= { + if (scalaBinaryVersion.value == "3") { + Seq("-Wconf:cat=deprecation&msg=.*(onDownstreamFinish|ActorMaterializer).*:s") + } else { + Seq("-P:silencer:globalFilters=deprecated") + } + }, + Test / unmanagedSourceDirectories ++= { + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, n)) if n < 13 => + Seq((Test / sourceDirectory).value / "scala-2.13-") + + case _ => + Seq((Test / sourceDirectory).value / "scala-2.13+") + + } + } + ) + .dependsOn(`anorm-core`) + // --- lazy val pgVer = sys.env.get("POSTGRES_VERSION").getOrElse("42.6.0") @@ -356,7 +394,7 @@ lazy val `anorm-enumeratum` = (project in file("enumeratum")) lazy val `anorm-parent` = (project in file(".")) .enablePlugins(ScalaUnidocPlugin) - .aggregate(`anorm-tokenizer`, `anorm-core`, `anorm-iteratee`, `anorm-akka`, `anorm-postgres`, `anorm-enumeratum`) + .aggregate(`anorm-tokenizer`, `anorm-core`, `anorm-iteratee`, `anorm-akka`, `anorm-pekko`, `anorm-postgres`, `anorm-enumeratum`) .settings( mimaPreviousArtifacts := Set.empty, (Compile / headerSources) ++= diff --git a/pekko/src/main/scala/anorm/PekkoStream.scala b/pekko/src/main/scala/anorm/PekkoStream.scala new file mode 100644 index 00000000..65d0d606 --- /dev/null +++ b/pekko/src/main/scala/anorm/PekkoStream.scala @@ -0,0 +1,182 @@ +/* + * Copyright (C) from 2022 The Play Framework Contributors , 2011-2021 Lightbend Inc. + */ + +package anorm + +import org.apache.pekko.stream.scaladsl.Source + +import java.sql.Connection +import scala.concurrent.{ Future, Promise } +import scala.util.control.NonFatal + +/** + * Anorm companion for the Pekko Streams. + * + * @define materialization It materializes a [[scala.concurrent.Future]] of [[scala.Int]] containing the number of rows read from the source upon completion, and a possible exception if row parsing failed. + * @define sqlParam the SQL query + * @define connectionParam the JDBC connection, which must not be closed until the source is materialized. + * @define columnAliaserParam the column aliaser + */ +object PekkoStream { + + /** + * Returns the rows parsed from the `sql` query as a reactive source. + * + * $materialization + * + * @tparam T the type of the result elements + * @param sql $sqlParam + * @param parser the result (row) parser + * @param as $columnAliaserParam + * @param connection $connectionParam + * + * {{{ + * import java.sql.Connection + * + * import scala.concurrent.Future + * + * import org.apache.pekko.stream.scaladsl.Source + * + * import anorm._ + * + * def resultSource(implicit con: Connection): Source[String, Future[Int]] = PekkoStream.source(SQL"SELECT * FROM Test", SqlParser.scalar[String], ColumnAliaser.empty) + * }}} + */ + @SuppressWarnings(Array("UnusedMethodParameter")) + def source[T](sql: => Sql, parser: RowParser[T], as: ColumnAliaser)(implicit + con: Connection + ): Source[T, Future[Int]] = Source.fromGraph(new ResultSource[T](con, sql, as, parser)) + + /** + * Returns the rows parsed from the `sql` query as a reactive source. + * + * $materialization + * + * @tparam T the type of the result elements + * @param sql $sqlParam + * @param parser the result (row) parser + * @param connection $connectionParam + */ + @SuppressWarnings(Array("UnusedMethodParameter")) + def source[T](sql: => Sql, parser: RowParser[T])(implicit con: Connection): Source[T, Future[Int]] = + source[T](sql, parser, ColumnAliaser.empty) + + /** + * Returns the result rows from the `sql` query as an enumerator. + * This is equivalent to `source[Row](sql, RowParser.successful, as)`. + * + * $materialization + * + * @param sql $sqlParam + * @param as $columnAliaserParam + * @param connection $connectionParam + */ + def source(sql: => Sql, as: ColumnAliaser)(implicit connection: Connection): Source[Row, Future[Int]] = + source(sql, RowParser.successful, as) + + /** + * Returns the result rows from the `sql` query as an enumerator. + * This is equivalent to + * `source[Row](sql, RowParser.successful, ColumnAliaser.empty)`. + * + * $materialization + * + * @param sql $sqlParam + * @param connection $connectionParam + */ + def source(sql: => Sql)(implicit connnection: Connection): Source[Row, Future[Int]] = + source(sql, RowParser.successful, ColumnAliaser.empty) + + // Internal stages + + import org.apache.pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler } + import org.apache.pekko.stream.{ Attributes, Outlet, SourceShape } + + import java.sql.ResultSet + import scala.util.{ Failure, Success } + + private[anorm] class ResultSource[T](connection: Connection, sql: Sql, as: ColumnAliaser, parser: RowParser[T]) + extends GraphStageWithMaterializedValue[SourceShape[T], Future[Int]] { + + private[anorm] var resultSet: ResultSet = _ + + override val toString = "AnormQueryResult" + val out: Outlet[T] = Outlet(s"${toString}.out") + val shape: SourceShape[T] = SourceShape(out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = { + val result = Promise[Int]() + + val logic = new GraphStageLogic(shape) with OutHandler { + private var cursor: Option[Cursor] = None + private var counter: Int = 0 + + private def failWith(cause: Throwable): Unit = { + result.failure(cause) + fail(out, cause) + () + } + + override def preStart(): Unit = { + try { + resultSet = sql.unsafeResultSet(connection) + nextCursor() + } catch { + case NonFatal(cause) => failWith(cause) + } + } + + override def postStop() = release() + + private def release(): Unit = { + val stmt: Option[java.sql.Statement] = { + if (resultSet != null && !resultSet.isClosed) { + val s = resultSet.getStatement + resultSet.close() + Option(s) + } else None + } + + stmt.foreach { s => + if (!s.isClosed) s.close() + } + } + + private def nextCursor(): Unit = { + cursor = Sql.unsafeCursor(resultSet, sql.resultSetOnFirstRow, as) + } + + def onPull(): Unit = cursor match { + case Some(c) => + c.row.as(parser) match { + case Success(parsed) => { + counter += 1 + push(out, parsed) + nextCursor() + } + + case Failure(cause) => + failWith(cause) + } + + case _ => { + result.success(counter) + complete(out) + } + } + + override def onDownstreamFinish() = { + result.tryFailure(new InterruptedException("Downstream finished")) + release() + super.onDownstreamFinish() + } + + setHandler(out, this) + } + + logic -> result.future + } + } + +} diff --git a/pekko/src/test/resources/reference.conf b/pekko/src/test/resources/reference.conf new file mode 100644 index 00000000..c9673757 --- /dev/null +++ b/pekko/src/test/resources/reference.conf @@ -0,0 +1,3 @@ +akka { + loglevel = "OFF" +} \ No newline at end of file diff --git a/pekko/src/test/scala-2.13+/PekkoCompat.scala b/pekko/src/test/scala-2.13+/PekkoCompat.scala new file mode 100644 index 00000000..e81331d1 --- /dev/null +++ b/pekko/src/test/scala-2.13+/PekkoCompat.scala @@ -0,0 +1,9 @@ +/* + * Copyright (C) from 2022 The Play Framework Contributors , 2011-2021 Lightbend Inc. + */ + +package anorm + +private[anorm] object PekkoCompat { + type Seq[T] = _root_.scala.collection.immutable.Seq[T] +} diff --git a/pekko/src/test/scala-2.13-/PekkoCompat.scala b/pekko/src/test/scala-2.13-/PekkoCompat.scala new file mode 100644 index 00000000..59c192cf --- /dev/null +++ b/pekko/src/test/scala-2.13-/PekkoCompat.scala @@ -0,0 +1,9 @@ +/* + * Copyright (C) from 2022 The Play Framework Contributors , 2011-2021 Lightbend Inc. + */ + +package anorm + +private[anorm] object PekkoCompat { + type Seq[T] = _root_.scala.collection.Seq[T] +} diff --git a/pekko/src/test/scala/anorm/PekkoStreamSpec.scala b/pekko/src/test/scala/anorm/PekkoStreamSpec.scala new file mode 100644 index 00000000..9dcaac4b --- /dev/null +++ b/pekko/src/test/scala/anorm/PekkoStreamSpec.scala @@ -0,0 +1,160 @@ +/* + * Copyright (C) from 2022 The Play Framework Contributors , 2011-2021 Lightbend Inc. + */ + +package anorm + +import java.sql.{ Connection, ResultSet } + +import scala.collection.immutable.Seq + +import scala.concurrent.Future +import scala.concurrent.duration._ + +import org.apache.pekko.actor.ActorSystem + +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.{ Keep, Sink, Source } + +import acolyte.jdbc.AcolyteDSL.withQueryResult +import acolyte.jdbc.Implicits._ +import acolyte.jdbc.QueryResult +import acolyte.jdbc.RowLists.stringList + +import org.specs2.concurrent.ExecutionEnv + +final class PekkoStreamSpec(implicit ee: ExecutionEnv) extends org.specs2.mutable.Specification { + + "Pekko Stream".title + + implicit lazy val system: ActorSystem = ActorSystem("anorm-tests") + + implicit def materializer: Materializer = + org.apache.pekko.stream.ActorMaterializer.create(system) + + def assertAllStagesStopped[T](f: => T) = f + + "Pekko Stream" should { + "expose the query result as source" in assertAllStagesStopped { + withQueryResult(stringList :+ "A" :+ "B" :+ "C") { implicit con => + PekkoStream + .source(SQL"SELECT * FROM Test", SqlParser.scalar[String]) + .runWith(Sink.seq[String]) must beTypedEqualTo( + Seq("A", "B", "C") + ).await(0, 5.seconds) + } + } + + "be done if the stream run through" in { + withQueryResult(stringList :+ "A" :+ "B" :+ "C") { implicit con => + PekkoStream + .source(SQL"SELECT * FROM Test", SqlParser.scalar[String]) + .toMat(Sink.ignore)(Keep.left) + .run() must beTypedEqualTo(3).await(0, 3.seconds) + } + } + + "fail materialized value on finished downstream" in assertAllStagesStopped { + val list = stringList :+ "A" :+ "B" :+ "C" + + withQueryResult(list.withCycling(true)) { implicit con => + val killSwitch = org.apache.pekko.stream.KillSwitches.shared("cycling-switch") + + PekkoStream + .source(SQL"SELECT * FROM Test", SqlParser.scalar[String]) + + val p = scala.concurrent.Promise[Int]() + + val res = PekkoStream + .source(SQL"SELECT * FROM Test", SqlParser.scalar[String]) + .mapMaterializedValue(p.completeWith) + .via(killSwitch.flow) + .runWith(Sink.ignore) + .flatMap(_ => p.future) + + Thread.sleep(2000) + killSwitch.shutdown() + + res must throwA[java.util.concurrent.ExecutionException].like { + case e => e.getCause must beAnInstanceOf[InterruptedException] + }.await + } + } + + "manage resources" >> { + def run[T](sink: Sink[String, T])(implicit c: Connection) = { + val graph = source(SQL"SELECT * FROM Test", SqlParser.scalar[String]) + + Source + .fromGraph(graph) + .runWith(sink.mapMaterializedValue { _ => + Option(graph.resultSet) + }) + } + + def runAsync[T](sink: Sink[String, Future[T]])(implicit c: Connection) = { + val graph = source(SQL"SELECT * FROM Test", SqlParser.scalar[String]) + + Source.fromGraph(graph).runWith(sink).map { _ => graph.resultSet } + } + + "on success" in assertAllStagesStopped { + withQueryResult(stringList :+ "A" :+ "B" :+ "C") { implicit con => + runAsync(Sink.seq[String]) must beLike[ResultSet] { + case rs => + (rs.isClosed must beTrue).and(rs.getStatement.isClosed must beTrue).and(con.isClosed must beFalse) + }.await(0, 5.seconds) + } + } + + "on cancellation" in withQueryResult(stringList :+ "A" :+ "B" :+ "C") { implicit con => + assertAllStagesStopped { + val rSet: Option[ResultSet] = run(Sink.cancelled[String]) + + (rSet must beNone).or(rSet must beSome[ResultSet].which { rs => + (rs.isClosed must beTrue).and(rs.getStatement.isClosed must beTrue) + }) + } + } + + "on failed initialization" in { + import java.sql.SQLException + + withQueryResult(QueryResult.Nil) { implicit con => + val failingSql = new Sql { + import java.sql.PreparedStatement + + def unsafeStatement( + connection: Connection, + generatedColumn: String, + generatedColumns: PekkoCompat.Seq[String] + ): PreparedStatement = ??? + + def unsafeStatement(connection: Connection, getGeneratedKeys: Boolean): PreparedStatement = + throw new SQLException("Init failure") + + def resultSetOnFirstRow: Boolean = ??? + } + + val graph = source(failingSql, SqlParser.scalar[String]) + val mat = Source.fromGraph(graph).toMat(Sink.ignore)(Keep.left).run() + + mat must throwA[SQLException]("Init failure").awaitFor(3.seconds) + } + } + + "on failure" in withQueryResult(stringList :+ "A" :+ "B" :+ "C") { implicit con => + assertAllStagesStopped { + val rSet = run(Sink.reduce[String] { (_, _) => sys.error("Foo") }) + + (rSet must beNone).or(rSet must beSome[ResultSet].which { rs => + (rs must beNull).or(rs.isClosed must beTrue) + }) + } + } + } + } + + def source[T](sql: Sql, parser: RowParser[T])(implicit connection: Connection) = + new PekkoStream.ResultSource[T](connection, sql, ColumnAliaser.empty, parser) +}