From d421bdef8ff75db179f0508aa64e25737668a23d Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Wed, 18 Dec 2024 14:05:50 +0100 Subject: [PATCH] [Java] Emit WARN event when ControlSession is closed abruptly + add reason to the ControlSession state transition log + increase default stale session check interval to 1s. --- .../io/aeron/agent/ArchiveEventDissector.java | 5 ++ .../io/aeron/agent/ArchiveEventEncoder.java | 16 +++-- .../io/aeron/agent/ArchiveEventLogger.java | 12 ++-- .../io/aeron/agent/ArchiveInterceptor.java | 5 +- .../io/aeron/agent/CommonEventEncoder.java | 5 +- .../agent/ArchiveEventDissectorTest.java | 5 +- .../aeron/agent/ArchiveEventEncoderTest.java | 16 +++-- .../aeron/agent/ArchiveEventLoggerTest.java | 7 ++- .../AbstractListRecordingsSession.java | 2 +- .../main/java/io/aeron/archive/Archive.java | 6 +- .../io/aeron/archive/ArchiveConductor.java | 18 +++--- .../aeron/archive/ControlResponseProxy.java | 15 +++-- .../java/io/aeron/archive/ControlSession.java | 63 ++++++++++--------- .../aeron/archive/ControlSessionAdapter.java | 7 ++- .../CreateReplayPublicationSession.java | 2 +- .../aeron/archive/DeleteSegmentsSession.java | 2 +- .../ListRecordingSubscriptionsSession.java | 2 +- .../io/aeron/archive/RecordingSession.java | 2 +- .../java/io/aeron/archive/ReplaySession.java | 2 +- .../io/aeron/archive/ReplicationSession.java | 2 +- .../main/java/io/aeron/archive/Session.java | 2 +- .../java/io/aeron/archive/ArchiveTest.java | 24 +++++-- 22 files changed, 130 insertions(+), 90 deletions(-) diff --git a/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventDissector.java b/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventDissector.java index 0a20917c31..d214ef8d7b 100644 --- a/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventDissector.java +++ b/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventDissector.java @@ -606,7 +606,12 @@ static void dissectControlSessionStateChange( builder.append(": controlSessionId=").append(controlSessionId); builder.append(" "); + absoluteOffset += buffer.getStringAscii(absoluteOffset, builder, LITTLE_ENDIAN); + absoluteOffset += SIZE_OF_INT; + + builder.append(" reason=\""); buffer.getStringAscii(absoluteOffset, builder, LITTLE_ENDIAN); + builder.append("\""); } static void dissectReplaySessionError( diff --git a/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventEncoder.java b/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventEncoder.java index 76972fdbf3..6d64d8b5aa 100644 --- a/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventEncoder.java +++ b/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventEncoder.java @@ -131,26 +131,32 @@ static > int replicationSessionStateChangeLength(final E from, return stateTransitionStringLength(from, to) + (4 * SIZE_OF_LONG) + (SIZE_OF_INT + reason.length()); } - static > int encodeSessionStateChange( + static > int encodeControlSessionStateChange( final UnsafeBuffer encodingBuffer, final int offset, final int captureLength, final int length, final E from, final E to, - final long id) + final long id, + final String reason) { int encodedLength = encodeLogHeader(encodingBuffer, offset, captureLength, length); encodingBuffer.putLong(offset + encodedLength, id, LITTLE_ENDIAN); encodedLength += SIZE_OF_LONG; - return encodeTrailingStateChange(encodingBuffer, offset, encodedLength, captureLength, from, to); + encodedLength += encodeStateChange(encodingBuffer, offset + encodedLength, from, to); + + encodedLength += encodeTrailingString(encodingBuffer, offset + encodedLength, + captureLength + LOG_HEADER_LENGTH - encodedLength, reason); + + return encodedLength; } - static > int sessionStateChangeLength(final E from, final E to) + static > int sessionStateChangeLength(final E from, final E to, final String reason) { - return stateTransitionStringLength(from, to) + SIZE_OF_LONG; + return stateTransitionStringLength(from, to) + SIZE_OF_LONG + (SIZE_OF_INT + reason.length()); } static void encodeReplaySessionError( diff --git a/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventLogger.java b/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventLogger.java index 5802166a68..f10f9eaad6 100644 --- a/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventLogger.java +++ b/aeron-agent/src/main/java/io/aeron/agent/ArchiveEventLogger.java @@ -259,13 +259,15 @@ public > void logReplicationSessionStateChange( * @param oldState before the change. * @param newState after the change. * @param controlSessionId identity for the control session on the Archive. + * @param reason a string indicating the reason for the state change. */ public > void logControlSessionStateChange( final E oldState, final E newState, - final long controlSessionId) + final long controlSessionId, + final String reason) { - final int length = sessionStateChangeLength(oldState, newState); + final int length = sessionStateChangeLength(oldState, newState, reason); final int captureLength = captureLength(length); final int encodedLength = encodedLength(captureLength); final ManyToOneRingBuffer ringBuffer = this.ringBuffer; @@ -275,15 +277,15 @@ public > void logControlSessionStateChange( { try { - encodeSessionStateChange( + encodeControlSessionStateChange( (UnsafeBuffer)ringBuffer.buffer(), index, captureLength, length, oldState, newState, - controlSessionId - ); + controlSessionId, + reason); } finally { diff --git a/aeron-agent/src/main/java/io/aeron/agent/ArchiveInterceptor.java b/aeron-agent/src/main/java/io/aeron/agent/ArchiveInterceptor.java index 6b2a8253a0..98b7c08ae0 100644 --- a/aeron-agent/src/main/java/io/aeron/agent/ArchiveInterceptor.java +++ b/aeron-agent/src/main/java/io/aeron/agent/ArchiveInterceptor.java @@ -105,9 +105,10 @@ static void logReplicationSessionDone( static class ControlSessionStateChange { @Advice.OnMethodEnter - static > void logStateChange(final E oldState, final E newState, final long controlSessionId) + static > void logStateChange( + final E oldState, final E newState, final long controlSessionId, final String reason) { - LOGGER.logControlSessionStateChange(oldState, newState, controlSessionId); + LOGGER.logControlSessionStateChange(oldState, newState, controlSessionId, reason); } } diff --git a/aeron-agent/src/main/java/io/aeron/agent/CommonEventEncoder.java b/aeron-agent/src/main/java/io/aeron/agent/CommonEventEncoder.java index c355fbee88..e8bc5eb60c 100644 --- a/aeron-agent/src/main/java/io/aeron/agent/CommonEventEncoder.java +++ b/aeron-agent/src/main/java/io/aeron/agent/CommonEventEncoder.java @@ -162,10 +162,7 @@ static int encode( } static > int encodeStateChange( - final UnsafeBuffer encodingBuffer, - final int offset, - final E from, - final E to) + final UnsafeBuffer encodingBuffer, final int offset, final E from, final E to) { int encodedLength = 0; diff --git a/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventDissectorTest.java b/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventDissectorTest.java index c19f0fd056..2523c292ea 100644 --- a/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventDissectorTest.java +++ b/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventDissectorTest.java @@ -906,13 +906,14 @@ void controlSessionStateChange() { internalEncodeLogHeader(buffer, 0, 10, 20, () -> 1_500_000_000L); buffer.putLong(LOG_HEADER_LENGTH, -10_000_000_000L, LITTLE_ENDIAN); - buffer.putStringAscii(LOG_HEADER_LENGTH + SIZE_OF_LONG, "x -> y"); + final int length = buffer.putStringAscii(LOG_HEADER_LENGTH + SIZE_OF_LONG, "x -> y"); + buffer.putStringAscii(LOG_HEADER_LENGTH + SIZE_OF_LONG + length, "the very reason to report"); dissectControlSessionStateChange(buffer, 0, builder); assertEquals("[1.500000000] " + CONTEXT + ": " + CONTROL_SESSION_STATE_CHANGE.name() + " [10/20]:" + " controlSessionId=-10000000000" + - " x -> y", + " x -> y reason=\"the very reason to report\"", builder.toString()); } diff --git a/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventEncoderTest.java b/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventEncoderTest.java index 5c7b7ea7b7..a3f1afd50d 100644 --- a/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventEncoderTest.java +++ b/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventEncoderTest.java @@ -42,20 +42,21 @@ enum State private final UnsafeBuffer buffer = new UnsafeBuffer(new byte[MAX_EVENT_LENGTH]); @Test - void testEncodeSessionStateChange() + void testEncodeControlSessionStateChange() { final int offset = 24; final TimeUnit from = DAYS; final TimeUnit to = MILLISECONDS; final long sessionId = Long.MAX_VALUE; final String payload = from.name() + STATE_SEPARATOR + to.name(); - final int length = payload.length() + SIZE_OF_LONG + SIZE_OF_INT; + final String reason = "test reason for now"; + final int length = payload.length() + SIZE_OF_LONG + SIZE_OF_INT + SIZE_OF_INT + reason.length(); final int captureLength = captureLength(length); - final int encodedLength = encodeSessionStateChange( - buffer, offset, captureLength, length, from, to, sessionId); + final int encodedLength = encodeControlSessionStateChange( + buffer, offset, captureLength, length, from, to, sessionId, reason); - assertEquals(encodedLength(sessionStateChangeLength(from, to)), encodedLength); + assertEquals(encodedLength(sessionStateChangeLength(from, to, reason)), encodedLength); assertEquals(captureLength, buffer.getInt(offset, LITTLE_ENDIAN)); assertEquals(length, buffer.getInt(offset + SIZE_OF_INT, LITTLE_ENDIAN)); assertNotEquals(0, buffer.getLong(offset + SIZE_OF_INT * 2, LITTLE_ENDIAN)); @@ -69,8 +70,11 @@ void testSessionStateChangeLength() final ChronoUnit from = ChronoUnit.ERAS; final ChronoUnit to = ChronoUnit.MILLENNIA; final String payload = from.name() + STATE_SEPARATOR + to.name(); + final String reason = "hfskhflkdhfldshlfkhllkshflkhsldfhaslkfhsaklhflksahdflsahlhalks"; - assertEquals(payload.length() + SIZE_OF_LONG + SIZE_OF_INT, sessionStateChangeLength(from, to)); + assertEquals( + payload.length() + SIZE_OF_LONG + SIZE_OF_INT * 2 + reason.length(), + sessionStateChangeLength(from, to, reason)); } @Test diff --git a/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventLoggerTest.java b/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventLoggerTest.java index a88b05424a..ab16c1d0e4 100644 --- a/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventLoggerTest.java +++ b/aeron-agent/src/test/java/io/aeron/agent/ArchiveEventLoggerTest.java @@ -184,15 +184,18 @@ void logControlSessionStateChange() final ChronoUnit to = ChronoUnit.MICROS; final long id = 555_000_000_000L; final String payload = from.name() + STATE_SEPARATOR + to.name(); - final int captureLength = SIZE_OF_LONG + SIZE_OF_INT + payload.length(); + final String reason = "test reason to check"; + final int captureLength = SIZE_OF_LONG + SIZE_OF_INT + payload.length() + SIZE_OF_INT + reason.length(); - logger.logControlSessionStateChange(from, to, id); + logger.logControlSessionStateChange(from, to, id, reason); verifyLogHeader( logBuffer, offset, CONTROL_SESSION_STATE_CHANGE.toEventCodeId(), captureLength, captureLength); assertEquals(id, logBuffer.getLong(encodedMsgOffset(offset + LOG_HEADER_LENGTH), LITTLE_ENDIAN)); assertEquals( payload, logBuffer.getStringAscii(encodedMsgOffset(offset + LOG_HEADER_LENGTH + SIZE_OF_LONG))); + assertEquals(reason, logBuffer.getStringAscii(encodedMsgOffset( + offset + LOG_HEADER_LENGTH + SIZE_OF_LONG + SIZE_OF_INT + payload.length()))); } @Test diff --git a/aeron-archive/src/main/java/io/aeron/archive/AbstractListRecordingsSession.java b/aeron-archive/src/main/java/io/aeron/archive/AbstractListRecordingsSession.java index 15955e302f..c2b41c6fb0 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/AbstractListRecordingsSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/AbstractListRecordingsSession.java @@ -50,7 +50,7 @@ abstract class AbstractListRecordingsSession implements Session /** * {@inheritDoc} */ - public void abort() + public void abort(final String reason) { isDone = true; } diff --git a/aeron-archive/src/main/java/io/aeron/archive/Archive.java b/aeron-archive/src/main/java/io/aeron/archive/Archive.java index 30d74a2309..1e4af13964 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/Archive.java +++ b/aeron-archive/src/main/java/io/aeron/archive/Archive.java @@ -447,10 +447,10 @@ public static final class Configuration /** * Default time interval in nanoseconds for checking session liveness. * - * @see #CONNECT_TIMEOUT_PROP_NAME + * @see #SESSION_LIVENESS_CHECK_INTERVAL_PROP_NAME */ - @Config(defaultType = DefaultType.LONG, defaultLong = 100 * 1000 * 1000) - public static final long SESSION_LIVENESS_CHECK_INTERVAL_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(100); + @Config(defaultType = DefaultType.LONG, defaultLong = 1000L * 1000 * 1000) + public static final long SESSION_LIVENESS_CHECK_INTERVAL_DEFAULT_NS = TimeUnit.SECONDS.toNanos(1); /** * How long a replay publication should linger after all data is sent. Longer linger can help avoid tail loss. diff --git a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java index e6cd4ca600..a363a9490b 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java @@ -874,7 +874,7 @@ void stopReplay(final long correlationId, final long replaySessionId, final Cont final ReplaySession replaySession = replaySessionByIdMap.get(replaySessionId); if (null != replaySession) { - replaySession.abort(); + replaySession.abort("stop replay"); } controlSession.sendOkResponse(correlationId); @@ -886,7 +886,7 @@ void stopAllReplays(final long correlationId, final long recordingId, final Cont { if (NULL_VALUE == recordingId || replaySession.recordingId() == recordingId) { - replaySession.abort(); + replaySession.abort("stop all replays"); } } @@ -1136,7 +1136,7 @@ void stopRecordingByIdentity(final long correlationId, final long recordingId, f if (null != recordingSession) { - recordingSession.abort(); + recordingSession.abort("stop recording by identity"); final long subscriptionId = recordingSession.subscription().registrationId(); final Subscription subscription = removeRecordingSubscription(subscriptionId); @@ -1187,7 +1187,7 @@ void closeRecordingSession(final RecordingSession session) if (subscriptionRefCountMap.decrementAndGet(subscriptionId) <= 0 || session.isAutoStop()) { - closeAndRemoveRecordingSubscription(subscription); + closeAndRemoveRecordingSubscription(subscription, "close recording session"); } closeSession(session); ctx.recordingSessionCounter().decrementOrdered(); @@ -1317,7 +1317,7 @@ void stopReplication(final long correlationId, final long replicationId, final C } else { - session.abort(); + session.abort("stop replication"); controlSession.sendOkResponse(correlationId); } } @@ -1567,7 +1567,7 @@ private void abortRecordingSessionAndCloseSubscription(final Subscription subscr { if (subscription == session.subscription()) { - session.abort(); + session.abort("stop recording"); } } @@ -1922,7 +1922,7 @@ private void extendRecordingSession( errorHandler.onError(ex); if (autoStop) { - closeAndRemoveRecordingSubscription(image.subscription()); + closeAndRemoveRecordingSubscription(image.subscription(), ex.getMessage()); } } } @@ -2370,7 +2370,7 @@ private boolean eraseRemainingSegment( return true; } - private void closeAndRemoveRecordingSubscription(final Subscription subscription) + private void closeAndRemoveRecordingSubscription(final Subscription subscription, final String reason) { final long subscriptionId = subscription.registrationId(); subscriptionRefCountMap.remove(subscriptionId); @@ -2379,7 +2379,7 @@ private void closeAndRemoveRecordingSubscription(final Subscription subscription { if (subscription == session.subscription()) { - session.abort(); + session.abort(reason); } } diff --git a/aeron-archive/src/main/java/io/aeron/archive/ControlResponseProxy.java b/aeron-archive/src/main/java/io/aeron/archive/ControlResponseProxy.java index 2d9a083edc..2125639c13 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ControlResponseProxy.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ControlResponseProxy.java @@ -18,7 +18,7 @@ import io.aeron.Publication; import io.aeron.Subscription; import io.aeron.archive.client.AeronArchive; -import io.aeron.archive.client.ArchiveException; +import io.aeron.archive.client.ArchiveEvent; import io.aeron.archive.codecs.*; import io.aeron.exceptions.AeronException; import io.aeron.logbuffer.BufferClaim; @@ -218,21 +218,20 @@ private static void checkResult(final ControlSession session, final long result) { if (result == Publication.NOT_CONNECTED) { - session.abort(); - throw new ArchiveException( - "response publication is not connected: " + session, AeronException.Category.WARN); + session.abort("response publication is not connected"); + throw new ArchiveEvent("response publication is not connected: " + session); } if (result == Publication.CLOSED) { - session.abort(); - throw new ArchiveException("response publication is closed: " + session); + session.abort("response publication is closed"); + throw new ArchiveEvent("response publication is closed: " + session, AeronException.Category.ERROR); } if (result == Publication.MAX_POSITION_EXCEEDED) { - session.abort(); - throw new ArchiveException("response publication at max position: " + session); + session.abort("response publication at max position"); + throw new ArchiveEvent("response publication at max position: " + session, AeronException.Category.ERROR); } } diff --git a/aeron-archive/src/main/java/io/aeron/archive/ControlSession.java b/aeron-archive/src/main/java/io/aeron/archive/ControlSession.java index 08a35a9a58..09d7ab521d 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ControlSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ControlSession.java @@ -18,6 +18,7 @@ import io.aeron.Aeron; import io.aeron.ExclusivePublication; import io.aeron.Subscription; +import io.aeron.archive.client.ArchiveEvent; import io.aeron.archive.codecs.ControlResponseCode; import io.aeron.archive.codecs.RecordingSignal; import io.aeron.archive.codecs.SourceLocation; @@ -39,10 +40,12 @@ */ final class ControlSession implements Session { + static final String SESSION_CLOSED_MSG = "session closed"; private static final long RESEND_INTERVAL_MS = 200L; private static final String SESSION_REJECTED_MSG = "authentication rejected"; private final Thread conductorThread; private long sessionLivenessCheckDeadlineMs; + private String abortReason; enum State { @@ -71,7 +74,6 @@ enum State private final ControlSessionAdapter controlSessionAdapter; private final String invalidVersionMessage; private State state = State.INIT; - private boolean isInactive = false; ControlSession( final long controlSessionId, @@ -117,12 +119,13 @@ public long sessionId() /** * {@inheritDoc} */ - public void abort() + public void abort(final String reason) { - state(State.DONE); + abortReason = reason; + state(State.DONE, reason); if (null != activeListing) { - activeListing.abort(); + activeListing.abort(reason); } } @@ -133,7 +136,7 @@ public void close() { if (null != activeListing) { - activeListing.abort(); + activeListing.abort(abortReason); } if (null == controlPublication) @@ -150,6 +153,12 @@ public void close() { conductor.context().controlSessionsCounter().decrementOrdered(); } + + if (null != abortReason && !SESSION_CLOSED_MSG.equals(abortReason)) + { + conductor.errorHandler.onError(new ArchiveEvent( + "controlSessionId=" + controlSessionId + " terminated: " + abortReason)); + } } /** @@ -170,7 +179,10 @@ public int doWork() if (hasNoActivity(nowMs)) { - state(State.INACTIVE); + abortReason = State.ACTIVE == state ? + "failed to send response for more than connectTimeoutMs=" + connectTimeoutMs : + "failed to establish initial connection: state=" + state; + state(State.INACTIVE, abortReason); workCount++; } @@ -214,8 +226,7 @@ public int doWork() break; case INACTIVE: - isInactive = true; - state(State.DONE); + state(State.DONE, "inactive"); break; case DONE: @@ -225,11 +236,6 @@ public int doWork() return workCount; } - boolean isInactive() - { - return isInactive; - } - byte[] encodedPrincipal() { return encodedPrincipal; @@ -776,19 +782,19 @@ int maxPayloadLength() void challenged() { - state(State.CHALLENGED); + state(State.CHALLENGED, "challenged"); } void authenticate(final byte[] encodedPrincipal) { this.encodedPrincipal = encodedPrincipal; activityDeadlineMs = Aeron.NULL_VALUE; - state(State.AUTHENTICATED); + state(State.AUTHENTICATED, "authenticated"); } void reject() { - state(State.REJECTED); + state(State.REJECTED, SESSION_REJECTED_MSG); } private void assertCalledOnConductorThread() @@ -821,7 +827,7 @@ private int waitForPublication(final long nowMs) { controlPublication = publication; activityDeadlineMs = nowMs + connectTimeoutMs; - state(State.CONNECTING); + state(State.CONNECTING, "connecting"); workCount += 1; } @@ -834,7 +840,7 @@ private int waitForConnection(final long nowMs) if (controlPublication.isConnected()) { - state(State.CONNECTED); + state(State.CONNECTED, "connected"); workCount += 1; } @@ -904,7 +910,8 @@ private int sendResponses(final long nowMs) if (!controlPublication.isConnected()) { - state(State.INACTIVE); + abortReason = "control publication not connected"; + state(State.INACTIVE, abortReason); workCount++; } else @@ -946,12 +953,7 @@ private int sendReject(final long nowMs) { resendDeadlineMs = nowMs + RESEND_INTERVAL_MS; controlResponseProxy.sendResponse( - controlSessionId, - correlationId, - AUTHENTICATION_REJECTED, - ERROR, - SESSION_REJECTED_MSG, - this); + controlSessionId, correlationId, AUTHENTICATION_REJECTED, ERROR, SESSION_REJECTED_MSG, this); workCount += 1; } @@ -976,19 +978,20 @@ private void attemptToActivate() { if (State.AUTHENTICATED == state && null == invalidVersionMessage) { - state(State.ACTIVE); + state(State.ACTIVE, "active"); } } - private void state(final State state) + private void state(final State state, final String reason) { - logStateChange(this.state, state, controlSessionId); + logStateChange(this.state, state, controlSessionId, reason); this.state = state; } - private void logStateChange(final State oldState, final State newState, final long controlSessionId) + private void logStateChange( + final State oldState, final State newState, final long controlSessionId, final String reason) { -// System.out.println(controlSessionId + ": " + oldState + " -> " + newState); +// System.out.println(controlSessionId + ": " + oldState + " -> " + newState + ", reason=\"" + reason + "\""); } public String toString() diff --git a/aeron-archive/src/main/java/io/aeron/archive/ControlSessionAdapter.java b/aeron-archive/src/main/java/io/aeron/archive/ControlSessionAdapter.java index d0fd98e443..fcdc322b8f 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ControlSessionAdapter.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ControlSessionAdapter.java @@ -127,7 +127,7 @@ public void onFragment(final DirectBuffer buffer, final int offset, final int le final SessionInfo info = controlSessionByIdMap.get(controlSessionId); if (null != info) { - info.controlSession.abort(); + info.controlSession.abort(ControlSession.SESSION_CLOSED_MSG); } break; } @@ -1053,7 +1053,10 @@ void abortControlSessionByImage(final Image image) { if (info.image == image) { - info.controlSession.abort(); + info.controlSession.abort("request publication image unavailable:" + + " image.correlationId=" + image.correlationId() + + " image.sessionId=" + image.sessionId() + + " channel=" + image.subscription().channel()); break; } } diff --git a/aeron-archive/src/main/java/io/aeron/archive/CreateReplayPublicationSession.java b/aeron-archive/src/main/java/io/aeron/archive/CreateReplayPublicationSession.java index c82f5fa696..0b74430711 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/CreateReplayPublicationSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/CreateReplayPublicationSession.java @@ -86,7 +86,7 @@ public void close() /** * {@inheritDoc} */ - public void abort() + public void abort(final String reason) { isDone = true; } diff --git a/aeron-archive/src/main/java/io/aeron/archive/DeleteSegmentsSession.java b/aeron-archive/src/main/java/io/aeron/archive/DeleteSegmentsSession.java index bec34e495d..e42a93f553 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/DeleteSegmentsSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/DeleteSegmentsSession.java @@ -79,7 +79,7 @@ public void close() /** * {@inheritDoc} */ - public void abort() + public void abort(final String reason) { } diff --git a/aeron-archive/src/main/java/io/aeron/archive/ListRecordingSubscriptionsSession.java b/aeron-archive/src/main/java/io/aeron/archive/ListRecordingSubscriptionsSession.java index 42b7e44c99..401b56191e 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ListRecordingSubscriptionsSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ListRecordingSubscriptionsSession.java @@ -63,7 +63,7 @@ public void close() /** * {@inheritDoc} */ - public void abort() + public void abort(final String reason) { isDone = true; } diff --git a/aeron-archive/src/main/java/io/aeron/archive/RecordingSession.java b/aeron-archive/src/main/java/io/aeron/archive/RecordingSession.java index eb9b2886d2..91b31e1240 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/RecordingSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/RecordingSession.java @@ -98,7 +98,7 @@ public boolean isDone() /** * {@inheritDoc} */ - public void abort() + public void abort(final String reason) { isAborted = true; } diff --git a/aeron-archive/src/main/java/io/aeron/archive/ReplaySession.java b/aeron-archive/src/main/java/io/aeron/archive/ReplaySession.java index 2d9e980a04..9cc71d0d83 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ReplaySession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ReplaySession.java @@ -215,7 +215,7 @@ public int doWork() /** * {@inheritDoc} */ - public void abort() + public void abort(final String reason) { isAborted = true; } diff --git a/aeron-archive/src/main/java/io/aeron/archive/ReplicationSession.java b/aeron-archive/src/main/java/io/aeron/archive/ReplicationSession.java index d223384b8b..0d3d5c9a89 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ReplicationSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ReplicationSession.java @@ -163,7 +163,7 @@ public boolean isDone() /** * {@inheritDoc} */ - public void abort() + public void abort(final String reason) { this.state(State.DONE, "abort"); } diff --git a/aeron-archive/src/main/java/io/aeron/archive/Session.java b/aeron-archive/src/main/java/io/aeron/archive/Session.java index a67da5e58a..61ad88caf3 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/Session.java +++ b/aeron-archive/src/main/java/io/aeron/archive/Session.java @@ -22,7 +22,7 @@ */ interface Session { - void abort(); + void abort(String reason); boolean isDone(); diff --git a/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java b/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java index c99d06c16b..3cae951356 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java +++ b/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java @@ -25,12 +25,14 @@ import io.aeron.archive.Archive.Context; import io.aeron.archive.checksum.Checksum; import io.aeron.archive.client.AeronArchive; +import io.aeron.archive.client.ArchiveEvent; import io.aeron.archive.client.ArchiveException; import io.aeron.archive.client.RecordingSubscriptionDescriptorConsumer; import io.aeron.archive.codecs.TruncateRecordingRequestDecoder; import io.aeron.archive.status.RecordingPos; import io.aeron.driver.MediaDriver; import io.aeron.driver.ThreadingMode; +import io.aeron.exceptions.AeronException; import io.aeron.logbuffer.BufferClaim; import io.aeron.logbuffer.FragmentHandler; import io.aeron.logbuffer.FrameDescriptor; @@ -44,6 +46,7 @@ import io.aeron.test.Tests; import org.agrona.CloseHelper; import org.agrona.DirectBuffer; +import org.agrona.ErrorHandler; import org.agrona.IoUtil; import org.agrona.concurrent.IdleStrategy; import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue; @@ -60,6 +63,7 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import java.io.ByteArrayOutputStream; import java.io.File; @@ -91,8 +95,7 @@ import static org.hamcrest.Matchers.*; import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.params.provider.EnumSource.Mode.EXCLUDE; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @ExtendWith(InterruptingTestCallback.class) @SuppressWarnings("try") @@ -867,6 +870,7 @@ private static int calculateFragmentedMessageLength(final Publication publicatio void shouldTimeoutInactiveArchiveClients(final String controlRequestChannel, final String controlResponseChannel) { final long archiveId = -743746574; + final ErrorHandler errorHandler = mock(ErrorHandler.class); try (MediaDriver driver = MediaDriver.launch(new MediaDriver.Context() .aeronDirectoryName(CommonContext.generateRandomDirName()) .statusMessageTimeoutNs(TimeUnit.MILLISECONDS.toNanos(80)) @@ -881,8 +885,9 @@ void shouldTimeoutInactiveArchiveClients(final String controlRequestChannel, fin .archiveId(archiveId) .archiveDir(tmpDir.resolve("archive").toFile()) .aeronDirectoryName(driver.context().aeronDirectoryName()) - .connectTimeoutNs(TimeUnit.MILLISECONDS.toNanos(500)) - .sessionLivenessCheckIntervalNs(TimeUnit.MILLISECONDS.toNanos(1)))) + .connectTimeoutNs(TimeUnit.MILLISECONDS.toNanos(678)) + .sessionLivenessCheckIntervalNs(TimeUnit.MILLISECONDS.toNanos(1)) + .errorHandler(errorHandler))) { final AeronArchive.Context ctx = new AeronArchive.Context() .aeronDirectoryName(driver.context().aeronDirectoryName()) @@ -914,6 +919,17 @@ void shouldTimeoutInactiveArchiveClients(final String controlRequestChannel, fin endNs - startNs, greaterThanOrEqualTo(timeToFillResponseWindowNs + archive.context().connectTimeoutNs())); + assertEquals(1, archive.context().errorCounter().get()); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(errorHandler, timeout(1000)).onError(captor.capture()); + final ArchiveEvent event = assertInstanceOf(ArchiveEvent.class, captor.getValue()); + assertEquals(AeronException.Category.WARN, event.category()); + assertEquals( + "WARN - controlSessionId=" + client2.controlSessionId() + " terminated: " + + "failed to send response for more than connectTimeoutMs=" + + TimeUnit.NANOSECONDS.toMillis(archive.context().connectTimeoutNs()), + event.getMessage()); + while (client2.controlResponsePoller().subscription().isConnected()) { assertNull(client1.pollForErrorResponse());