Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming with Akka-Anorm generates a materialized value as Future that may never complete #545

Closed
gaeljw opened this issue Apr 26, 2023 · 1 comment

Comments

@gaeljw
Copy link
Member

gaeljw commented Apr 26, 2023

Anorm Version (2.5.x / etc)

2.7.0

Operating System (Ubuntu 15.10 / MacOS 10.10 / Windows 10)

Linux ... 6.2.9-200.fc37.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Mar 30 22:31:57 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

JDK (Oracle 1.8.0_72, OpenJDK 1.8.x, Azul Zing)

openjdk version "11.0.18" 2023-01-17
OpenJDK Runtime Environment (Red_Hat-11.0.18.0.10-1.fc37) (build 11.0.18+10)
OpenJDK 64-Bit Server VM (Red_Hat-11.0.18.0.10-1.fc37) (build 11.0.18+10, mixed mode, sharing)

Library Dependencies

Not relevant

Actual vs. Expected Behavior

We're using Anorm with Akka to stream long-running SQL queries results.

To do so, we have to manage the Connection manually (open and close it when appropriate). Our initial implementation used the materialized value of Akka-Anorm to close connection like this:

implicit val connection: Connection = ???

AkkaStream
  .source(SQL(sqlQuery), rowParser)
  .mapMaterializedValue { rowsRead: Future[Int] =>
    rowsRead.onComplete {
      case Success(_) =>
        connection.close()
      case Failure(ex) =>
        connection.close()
    }
  }

However, we noticed that the rowsRead materialized value above never completes if the consumer of the stream close it while the SQL query is still running and pulling data.

This leads to connection not closed and exhaustion of connection pool when using one.

More generally, the actual issue is that the materialized value may never complete.

Reproducible Test Case

I do have one in my project but I need to make it more minimal before sharing. I will update the issue if you feel it's needed.

The general idea to reproduce is to do something like this:

// Create a fake DB (H2 for instance) with just a few rows of data

val sqlQuery = "SELECT * FROM table"
val rowParser: RowParser = ???

val sharedKillSwitch = KillSwitches.shared("my-kill-switch")

val sqlSource = 
  AkkaStream
    .source(SQL(sqlQuery), rowParser)
    .mapMaterializedValue { rowsRead: Future[Int] =>
      rowsRead.onComplete {
        case Success(_) =>
          connection.close()
        case Failure(ex) =>
          connection.close()
      }
  }
  // Throttle to let us time to simulate consumer of the data stopping
  .throttle(1, 2.seconds)
  .wireTap(_ => println("Source emitted one item..."))
  .via(sharedKillSwitch.flow)

val runResult: Future[Seq[???]] = sqlSource.runWith(Sink.seq)

// Wait for 2 items to have been emitted and simulate consumer stopping
Thread.sleep(3000)
println("Shutdown")
sharedKillSwitch.shutdown()

val result = Await.result(runResult)

// Assert that connection.isClosed is false even 10 seconds later (or add logs or AtomicBoolean somehow to show that the `onComplete` of the materialized value is never actually called)

Possible explanation

Looking at the source code, I believe it's because the materialized value is managed through a Promise (https://github.com/playframework/anorm/blob/main/akka/src/main/scala/anorm/AkkaStream.scala#L122) which may never complete like in the case described above which I believe would trigger the onDownstreamFinish method (https://github.com/playframework/anorm/blob/main/akka/src/main/scala/anorm/AkkaStream.scala#L182).

@gaeljw gaeljw changed the title Streaming with Akka-Anorm generates a materialized value as Future that may never completes Streaming with Akka-Anorm generates a materialized value as Future that may never complete Apr 26, 2023
@gaeljw
Copy link
Member Author

gaeljw commented Apr 26, 2023

Adding that a "workaround" is to use a watchTermination on the Akka stream and close connections when stream completes.

...
  .watchTermination()((_, done) => {
     done.onComplete {
        case Success(_) =>
          connection.close()
        case Failure(ex) =>
          connection.close()
     }
     NotUsed // Or something else
  })

But, if I understand correctly, this means that connection would be kept open until completion of the stream even if the SQL query itself doesn't need it anymore. This could be non desirable depending on downstream steps of the stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant