Skip to content

Commit

Permalink
Fix akka#3023
Browse files Browse the repository at this point in the history
Offer new cached connection when previous one has been closed and released
  • Loading branch information
bturos committed Oct 19, 2023
1 parent 0f18616 commit 56ce51c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,12 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
def withAutomaticRelease(automaticRelease: Boolean): AmqpCachedConnectionProvider =
copy(automaticRelease = automaticRelease)

private lazy val connection = provider.get

@tailrec
override def get: Connection = state.get match {
case Empty =>
if (state.compareAndSet(Empty, Connecting)) {
try {
val connection = provider.get
if (!state.compareAndSet(Connecting, Connected(connection, 1)))
throw new ConcurrentModificationException(
"Unexpected concurrent modification while creating the connection."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,21 @@ class AmqpConnectionProvidersSpec extends AmqpSpec {
catch { case e: Throwable => e shouldBe an[ConnectException] }
}
}

"The AMQP Reusable Connection Provider" should {
"open new connection when previous one is forced to close and released" in {
val connectionFactory = new ConnectionFactory()
val connectionProvider = AmqpConnectionFactoryConnectionProvider(connectionFactory)
.withHostAndPort("localhost", 5672)
val reusableConnectionProvider = AmqpCachedConnectionProvider(connectionProvider)
val originalConnection = reusableConnectionProvider.get

originalConnection.isOpen shouldBe true
originalConnection.abort(1, "Forced close")
reusableConnectionProvider.release(originalConnection)

val newConnection = reusableConnectionProvider.get
newConnection should not be (originalConnection)
}
}
}

0 comments on commit 56ce51c

Please sign in to comment.