From cd4f1fd957a0a69964eafaf862cad022b1cf1c70 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:31:42 +0100 Subject: [PATCH] [Java] Simplify synchronous connect. --- .../io/aeron/archive/client/AeronArchive.java | 98 ++++++------------- .../archive/client/ControlResponsePoller.java | 4 +- .../archive/client/AeronArchiveTest.java | 79 +++++++++++++++ 3 files changed, 114 insertions(+), 67 deletions(-) diff --git a/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java b/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java index efa6f7a085..67ec52db24 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java +++ b/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java @@ -231,46 +231,25 @@ public static AeronArchive connect() */ public static AeronArchive connect(final Context ctx) { - Subscription subscription = null; - ExclusivePublication publication = null; - AsyncConnect asyncConnect = null; + final AsyncConnect asyncConnect = asyncConnect(ctx); try { - ctx.conclude(); - - final Aeron aeron = ctx.aeron(); - subscription = aeron.addSubscription(ctx.controlResponseChannel(), ctx.controlResponseStreamId()); - - checkAndSetupResponseChannel(ctx, subscription); - - publication = aeron.addExclusivePublication(ctx.controlRequestChannel(), ctx.controlRequestStreamId()); - final ControlResponsePoller controlResponsePoller = new ControlResponsePoller(subscription); - - final ArchiveProxy archiveProxy = new ArchiveProxy( - publication, - ctx.idleStrategy(), - aeron.context().nanoClock(), - ctx.messageTimeoutNs(), - DEFAULT_RETRY_ATTEMPTS, - ctx.credentialsSupplier()); - - asyncConnect = new AsyncConnect(ctx, controlResponsePoller, archiveProxy); final IdleStrategy idleStrategy = ctx.idleStrategy(); - final AgentInvoker aeronClientInvoker = aeron.conductorAgentInvoker(); + final AgentInvoker aeronClientInvoker = ctx.aeron().conductorAgentInvoker(); final AgentInvoker delegatingInvoker = ctx.agentInvoker(); - int previousStep = asyncConnect.step(); + AsyncConnect.State previousState = asyncConnect.state(); AeronArchive aeronArchive; while (null == (aeronArchive = asyncConnect.poll())) { - if (asyncConnect.step() == previousStep) + if (asyncConnect.state() == previousState) { idleStrategy.idle(); } else { idleStrategy.reset(); - previousStep = asyncConnect.step(); + previousState = asyncConnect.state(); } if (null != aeronClientInvoker) @@ -286,21 +265,11 @@ public static AeronArchive connect(final Context ctx) return aeronArchive; } - catch (final ConcurrentConcludeException ex) - { - throw ex; - } catch (final Exception ex) { - if (!ctx.ownsAeronClient()) - { - CloseHelper.quietClose(subscription); - CloseHelper.quietClose(publication); - } - - CloseHelper.quietCloseAll(asyncConnect, ctx::close); - - throw ex; + final Exception error = quietClose(ex, asyncConnect); + LangUtil.rethrowUnchecked(error); + return null; } } @@ -324,21 +293,8 @@ public static AsyncConnect asyncConnect() */ public static AsyncConnect asyncConnect(final Context ctx) { - try - { - ctx.conclude(); - - return new AsyncConnect(ctx); - } - catch (final ConcurrentConcludeException ex) - { - throw ex; - } - catch (final Exception ex) - { - ctx.close(); - throw ex; - } + ctx.conclude(); + return new AsyncConnect(ctx); } /** @@ -3636,28 +3592,36 @@ public enum State static final int PROTOCOL_VERSION_WITH_ARCHIVE_ID = SemanticVersion.compose(1, 11, 0); private final Context ctx; private final ControlResponsePoller controlResponsePoller; - private ArchiveProxy archiveProxy; private final long deadlineNs; private long publicationRegistrationId = Aeron.NULL_VALUE; private long correlationId = Aeron.NULL_VALUE; private long controlSessionId = Aeron.NULL_VALUE; private byte[] encodedCredentialsFromChallenge = null; private State state = State.ADD_PUBLICATION; + private ArchiveProxy archiveProxy; AsyncConnect(final Context ctx) { - this.ctx = ctx; + try + { + this.ctx = ctx; - final Aeron aeron = ctx.aeron(); + final Aeron aeron = ctx.aeron(); - controlResponsePoller = new ControlResponsePoller( - aeron.addSubscription(ctx.controlResponseChannel(), ctx.controlResponseStreamId())); + controlResponsePoller = new ControlResponsePoller( + aeron.addSubscription(ctx.controlResponseChannel(), ctx.controlResponseStreamId())); - checkAndSetupResponseChannel(ctx, controlResponsePoller.subscription()); + checkAndSetupResponseChannel(ctx, controlResponsePoller.subscription()); - publicationRegistrationId = aeron.asyncAddExclusivePublication( - ctx.controlRequestChannel(), ctx.controlRequestStreamId()); - deadlineNs = aeron.context().nanoClock().nanoTime() + ctx.messageTimeoutNs(); + publicationRegistrationId = aeron.asyncAddExclusivePublication( + ctx.controlRequestChannel(), ctx.controlRequestStreamId()); + deadlineNs = aeron.context().nanoClock().nanoTime() + ctx.messageTimeoutNs(); + } + catch (final Exception ex) + { + close(); + throw ex; + } } AsyncConnect( @@ -3678,12 +3642,14 @@ public void close() { if (State.DONE != state) { - final ErrorHandler errorHandler = ctx.errorHandler(); - CloseHelper.close(errorHandler, controlResponsePoller.subscription()); + if (null != controlResponsePoller) + { + CloseHelper.close(ctx.errorHandler(), controlResponsePoller.subscription()); + } if (null != archiveProxy) { - CloseHelper.close(errorHandler, archiveProxy.publication()); + CloseHelper.close(ctx.errorHandler(), archiveProxy.publication()); } else if (Aeron.NULL_VALUE != publicationRegistrationId) { diff --git a/aeron-archive/src/main/java/io/aeron/archive/client/ControlResponsePoller.java b/aeron-archive/src/main/java/io/aeron/archive/client/ControlResponsePoller.java index 6740ca80fc..510ebbabd2 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/client/ControlResponsePoller.java +++ b/aeron-archive/src/main/java/io/aeron/archive/client/ControlResponsePoller.java @@ -24,6 +24,8 @@ import org.agrona.DirectBuffer; import org.agrona.SemanticVersion; +import java.util.Objects; + /** * Encapsulate the polling and decoding of archive control protocol response messages. */ @@ -76,7 +78,7 @@ public ControlResponsePoller(final Subscription subscription) */ public ControlResponsePoller(final Subscription subscription, final int fragmentLimit) { - this.subscription = subscription; + this.subscription = Objects.requireNonNull(subscription); this.fragmentLimit = fragmentLimit; } diff --git a/aeron-archive/src/test/java/io/aeron/archive/client/AeronArchiveTest.java b/aeron-archive/src/test/java/io/aeron/archive/client/AeronArchiveTest.java index debd496b92..1ed5f6a37e 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/client/AeronArchiveTest.java +++ b/aeron-archive/src/test/java/io/aeron/archive/client/AeronArchiveTest.java @@ -51,6 +51,85 @@ class AeronArchiveTest private final ArchiveProxy archiveProxy = mock(ArchiveProxy.class); private final ErrorHandler errorHandler = mock(ErrorHandler.class); + @Test + void asyncConnectedShouldConcludeContext() + { + final Context ctx = mock(Context.class); + final IllegalStateException expectedException = new IllegalStateException("test"); + doThrow(expectedException).when(ctx).conclude(); + + final IllegalStateException actualException = + assertThrowsExactly(IllegalStateException.class, () -> AeronArchive.asyncConnect(ctx)); + assertSame(expectedException, actualException); + + verify(ctx).conclude(); + verifyNoMoreInteractions(ctx); + } + + @Test + void asyncConnectedShouldCloseContext() + { + final String responseChannel = "aeron:udp?endpoint=localhost:1234"; + final int responseStreamId = 49; + final Context ctx = mock(Context.class); + when(ctx.aeron()).thenReturn(aeron); + when(ctx.controlResponseChannel()).thenReturn(responseChannel); + when(ctx.controlResponseStreamId()).thenReturn(responseStreamId); + final RuntimeException error = new RuntimeException("subscription"); + when(aeron.addSubscription(responseChannel, responseStreamId)).thenThrow(error); + + final RuntimeException actualException = + assertThrowsExactly(RuntimeException.class, () -> AeronArchive.asyncConnect(ctx)); + assertSame(error, actualException); + + final InOrder inOrder = inOrder(ctx, aeron); + inOrder.verify(ctx).conclude(); + inOrder.verify(ctx).aeron(); + inOrder.verify(ctx).controlResponseChannel(); + inOrder.verify(ctx).controlResponseStreamId(); + inOrder.verify(aeron).addSubscription(responseChannel, responseStreamId); + inOrder.verify(ctx).close(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + void asyncConnectedShouldCloseResourceInCaseOfExceptionUponStartup() + { + final String responseChannel = "aeron:udp?endpoint=localhost:0"; + final int responseStreamId = 49; + final String requestChannel = "aeron:udp?endpoint=localhost:1234"; + final int requestStreamId = -15; + final long pubId = -3275938475934759L; + + final Context ctx = mock(Context.class); + when(ctx.aeron()).thenReturn(aeron); + when(ctx.controlResponseChannel()).thenReturn(responseChannel); + when(ctx.controlResponseStreamId()).thenReturn(responseStreamId); + when(ctx.controlRequestChannel()).thenReturn(requestChannel); + when(ctx.controlRequestStreamId()).thenReturn(requestStreamId); + final Subscription subscription = mock(Subscription.class); + when(aeron.addSubscription(responseChannel, responseStreamId)).thenReturn(subscription); + when(aeron.asyncAddExclusivePublication(requestChannel, requestStreamId)).thenReturn(pubId); + final IndexOutOfBoundsException error = new IndexOutOfBoundsException("exception"); + when(aeron.context()).thenThrow(error); + + final IndexOutOfBoundsException actualException = + assertThrowsExactly(IndexOutOfBoundsException.class, () -> AeronArchive.asyncConnect(ctx)); + assertSame(error, actualException); + + final InOrder inOrder = inOrder(ctx, aeron, subscription); + inOrder.verify(ctx).conclude(); + inOrder.verify(ctx).aeron(); + inOrder.verify(ctx).controlResponseChannel(); + inOrder.verify(ctx).controlResponseStreamId(); + inOrder.verify(aeron).addSubscription(responseChannel, responseStreamId); + inOrder.verify(aeron).asyncAddExclusivePublication(requestChannel, requestStreamId); + inOrder.verify(subscription).close(); + inOrder.verify(aeron).asyncRemovePublication(pubId); + inOrder.verify(ctx).close(); + inOrder.verifyNoMoreInteractions(); + } + @Test void closeNotOwningAeronClient() {