Skip to content

Commit

Permalink
Support client fragmentation over Barrage (#2069)
Browse files Browse the repository at this point in the history
Client can enabled by setting batchSize on the BarrageSubscriptionOptions.
  • Loading branch information
nbauernfeind committed Mar 11, 2022
1 parent a3ee62d commit aed5d84
Show file tree
Hide file tree
Showing 44 changed files with 1,039 additions and 514 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
/**
* Repository for constants used by {@link ChunkPool} implementations.
*/
final class ChunkPoolConstants {
public final class ChunkPoolConstants {

static final int SMALLEST_POOLED_CHUNK_LOG2_CAPACITY = 5;
// NB: It's probably best for this to allow Barrage delta chunks to be poolable. See
// BarrageMessageProducer.DELTA_CHUNK_SIZE.
@VisibleForTesting
static final int LARGEST_POOLED_CHUNK_LOG2_CAPACITY = 16;
public static final int LARGEST_POOLED_CHUNK_LOG2_CAPACITY = 16;
static final int NUM_POOLED_CHUNK_CAPACITIES =
LARGEST_POOLED_CHUNK_LOG2_CAPACITY - SMALLEST_POOLED_CHUNK_LOG2_CAPACITY + 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import gnu.trove.iterator.TLongIterator;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -19,7 +20,6 @@
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.util.type.TypeUtils;
Expand Down Expand Up @@ -161,29 +161,42 @@ public interface ByteConversion {
ByteConversion IDENTITY = (byte a) -> a;
}

static Chunk<Values> extractChunkFromInputStream(
static WritableChunk<Values> extractChunkFromInputStream(
final int elementSize,
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
final TLongIterator bufferInfoIter,
final DataInput is) throws IOException {
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {
return extractChunkFromInputStreamWithConversion(
elementSize, options, ByteConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is);
elementSize, options, ByteConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}

static Chunk<Values> extractChunkFromInputStreamWithConversion(
static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
final ByteConversion conversion,
final Iterator<FieldNodeInfo> fieldNodeIter,
final TLongIterator bufferInfoIter,
final DataInput is) throws IOException {
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.next();
final long payloadBuffer = bufferInfoIter.next();

final WritableByteChunk<Values> chunk = WritableByteChunk.makeWritableChunk(nodeInfo.numElements);
final WritableByteChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableByteChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableByteChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}

if (nodeInfo.numElements == 0) {
return chunk;
Expand Down Expand Up @@ -214,9 +227,9 @@ static Chunk<Values> extractChunkFromInputStreamWithConversion(
}

if (options.useDeephavenNulls()) {
useDeephavenNulls(conversion, is, nodeInfo, chunk);
useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset);
} else {
useValidityBuffer(elementSize, conversion, is, nodeInfo, chunk, isValid);
useValidityBuffer(elementSize, conversion, is, nodeInfo, chunk, outOffset, isValid);
}

final long overhangPayload = payloadBuffer - payloadRead;
Expand All @@ -225,24 +238,24 @@ static Chunk<Values> extractChunkFromInputStreamWithConversion(
}
}

chunk.setSize(nodeInfo.numElements);
return chunk;
}

private static void useDeephavenNulls(
final ByteConversion conversion,
final DataInput is,
final FieldNodeInfo nodeInfo,
final WritableByteChunk<Values> chunk) throws IOException {
final WritableByteChunk<Values> chunk,
final int offset) throws IOException {
if (conversion == ByteConversion.IDENTITY) {
for (int ii = 0; ii < nodeInfo.numElements; ++ii) {
chunk.set(ii, is.readByte());
chunk.set(offset + ii, is.readByte());
}
} else {
for (int ii = 0; ii < nodeInfo.numElements; ++ii) {
final byte in = is.readByte();
final byte out = in == NULL_BYTE ? in : conversion.apply(in);
chunk.set(ii, out);
chunk.set(offset + ii, out);
}
}
}
Expand All @@ -253,6 +266,7 @@ private static void useValidityBuffer(
final DataInput is,
final FieldNodeInfo nodeInfo,
final WritableByteChunk<Values> chunk,
final int offset,
final WritableLongChunk<Values> isValid) throws IOException {
final int numElements = nodeInfo.numElements;
final int numValidityWords = (numElements + 63) / 64;
Expand All @@ -267,11 +281,11 @@ private static void useValidityBuffer(
if ((validityWord & 1) == 1) {
if (pendingSkips > 0) {
is.skipBytes(pendingSkips * elementSize);
chunk.fillWithNullValue(ei, pendingSkips);
chunk.fillWithNullValue(offset + ei, pendingSkips);
ei += pendingSkips;
pendingSkips = 0;
}
chunk.set(ei++, conversion.apply(is.readByte()));
chunk.set(offset + ei++, conversion.apply(is.readByte()));
validityWord >>= 1;
bitsLeftInThisWord--;
} else {
Expand All @@ -285,7 +299,7 @@ private static void useValidityBuffer(

if (pendingSkips > 0) {
is.skipBytes(pendingSkips * elementSize);
chunk.fillWithNullValue(ei, pendingSkips);
chunk.fillWithNullValue(offset + ei, pendingSkips);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import gnu.trove.iterator.TLongIterator;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -14,7 +15,6 @@
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.WritableCharChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.util.type.TypeUtils;
Expand Down Expand Up @@ -156,29 +156,42 @@ public interface CharConversion {
CharConversion IDENTITY = (char a) -> a;
}

static Chunk<Values> extractChunkFromInputStream(
static WritableChunk<Values> extractChunkFromInputStream(
final int elementSize,
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
final TLongIterator bufferInfoIter,
final DataInput is) throws IOException {
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {
return extractChunkFromInputStreamWithConversion(
elementSize, options, CharConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is);
elementSize, options, CharConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}

static Chunk<Values> extractChunkFromInputStreamWithConversion(
static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
final CharConversion conversion,
final Iterator<FieldNodeInfo> fieldNodeIter,
final TLongIterator bufferInfoIter,
final DataInput is) throws IOException {
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.next();
final long payloadBuffer = bufferInfoIter.next();

final WritableCharChunk<Values> chunk = WritableCharChunk.makeWritableChunk(nodeInfo.numElements);
final WritableCharChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableCharChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableCharChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}

if (nodeInfo.numElements == 0) {
return chunk;
Expand Down Expand Up @@ -209,9 +222,9 @@ static Chunk<Values> extractChunkFromInputStreamWithConversion(
}

if (options.useDeephavenNulls()) {
useDeephavenNulls(conversion, is, nodeInfo, chunk);
useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset);
} else {
useValidityBuffer(elementSize, conversion, is, nodeInfo, chunk, isValid);
useValidityBuffer(elementSize, conversion, is, nodeInfo, chunk, outOffset, isValid);
}

final long overhangPayload = payloadBuffer - payloadRead;
Expand All @@ -220,24 +233,24 @@ static Chunk<Values> extractChunkFromInputStreamWithConversion(
}
}

chunk.setSize(nodeInfo.numElements);
return chunk;
}

private static void useDeephavenNulls(
final CharConversion conversion,
final DataInput is,
final FieldNodeInfo nodeInfo,
final WritableCharChunk<Values> chunk) throws IOException {
final WritableCharChunk<Values> chunk,
final int offset) throws IOException {
if (conversion == CharConversion.IDENTITY) {
for (int ii = 0; ii < nodeInfo.numElements; ++ii) {
chunk.set(ii, is.readChar());
chunk.set(offset + ii, is.readChar());
}
} else {
for (int ii = 0; ii < nodeInfo.numElements; ++ii) {
final char in = is.readChar();
final char out = in == NULL_CHAR ? in : conversion.apply(in);
chunk.set(ii, out);
chunk.set(offset + ii, out);
}
}
}
Expand All @@ -248,6 +261,7 @@ private static void useValidityBuffer(
final DataInput is,
final FieldNodeInfo nodeInfo,
final WritableCharChunk<Values> chunk,
final int offset,
final WritableLongChunk<Values> isValid) throws IOException {
final int numElements = nodeInfo.numElements;
final int numValidityWords = (numElements + 63) / 64;
Expand All @@ -262,11 +276,11 @@ private static void useValidityBuffer(
if ((validityWord & 1) == 1) {
if (pendingSkips > 0) {
is.skipBytes(pendingSkips * elementSize);
chunk.fillWithNullValue(ei, pendingSkips);
chunk.fillWithNullValue(offset + ei, pendingSkips);
ei += pendingSkips;
pendingSkips = 0;
}
chunk.set(ei++, conversion.apply(is.readChar()));
chunk.set(offset + ei++, conversion.apply(is.readChar()));
validityWord >>= 1;
bitsLeftInThisWord--;
} else {
Expand All @@ -280,7 +294,7 @@ private static void useValidityBuffer(

if (pendingSkips > 0) {
is.skipBytes(pendingSkips * elementSize);
chunk.fillWithNullValue(ei, pendingSkips);
chunk.fillWithNullValue(offset + ei, pendingSkips);
}
}
}
Loading

0 comments on commit aed5d84

Please sign in to comment.