Skip to content

Commit

Permalink
[Java] Tidy up of term unblock logic. Issue #424.
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpt777 committed Oct 28, 2017
1 parent e30cc3a commit 779fcad
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,36 @@ public class LogBufferUnblocker
* @param termBuffers for current blockedOffset
* @param logMetaDataBuffer for log buffer
* @param blockedPosition to attempt to unblock
* @param termLength of the buffer for each term in the log
* @return whether unblocked or not
*/
public static boolean unblock(
final UnsafeBuffer[] termBuffers, final UnsafeBuffer logMetaDataBuffer, final long blockedPosition)
final UnsafeBuffer[] termBuffers,
final UnsafeBuffer logMetaDataBuffer,
final long blockedPosition,
final int termLength)
{
final int termLength = termBuffers[0].capacity();
final int positionBitsToShift = Integer.numberOfTrailingZeros(termLength);
final int activeTermCount = activeTermCount(logMetaDataBuffer);
final int expectedTermCount = (int)(blockedPosition >> positionBitsToShift);
final int index = indexByTermCount(expectedTermCount);
final long rawTail = rawTailVolatile(logMetaDataBuffer, index);
final int termId = termId(rawTail);
final int tailOffset = termOffset(rawTail, termLength);
final int blockedTermCount = (int)(blockedPosition >> positionBitsToShift);
final int blockedOffset = computeTermOffsetFromPosition(blockedPosition, positionBitsToShift);
final int activeTermCount = activeTermCount(logMetaDataBuffer);

if (activeTermCount == (expectedTermCount - 1) && blockedOffset == 0)
if (activeTermCount == (blockedTermCount - 1) && blockedOffset == 0)
{
final int currentTermId = termId(rawTailVolatile(logMetaDataBuffer, indexByTermCount(activeTermCount)));
return rotateLog(logMetaDataBuffer, activeTermCount, currentTermId);
}

switch (TermUnblocker.unblock(logMetaDataBuffer, termBuffers[index], blockedOffset, tailOffset, termId))
final int blockedIndex = indexByTermCount(blockedTermCount);
final long rawTail = rawTailVolatile(logMetaDataBuffer, blockedIndex);
final int termId = termId(rawTail);
final int tailOffset = termOffset(rawTail, termLength);
final UnsafeBuffer termBuffer = termBuffers[blockedIndex];

switch (TermUnblocker.unblock(logMetaDataBuffer, termBuffer, blockedOffset, tailOffset, termId))
{
case UNBLOCKED_TO_END:
rotateLog(logMetaDataBuffer, activeTermCount, termId);
rotateLog(logMetaDataBuffer, blockedTermCount, termId);
// fall through
case UNBLOCKED:
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void shouldNotUnblockWhenPositionHasCompleteMessage()

when(termBuffers[activeIndex].getIntVolatile(blockedOffset)).thenReturn(HEADER_LENGTH);

assertFalse(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition));
assertFalse(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH));

final long rawTail = rawTailVolatile(logMetaDataBuffer);
assertThat(computePosition(termId(rawTail), blockedOffset, positionBitsToShift, TERM_ID_1),
Expand All @@ -84,7 +84,7 @@ public void shouldUnblockWhenPositionHasNonCommittedMessageAndTailWithinTerm()

when(termBuffers[activeIndex].getIntVolatile(blockedOffset)).thenReturn(-messageLength);

assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition));
assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH));

final long rawTail = rawTailVolatile(logMetaDataBuffer);
assertThat(computePosition(termId(rawTail), blockedOffset + messageLength, positionBitsToShift, TERM_ID_1),
Expand All @@ -103,7 +103,7 @@ public void shouldUnblockWhenPositionHasNonCommittedMessageAndTailAtEndOfTerm()

logMetaDataBuffer.getAndAddLong(TERM_TAIL_COUNTER_OFFSET, TERM_LENGTH);

assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition));
assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH));

final long rawTail = rawTailVolatile(logMetaDataBuffer);
final int termId = termId(rawTail);
Expand All @@ -121,7 +121,7 @@ public void shouldUnblockWhenPositionHasCommittedMessageAndTailAtEndOfTermButNot
final int termTailCounterTwoOffset = TERM_TAIL_COUNTER_OFFSET + SIZE_OF_LONG;
logMetaDataBuffer.getAndAddLong(TERM_TAIL_COUNTER_OFFSET, TERM_LENGTH);

assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition));
assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH));

final long rawTail = rawTailVolatile(logMetaDataBuffer);
final int termId = termId(rawTail);
Expand All @@ -146,7 +146,7 @@ public void shouldUnblockWhenPositionHasNonCommittedMessageAndTailPastEndOfTerm(

logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, pack(TERM_ID_1, TERM_LENGTH + HEADER_LENGTH));

assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition));
assertTrue(LogBufferUnblocker.unblock(termBuffers, logMetaDataBuffer, blockedPosition, TERM_LENGTH));

final long rawTail = rawTailVolatile(logMetaDataBuffer);
final int termId = termId(rawTail);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ private void checkForBlockedPublisher(final long timeNs)
{
if (timeNs > (timeOfLastConsumerPositionUpdateNs + unblockTimeoutNs))
{
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, consumerPosition))
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, consumerPosition, termBufferLength))
{
unblockedPublications.orderedIncrement();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ private void checkForBlockedPublisher(final long timeNs, final long senderPositi
{
if (timeNs > (timeOfLastActivityNs + unblockTimeoutNs))
{
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition))
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition, termBufferLength))
{
unblockedPublications.orderedIncrement();
}
Expand Down Expand Up @@ -691,7 +691,7 @@ public void onTimeEvent(final long timeNs, final long timeMs, final DriverConduc
final long producerPosition = producerPosition();
if (producerPosition > senderPosition)
{
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition))
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition, termBufferLength))
{
unblockedPublications.orderedIncrement();
timeOfLastActivityNs = timeNs;
Expand Down

0 comments on commit 779fcad

Please sign in to comment.