Skip to content

Commit

Permalink
[Java] Tidy up of Publication tail handling while trying to track dow…
Browse files Browse the repository at this point in the history
…n replay bug.
  • Loading branch information
mjpt777 committed Jul 3, 2017
1 parent fc24dc3 commit 0ce9b6a
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 144 deletions.
24 changes: 13 additions & 11 deletions aeron-client/src/main/java/io/aeron/ExclusivePublication.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,27 @@ public class ExclusivePublication implements AutoCloseable
}

final int termLength = logBuffers.termLength();
this.maxPayloadLength = LogBufferDescriptor.mtuLength(logMetaDataBuffer) - HEADER_LENGTH;
this.maxMessageLength = FrameDescriptor.computeExclusiveMaxMessageLength(termLength);
this.conductor = clientConductor;
maxPayloadLength = LogBufferDescriptor.mtuLength(logMetaDataBuffer) - HEADER_LENGTH;
maxMessageLength = FrameDescriptor.computeExclusiveMaxMessageLength(termLength);
conductor = clientConductor;
this.channel = channel;
this.streamId = streamId;
this.sessionId = sessionId;
this.initialTermId = LogBufferDescriptor.initialTermId(logMetaDataBuffer);
this.logMetaDataBuffer = logMetaDataBuffer;
this.originalRegistrationId = originalRegistrationId;
this.registrationId = registrationId;
this.positionLimit = positionLimit;
this.logBuffers = logBuffers;
this.positionBitsToShift = Integer.numberOfTrailingZeros(termLength);
this.headerWriter = new HeaderWriter(defaultFrameHeader(logMetaDataBuffer));
this.activePartitionIndex = activePartitionIndex(logMetaDataBuffer);
positionBitsToShift = Integer.numberOfTrailingZeros(termLength);
headerWriter = new HeaderWriter(defaultFrameHeader(logMetaDataBuffer));
initialTermId = LogBufferDescriptor.initialTermId(logMetaDataBuffer);

final long rawTail = termAppenders[activePartitionIndex].rawTail();
final int activeIndex = activePartitionIndex(logMetaDataBuffer);
activePartitionIndex = activeIndex;

final long rawTail = rawTail(logMetaDataBuffer, activeIndex);
termId = termId(rawTail);
termOffset = termOffset(rawTail, termLength);
termOffset = termOffset(rawTail);
termBeginPosition = computeTermBeginPosition(termId, positionBitsToShift, initialTermId);
}

Expand Down Expand Up @@ -563,8 +565,8 @@ private long newPosition(final int resultingOffset)
termId = nextTermId;
termBeginPosition = computeTermBeginPosition(nextTermId, positionBitsToShift, initialTermId);

termAppenders[nextIndex].tailTermId(nextTermId);
LogBufferDescriptor.activePartitionIndexOrdered(logMetaDataBuffer, nextIndex);
initialiseTailWithTermId(logMetaDataBuffer, nextIndex, nextTermId);
activePartitionIndexOrdered(logMetaDataBuffer, nextIndex);

return ADMIN_ACTION;
}
Expand Down
6 changes: 3 additions & 3 deletions aeron-client/src/main/java/io/aeron/Publication.java
Original file line number Diff line number Diff line change
Expand Up @@ -507,16 +507,16 @@ void incRef()
private long newPosition(final int index, final int currentTail, final long position, final long result)
{
long newPosition = ADMIN_ACTION;
final int termOffset = TermAppender.termOffset(result);
final int termOffset = termOffset(result);
if (termOffset > 0)
{
newPosition = (position - currentTail) + termOffset;
}
else if (termOffset == TermAppender.TRIPPED)
{
final int nextIndex = nextPartitionIndex(index);
termAppenders[nextIndex].tailTermId(TermAppender.termId(result) + 1);
LogBufferDescriptor.activePartitionIndexOrdered(logMetaDataBuffer, nextIndex);
initialiseTailWithTermId(logMetaDataBuffer, nextIndex, termId(result) + 1);
activePartitionIndexOrdered(logMetaDataBuffer, nextIndex);
}

return newPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,6 @@ public ExclusiveTermAppender(
tailAddressOffset = metaDataBuffer.addressOffset() + tailCounterOffset;
}

/**
* Get the raw value of the current tail.
*
* @return the current tail value.
*/
public long rawTail()
{
return UnsafeAccess.UNSAFE.getLong(tailBuffer, tailAddressOffset);
}

/**
* Set the value for the tail counter.
*
* @param termId for the tail counter
*/
public void tailTermId(final int termId)
{
UnsafeAccess.UNSAFE.putLong(tailBuffer, tailAddressOffset, ((long)termId) << 32);
}

/**
* Claim length of a the term buffer for writing in the message with zero copy semantics.
*
Expand All @@ -113,9 +93,9 @@ public int claim(
final UnsafeBuffer termBuffer = this.termBuffer;
final int termLength = termBuffer.capacity();

putRawTailOrdered(termId, termOffset + alignedLength);

int resultingOffset = termOffset + alignedLength;
putRawTailOrdered(termId, resultingOffset);

if (resultingOffset > termLength)
{
resultingOffset = handleEndOfLogCondition(termBuffer, termOffset, header, termLength, termId);
Expand Down Expand Up @@ -149,9 +129,9 @@ public int appendPadding(
final UnsafeBuffer termBuffer = this.termBuffer;
final int termLength = termBuffer.capacity();

putRawTailOrdered(termId, termOffset + alignedLength);

int resultingOffset = termOffset + alignedLength;
putRawTailOrdered(termId, resultingOffset);

if (resultingOffset > termLength)
{
resultingOffset = handleEndOfLogCondition(termBuffer, termOffset, header, termLength, termId);
Expand Down Expand Up @@ -192,9 +172,9 @@ public int appendUnfragmentedMessage(
final UnsafeBuffer termBuffer = this.termBuffer;
final int termLength = termBuffer.capacity();

putRawTailOrdered(termId, termOffset + alignedLength);

int resultingOffset = termOffset + alignedLength;
putRawTailOrdered(termId, resultingOffset);

if (resultingOffset > termLength)
{
resultingOffset = handleEndOfLogCondition(termBuffer, termOffset, header, termLength, termId);
Expand Down Expand Up @@ -247,9 +227,9 @@ public int appendFragmentedMessage(
final UnsafeBuffer termBuffer = this.termBuffer;
final int termLength = termBuffer.capacity();

putRawTailOrdered(termId, termOffset + requiredLength);

int resultingOffset = termOffset + requiredLength;
putRawTailOrdered(termId, resultingOffset);

if (resultingOffset > termLength)
{
resultingOffset = handleEndOfLogCondition(termBuffer, termOffset, header, termLength, termId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ public static void rotateLog(final UnsafeBuffer logMetaDataBuffer, final int act
public static void initialiseTailWithTermId(
final UnsafeBuffer logMetaData, final int partitionIndex, final int termId)
{
logMetaData.putLong(TERM_TAIL_COUNTERS_OFFSET + (partitionIndex * SIZE_OF_LONG), ((long)termId) << 32);
logMetaData.putLong(TERM_TAIL_COUNTERS_OFFSET + (partitionIndex * SIZE_OF_LONG), packTail(termId, 0));
}

/**
Expand All @@ -563,7 +563,7 @@ public static void initialiseTailWithTermId(
*/
public static int termId(final long rawTail)
{
return (int)(rawTail >>> 32);
return (int)(rawTail >> 32);
}

/**
Expand All @@ -580,6 +580,17 @@ public static int termOffset(final long rawTail, final long termLength)
return (int)Math.min(tail, termLength);
}

/**
* The termOffset as a result of the append
*
* @param result into which the termOffset value has been packed.
* @return the termOffset after the append
*/
public static int termOffset(final long result)
{
return (int)result;
}

/**
* Pack a termId and termOffset into a raw tail value.
*
Expand All @@ -589,7 +600,7 @@ public static int termOffset(final long rawTail, final long termLength)
*/
public static long packTail(final int termId, final int termOffset)
{
return (((long)termId) << 32) + termOffset;
return ((long)termId << 32) | (termOffset & 0xFFFF_FFFFL);
}

/**
Expand All @@ -605,6 +616,18 @@ public static void rawTail(
logMetaDataBuffer.putLong(TERM_TAIL_COUNTERS_OFFSET + (SIZE_OF_LONG * partitionIndex), rawTail);
}

/**
* Get the raw value of the tail for the given partition.
*
* @param logMetaDataBuffer containing the tail counters.
* @param partitionIndex for the tail counter.
* @return the raw value of the tail for the current active partition.
*/
public static long rawTail(final UnsafeBuffer logMetaDataBuffer, final int partitionIndex)
{
return logMetaDataBuffer.getLong(TERM_TAIL_COUNTERS_OFFSET + (SIZE_OF_LONG * partitionIndex));
}

/**
* Set the raw value of the tail for the given partition.
*
Expand Down
49 changes: 3 additions & 46 deletions aeron-client/src/main/java/io/aeron/logbuffer/TermAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static io.aeron.logbuffer.FrameDescriptor.frameLengthOrdered;
import static io.aeron.logbuffer.FrameDescriptor.frameType;
import static io.aeron.logbuffer.LogBufferDescriptor.TERM_TAIL_COUNTERS_OFFSET;
import static io.aeron.logbuffer.LogBufferDescriptor.packTail;
import static io.aeron.logbuffer.LogBufferDescriptor.termId;
import static io.aeron.protocol.DataHeaderFlyweight.*;
import static org.agrona.BitUtil.SIZE_OF_LONG;
import static org.agrona.BitUtil.align;
Expand Down Expand Up @@ -89,16 +91,6 @@ public long rawTailVolatile()
return UnsafeAccess.UNSAFE.getLongVolatile(tailBuffer, tailAddressOffset);
}

/**
* Set the value for the tail counter.
*
* @param termId for the tail counter
*/
public void tailTermId(final int termId)
{
UnsafeAccess.UNSAFE.putLong(tailBuffer, tailAddressOffset, ((long)termId) << 32);
}

/**
* Claim length of a the term buffer for writing in the message with zero copy semantics.
*
Expand Down Expand Up @@ -262,41 +254,6 @@ public long appendFragmentedMessage(
return resultingOffset;
}


/**
* Pack the values for termOffset and termId into a long for returning on the stack.
*
* @param termId value to be packed.
* @param termOffset value to be packed.
* @return a long with both ints packed into it.
*/
public static long pack(final int termId, final int termOffset)
{
return ((long)termId << 32) | (termOffset & 0xFFFF_FFFFL);
}

/**
* The termOffset as a result of the append
*
* @param result into which the termOffset value has been packed.
* @return the termOffset after the append
*/
public static int termOffset(final long result)
{
return (int)result;
}

/**
* The termId in which the append operation took place.
*
* @param result into which the termId value has been packed.
* @return the termId in which the append operation took place.
*/
public static int termId(final long result)
{
return (int)(result >>> 32);
}

private long handleEndOfLogCondition(
final UnsafeBuffer termBuffer,
final long termOffset,
Expand All @@ -320,7 +277,7 @@ private long handleEndOfLogCondition(
}
}

return pack(termId, resultingOffset);
return packTail(termId, resultingOffset);
}

private long getAndAddRawTail(final int alignedLength)
Expand Down
30 changes: 15 additions & 15 deletions aeron-client/src/test/java/io/aeron/logbuffer/TermAppenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.agrona.concurrent.UnsafeBuffer;

import static io.aeron.logbuffer.LogBufferDescriptor.TERM_TAIL_COUNTERS_OFFSET;
import static io.aeron.logbuffer.LogBufferDescriptor.packTail;
import static io.aeron.logbuffer.LogBufferDescriptor.rawTailVolatile;
import static io.aeron.logbuffer.TermAppender.pack;
import static io.aeron.protocol.DataHeaderFlyweight.RESERVED_VALUE_OFFSET;
import static io.aeron.protocol.DataHeaderFlyweight.createDefaultHeader;
import static java.nio.ByteBuffer.allocateDirect;
Expand Down Expand Up @@ -67,10 +67,10 @@ public void shouldPackResult()
final int termId = 7;
final int termOffset = -1;

final long result = pack(termId, termOffset);
final long result = LogBufferDescriptor.packTail(termId, termOffset);

assertThat(TermAppender.termId(result), is(termId));
assertThat(TermAppender.termOffset(result), is(termOffset));
assertThat(LogBufferDescriptor.termId(result), is(termId));
assertThat(LogBufferDescriptor.termOffset(result), is(termOffset));
}

@Test
Expand All @@ -83,13 +83,13 @@ public void shouldAppendFrameToEmptyLog()
final int alignedFrameLength = align(frameLength, FRAME_ALIGNMENT);
final int tail = 0;

logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, pack(TERM_ID, tail));
logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, packTail(TERM_ID, tail));

assertThat(termAppender.appendUnfragmentedMessage(
headerWriter, buffer, 0, msgLength, RVS), is((long)alignedFrameLength));

assertThat(rawTailVolatile(logMetaDataBuffer, PARTITION_INDEX),
is(pack(TERM_ID, tail + alignedFrameLength)));
is(packTail(TERM_ID, tail + alignedFrameLength)));

final InOrder inOrder = inOrder(termBuffer, headerWriter);
inOrder.verify(headerWriter, times(1)).write(termBuffer, tail, frameLength, TERM_ID);
Expand All @@ -108,15 +108,15 @@ public void shouldAppendFrameTwiceToLog()
final int alignedFrameLength = align(frameLength, FRAME_ALIGNMENT);
int tail = 0;

logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, pack(TERM_ID, tail));
logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, packTail(TERM_ID, tail));

assertThat(termAppender.appendUnfragmentedMessage(
headerWriter, buffer, 0, msgLength, RVS), is((long)alignedFrameLength));
assertThat(termAppender.appendUnfragmentedMessage(
headerWriter, buffer, 0, msgLength, RVS), is((long)alignedFrameLength * 2));

assertThat(rawTailVolatile(logMetaDataBuffer, PARTITION_INDEX),
is(pack(TERM_ID, tail + (alignedFrameLength * 2))));
is(packTail(TERM_ID, tail + (alignedFrameLength * 2))));

final InOrder inOrder = inOrder(termBuffer, headerWriter);
inOrder.verify(headerWriter, times(1)).write(termBuffer, tail, frameLength, TERM_ID);
Expand All @@ -141,13 +141,13 @@ public void shouldPadLogAndTripWhenAppendingWithInsufficientRemainingCapacity()
final UnsafeBuffer buffer = new UnsafeBuffer(new byte[128]);
final int frameLength = TERM_BUFFER_LENGTH - tailValue;

logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, pack(TERM_ID, tailValue));
logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, packTail(TERM_ID, tailValue));

final long expectResult = pack(TERM_ID, TRIPPED);
final long expectResult = packTail(TERM_ID, TRIPPED);
assertThat(termAppender.appendUnfragmentedMessage(headerWriter, buffer, 0, msgLength, RVS), is(expectResult));

assertThat(rawTailVolatile(logMetaDataBuffer, PARTITION_INDEX),
is(pack(TERM_ID, tailValue + requiredFrameSize)));
is(packTail(TERM_ID, tailValue + requiredFrameSize)));

final InOrder inOrder = inOrder(termBuffer, headerWriter);
inOrder.verify(headerWriter, times(1)).write(termBuffer, tailValue, frameLength, TERM_ID);
Expand All @@ -165,13 +165,13 @@ public void shouldFragmentMessageOverTwoFrames()
final UnsafeBuffer buffer = new UnsafeBuffer(new byte[msgLength]);
int tail = 0;

logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, pack(TERM_ID, tail));
logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, packTail(TERM_ID, tail));

assertThat(termAppender.appendFragmentedMessage(
headerWriter, buffer, 0, msgLength, MAX_PAYLOAD_LENGTH, RVS), is((long)requiredCapacity));

assertThat(rawTailVolatile(logMetaDataBuffer, PARTITION_INDEX),
is(pack(TERM_ID, tail + requiredCapacity)));
is(packTail(TERM_ID, tail + requiredCapacity)));

final InOrder inOrder = inOrder(termBuffer, headerWriter);
inOrder.verify(headerWriter, times(1)).write(termBuffer, tail, MAX_FRAME_LENGTH, TERM_ID);
Expand All @@ -198,15 +198,15 @@ public void shouldClaimRegionForZeroCopyEncoding()
final int tail = 0;
final BufferClaim bufferClaim = new BufferClaim();

logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, pack(TERM_ID, tail));
logMetaDataBuffer.putLong(TERM_TAIL_COUNTER_OFFSET, packTail(TERM_ID, tail));

assertThat(termAppender.claim(headerWriter, msgLength, bufferClaim), is((long)alignedFrameLength));

assertThat(bufferClaim.offset(), is(tail + headerLength));
assertThat(bufferClaim.length(), is(msgLength));

assertThat(rawTailVolatile(logMetaDataBuffer, PARTITION_INDEX),
is(pack(TERM_ID, tail + alignedFrameLength)));
is(packTail(TERM_ID, tail + alignedFrameLength)));

// Map flyweight or encode to buffer directly then call commit() when done
bufferClaim.commit();
Expand Down
Loading

0 comments on commit 0ce9b6a

Please sign in to comment.