Skip to content

Commit

Permalink
Close playframework#545 - Fail materialized value on finished downstream
Browse files Browse the repository at this point in the history
  • Loading branch information
cchantep committed Apr 27, 2023
1 parent 93ecb29 commit fa0e957
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
1 change: 1 addition & 0 deletions akka/src/main/scala/anorm/AkkaStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ object AkkaStream {
}

override def onDownstreamFinish() = {
result.tryFailure(new InterruptedException("Downstream finished"))
release()
super.onDownstreamFinish()
}
Expand Down
27 changes: 27 additions & 0 deletions akka/src/test/scala/anorm/AkkaStreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ final class AkkaStreamSpec(implicit ee: ExecutionEnv) extends org.specs2.mutable
}
}

"fail materialized value on finished downstream" in assertAllStagesStopped {
val list = stringList :+ "A" :+ "B" :+ "C"

withQueryResult(list.withCycling(true)) { implicit con =>
val killSwitch = akka.stream.KillSwitches.shared("cycling-switch")

AkkaStream
.source(SQL"SELECT * FROM Test", SqlParser.scalar[String])

val p = scala.concurrent.Promise[Int]()

val res = AkkaStream
.source(SQL"SELECT * FROM Test", SqlParser.scalar[String])
.mapMaterializedValue(p.completeWith)
.via(killSwitch.flow)
.runWith(Sink.ignore)
.flatMap(_ => p.future)

Thread.sleep(2000)
killSwitch.shutdown()

res must throwA[java.util.concurrent.ExecutionException].like {
case e => e.getCause must beAnInstanceOf[InterruptedException]
}.await
}
}

"manage resources" >> {
def run[T](sink: Sink[String, T])(implicit c: Connection) = {
val graph = source(SQL"SELECT * FROM Test", SqlParser.scalar[String])
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ val specs2Test = Seq(
).map("org.specs2" %% _ % "4.10.6" % Test cross (CrossVersion.for3Use2_13))
.map(_.exclude("org.scala-lang.modules", "*"))

lazy val acolyteVersion = "1.2.5"
lazy val acolyte = "org.eu.acolyte" %% "jdbc-scala" % acolyteVersion % Test
lazy val acolyteVersion = "1.2.7-58199735-SNAPSHOT"
lazy val acolyte = ("org.eu.acolyte" %% "jdbc-scala" % acolyteVersion % Test).changing()

ThisBuild / resolvers ++= Seq("Tatami Snapshots".at("https://raw.github.com/cchantep/tatami/master/snapshots"))

Expand Down

0 comments on commit fa0e957

Please sign in to comment.