From fa0e95796c873b0e13c94f3c999c3f7d57be105b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Chantepie?= Date: Thu, 27 Apr 2023 22:51:36 +0200 Subject: [PATCH] Close #545 - Fail materialized value on finished downstream --- akka/src/main/scala/anorm/AkkaStream.scala | 1 + .../src/test/scala/anorm/AkkaStreamSpec.scala | 27 +++++++++++++++++++ build.sbt | 4 +-- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/akka/src/main/scala/anorm/AkkaStream.scala b/akka/src/main/scala/anorm/AkkaStream.scala index 2a02dd26..3471a985 100644 --- a/akka/src/main/scala/anorm/AkkaStream.scala +++ b/akka/src/main/scala/anorm/AkkaStream.scala @@ -180,6 +180,7 @@ object AkkaStream { } override def onDownstreamFinish() = { + result.tryFailure(new InterruptedException("Downstream finished")) release() super.onDownstreamFinish() } diff --git a/akka/src/test/scala/anorm/AkkaStreamSpec.scala b/akka/src/test/scala/anorm/AkkaStreamSpec.scala index 4a4dcbcf..b209f1a7 100644 --- a/akka/src/test/scala/anorm/AkkaStreamSpec.scala +++ b/akka/src/test/scala/anorm/AkkaStreamSpec.scala @@ -56,6 +56,33 @@ final class AkkaStreamSpec(implicit ee: ExecutionEnv) extends org.specs2.mutable } } + "fail materialized value on finished downstream" in assertAllStagesStopped { + val list = stringList :+ "A" :+ "B" :+ "C" + + withQueryResult(list.withCycling(true)) { implicit con => + val killSwitch = akka.stream.KillSwitches.shared("cycling-switch") + + AkkaStream + .source(SQL"SELECT * FROM Test", SqlParser.scalar[String]) + + val p = scala.concurrent.Promise[Int]() + + val res = AkkaStream + .source(SQL"SELECT * FROM Test", SqlParser.scalar[String]) + .mapMaterializedValue(p.completeWith) + .via(killSwitch.flow) + .runWith(Sink.ignore) + .flatMap(_ => p.future) + + Thread.sleep(2000) + killSwitch.shutdown() + + res must throwA[java.util.concurrent.ExecutionException].like { + case e => e.getCause must beAnInstanceOf[InterruptedException] + }.await + } + } + "manage resources" >> { def run[T](sink: Sink[String, T])(implicit c: Connection) = { val graph = source(SQL"SELECT * FROM Test", SqlParser.scalar[String]) diff --git a/build.sbt b/build.sbt index 49996abf..989ea92c 100644 --- a/build.sbt +++ b/build.sbt @@ -23,8 +23,8 @@ val specs2Test = Seq( ).map("org.specs2" %% _ % "4.10.6" % Test cross (CrossVersion.for3Use2_13)) .map(_.exclude("org.scala-lang.modules", "*")) -lazy val acolyteVersion = "1.2.5" -lazy val acolyte = "org.eu.acolyte" %% "jdbc-scala" % acolyteVersion % Test +lazy val acolyteVersion = "1.2.7-58199735-SNAPSHOT" +lazy val acolyte = ("org.eu.acolyte" %% "jdbc-scala" % acolyteVersion % Test).changing() ThisBuild / resolvers ++= Seq("Tatami Snapshots".at("https://raw.github.com/cchantep/tatami/master/snapshots"))