From 779fcad6c307788e0600933c7c35628bf9e7cd0c Mon Sep 17 00:00:00 2001 From: mjpt777 Date: Sat, 28 Oct 2017 10:30:51 +0100 Subject: [PATCH] [Java] Tidy up of term unblock logic. Issue #424. --- .../aeron/logbuffer/LogBufferUnblocker.java | 27 +++++++++++-------- .../logbuffer/LogBufferUnblockerTest.java | 10 +++---- .../java/io/aeron/driver/IpcPublication.java | 2 +- .../io/aeron/driver/NetworkPublication.java | 4 +-- 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferUnblocker.java b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferUnblocker.java index a26b9c41ea..03b7d042c6 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferUnblocker.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferUnblocker.java @@ -30,31 +30,36 @@ public class LogBufferUnblocker * @param termBuffers for current blockedOffset * @param logMetaDataBuffer for log buffer * @param blockedPosition to attempt to unblock + * @param termLength of the buffer for each term in the log * @return whether unblocked or not */ public static boolean unblock( - final UnsafeBuffer[] termBuffers, final UnsafeBuffer logMetaDataBuffer, final long blockedPosition) + final UnsafeBuffer[] termBuffers, + final UnsafeBuffer logMetaDataBuffer, + final long blockedPosition, + final int termLength) { - final int termLength = termBuffers[0].capacity(); final int positionBitsToShift = Integer.numberOfTrailingZeros(termLength); - final int activeTermCount = activeTermCount(logMetaDataBuffer); - final int expectedTermCount = (int)(blockedPosition >> positionBitsToShift); - final int index = indexByTermCount(expectedTermCount); - final long rawTail = rawTailVolatile(logMetaDataBuffer, index); - final int termId = termId(rawTail); - final int tailOffset = termOffset(rawTail, termLength); + final int blockedTermCount = (int)(blockedPosition >> positionBitsToShift); final int blockedOffset = computeTermOffsetFromPosition(blockedPosition, positionBitsToShift); + final int activeTermCount = activeTermCount(logMetaDataBuffer); - if (activeTermCount == (expectedTermCount - 1) && blockedOffset == 0) + if (activeTermCount == (blockedTermCount - 1) && blockedOffset == 0) { final int currentTermId = termId(rawTailVolatile(logMetaDataBuffer, indexByTermCount(activeTermCount))); return rotateLog(logMetaDataBuffer, activeTermCount, currentTermId); } - switch (TermUnblocker.unblock(logMetaDataBuffer, termBuffers[index], blockedOffset, tailOffset, termId)) + final int blockedIndex = indexByTermCount(blockedTermCount); + final long rawTail = rawTailVolatile(logMetaDataBuffer, blockedIndex); + final int termId = termId(rawTail); + final int tailOffset = termOffset(rawTail, termLength); + final UnsafeBuffer termBuffer = termBuffers[blockedIndex]; + + switch (TermUnblocker.unblock(logMetaDataBuffer, termBuffer, blockedOffset, tailOffset, termId)) { case UNBLOCKED_TO_END: - rotateLog(logMetaDataBuffer, activeTermCount, termId); + rotateLog(logMetaDataBuffer, blockedTermCount, termId); // fall through case UNBLOCKED: return true; diff --git a/aeron-client/src/test/java/io/aeron/logbuffer/LogBufferUnblockerTest.java b/aeron-client/src/test/java/io/aeron/logbuffer/LogBufferUnblockerTest.java index 9ec65b9ad7..7b7ab23382 100644 --- a/aeron-client/src/test/java/io/aeron/logbuffer/LogBufferUnblockerTest.java +++ b/aeron-client/src/test/java/io/aeron/logbuffer/LogBufferUnblockerTest.java @@ -67,7 +67,7 @@ public void shouldNotUnblockWhenPositionHasCompleteMessage() when(termBuffers[activeIndex].getIntVolatile(blockedOffset)).thenReturn(HEADER_LENGTH); - assertFalse(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition)); + assertFalse(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH)); final long rawTail = rawTailVolatile(logMetaDataBuffer); assertThat(computePosition(termId(rawTail), blockedOffset, positionBitsToShift, TERM_ID_1), @@ -84,7 +84,7 @@ public void shouldUnblockWhenPositionHasNonCommittedMessageAndTailWithinTerm() when(termBuffers[activeIndex].getIntVolatile(blockedOffset)).thenReturn(-messageLength); - assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition)); + assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH)); final long rawTail = rawTailVolatile(logMetaDataBuffer); assertThat(computePosition(termId(rawTail), blockedOffset + messageLength, positionBitsToShift, TERM_ID_1), @@ -103,7 +103,7 @@ public void shouldUnblockWhenPositionHasNonCommittedMessageAndTailAtEndOfTerm() logMetaDataBuffer.getAndAddLong(TERM_TAIL_COUNTER_OFFSET, TERM_LENGTH); - assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition)); + assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH)); final long rawTail = rawTailVolatile(logMetaDataBuffer); final int termId = termId(rawTail); @@ -121,7 +121,7 @@ public void shouldUnblockWhenPositionHasCommittedMessageAndTailAtEndOfTermButNot final int termTailCounterTwoOffset = TERM_TAIL_COUNTER_OFFSET + SIZE_OF_LONG; logMetaDataBuffer.getAndAddLong(TERM_TAIL_COUNTER_OFFSET, TERM_LENGTH); - assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition)); + assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH)); final long rawTail = rawTailVolatile(logMetaDataBuffer); final int termId = termId(rawTail); @@ -146,7 +146,7 @@ public void shouldUnblockWhenPositionHasNonCommittedMessageAndTailPastEndOfTerm( logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, pack(TERM_ID_1, TERM_LENGTH + HEADER_LENGTH)); - assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition)); + assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH)); final long rawTail = rawTailVolatile(logMetaDataBuffer); final int termId = termId(rawTail); diff --git a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java index 51879603a6..4190a9d290 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java @@ -331,7 +331,7 @@ private void checkForBlockedPublisher(final long timeNs) { if (timeNs > (timeOfLastConsumerPositionUpdateNs + unblockTimeoutNs)) { - if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, consumerPosition)) + if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, consumerPosition, termBufferLength)) { unblockedPublications.orderedIncrement(); } diff --git a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java index f6512c42e8..da6653bafd 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java @@ -604,7 +604,7 @@ private void checkForBlockedPublisher(final long timeNs, final long senderPositi { if (timeNs > (timeOfLastActivityNs + unblockTimeoutNs)) { - if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition)) + if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition, termBufferLength)) { unblockedPublications.orderedIncrement(); } @@ -691,7 +691,7 @@ public void onTimeEvent(final long timeNs, final long timeMs, final DriverConduc final long producerPosition = producerPosition(); if (producerPosition > senderPosition) { - if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition)) + if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition, termBufferLength)) { unblockedPublications.orderedIncrement(); timeOfLastActivityNs = timeNs;