Skip to content

Commit

Permalink
[Java] Simplify synchronous connect.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Dec 19, 2024
1 parent 3938474 commit cd4f1fd
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,46 +231,25 @@ public static AeronArchive connect()
*/
public static AeronArchive connect(final Context ctx)
{
Subscription subscription = null;
ExclusivePublication publication = null;
AsyncConnect asyncConnect = null;
final AsyncConnect asyncConnect = asyncConnect(ctx);
try
{
ctx.conclude();

final Aeron aeron = ctx.aeron();
subscription = aeron.addSubscription(ctx.controlResponseChannel(), ctx.controlResponseStreamId());

checkAndSetupResponseChannel(ctx, subscription);

publication = aeron.addExclusivePublication(ctx.controlRequestChannel(), ctx.controlRequestStreamId());
final ControlResponsePoller controlResponsePoller = new ControlResponsePoller(subscription);

final ArchiveProxy archiveProxy = new ArchiveProxy(
publication,
ctx.idleStrategy(),
aeron.context().nanoClock(),
ctx.messageTimeoutNs(),
DEFAULT_RETRY_ATTEMPTS,
ctx.credentialsSupplier());

asyncConnect = new AsyncConnect(ctx, controlResponsePoller, archiveProxy);
final IdleStrategy idleStrategy = ctx.idleStrategy();
final AgentInvoker aeronClientInvoker = aeron.conductorAgentInvoker();
final AgentInvoker aeronClientInvoker = ctx.aeron().conductorAgentInvoker();
final AgentInvoker delegatingInvoker = ctx.agentInvoker();
int previousStep = asyncConnect.step();
AsyncConnect.State previousState = asyncConnect.state();

AeronArchive aeronArchive;
while (null == (aeronArchive = asyncConnect.poll()))
{
if (asyncConnect.step() == previousStep)
if (asyncConnect.state() == previousState)
{
idleStrategy.idle();
}
else
{
idleStrategy.reset();
previousStep = asyncConnect.step();
previousState = asyncConnect.state();
}

if (null != aeronClientInvoker)
Expand All @@ -286,21 +265,11 @@ public static AeronArchive connect(final Context ctx)

return aeronArchive;
}
catch (final ConcurrentConcludeException ex)
{
throw ex;
}
catch (final Exception ex)
{
if (!ctx.ownsAeronClient())
{
CloseHelper.quietClose(subscription);
CloseHelper.quietClose(publication);
}

CloseHelper.quietCloseAll(asyncConnect, ctx::close);

throw ex;
final Exception error = quietClose(ex, asyncConnect);
LangUtil.rethrowUnchecked(error);
return null;
}
}

Expand All @@ -324,21 +293,8 @@ public static AsyncConnect asyncConnect()
*/
public static AsyncConnect asyncConnect(final Context ctx)
{
try
{
ctx.conclude();

return new AsyncConnect(ctx);
}
catch (final ConcurrentConcludeException ex)
{
throw ex;
}
catch (final Exception ex)
{
ctx.close();
throw ex;
}
ctx.conclude();
return new AsyncConnect(ctx);
}

/**
Expand Down Expand Up @@ -3636,28 +3592,36 @@ public enum State
static final int PROTOCOL_VERSION_WITH_ARCHIVE_ID = SemanticVersion.compose(1, 11, 0);
private final Context ctx;
private final ControlResponsePoller controlResponsePoller;
private ArchiveProxy archiveProxy;
private final long deadlineNs;
private long publicationRegistrationId = Aeron.NULL_VALUE;
private long correlationId = Aeron.NULL_VALUE;
private long controlSessionId = Aeron.NULL_VALUE;
private byte[] encodedCredentialsFromChallenge = null;
private State state = State.ADD_PUBLICATION;
private ArchiveProxy archiveProxy;

AsyncConnect(final Context ctx)
{
this.ctx = ctx;
try
{
this.ctx = ctx;

final Aeron aeron = ctx.aeron();
final Aeron aeron = ctx.aeron();

controlResponsePoller = new ControlResponsePoller(
aeron.addSubscription(ctx.controlResponseChannel(), ctx.controlResponseStreamId()));
controlResponsePoller = new ControlResponsePoller(
aeron.addSubscription(ctx.controlResponseChannel(), ctx.controlResponseStreamId()));

checkAndSetupResponseChannel(ctx, controlResponsePoller.subscription());
checkAndSetupResponseChannel(ctx, controlResponsePoller.subscription());

publicationRegistrationId = aeron.asyncAddExclusivePublication(
ctx.controlRequestChannel(), ctx.controlRequestStreamId());
deadlineNs = aeron.context().nanoClock().nanoTime() + ctx.messageTimeoutNs();
publicationRegistrationId = aeron.asyncAddExclusivePublication(
ctx.controlRequestChannel(), ctx.controlRequestStreamId());
deadlineNs = aeron.context().nanoClock().nanoTime() + ctx.messageTimeoutNs();
}
catch (final Exception ex)
{
close();
throw ex;
}
}

AsyncConnect(
Expand All @@ -3678,12 +3642,14 @@ public void close()
{
if (State.DONE != state)
{
final ErrorHandler errorHandler = ctx.errorHandler();
CloseHelper.close(errorHandler, controlResponsePoller.subscription());
if (null != controlResponsePoller)
{
CloseHelper.close(ctx.errorHandler(), controlResponsePoller.subscription());
}

if (null != archiveProxy)
{
CloseHelper.close(errorHandler, archiveProxy.publication());
CloseHelper.close(ctx.errorHandler(), archiveProxy.publication());
}
else if (Aeron.NULL_VALUE != publicationRegistrationId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.agrona.DirectBuffer;
import org.agrona.SemanticVersion;

import java.util.Objects;

/**
* Encapsulate the polling and decoding of archive control protocol response messages.
*/
Expand Down Expand Up @@ -76,7 +78,7 @@ public ControlResponsePoller(final Subscription subscription)
*/
public ControlResponsePoller(final Subscription subscription, final int fragmentLimit)
{
this.subscription = subscription;
this.subscription = Objects.requireNonNull(subscription);
this.fragmentLimit = fragmentLimit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,85 @@ class AeronArchiveTest
private final ArchiveProxy archiveProxy = mock(ArchiveProxy.class);
private final ErrorHandler errorHandler = mock(ErrorHandler.class);

@Test
void asyncConnectedShouldConcludeContext()
{
final Context ctx = mock(Context.class);
final IllegalStateException expectedException = new IllegalStateException("test");
doThrow(expectedException).when(ctx).conclude();

final IllegalStateException actualException =
assertThrowsExactly(IllegalStateException.class, () -> AeronArchive.asyncConnect(ctx));
assertSame(expectedException, actualException);

verify(ctx).conclude();
verifyNoMoreInteractions(ctx);
}

@Test
void asyncConnectedShouldCloseContext()
{
final String responseChannel = "aeron:udp?endpoint=localhost:1234";
final int responseStreamId = 49;
final Context ctx = mock(Context.class);
when(ctx.aeron()).thenReturn(aeron);
when(ctx.controlResponseChannel()).thenReturn(responseChannel);
when(ctx.controlResponseStreamId()).thenReturn(responseStreamId);
final RuntimeException error = new RuntimeException("subscription");
when(aeron.addSubscription(responseChannel, responseStreamId)).thenThrow(error);

final RuntimeException actualException =
assertThrowsExactly(RuntimeException.class, () -> AeronArchive.asyncConnect(ctx));
assertSame(error, actualException);

final InOrder inOrder = inOrder(ctx, aeron);
inOrder.verify(ctx).conclude();
inOrder.verify(ctx).aeron();
inOrder.verify(ctx).controlResponseChannel();
inOrder.verify(ctx).controlResponseStreamId();
inOrder.verify(aeron).addSubscription(responseChannel, responseStreamId);
inOrder.verify(ctx).close();
inOrder.verifyNoMoreInteractions();
}

@Test
void asyncConnectedShouldCloseResourceInCaseOfExceptionUponStartup()
{
final String responseChannel = "aeron:udp?endpoint=localhost:0";
final int responseStreamId = 49;
final String requestChannel = "aeron:udp?endpoint=localhost:1234";
final int requestStreamId = -15;
final long pubId = -3275938475934759L;

final Context ctx = mock(Context.class);
when(ctx.aeron()).thenReturn(aeron);
when(ctx.controlResponseChannel()).thenReturn(responseChannel);
when(ctx.controlResponseStreamId()).thenReturn(responseStreamId);
when(ctx.controlRequestChannel()).thenReturn(requestChannel);
when(ctx.controlRequestStreamId()).thenReturn(requestStreamId);
final Subscription subscription = mock(Subscription.class);
when(aeron.addSubscription(responseChannel, responseStreamId)).thenReturn(subscription);
when(aeron.asyncAddExclusivePublication(requestChannel, requestStreamId)).thenReturn(pubId);
final IndexOutOfBoundsException error = new IndexOutOfBoundsException("exception");
when(aeron.context()).thenThrow(error);

final IndexOutOfBoundsException actualException =
assertThrowsExactly(IndexOutOfBoundsException.class, () -> AeronArchive.asyncConnect(ctx));
assertSame(error, actualException);

final InOrder inOrder = inOrder(ctx, aeron, subscription);
inOrder.verify(ctx).conclude();
inOrder.verify(ctx).aeron();
inOrder.verify(ctx).controlResponseChannel();
inOrder.verify(ctx).controlResponseStreamId();
inOrder.verify(aeron).addSubscription(responseChannel, responseStreamId);
inOrder.verify(aeron).asyncAddExclusivePublication(requestChannel, requestStreamId);
inOrder.verify(subscription).close();
inOrder.verify(aeron).asyncRemovePublication(pubId);
inOrder.verify(ctx).close();
inOrder.verifyNoMoreInteractions();
}

@Test
void closeNotOwningAeronClient()
{
Expand Down

0 comments on commit cd4f1fd

Please sign in to comment.