diff --git a/engine/chunk/src/main/java/io/deephaven/chunk/ByteChunkToOutputStreamAdapter.java b/engine/chunk/src/main/java/io/deephaven/chunk/ByteChunkToOutputStreamAdapter.java new file mode 100644 index 00000000000..ea2ab47def2 --- /dev/null +++ b/engine/chunk/src/main/java/io/deephaven/chunk/ByteChunkToOutputStreamAdapter.java @@ -0,0 +1,12 @@ +package io.deephaven.chunk; + +import io.deephaven.chunk.attributes.Any; + +import java.io.IOException; +import java.io.OutputStream; + +public class ByteChunkToOutputStreamAdapter { + public static void write(OutputStream stream, ByteChunk chunk, int srcOffset, int length) throws IOException { + stream.write(chunk.data, chunk.offset + srcOffset, length); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java index c1b3e82e1b7..3a5d6c06b28 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java @@ -8,6 +8,7 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.base.log.LogOutputAppendable; import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.util.pools.ChunkPoolConstants; import io.deephaven.configuration.Configuration; import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.rowset.*; @@ -79,6 +80,8 @@ public NoSnapshotAllowedException(String reason) { private static final int MAX_CONCURRENT_ATTEMPT_DURATION_MILLIS = Configuration.getInstance() .getIntegerWithDefault("ConstructSnapshot.maxConcurrentAttemptDurationMillis", 5000); + public static final int SNAPSHOT_CHUNK_SIZE = 1 << ChunkPoolConstants.LARGEST_POOLED_CHUNK_LOG2_CAPACITY; + /** * Holder for thread-local state. */ @@ -1316,8 +1319,6 @@ public static boolean serializeAllTable(final boolean usePrev, snapshot.rowsIncluded = snapshot.rowsAdded.copy(); } - LongSizedDataStructure.intSize("construct snapshot", snapshot.rowsIncluded.size()); - final Map sourceMap = table.getColumnSourceMap(); final String[] columnSources = sourceMap.keySet().toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY); @@ -1340,16 +1341,19 @@ public static boolean serializeAllTable(final boolean usePrev, final RowSet rows = columnIsEmpty ? RowSetFactory.empty() : snapshot.rowsIncluded; // Note: cannot use shared context across several calls of differing lengths and no sharing necessary // when empty - acd.data = getSnapshotDataAsChunk(columnSource, columnIsEmpty ? null : sharedContext, rows, usePrev); + acd.data = + getSnapshotDataAsChunkList(columnSource, columnIsEmpty ? null : sharedContext, rows, usePrev); acd.type = columnSource.getType(); acd.componentType = columnSource.getComponentType(); + acd.chunkType = columnSource.getChunkType(); final BarrageMessage.ModColumnData mcd = new BarrageMessage.ModColumnData(); snapshot.modColumnData[ii] = mcd; mcd.rowsModified = RowSetFactory.empty(); - mcd.data = getSnapshotDataAsChunk(columnSource, null, RowSetFactory.empty(), usePrev); + mcd.data = getSnapshotDataAsChunkList(columnSource, null, RowSetFactory.empty(), usePrev); mcd.type = acd.type; mcd.componentType = acd.componentType; + mcd.chunkType = columnSource.getChunkType(); } } @@ -1430,6 +1434,52 @@ private static WritableChunk getSnapshotDataAsChunk(final ColumnSour } } + private static ArrayList> getSnapshotDataAsChunkList(final ColumnSource columnSource, + final SharedContext sharedContext, final RowSet rowSet, final boolean usePrev) { + final ColumnSource sourceToUse = ReinterpretUtils.maybeConvertToPrimitive(columnSource); + long offset = 0; + final long size = rowSet.size(); + final ArrayList> result = new ArrayList<>(); + + if (size == 0) { + return result; + } + + final int maxChunkSize = (int) Math.min(size, SNAPSHOT_CHUNK_SIZE); + + try (final ColumnSource.FillContext context = sourceToUse.makeFillContext(maxChunkSize, sharedContext); + final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) { + int chunkSize = maxChunkSize; + while (it.hasMore()) { + final RowSequence reducedRowSet = it.getNextRowSequenceWithLength(chunkSize); + final ChunkType chunkType = sourceToUse.getChunkType(); + + // create a new chunk + WritableChunk currentChunk = chunkType.makeWritableChunk(chunkSize); + + if (usePrev) { + sourceToUse.fillPrevChunk(context, currentChunk, reducedRowSet); + } else { + sourceToUse.fillChunk(context, currentChunk, reducedRowSet); + } + + // add the chunk to the current list + result.add(currentChunk); + + // increment the offset for the next chunk (using the actual values written) + offset += currentChunk.size(); + + // recompute the size of the next chunk + if (size - offset > maxChunkSize) { + chunkSize = maxChunkSize; + } else { + chunkSize = (int) (size - offset); + } + } + } + return result; + } + /** * Estimate the size of a complete table snapshot in bytes. * diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java index c2b9ff4be1a..fb331bce18f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java @@ -5,12 +5,14 @@ package io.deephaven.engine.table.impl.util; import io.deephaven.chunk.Chunk; +import io.deephaven.chunk.ChunkType; import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.util.SafeCloseable; +import java.util.ArrayList; import java.util.BitSet; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -28,13 +30,15 @@ public static class ModColumnData { public RowSet rowsModified; public Class type; public Class componentType; - public Chunk data; + public ArrayList> data; + public ChunkType chunkType; } public static class AddColumnData { public Class type; public Class componentType; - public Chunk data; + public ArrayList> data; + public ChunkType chunkType; } public long firstSeq = -1; @@ -94,9 +98,13 @@ public void close() { continue; } - if (acd.data instanceof PoolableChunk) { - ((PoolableChunk) acd.data).close(); + for (Chunk chunk : acd.data) { + if (chunk instanceof PoolableChunk) { + ((PoolableChunk) chunk).close(); + } } + + acd.data.clear(); } } if (modColumnData != null) { @@ -105,12 +113,13 @@ public void close() { continue; } - if (mcd.rowsModified != null) { - mcd.rowsModified.close(); - } - if (mcd.data instanceof PoolableChunk) { - ((PoolableChunk) mcd.data).close(); + for (Chunk chunk : mcd.data) { + if (chunk instanceof PoolableChunk) { + ((PoolableChunk) chunk).close(); + } } + + mcd.data.clear(); } } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java index 49d81aa54f9..fa2f69129c9 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java @@ -2404,8 +2404,10 @@ public void testUngroupConstructSnapshotOfBoxedNull() { try (final BarrageMessage snap = ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable) ungrouped)) { assertEquals(snap.rowsAdded, i(0, 1, 2)); - assertEquals(snap.addColumnData[0].data.asIntChunk().get(0), io.deephaven.util.QueryConstants.NULL_INT); - assertEquals(snap.addColumnData[1].data.asIntChunk().get(2), io.deephaven.util.QueryConstants.NULL_INT); + assertEquals(snap.addColumnData[0].data.get(0).asIntChunk().get(0), + io.deephaven.util.QueryConstants.NULL_INT); + assertEquals(snap.addColumnData[1].data.get(0).asIntChunk().get(2), + io.deephaven.util.QueryConstants.NULL_INT); } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarBinaryChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarBinaryChunkInputStreamGenerator.java index 4650e1652db..d0f71863615 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarBinaryChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarBinaryChunkInputStreamGenerator.java @@ -7,35 +7,180 @@ import com.google.common.io.LittleEndianDataOutputStream; import gnu.trove.iterator.TLongIterator; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.*; import io.deephaven.chunk.attributes.ChunkPositions; import io.deephaven.chunk.attributes.Values; +import io.deephaven.chunk.util.pools.ChunkPoolConstants; import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.SafeCloseable; import io.deephaven.util.datastructures.LongSizedDataStructure; -import io.deephaven.chunk.IntChunk; -import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.WritableIntChunk; -import io.deephaven.chunk.WritableLongChunk; -import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.extensions.barrage.util.BarrageProtoUtil; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.mutable.MutableLong; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.io.DataInput; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Iterator; public class VarBinaryChunkInputStreamGenerator extends BaseChunkInputStreamGenerator> { private static final String DEBUG_NAME = "ObjectChunkInputStream Serialization"; + private static final int BYTE_CHUNK_SIZE = 1 << ChunkPoolConstants.LARGEST_POOLED_CHUNK_LOG2_CAPACITY; private final Appender appendItem; - private byte[] bytes; - private WritableIntChunk offsets; + public static class ByteStorage extends OutputStream implements SafeCloseable { + + private final WritableLongChunk offsets; + private final ArrayList> byteChunks; + + /** + * The total number of bytes written to this output stream + */ + private long writtenTotalByteCount = 0L; + /** + * The total number of bytes written to the current ByteChunk + */ + private int activeChunkByteCount = 0; + /** + * The ByteChunk to which we are currently writing + */ + private WritableByteChunk activeChunk = null; + + public ByteStorage(int size) { + offsets = WritableLongChunk.makeWritableChunk(size); + byteChunks = new ArrayList<>(); + + // create an initial chunk for data storage. it might not be needed, but eliminates testing on every + // write operation and the costs for creating and disposing from the pool are minimal + byteChunks.add(activeChunk = WritableByteChunk.makeWritableChunk(BYTE_CHUNK_SIZE)); + } + + public boolean isEmpty() { + return writtenTotalByteCount == 0; + } + + /** + * Writes the specified byte to the underlying {@code ByteChunk}. + * + * @param b the byte to be written. + */ + public synchronized void write(int b) throws IOException { + // do the write + activeChunk.set(activeChunkByteCount++, (byte)b); + + // increment the offset + writtenTotalByteCount += 1; + + // allocate a new chunk when needed + if (activeChunkByteCount == BYTE_CHUNK_SIZE) { + byteChunks.add(activeChunk = WritableByteChunk.makeWritableChunk(BYTE_CHUNK_SIZE)); + activeChunkByteCount = 0; + } + } + + /** + * Writes {@code len} bytes from the specified byte array + * starting at offset {@code off} to the underlying {@code ByteChunk}. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @throws IndexOutOfBoundsException if {@code off} is negative, + * {@code len} is negative, or {@code len} is greater than + * {@code b.length - off} + */ + public synchronized void write(@NotNull byte[] b, int off, int len) throws IOException { + int remaining = len; + while (remaining > 0) { + final int writeLen = Math.min(remaining, BYTE_CHUNK_SIZE - activeChunkByteCount); + + // do the write + activeChunk.copyFromTypedArray(b, off, activeChunkByteCount, writeLen); + + // increment the counts + writtenTotalByteCount += writeLen; + activeChunkByteCount += writeLen; + off += writeLen; + + remaining -= writeLen; + + // allocate a new chunk when needed + if (activeChunkByteCount == BYTE_CHUNK_SIZE) { + byteChunks.add(activeChunk = WritableByteChunk.makeWritableChunk(BYTE_CHUNK_SIZE)); + activeChunkByteCount = 0; + } + } + } + + public long size() { + return writtenTotalByteCount; + } + + /*** + * computes the size of the payload from sPos to ePos (inclusive) + * + * @param sPos the first data item to include in this payload + * @param ePos the last data item to include in this payload + * @return number of bytes in the payload + */ + public long getPayloadSize(int sPos, int ePos) { + return offsets.get(ePos + 1) - offsets.get(sPos); + } + + /*** + * write payload from sPos to ePos (inclusive) to the output stream + * + * @param dos the data output stream to populate with data + * @param sPos the first data item to include in this payload + * @param ePos the last data item to include in this payload + * @return number of bytes written to the outputstream + * @throws IOException if there is a problem writing to the output stream + */ + public long writePayload(LittleEndianDataOutputStream dos, int sPos, int ePos) throws IOException { + final long writeLen = getPayloadSize(sPos, ePos); + long remainingBytes = writeLen; + + long startBytePos = offsets.get(sPos); + while (remainingBytes > 0) { + final int chunkIdx = (int)(startBytePos / BYTE_CHUNK_SIZE); + final int byteIdx = (int)(startBytePos % BYTE_CHUNK_SIZE); + + final ByteChunk chunk = byteChunks.get(chunkIdx); + + final int len = (int) Math.min(remainingBytes, BYTE_CHUNK_SIZE - byteIdx); + + // do the write (using the stream adapter utility) + ByteChunkToOutputStreamAdapter.write(dos, chunk, byteIdx, len); + + // increment the offsets + startBytePos += len; + remainingBytes -= len; + } + return writeLen; + } + + @Override + public void close() { + try { + super.close(); + } catch (IOException e) { + // ignore this error + } + + // close the offset and byte chunks + offsets.close(); + for (WritableByteChunk chunk : byteChunks) { + chunk.close(); + } + } + } + + private ByteStorage byteStorage = null; public interface Appender { void append(OutputStream out, T item) throws IOException; @@ -52,23 +197,19 @@ public interface Mapper { } private synchronized void computePayload() throws IOException { - if (bytes != null) { + if (byteStorage != null) { return; } + byteStorage = new ByteStorage(chunk.size() == 0 ? 0 : (chunk.size() + 1)); - offsets = WritableIntChunk.makeWritableChunk(chunk.size() == 0 ? 0 : (chunk.size() + 1)); - - try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos = new BarrageProtoUtil.ExposedByteArrayOutputStream()) { - if (chunk.size() > 0) { - offsets.set(0, 0); - } - for (int i = 0; i < chunk.size(); ++i) { - if (chunk.get(i) != null) { - appendItem.append(baos, chunk.get(i)); - } - offsets.set(i + 1, baos.size()); + if (chunk.size() > 0) { + byteStorage.offsets.set(0, 0); + } + for (int i = 0; i < chunk.size(); ++i) { + if (chunk.get(i) != null) { + appendItem.append(byteStorage, chunk.get(i)); } - bytes = baos.peekBuffer(); + byteStorage.offsets.set(i + 1, byteStorage.size()); } } @@ -78,30 +219,26 @@ public void close() { if (chunk instanceof PoolableChunk) { ((PoolableChunk) chunk).close(); } - if (offsets != null) { - offsets.close(); + if (byteStorage != null) { + byteStorage.close(); } } } @Override - public DrainableColumn getInputStream(final StreamReaderOptions options, final @Nullable RowSet subset) throws IOException { + public DrainableColumn getInputStream(final StreamReaderOptions options, final @Nullable RowSet subset) + throws IOException { computePayload(); - return new ObjectChunkInputStream(options, offsets, bytes, subset); + return new ObjectChunkInputStream(options, subset); } private class ObjectChunkInputStream extends BaseChunkInputStream { + private int cachedSize = -1; - private final byte[] myBytes; - private final IntChunk myOffsets; private ObjectChunkInputStream( - final StreamReaderOptions options, - final IntChunk myOffsets, - final byte[] myBytes, final RowSet subset) { + final StreamReaderOptions options, final RowSet subset) { super(chunk, options, subset); - this.myBytes = myBytes; - this.myOffsets = myOffsets; } private int cachedNullCount = -1; @@ -111,7 +248,7 @@ public int nullCount() { if (cachedNullCount == -1) { cachedNullCount = 0; subset.forAllRowKeys(i -> { - if (chunk.get((int)i) == null) { + if (chunk.get((int) i) == null) { ++cachedNullCount; } }); @@ -131,7 +268,7 @@ public void visitBuffers(final BufferListener listener) { listener.noteLogicalBuffer(sendValidityBuffer() ? getValidityMapSerializationSizeFor(numElements) : 0); // offsets - long numOffsetBytes = Integer.BYTES * (((long)numElements) + (numElements > 0 ? 1 : 0)); + long numOffsetBytes = Integer.BYTES * (((long) numElements) + (numElements > 0 ? 1 : 0)); final long bytesExtended = numOffsetBytes & REMAINDER_MOD_8_MASK; if (bytesExtended > 0) { numOffsetBytes += 8 - bytesExtended; @@ -141,9 +278,7 @@ public void visitBuffers(final BufferListener listener) { // payload final MutableLong numPayloadBytes = new MutableLong(); subset.forAllRowKeyRanges((s, e) -> { - // account for payload - numPayloadBytes.add(myOffsets.get(LongSizedDataStructure.intSize(DEBUG_NAME, e + 1))); - numPayloadBytes.subtract(myOffsets.get(LongSizedDataStructure.intSize(DEBUG_NAME, s))); + numPayloadBytes.add(byteStorage.getPayloadSize((int) s, (int) e)); }); final long payloadExtended = numPayloadBytes.longValue() & REMAINDER_MOD_8_MASK; if (payloadExtended > 0) { @@ -155,30 +290,31 @@ public void visitBuffers(final BufferListener listener) { @Override protected int getRawSize() { if (cachedSize == -1) { - cachedSize = 0; + MutableLong totalCachedSize = new MutableLong(0L); if (sendValidityBuffer()) { - cachedSize += getValidityMapSerializationSizeFor(subset.intSize(DEBUG_NAME)); + totalCachedSize.add(getValidityMapSerializationSizeFor(subset.intSize(DEBUG_NAME))); } // there are n+1 offsets; it is not assumed first offset is zero - if (!subset.isEmpty() && subset.size() == myOffsets.size() - 1) { - cachedSize += myOffsets.size() * Integer.BYTES; - cachedSize += myOffsets.get(subset.intSize(DEBUG_NAME)) - myOffsets.get(0); - } else { - cachedSize += subset.isEmpty() ? 0 : Integer.BYTES; // account for the n+1 offset + if (!subset.isEmpty() && subset.size() == byteStorage.offsets.size() - 1) { + totalCachedSize.add(byteStorage.offsets.size() * (long) Integer.BYTES); + totalCachedSize.add(byteStorage.size()); + } else { + totalCachedSize.add(subset.isEmpty() ? 0 : Integer.BYTES); // account for the n+1 offset subset.forAllRowKeyRanges((s, e) -> { // account for offsets - cachedSize += (e - s + 1) * Integer.BYTES; + totalCachedSize.add((e - s + 1) * Integer.BYTES); + // account for payload - cachedSize += myOffsets.get(LongSizedDataStructure.intSize(DEBUG_NAME, e + 1)); - cachedSize -= myOffsets.get(LongSizedDataStructure.intSize(DEBUG_NAME, s)); + totalCachedSize.add(byteStorage.getPayloadSize((int)s, (int)e)); }); } if (!subset.isEmpty() && (subset.size() & 0x1) == 0) { // then we must also align offset array - cachedSize += Integer.BYTES; + totalCachedSize.add(Integer.BYTES); } + cachedSize = LongSizedDataStructure.intSize(DEBUG_NAME, totalCachedSize.longValue()); } return cachedSize; } @@ -223,11 +359,9 @@ public int drainTo(final OutputStream outputStream) throws IOException { dos.writeInt(0); final MutableInt logicalSize = new MutableInt(); - subset.forAllRowKeys((rawRow) -> { + subset.forAllRowKeys((idx) -> { try { - final int rowEnd = LongSizedDataStructure.intSize(DEBUG_NAME, rawRow + 1); - final int size = myOffsets.get(rowEnd) - myOffsets.get(rowEnd - 1); - logicalSize.add(size); + logicalSize.add(byteStorage.getPayloadSize((int) idx, (int) idx)); dos.writeInt(logicalSize.intValue()); } catch (final IOException e) { throw new UncheckedDeephavenException("couldn't drain data to OutputStream", e); @@ -244,11 +378,7 @@ public int drainTo(final OutputStream outputStream) throws IOException { final MutableLong payloadLen = new MutableLong(); subset.forAllRowKeyRanges((s, e) -> { try { - // we have already int-size verified all rows in the RowSet - final int startOffset = myOffsets.get((int) s); - final int endOffset = myOffsets.get((int) e + 1); - dos.write(myBytes, startOffset, endOffset - startOffset); - payloadLen.add(endOffset - startOffset); + payloadLen.add(byteStorage.writePayload(dos, (int) s, (int) e)); } catch (final IOException err) { throw new UncheckedDeephavenException("couldn't drain data to OutputStream", err); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index 4d1701c50ea..881da43e36f 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -9,6 +9,7 @@ import gnu.trove.list.linked.TLongLinkedList; import io.deephaven.base.verify.Assert; import io.deephaven.chunk.attributes.Values; +import io.deephaven.chunk.util.pools.ChunkPoolConstants; import io.deephaven.configuration.Configuration; import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.*; @@ -62,6 +63,8 @@ public class BarrageTable extends QueryTable implements BarrageMessage.Listener, private static final Logger log = LoggerFactory.getLogger(BarrageTable.class); + private static final int BATCH_SIZE = 1 << ChunkPoolConstants.LARGEST_POOLED_CHUNK_LOG2_CAPACITY; + private final UpdateSourceRegistrar registrar; private final NotificationQueue notificationQueue; private final ScheduledExecutorService executorService; @@ -71,7 +74,7 @@ public class BarrageTable extends QueryTable implements BarrageMessage.Listener, private final Stats stats; /** the capacity that the destSources been set to */ - private int capacity = 0; + private long capacity = 0; /** the reinterpreted destination writable sources */ private final WritableColumnSource[] destSources; /** we compact the parent table's key-space and instead redirect; ideal for viewport */ @@ -312,23 +315,60 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC } if (update.rowsIncluded.isNonempty()) { - try (final ChunkSink.FillFromContext redirContext = - rowRedirection.makeFillFromContext(update.rowsIncluded.intSize()); - final RowSet destinationRowSet = getFreeRows(update.rowsIncluded.size())) { - // Update redirection mapping: - rowRedirection.fillFromChunk(redirContext, destinationRowSet.asRowKeyChunk(), - update.rowsIncluded); - - // Update data chunk-wise: - for (int ii = 0; ii < update.addColumnData.length; ++ii) { - if (isSubscribedColumn(ii)) { - final Chunk data = update.addColumnData[ii].data; - Assert.eq(data.size(), "delta.includedAdditions.size()", destinationRowSet.size(), - "destinationRowSet.size()"); - try (final ChunkSink.FillFromContext ctxt = - destSources[ii].makeFillFromContext(destinationRowSet.intSize())) { - destSources[ii].fillFromChunk(ctxt, data, destinationRowSet); + // perform the addition operations in batches for efficiency + final int addBatchSize = (int) Math.min(update.rowsIncluded.size(), BATCH_SIZE); + + if (mightBeInitialSnapshot) { + // ensure the data sources have at least the incoming capacity. The sources can auto-resize but + // we know the initial snapshot size and can resize immediately + capacity = update.rowsIncluded.size(); + for (final WritableColumnSource source : destSources) { + source.ensureCapacity(capacity); + } + freeset.insertRange(0, capacity - 1); + } + + // this will hold all the free rows allocated for the included rows + final WritableRowSet destinationRowSet = RowSetFactory.empty(); + + // update the table with the rowsIncluded set (in manageable batch sizes) + try (final RowSequence.Iterator rowsIncludedIterator = update.rowsIncluded.getRowSequenceIterator(); + final ChunkSink.FillFromContext redirContext = + rowRedirection.makeFillFromContext(addBatchSize)) { + while (rowsIncludedIterator.hasMore()) { + final RowSequence rowsToRedirect = + rowsIncludedIterator.getNextRowSequenceWithLength(addBatchSize); + try (final RowSet newRows = getFreeRows(rowsToRedirect.intSize())) { + // Update redirection mapping: + rowRedirection.fillFromChunk(redirContext, newRows.asRowKeyChunk(), rowsToRedirect); + // add these rows to the final destination set + destinationRowSet.insert(newRows); + } + } + } + + // update the column sources (in manageable batch sizes) + for (int ii = 0; ii < update.addColumnData.length; ++ii) { + if (isSubscribedColumn(ii)) { + final BarrageMessage.AddColumnData column = update.addColumnData[ii]; + try (final ChunkSink.FillFromContext fillContext = + destSources[ii].makeFillFromContext(addBatchSize); + final RowSequence.Iterator destIterator = destinationRowSet.getRowSequenceIterator()) { + // grab the matching rows from each chunk + for (final Chunk chunk : column.data) { + // track where we are in the current chunk + int chunkOffset = 0; + while (chunkOffset < chunk.size()) { + // don't overrun the chunk boundary + int effectiveBatchSize = Math.min(addBatchSize, chunk.size() - chunkOffset); + final RowSequence chunkKeys = + destIterator.getNextRowSequenceWithLength(effectiveBatchSize); + Chunk slicedChunk = chunk.slice(chunkOffset, effectiveBatchSize); + destSources[ii].fillFromChunk(fillContext, slicedChunk, chunkKeys); + chunkOffset += effectiveBatchSize; + } } + Assert.assertion(!destIterator.hasMore(), "not all rowsIncluded were processed"); } } } @@ -341,21 +381,33 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC continue; } + // perform the modification operations in batches for efficiency + final int modBatchSize = (int) Math.min(column.rowsModified.size(), BATCH_SIZE); modifiedColumnSet.setColumnWithIndex(ii); - try (final ChunkSource.FillContext redirContext = - rowRedirection.makeFillContext(column.rowsModified.intSize(), null); - final WritableLongChunk keys = - WritableLongChunk.makeWritableChunk(column.rowsModified.intSize())) { - rowRedirection.fillChunk(redirContext, keys, column.rowsModified); - for (int i = 0; i < keys.size(); ++i) { - Assert.notEquals(keys.get(i), "keys[i]", RowSequence.NULL_ROW_KEY, "RowSet.NULL_ROW_KEY"); - } - - try (final ChunkSink.FillFromContext ctxt = - destSources[ii].makeFillFromContext(keys.size())) { - destSources[ii].fillFromChunkUnordered(ctxt, column.data, keys); + try (final ChunkSource.FillContext redirContext = rowRedirection.makeFillContext(modBatchSize, null); + final ChunkSink.FillFromContext fillContext = destSources[ii].makeFillFromContext(modBatchSize); + final WritableLongChunk keys = WritableLongChunk.makeWritableChunk(modBatchSize); + final RowSequence.Iterator destIterator = column.rowsModified.getRowSequenceIterator()) { + + // grab the matching rows from each chunk + for (final Chunk chunk : column.data) { + // track where we are in the current chunk + int chunkOffset = 0; + while (chunkOffset < chunk.size()) { + // don't overrun the chunk boundary + int effectiveBatchSize = Math.min(modBatchSize, chunk.size() - chunkOffset); + final RowSequence chunkKeys = destIterator.getNextRowSequenceWithLength(effectiveBatchSize); + // fill the key chunk with the keys from this rowset + rowRedirection.fillChunk(redirContext, keys, chunkKeys); + Chunk slicedChunk = chunk.slice(chunkOffset, effectiveBatchSize); + + destSources[ii].fillFromChunkUnordered(fillContext, slicedChunk, keys); + + chunkOffset += effectiveBatchSize; + } } + Assert.assertion(!destIterator.hasMore(), "not all rowsModified were processed"); } } @@ -390,15 +442,15 @@ private RowSet getFreeRows(long size) { if (size <= 0) { return RowSetFactory.empty(); } - boolean needsResizing = false; if (capacity == 0) { - capacity = Integer.highestOneBit((int) Math.max(size * 2, 8)); + capacity = Long.highestOneBit(Math.max(size * 2, 8)); freeset = RowSetFactory.flat(capacity); needsResizing = true; } else if (freeset.size() < size) { - int usedSlots = (int) (capacity - freeset.size()); - int prevCapacity = capacity; + long usedSlots = capacity - freeset.size(); + long prevCapacity = capacity; + do { capacity *= 2; } while ((capacity - usedSlots) < size); @@ -412,7 +464,7 @@ private RowSet getFreeRows(long size) { } } - final RowSet result = freeset.subSetByPositionRange(0, (int) size); + final RowSet result = freeset.subSetByPositionRange(0, size); Assert.assertion(result.size() == size, "result.size() == size"); freeset.removeRange(0, result.lastRowKey()); return result; @@ -424,18 +476,26 @@ private void freeRows(final RowSet rowsToFree) { } // Note: these are NOT OrderedRowKeys until after the call to .sort() - try (final WritableLongChunk redirectedRows = - WritableLongChunk.makeWritableChunk(rowsToFree.intSize("BarrageTable"))) { - redirectedRows.setSize(0); + final int chunkSize = (int) Math.min(rowsToFree.size(), BATCH_SIZE); + + try (final WritableLongChunk redirectedRows = WritableLongChunk.makeWritableChunk(chunkSize); + final RowSequence.Iterator rowsToFreeIterator = rowsToFree.getRowSequenceIterator()) { - rowsToFree.forAllRowKeys(next -> { - final long prevIndex = rowRedirection.remove(next); - Assert.assertion(prevIndex != -1, "prevIndex != -1", prevIndex, "prevIndex", next, "next"); - redirectedRows.add(prevIndex); - }); + while (rowsToFreeIterator.hasMore()) { - redirectedRows.sort(); // now they're truly ordered - freeset.insert(redirectedRows, 0, redirectedRows.size()); + final RowSequence chunkRowsToFree = rowsToFreeIterator.getNextRowSequenceWithLength(chunkSize); + + redirectedRows.setSize(0); + + chunkRowsToFree.forAllRowKeys(next -> { + final long prevIndex = rowRedirection.remove(next); + Assert.assertion(prevIndex != -1, "prevIndex != -1", prevIndex, "prevIndex", next, "next"); + redirectedRows.add(prevIndex); + }); + + redirectedRows.sort(); // now they're truly ordered + freeset.insert(redirectedRows, 0, redirectedRows.size()); + } } } @@ -586,7 +646,8 @@ public static BarrageTable make( final boolean isViewPort) { final ColumnDefinition[] columns = tableDefinition.getColumns(); final WritableColumnSource[] writableSources = new WritableColumnSource[columns.length]; - final WritableRowRedirection rowRedirection = WritableRowRedirection.FACTORY.createRowRedirection(8); + final WritableRowRedirection rowRedirection = + new LongColumnSourceWritableRowRedirection(new LongSparseArraySource()); final LinkedHashMap> finalColumns = makeColumns(columns, writableSources, rowRedirection); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java index 76b94caa43e..360d65f9347 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.BitSet; import java.util.Iterator; import java.util.function.LongConsumer; @@ -41,12 +42,18 @@ public class BarrageStreamReader implements StreamReader { private static final Logger log = LoggerFactory.getLogger(BarrageStreamReader.class); + // We would like to use jdk.internal.util.ArraysSupport.MAX_ARRAY_LENGTH, but it is not exported + private static final int MAX_CHUNK_SIZE = Integer.MAX_VALUE - 8; + private final LongConsumer deserializeTmConsumer; - private int numAddRowsRead = 0; - private int numModRowsRead = 0; - private int numAddBatchesRemaining = 0; - private int numModBatchesRemaining = 0; + private long numAddRowsRead = 0; + private long numAddRowsTotal = 0; + private long numModRowsRead = 0; + private long numModRowsTotal = 0; + + private long lastAddStartIndex = 0; + private long lastModStartIndex = 0; private BarrageMessage msg = null; public BarrageStreamReader(final LongConsumer deserializeTmConsumer) { @@ -83,8 +90,8 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, } else if (wrapper.msgType() == BarrageMessageType.BarrageUpdateMetadata) { if (msg != null) { throw new IllegalStateException( - "Previous message was not complete; pending " + numAddBatchesRemaining - + " add batches and " + numModBatchesRemaining + " mod batches"); + "Previous message was not complete; pending " + (numAddRowsTotal - numAddRowsRead) + + " add rows and " + (numModRowsTotal - numModRowsRead) + " mod rows"); } final BarrageUpdateMetadata metadata = @@ -97,14 +104,8 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, numAddRowsRead = 0; numModRowsRead = 0; - numAddBatchesRemaining = metadata.numAddBatches(); - numModBatchesRemaining = metadata.numModBatches(); - if (numAddBatchesRemaining < 0 || numModBatchesRemaining < 0) { - throw new IllegalStateException( - "Found negative number of record batches in barrage metadata: " - + numAddBatchesRemaining + " add batches and " + numModBatchesRemaining - + " mod batches"); - } + lastAddStartIndex = 0; + lastModStartIndex = 0; if (msg.isSnapshot) { final ByteBuffer effectiveViewport = metadata.effectiveViewportAsByteBuffer(); @@ -130,17 +131,32 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, msg.addColumnData[ci] = new BarrageMessage.AddColumnData(); msg.addColumnData[ci].type = columnTypes[ci]; msg.addColumnData[ci].componentType = componentTypes[ci]; + msg.addColumnData[ci].data = new ArrayList<>(); + + // create an initial chunk of the correct size + final int chunkSize = (int) (Math.min(msg.rowsIncluded.size(), MAX_CHUNK_SIZE)); + msg.addColumnData[ci].data.add(columnChunkTypes[ci].makeWritableChunk(chunkSize)); } + numAddRowsTotal = msg.rowsIncluded.size(); // if this message is a snapshot response (vs. subscription) then mod columns may be empty + numModRowsTotal = 0; msg.modColumnData = new BarrageMessage.ModColumnData[metadata.modColumnNodesLength()]; for (int ci = 0; ci < msg.modColumnData.length; ++ci) { msg.modColumnData[ci] = new BarrageMessage.ModColumnData(); msg.modColumnData[ci].type = columnTypes[ci]; msg.modColumnData[ci].componentType = componentTypes[ci]; + msg.modColumnData[ci].data = new ArrayList<>(); final BarrageModColumnMetadata mcd = metadata.modColumnNodes(ci); msg.modColumnData[ci].rowsModified = extractIndex(mcd.modifiedRowsAsByteBuffer()); + + // create an initial chunk of the correct size + final int chunkSize = (int) (Math.min(msg.modColumnData[ci].rowsModified.size(), + MAX_CHUNK_SIZE)); + msg.modColumnData[ci].data.add(columnChunkTypes[ci].makeWritableChunk(chunkSize)); + + numModRowsTotal = Math.max(numModRowsTotal, msg.modColumnData[ci].rowsModified.size()); } } @@ -165,30 +181,10 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, throw new IllegalStateException("Only know how to decode Schema/BarrageRecordBatch messages"); } - // snapshots may not provide metadata, generate it now + // throw an error when no app metadata (snapshots now provide by default) if (msg == null) { - msg = new BarrageMessage(); - - // generate a default set of column selectors - msg.snapshotColumns = expectedColumns; - - // create and fill the add column metadata from the schema - msg.addColumnData = new BarrageMessage.AddColumnData[columnTypes.length]; - for (int ci = 0; ci < msg.addColumnData.length; ++ci) { - msg.addColumnData[ci] = new BarrageMessage.AddColumnData(); - msg.addColumnData[ci].type = columnTypes[ci]; - msg.addColumnData[ci].componentType = componentTypes[ci]; - } - - // no mod column data - msg.modColumnData = new BarrageMessage.ModColumnData[0]; - - // generate empty row sets - msg.rowsRemoved = RowSetFactory.empty(); - msg.shifted = RowSetShiftData.EMPTY; - - msg.isSnapshot = true; - numAddBatchesRemaining = 1; + throw new IllegalStateException( + "Missing app metadata tag; cannot decode using BarrageStreamReader"); } bodyParsed = true; @@ -219,30 +215,81 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, } final TLongIterator bufferInfoIter = bufferInfo.iterator(); - final boolean isAddBatch = numAddBatchesRemaining > 0; - if (isAddBatch) { - --numAddBatchesRemaining; - } else { - --numModBatchesRemaining; - } - - if (isAddBatch) { - final int numRowsTotal = msg.rowsIncluded.intSize("BarrageStreamReader"); + // add and mod rows are never combined in a batch. all added rows must be received before the first + // mod rows will be received. + if (numAddRowsRead < numAddRowsTotal) { for (int ci = 0; ci < msg.addColumnData.length; ++ci) { - msg.addColumnData[ci].data = ChunkInputStreamGenerator.extractChunkFromInputStream(options, - columnChunkTypes[ci], columnTypes[ci], componentTypes[ci], fieldNodeIter, - bufferInfoIter, ois, (WritableChunk) msg.addColumnData[ci].data, - numAddRowsRead, numRowsTotal); + final BarrageMessage.AddColumnData acd = msg.addColumnData[ci]; + + final long remaining = numAddRowsTotal - numAddRowsRead; + if (batch.length() > remaining) { + throw new IllegalStateException( + "Batch length exceeded the expected number of rows from app metadata"); + } + + // select the current chunk size and read the size + int lastChunkIndex = acd.data.size() - 1; + WritableChunk chunk = (WritableChunk) acd.data.get(lastChunkIndex); + int chunkSize = acd.data.get(lastChunkIndex).size(); + + final int chunkOffset; + long rowOffset = numAddRowsRead - lastAddStartIndex; + // reading the rows from this batch might overflow the existing chunk + if (rowOffset + batch.length() > chunkSize) { + lastAddStartIndex += chunkSize; + + // create a new chunk before trying to write again + chunkSize = (int) (Math.min(remaining, MAX_CHUNK_SIZE)); + + chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize); + acd.data.add(chunk); + + chunkOffset = 0; + ++lastChunkIndex; + } else { + chunkOffset = (int) rowOffset; + } + + // fill the chunk with data and assign back into the array + acd.data.set(lastChunkIndex, + ChunkInputStreamGenerator.extractChunkFromInputStream(options, columnChunkTypes[ci], + columnTypes[ci], componentTypes[ci], fieldNodeIter, bufferInfoIter, ois, + chunk, chunkOffset, chunkSize)); } numAddRowsRead += batch.length(); } else { for (int ci = 0; ci < msg.modColumnData.length; ++ci) { final BarrageMessage.ModColumnData mcd = msg.modColumnData[ci]; - final int numModdedRows = mcd.rowsModified.intSize("BarrageStreamReader"); - mcd.data = ChunkInputStreamGenerator.extractChunkFromInputStream(options, - columnChunkTypes[ci], columnTypes[ci], componentTypes[ci], fieldNodeIter, - bufferInfoIter, ois, (WritableChunk) mcd.data, numModRowsRead, - numModdedRows); + + long remaining = mcd.rowsModified.size() - numModRowsRead; + + // need to add the batch row data to the column chunks + int lastChunkIndex = mcd.data.size() - 1; + WritableChunk chunk = (WritableChunk) mcd.data.get(lastChunkIndex); + int chunkSize = chunk.size(); + + final int chunkOffset; + long rowOffset = numModRowsRead - lastModStartIndex; + // this batch might overflow the chunk + if (rowOffset + Math.min(remaining, batch.length()) > chunkSize) { + lastModStartIndex += chunkSize; + + // create a new chunk before trying to write again + chunkSize = (int) (Math.min(remaining, MAX_CHUNK_SIZE)); + chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize); + mcd.data.add(chunk); + + chunkOffset = 0; + ++lastChunkIndex; + } else { + chunkOffset = (int) rowOffset; + } + + // fill the chunk with data and assign back into the array + mcd.data.set(lastChunkIndex, + ChunkInputStreamGenerator.extractChunkFromInputStream(options, columnChunkTypes[ci], + columnTypes[ci], componentTypes[ci], fieldNodeIter, bufferInfoIter, ois, + chunk, chunkOffset, chunkSize)); } numModRowsRead += batch.length(); } @@ -259,7 +306,7 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, } deserializeTmConsumer.accept(System.nanoTime() - startDeserTm); - if (numAddBatchesRemaining + numModBatchesRemaining == 0) { + if (numAddRowsRead == numAddRowsTotal && numModRowsRead == numModRowsTotal) { final BarrageMessage retval = msg; msg = null; return retval; diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java index a569a2e5ff3..9790c199d1f 100644 --- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java +++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java @@ -25,10 +25,10 @@ abstract class SubscribeExampleBase extends BarrageClientExampleBase { TableUpdateListener listener; @CommandLine.Option(names = {"--tail"}, required = false, description = "Tail viewport size") - int tailSize = 0; + long tailSize = 0; @CommandLine.Option(names = {"--head"}, required = false, description = "Header viewport size") - int headerSize = 0; + long headerSize = 0; static class Mode { @CommandLine.Option(names = {"-b", "--batch"}, required = true, description = "Batch mode") diff --git a/java-client/flight-examples/src/main/java/io/deephaven/client/examples/GetDirectTable.java b/java-client/flight-examples/src/main/java/io/deephaven/client/examples/GetDirectTable.java index 62a6087b955..946791ff6ea 100644 --- a/java-client/flight-examples/src/main/java/io/deephaven/client/examples/GetDirectTable.java +++ b/java-client/flight-examples/src/main/java/io/deephaven/client/examples/GetDirectTable.java @@ -16,9 +16,13 @@ class GetDirectTable extends FlightExampleBase { protected void execute(FlightSession flight) throws Exception { try (final FlightStream stream = flight.stream(ticket)) { System.out.println(stream.getSchema()); + long tableRows = 0L; while (stream.next()) { - System.out.println(stream.getRoot().contentToTSVString()); + int batchRows = stream.getRoot().getRowCount(); + System.out.println(" batch received: " + batchRows + " rows"); + tableRows += batchRows; } + System.out.println("Table received: " + tableRows + " rows"); } } diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index b8960f4667a..4906f3581ca 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -53,10 +53,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.ArrayDeque; -import java.util.BitSet; -import java.util.Iterator; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ScheduledExecutorService; import static io.deephaven.extensions.barrage.util.BarrageProtoUtil.DEFAULT_SER_OPTIONS; @@ -174,16 +171,17 @@ public void onNext(final InputStream request) { for (int ci = 0; ci < numColumns; ++ci) { final BarrageMessage.AddColumnData acd = new BarrageMessage.AddColumnData(); msg.addColumnData[ci] = acd; + msg.addColumnData[ci].data = new ArrayList<>(); final int factor = (columnConversionFactors == null) ? 1 : columnConversionFactors[ci]; try { - acd.data = ChunkInputStreamGenerator.extractChunkFromInputStream(options, factor, + acd.data.add(ChunkInputStreamGenerator.extractChunkFromInputStream(options, factor, columnChunkTypes[ci], columnTypes[ci], componentTypes[ci], fieldNodeIter, - bufferInfoIter, mi.inputStream, null, 0, 0); + bufferInfoIter, mi.inputStream, null, 0, 0)); } catch (final IOException unexpected) { throw new UncheckedDeephavenException(unexpected); } - if (acd.data.size() != numRowsAdded) { + if (acd.data.get(0).size() != numRowsAdded) { throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "Inconsistent num records per column: " + numRowsAdded + " != " + acd.data.size()); } diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index a65223d9783..0a6131438d9 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -48,6 +48,8 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.mutable.MutableLong; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.HdrHistogram.Histogram; @@ -62,6 +64,8 @@ import java.util.function.IntFunction; import java.util.stream.Stream; +import static io.deephaven.engine.table.impl.remote.ConstructSnapshot.SNAPSHOT_CHUNK_SIZE; + /** * The server-side implementation of a Barrage replication source. * @@ -1613,18 +1617,25 @@ private BarrageMessage aggregateUpdatesInRange(final int startDelta, final int e for (int ci = 0; ci < downstream.addColumnData.length; ++ci) { final ColumnSource deltaColumn = deltaColumns[ci]; final BarrageMessage.AddColumnData adds = new BarrageMessage.AddColumnData(); + adds.data = new ArrayList<>(); + adds.chunkType = deltaColumn.getChunkType(); + downstream.addColumnData[ci] = adds; if (addColumnSet.get(ci)) { - final int chunkCapacity = localAdded.intSize("serializeItems"); - final WritableChunk chunk = - deltaColumn.getChunkType().makeWritableChunk(chunkCapacity); - try (final ChunkSource.FillContext fc = deltaColumn.makeFillContext(chunkCapacity)) { - deltaColumn.fillChunk(fc, chunk, localAdded); + // create data chunk(s) for the added row data + try (final RowSequence.Iterator it = localAdded.getRowSequenceIterator()) { + while (it.hasMore()) { + final RowSequence rs = + it.getNextRowSequenceWithLength(SNAPSHOT_CHUNK_SIZE); + final int chunkCapacity = rs.intSize("serializeItems"); + final WritableChunk chunk = adds.chunkType.makeWritableChunk(chunkCapacity); + try (final ChunkSource.FillContext fc = deltaColumn.makeFillContext(chunkCapacity)) { + deltaColumn.fillChunk(fc, chunk, rs); + } + adds.data.add(chunk); + } } - adds.data = chunk; - } else { - adds.data = deltaColumn.getChunkType().getEmptyChunk(); } adds.type = deltaColumn.getType(); @@ -1633,26 +1644,33 @@ private BarrageMessage aggregateUpdatesInRange(final int startDelta, final int e for (int ci = 0; ci < downstream.modColumnData.length; ++ci) { final ColumnSource deltaColumn = deltaColumns[ci]; - final BarrageMessage.ModColumnData modifications = new BarrageMessage.ModColumnData(); - downstream.modColumnData[ci] = modifications; + final BarrageMessage.ModColumnData mods = new BarrageMessage.ModColumnData(); + mods.data = new ArrayList<>(); + mods.chunkType = deltaColumn.getChunkType(); + downstream.modColumnData[ci] = mods; if (modColumnSet.get(ci)) { - modifications.rowsModified = firstDelta.recordedMods.copy(); - - final int chunkCapacity = localModified.intSize("serializeItems"); - final WritableChunk chunk = - deltaColumn.getChunkType().makeWritableChunk(chunkCapacity); - try (final ChunkSource.FillContext fc = deltaColumn.makeFillContext(chunkCapacity)) { - deltaColumn.fillChunk(fc, chunk, localModified); + mods.rowsModified = firstDelta.recordedMods.copy(); + + // create data chunk(s) for the added row data + try (final RowSequence.Iterator it = localModified.getRowSequenceIterator()) { + while (it.hasMore()) { + final RowSequence rs = + it.getNextRowSequenceWithLength(SNAPSHOT_CHUNK_SIZE); + final int chunkCapacity = rs.intSize("serializeItems"); + final WritableChunk chunk = mods.chunkType.makeWritableChunk(chunkCapacity); + try (final ChunkSource.FillContext fc = deltaColumn.makeFillContext(chunkCapacity)) { + deltaColumn.fillChunk(fc, chunk, rs); + } + mods.data.add(chunk); + } } - modifications.data = chunk; } else { - modifications.rowsModified = RowSetFactory.empty(); - modifications.data = deltaColumn.getChunkType().getEmptyChunk(); + mods.rowsModified = RowSetFactory.empty(); } - modifications.type = deltaColumn.getType(); - modifications.componentType = deltaColumn.getComponentType(); + mods.type = deltaColumn.getType(); + mods.componentType = deltaColumn.getComponentType(); } } else { // We must coalesce these updates. @@ -1700,8 +1718,8 @@ private BarrageMessage aggregateUpdatesInRange(final int startDelta, final int e final class ColumnInfo { final WritableRowSet modified = RowSetFactory.empty(); final WritableRowSet recordedMods = RowSetFactory.empty(); - long[] addedMapping; - long[] modifiedMapping; + ArrayList addedMappings = new ArrayList<>(); + ArrayList modifiedMappings = new ArrayList<>(); } final HashMap infoCache = new HashMap<>(); @@ -1734,15 +1752,28 @@ final class ColumnInfo { retval.modified.remove(coalescer.added); retval.recordedMods.remove(coalescer.added); - retval.addedMapping = new long[localAdded.intSize()]; - retval.modifiedMapping = new long[retval.recordedMods.intSize()]; - Arrays.fill(retval.addedMapping, RowSequence.NULL_ROW_KEY); - Arrays.fill(retval.modifiedMapping, RowSequence.NULL_ROW_KEY); + try (final RowSequence.Iterator it = localAdded.getRowSequenceIterator()) { + while (it.hasMore()) { + final RowSequence rs = it.getNextRowSequenceWithLength(SNAPSHOT_CHUNK_SIZE); + long[] addedMapping = new long[rs.intSize()]; + Arrays.fill(addedMapping, RowSequence.NULL_ROW_KEY); + retval.addedMappings.add(addedMapping); + } + } + + try (final RowSequence.Iterator it = retval.recordedMods.getRowSequenceIterator()) { + while (it.hasMore()) { + final RowSequence rs = it.getNextRowSequenceWithLength(SNAPSHOT_CHUNK_SIZE); + long[] modifiedMapping = new long[rs.intSize()]; + Arrays.fill(modifiedMapping, RowSequence.NULL_ROW_KEY); + retval.modifiedMappings.add(modifiedMapping); + } + } final WritableRowSet unfilledAdds = localAdded.isEmpty() ? RowSetFactory.empty() - : RowSetFactory.fromRange(0, retval.addedMapping.length - 1); + : RowSetFactory.flat(localAdded.size()); final WritableRowSet unfilledMods = retval.recordedMods.isEmpty() ? RowSetFactory.empty() - : RowSetFactory.fromRange(0, retval.modifiedMapping.length - 1); + : RowSetFactory.flat(retval.recordedMods.size()); final WritableRowSet addedRemaining = localAdded.copy(); final WritableRowSet modifiedRemaining = retval.recordedMods.copy(); @@ -1772,7 +1803,7 @@ final class ColumnInfo { } applyRedirMapping(rowsToFill, sourceRows, - addedMapping ? retval.addedMapping : retval.modifiedMapping); + addedMapping ? retval.addedMappings : retval.modifiedMappings); } }; @@ -1815,19 +1846,21 @@ final class ColumnInfo { for (int ci = 0; ci < downstream.addColumnData.length; ++ci) { final ColumnSource deltaColumn = deltaColumns[ci]; final BarrageMessage.AddColumnData adds = new BarrageMessage.AddColumnData(); + adds.data = new ArrayList<>(); + adds.chunkType = deltaColumn.getChunkType(); + downstream.addColumnData[ci] = adds; if (addColumnSet.get(ci)) { final ColumnInfo info = getColumnInfo.apply(ci); - final WritableChunk chunk = - deltaColumn.getChunkType().makeWritableChunk(info.addedMapping.length); - try (final ChunkSource.FillContext fc = deltaColumn.makeFillContext(info.addedMapping.length)) { - ((FillUnordered) deltaColumn).fillChunkUnordered(fc, chunk, - LongChunk.chunkWrap(info.addedMapping)); + for (long[] addedMapping : info.addedMappings) { + final WritableChunk chunk = adds.chunkType.makeWritableChunk(addedMapping.length); + try (final ChunkSource.FillContext fc = deltaColumn.makeFillContext(addedMapping.length)) { + ((FillUnordered) deltaColumn).fillChunkUnordered(fc, chunk, + LongChunk.chunkWrap(addedMapping)); + } + adds.data.add(chunk); } - adds.data = chunk; - } else { - adds.data = deltaColumn.getChunkType().getEmptyChunk(); } adds.type = deltaColumn.getType(); @@ -1837,28 +1870,29 @@ final class ColumnInfo { int numActualModCols = 0; for (int i = 0; i < downstream.modColumnData.length; ++i) { final ColumnSource sourceColumn = deltaColumns[i]; - final BarrageMessage.ModColumnData modifications = new BarrageMessage.ModColumnData(); - downstream.modColumnData[numActualModCols++] = modifications; + final BarrageMessage.ModColumnData mods = new BarrageMessage.ModColumnData(); + mods.data = new ArrayList<>(); + mods.chunkType = sourceColumn.getChunkType(); + + downstream.modColumnData[numActualModCols++] = mods; if (modColumnSet.get(i)) { final ColumnInfo info = getColumnInfo.apply(i); - modifications.rowsModified = info.recordedMods.copy(); - - final WritableChunk chunk = - sourceColumn.getChunkType().makeWritableChunk(info.modifiedMapping.length); - try (final ChunkSource.FillContext fc = sourceColumn.makeFillContext(info.modifiedMapping.length)) { - ((FillUnordered) sourceColumn).fillChunkUnordered(fc, chunk, - LongChunk.chunkWrap(info.modifiedMapping)); + mods.rowsModified = info.recordedMods.copy(); + for (long[] modifiedMapping : info.modifiedMappings) { + final WritableChunk chunk = mods.chunkType.makeWritableChunk(modifiedMapping.length); + try (final ChunkSource.FillContext fc = sourceColumn.makeFillContext(modifiedMapping.length)) { + ((FillUnordered) sourceColumn).fillChunkUnordered(fc, chunk, + LongChunk.chunkWrap(modifiedMapping)); + } + mods.data.add(chunk); } - - modifications.data = chunk; } else { - modifications.rowsModified = RowSetFactory.empty(); - modifications.data = sourceColumn.getChunkType().getEmptyChunk(); + mods.rowsModified = RowSetFactory.empty(); } - modifications.type = sourceColumn.getType(); - modifications.componentType = sourceColumn.getComponentType(); + mods.type = sourceColumn.getType(); + mods.componentType = sourceColumn.getComponentType(); } } @@ -1872,14 +1906,28 @@ final class ColumnInfo { } // Updates provided mapping so that mapping[i] returns values.get(i) for all i in keys. - private static void applyRedirMapping(final RowSet keys, final RowSet values, final long[] mapping) { + private static void applyRedirMapping(final RowSet keys, final RowSet values, final ArrayList mappings) { Assert.eq(keys.size(), "keys.size()", values.size(), "values.size()"); - Assert.leq(keys.size(), "keys.size()", mapping.length, "mapping.length"); + MutableLong mapCount = new MutableLong(0L); + mappings.forEach((arr) -> mapCount.add(arr.length)); + Assert.leq(keys.size(), "keys.size()", mapCount.longValue(), "mapping.length"); + + // we need to track our progress through multiple mapping arrays + MutableLong arrOffset = new MutableLong(0L); + MutableInt arrIdx = new MutableInt(0); + final RowSet.Iterator vit = values.iterator(); keys.forAllRowKeys(lkey -> { - final int key = LongSizedDataStructure.intSize("applyRedirMapping", lkey); - Assert.eq(mapping[key], "mapping[key]", RowSequence.NULL_ROW_KEY, "RowSet.NULL_ROW_KEY"); - mapping[key] = vit.nextLong(); + long[] mapping = mappings.get(arrIdx.intValue()); + int keyIdx = LongSizedDataStructure.intSize("applyRedirMapping", lkey - arrOffset.longValue()); + + Assert.eq(mapping[keyIdx], "mapping[keyIdx]", RowSequence.NULL_ROW_KEY, "RowSet.NULL_ROW_KEY"); + mapping[keyIdx] = vit.nextLong(); + + if (keyIdx == mapping.length - 1) { + arrOffset.add(mapping.length); + arrIdx.add(1); + } }); } diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageStreamGenerator.java b/server/src/main/java/io/deephaven/server/barrage/BarrageStreamGenerator.java index 8b53dfda1c6..7e41dfa7582 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageStreamGenerator.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageStreamGenerator.java @@ -10,7 +10,11 @@ import com.google.protobuf.WireFormat; import gnu.trove.list.array.TIntArrayList; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.barrage.flatbuf.*; +import io.deephaven.barrage.flatbuf.BarrageMessageType; +import io.deephaven.barrage.flatbuf.BarrageMessageWrapper; +import io.deephaven.barrage.flatbuf.BarrageModColumnMetadata; +import io.deephaven.barrage.flatbuf.BarrageUpdateMetadata; +import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ChunkType; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.WritableLongChunk; @@ -18,17 +22,14 @@ import io.deephaven.chunk.sized.SizedChunk; import io.deephaven.chunk.sized.SizedLongChunk; import io.deephaven.configuration.Configuration; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetBuilderSequential; -import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.rowset.RowSetShiftData; -import io.deephaven.engine.rowset.WritableRowSet; +import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.impl.ExternalizableRowSetUtils; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.util.BarrageMessage; import io.deephaven.extensions.barrage.BarragePerformanceLog; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.BarrageSnapshotOptions; +import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; import io.deephaven.extensions.barrage.util.BarrageProtoUtil.ExposedByteArrayOutputStream; import io.deephaven.extensions.barrage.util.BarrageUtil; @@ -39,6 +40,7 @@ import io.deephaven.proto.flight.util.MessageHelper; import io.deephaven.util.SafeCloseable; import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.util.datastructures.SizeException; import io.grpc.Drainable; import org.apache.arrow.flatbuf.Buffer; import org.apache.arrow.flatbuf.FieldNode; @@ -54,12 +56,10 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.BitSet; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.function.Consumer; +import static io.deephaven.engine.table.impl.remote.ConstructSnapshot.SNAPSHOT_CHUNK_SIZE; import static io.deephaven.extensions.barrage.chunk.BaseChunkInputStreamGenerator.PADDING_BUFFER; public class BarrageStreamGenerator implements @@ -70,6 +70,14 @@ public class BarrageStreamGenerator implements private static final int DEFAULT_BATCH_SIZE = Configuration.getInstance() .getIntegerForClassWithDefault(BarrageStreamGenerator.class, "batchSize", Integer.MAX_VALUE); + // defaults to a small value that is likely to succeed and provide data for following batches + private static final int DEFAULT_INITIAL_BATCH_SIZE = Configuration.getInstance() + .getIntegerForClassWithDefault(BarrageStreamGenerator.class, "initialBatchSize", 4096); + + // default to 100MB to match 100MB java-client and w2w default incoming limits + private static final int DEFAULT_MESSAGE_SIZE_LIMIT = Configuration.getInstance() + .getIntegerForClassWithDefault(BarrageStreamGenerator.class, "maxOutboundMessageSize", 100 * 1024 * 1024); + public interface View { void forEachStream(Consumer visitor) throws IOException; @@ -105,14 +113,51 @@ public View getSchemaView(final TableDefinition table, } } + public static class ChunkListInputStreamGenerator implements SafeCloseable { + public ChunkInputStreamGenerator[] generators; + public ChunkInputStreamGenerator emptyGenerator; + + ChunkListInputStreamGenerator(BarrageMessage.AddColumnData acd) { + // create an input stream generator for each chunk + generators = new ChunkInputStreamGenerator[acd.data.size()]; + + for (int i = 0; i < acd.data.size(); ++i) { + generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator( + acd.data.get(i).getChunkType(), acd.type, acd.componentType, acd.data.get(i)); + } + emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( + acd.chunkType, acd.type, acd.componentType, acd.chunkType.getEmptyChunk()); + } + + ChunkListInputStreamGenerator(BarrageMessage.ModColumnData mcd) { + // create an input stream generator for each chunk + generators = new ChunkInputStreamGenerator[mcd.data.size()]; + + for (int i = 0; i < mcd.data.size(); ++i) { + generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator( + mcd.chunkType, mcd.type, mcd.componentType, mcd.data.get(i)); + } + emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( + mcd.chunkType, mcd.type, mcd.componentType, mcd.chunkType.getEmptyChunk()); + } + + @Override + public void close() { + for (int i = 0; i < generators.length; i++) { + generators[i].close(); + generators[i] = null; + } + emptyGenerator.close(); + } + } + public static class ModColumnData { public final RowSetGenerator rowsModified; - public final ChunkInputStreamGenerator data; + public final ChunkListInputStreamGenerator data; ModColumnData(final BarrageMessage.ModColumnData col) throws IOException { rowsModified = new RowSetGenerator(col.rowsModified); - data = ChunkInputStreamGenerator.makeInputStreamGenerator( - col.data.getChunkType(), col.type, col.componentType, col.data); + data = new ChunkListInputStreamGenerator(col); } } @@ -130,7 +175,8 @@ public static class ModColumnData { public final RowSetGenerator rowsRemoved; public final RowSetShiftDataGenerator shifted; - public final ChunkInputStreamGenerator[] addColumnData; + public final ChunkListInputStreamGenerator[] addColumnData; + public int addGeneratorCount = 0; public final ModColumnData[] modColumnData; /** @@ -154,12 +200,13 @@ public BarrageStreamGenerator(final BarrageMessage message, rowsRemoved = new RowSetGenerator(message.rowsRemoved); shifted = new RowSetShiftDataGenerator(message.shifted); - addColumnData = new ChunkInputStreamGenerator[message.addColumnData.length]; + addColumnData = new ChunkListInputStreamGenerator[message.addColumnData.length]; + for (int i = 0; i < message.addColumnData.length; ++i) { - final BarrageMessage.AddColumnData acd = message.addColumnData[i]; - addColumnData[i] = ChunkInputStreamGenerator.makeInputStreamGenerator( - acd.data.getChunkType(), acd.type, acd.componentType, acd.data); + addColumnData[i] = new ChunkListInputStreamGenerator(message.addColumnData[i]); + addGeneratorCount = Math.max(addGeneratorCount, addColumnData[i].generators.length); } + modColumnData = new ModColumnData[message.modColumnData.length]; for (int i = 0; i < modColumnData.length; ++i) { modColumnData[i] = new ModColumnData(message.modColumnData[i]); @@ -189,7 +236,7 @@ public void close() { rowsRemoved.close(); if (addColumnData != null) { - for (final ChunkInputStreamGenerator in : addColumnData) { + for (final ChunkListInputStreamGenerator in : addColumnData) { in.close(); } } @@ -243,8 +290,8 @@ public static class SubView implements View { public final boolean reverseViewport; public final RowSet keyspaceViewport; public final BitSet subscribedColumns; - public final long numAddBatches; - public final long numModBatches; + public final long numAddRows; + public final long numModRows; public final RowSet addRowOffsets; public final RowSet addRowKeys; public final RowSet[] modRowOffsets; @@ -264,8 +311,6 @@ public SubView(final BarrageStreamGenerator generator, this.keyspaceViewport = keyspaceViewport; this.subscribedColumns = subscribedColumns; - final int batchSize = batchSize(); - if (keyspaceViewport != null) { this.modRowOffsets = new WritableRowSet[generator.modColumnData.length]; } else { @@ -286,7 +331,7 @@ public SubView(final BarrageStreamGenerator generator, numModRows = Math.max(numModRows, mcd.rowsModified.original.size()); } } - numModBatches = (numModRows + batchSize - 1) / batchSize; + this.numModRows = numModRows; if (keyspaceViewport != null) { addRowKeys = keyspaceViewport.intersect(generator.rowsIncluded.original); @@ -300,36 +345,38 @@ public SubView(final BarrageStreamGenerator generator, addRowOffsets = RowSetFactory.flat(generator.rowsAdded.original.size()); } - // require an add batch if there are no mod batches - final long needsAddBatch = this.numModBatches == 0 ? 1 : 0; - numAddBatches = Math.max(needsAddBatch, (addRowOffsets.size() + batchSize - 1) / batchSize); + this.numAddRows = addRowOffsets.size(); } @Override public void forEachStream(Consumer visitor) throws IOException { final long startTm = System.nanoTime(); - long bytesWritten = 0; ByteBuffer metadata = generator.getSubscriptionMetadata(this); - long offset = 0; - final long batchSize = batchSize(); - for (long ii = 0; ii < numAddBatches; ++ii) { - final InputStream is = generator.getInputStream( - this, offset, offset + batchSize, metadata, generator::appendAddColumns); - bytesWritten += is.available(); - visitor.accept(is); - offset += batchSize; - metadata = null; - } - offset = 0; - for (long ii = 0; ii < numModBatches; ++ii) { + MutableLong bytesWritten = new MutableLong(0L); + + // batch size is maximum, will write fewer rows when needed + int maxBatchSize = batchSize(); + + final MutableInt actualBatchSize = new MutableInt(); + + if (numAddRows == 0 && numModRows == 0) { + // we still need to send a message containing metadata when there are no rows final InputStream is = generator.getInputStream( - this, offset, offset + batchSize, metadata, generator::appendModColumns); - bytesWritten += is.available(); + this, 0, 0, actualBatchSize, metadata, generator::appendAddColumns); + bytesWritten.add(is.available()); visitor.accept(is); - offset += batchSize; - metadata = null; + generator.writeConsumer.onWrite(bytesWritten.longValue(), System.nanoTime() - startTm); + return; } + // send the add batches (if any) + generator.processBatches(visitor, this, numAddRows, maxBatchSize, metadata, generator::appendAddColumns, + bytesWritten); + + // send the mod batches (if any) but don't send metadata twice + generator.processBatches(visitor, this, numModRows, maxBatchSize, numAddRows > 0 ? null : metadata, + generator::appendModColumns, bytesWritten); + // clean up the helper indexes addRowOffsets.close(); addRowKeys.close(); @@ -338,7 +385,7 @@ public void forEachStream(Consumer visitor) throws IOException { modViewport.close(); } } - generator.writeConsumer.onWrite(bytesWritten, System.nanoTime() - startTm); + generator.writeConsumer.onWrite(bytesWritten.longValue(), System.nanoTime() - startTm); } private int batchSize() { @@ -407,7 +454,7 @@ public static class SnapshotView implements View { public final boolean reverseViewport; public final RowSet keyspaceViewport; public final BitSet subscribedColumns; - public final long numAddBatches; + public final long numAddRows; public final RowSet addRowOffsets; public SnapshotView(final BarrageStreamGenerator generator, @@ -424,8 +471,6 @@ public SnapshotView(final BarrageStreamGenerator generator, this.keyspaceViewport = keyspaceViewport; this.subscribedColumns = subscribedColumns; - final int batchSize = batchSize(); - // precompute add row offsets if (keyspaceViewport != null) { try (WritableRowSet intersect = keyspaceViewport.intersect(generator.rowsIncluded.original)) { @@ -436,27 +481,29 @@ public SnapshotView(final BarrageStreamGenerator generator, } // require a batch to at least send the metadata - numAddBatches = Math.max(1, (addRowOffsets.size() + batchSize - 1) / batchSize); + numAddRows = addRowOffsets.size(); } @Override public void forEachStream(Consumer visitor) throws IOException { final long startTm = System.nanoTime(); - long bytesWritten = 0; ByteBuffer metadata = generator.getSnapshotMetadata(this); - long offset = 0; - final long batchSize = batchSize(); - for (long ii = 0; ii < numAddBatches; ++ii) { - final InputStream is = generator.getInputStream( - this, offset, offset + batchSize, metadata, generator::appendAddColumns); - bytesWritten += is.available(); - visitor.accept(is); - offset += batchSize; - metadata = null; + MutableLong bytesWritten = new MutableLong(0L); + + // batch size is maximum, will write fewer rows when needed + int maxBatchSize = batchSize(); + final MutableInt actualBatchSize = new MutableInt(); + if (numAddRows == 0) { + // we still need to send a message containing metadata when there are no rows + visitor.accept(generator.getInputStream( + this, 0, 0, actualBatchSize, metadata, generator::appendAddColumns)); + } else { + // send the add batches + generator.processBatches(visitor, this, numAddRows, maxBatchSize, metadata, generator::appendAddColumns, + bytesWritten); } - addRowOffsets.close(); - generator.writeConsumer.onWrite(bytesWritten, System.nanoTime() - startTm); + generator.writeConsumer.onWrite(bytesWritten.longValue(), System.nanoTime() - startTm); } private int batchSize() { @@ -526,24 +573,27 @@ public RowSet modRowOffsets(int col) { @FunctionalInterface private interface ColumnVisitor { - long visit(final View view, final long startRange, final long endRange, + long visit(final View view, final long startRange, final int targetBatchSize, final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException; } /** - * Returns an InputStream of the message filtered to the viewport. + * Returns an InputStream of the message filtered to the viewport. This function accepts `targetBatchSize` but may + * actually write fewer rows than the target (when crossing an internal chunk boundary, e.g.) * * @param view the view of the overall chunk to generate a RecordBatch for - * @param startRange the start of the batch in position space w.r.t. the view (inclusive) - * @param endRange the end of the batch in position space w.r.t. the view (exclusive) + * @param offset the start of the batch in position space w.r.t. the view (inclusive) + * @param targetBatchSize the target (and maximum) batch size to use for this message + * @param actualBatchSize the number of rows actually sent in this batch (will be <= targetBatchSize) * @param metadata the optional flight data metadata to attach to the message * @param columnVisitor the helper method responsible for appending the payload columns to the RecordBatch * @return an InputStream ready to be drained by GRPC */ - private InputStream getInputStream(final View view, final long startRange, final long endRange, - final ByteBuffer metadata, final ColumnVisitor columnVisitor) throws IOException { + private InputStream getInputStream(final View view, final long offset, final int targetBatchSize, + final MutableInt actualBatchSize, final ByteBuffer metadata, final ColumnVisitor columnVisitor) + throws IOException { final ArrayDeque streams = new ArrayDeque<>(); final MutableInt size = new MutableInt(); @@ -595,7 +645,9 @@ private InputStream getInputStream(final View view, final long startRange, final bufferInfos.ensureCapacityPreserve(bufferInfos.get().size() + 1); bufferInfos.get().add(length); }; - numRows = columnVisitor.visit(view, startRange, endRange, addStream, fieldNodeListener, bufferListener); + + numRows = columnVisitor.visit(view, offset, targetBatchSize, addStream, fieldNodeListener, bufferListener); + actualBatchSize.setValue(numRows); final WritableChunk noChunk = nodeOffsets.get(); RecordBatch.startNodesVector(header, noChunk.size()); @@ -660,31 +712,112 @@ private static int createByteVector(final FlatBufferBuilder builder, final byte[ return builder.endVector(); } - private long appendAddColumns(final View view, final long startRange, final long endRange, - final Consumer addStream, - final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, - final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { + private void processBatches(Consumer visitor, final View view, + final long numRows, final int maxBatchSize, ByteBuffer metadata, + final ColumnVisitor columnVisitor, final MutableLong bytesWritten) throws IOException { + long offset = 0; + MutableInt actualBatchSize = new MutableInt(); + int batchSize = DEFAULT_INITIAL_BATCH_SIZE; + + while (offset < numRows) { + try { + final InputStream is = getInputStream( + view, offset, batchSize, actualBatchSize, metadata, columnVisitor); + int bytesToWrite = is.available(); + + // treat this as a hard limit, exceeding fails a client or w2w (unless we are sending a single + // row then we must send and let it potentially fail) + if (bytesToWrite < DEFAULT_MESSAGE_SIZE_LIMIT || batchSize == 1) { + // let's write the data + visitor.accept(is); + + bytesWritten.add(bytesToWrite); + offset += actualBatchSize.intValue(); + metadata = null; + } else { + // can't write this, so close the input stream and retry + is.close(); + } + // recompute the batch limit for the next message + int bytesPerRow = bytesToWrite / actualBatchSize.intValue(); + if (bytesPerRow > 0) { + int rowLimit = DEFAULT_MESSAGE_SIZE_LIMIT / bytesPerRow; - try (final RowSet myAddedOffsets = view.addRowOffsets().subSetByPositionRange(startRange, endRange)) { - // add the add-column streams - for (final ChunkInputStreamGenerator col : addColumnData) { - final ChunkInputStreamGenerator.DrainableColumn drainableColumn = - col.getInputStream(view.options(), myAddedOffsets); - drainableColumn.visitFieldNodes(fieldNodeListener); - drainableColumn.visitBuffers(bufferListener); + // add some margin for abnormal cell contents + batchSize = Math.min(maxBatchSize, Math.max(1, (int) ((double) rowLimit * 0.9))); + } + } catch (SizeException ex) { + // was an overflow in the ChunkInputStream generator (probably VarBinary). We can't compute the + // correct number of rows from this failure, so cut batch size in half and try again. This may + // occur multiple times until the size is restricted properly + if (batchSize == 1) { + // this row exceeds internal limits and can never be sent + throw (new UncheckedDeephavenException( + "BarrageStreamGenerator - single row (" + offset + ") exceeds transmissible size", ex)); + } + batchSize = Math.max(1, batchSize / 2); + } + } + } - // Add the drainable last as it is allowed to immediately close a row set the visitors need - addStream.accept(drainableColumn); + private long appendAddColumns(final View view, final long startRange, final int targetBatchSize, + final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, + final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { + long endRange = startRange + targetBatchSize; + int chunkIdx = 0; + if (addGeneratorCount > 0) { + // identify the chunk that holds this startRange (NOTE: will be same for all columns) + chunkIdx = (int) (startRange / SNAPSHOT_CHUNK_SIZE); + // verify the selected chunk index is valid + Assert.assertion(chunkIdx >= 0 && chunkIdx < addGeneratorCount, "appendAddColumns - chunk lookup failed"); + // adjust the batch size if we would cross a chunk boundary + endRange = Math.min((long) (chunkIdx + 1) * SNAPSHOT_CHUNK_SIZE, endRange); + } + try (final WritableRowSet myAddedOffsets = view.addRowOffsets().subSetByPositionRange(startRange, endRange); + final RowSet adjustedOffsets = + myAddedOffsets.shift((long) chunkIdx * -SNAPSHOT_CHUNK_SIZE)) { + // every column must write to the stream + for (final ChunkListInputStreamGenerator data : addColumnData) { + if (myAddedOffsets.isEmpty() || data.generators.length == 0) { + // use an empty generator to publish the column data + try (final RowSet empty = RowSetFactory.empty()) { + final ChunkInputStreamGenerator.DrainableColumn drainableColumn = + data.emptyGenerator.getInputStream(view.options(), empty); + drainableColumn.visitFieldNodes(fieldNodeListener); + drainableColumn.visitBuffers(bufferListener); + + // Add the drainable last as it is allowed to immediately close a row set the visitors need + addStream.accept(drainableColumn); + } + } else { + final ChunkInputStreamGenerator generator = data.generators[chunkIdx]; + final ChunkInputStreamGenerator.DrainableColumn drainableColumn = + generator.getInputStream(view.options(), adjustedOffsets); + drainableColumn.visitFieldNodes(fieldNodeListener); + drainableColumn.visitBuffers(bufferListener); + // Add the drainable last as it is allowed to immediately close a row set the visitors need + addStream.accept(drainableColumn); + } } return myAddedOffsets.size(); } } - private long appendModColumns(final View view, final long startRange, final long endRange, + private long appendModColumns(final View view, final long startRange, final int targetBatchSize, final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { + long endRange = startRange + targetBatchSize; + int[] columnChunkIdx = new int[modColumnData.length]; + // for each column identify the chunk that holds this startRange + for (int ii = 0; ii < modColumnData.length; ++ii) { + final ModColumnData mcd = modColumnData[ii]; + int chunkIdx = (int) (startRange / SNAPSHOT_CHUNK_SIZE); + // adjust the batch size if we would cross a chunk boundary + endRange = Math.min((long) (chunkIdx + 1) * SNAPSHOT_CHUNK_SIZE, endRange); + columnChunkIdx[ii] = chunkIdx; + } // now add mod-column streams, and write the mod column indexes long numRows = 0; for (int ii = 0; ii < modColumnData.length; ++ii) { @@ -704,16 +837,31 @@ private long appendModColumns(final View view, final long startRange, final long } } numRows = Math.max(numRows, myModOffsets.size()); - try { - final ChunkInputStreamGenerator.DrainableColumn drainableColumn = - mcd.data.getInputStream(view.options(), myModOffsets); - - drainableColumn.visitFieldNodes(fieldNodeListener); - drainableColumn.visitBuffers(bufferListener); - - // See comment in appendAddColumns - addStream.accept(drainableColumn); + if (myModOffsets.isEmpty() || mcd.data.generators.length == 0) { + // use the empty generator to publish the column data + try (final RowSet empty = RowSetFactory.empty()) { + final ChunkInputStreamGenerator.DrainableColumn drainableColumn = + mcd.data.emptyGenerator.getInputStream(view.options(), empty); + drainableColumn.visitFieldNodes(fieldNodeListener); + drainableColumn.visitBuffers(bufferListener); + // Add the drainable last as it is allowed to immediately close a row set the visitors need + addStream.accept(drainableColumn); + } + } else { + final int chunkIdx = columnChunkIdx[ii]; + final ChunkInputStreamGenerator generator = mcd.data.generators[chunkIdx]; + // normalize to the chunk offsets + try (final WritableRowSet adjustedOffsets = + myModOffsets.shift((long) chunkIdx * -SNAPSHOT_CHUNK_SIZE)) { + final ChunkInputStreamGenerator.DrainableColumn drainableColumn = + generator.getInputStream(view.options(), adjustedOffsets); + drainableColumn.visitFieldNodes(fieldNodeListener); + drainableColumn.visitBuffers(bufferListener); + // Add the drainable last as it is allowed to immediately close a row set the visitors need + addStream.accept(drainableColumn); + } + } } finally { myModOffsets.close(); } @@ -775,10 +923,10 @@ private ByteBuffer getSubscriptionMetadata(final SubView view) throws IOExceptio final int nodesOffset = metadata.endVector(); BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); - BarrageUpdateMetadata.addNumAddBatches(metadata, - LongSizedDataStructure.intSize("BarrageStreamGenerator", view.numAddBatches)); - BarrageUpdateMetadata.addNumModBatches(metadata, - LongSizedDataStructure.intSize("BarrageStreamGenerator", view.numModBatches)); + BarrageUpdateMetadata.addNumAddBatches(metadata, LongSizedDataStructure.intSize("BarrageStreamGenerator", + (view.numAddRows + view.batchSize() - 1) / view.batchSize())); + BarrageUpdateMetadata.addNumModBatches(metadata, LongSizedDataStructure.intSize("BarrageStreamGenerator", + (view.numModRows + view.batchSize() - 1) / view.batchSize())); BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); @@ -830,8 +978,8 @@ private ByteBuffer getSnapshotMetadata(final SnapshotView view) throws IOExcepti } BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); - BarrageUpdateMetadata.addNumAddBatches(metadata, - LongSizedDataStructure.intSize("BarrageStreamGenerator", view.numAddBatches)); + BarrageUpdateMetadata.addNumAddBatches(metadata, LongSizedDataStructure.intSize("BarrageStreamGenerator", + (view.numAddRows + view.batchSize() - 1) / view.batchSize())); BarrageUpdateMetadata.addNumModBatches(metadata, 0); BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); @@ -1044,12 +1192,12 @@ public int drainTo(final OutputStream outputStream) throws IOException { } @Override - public int available() throws IOException { + public int available() throws SizeException, IOException { int total = 0; for (final InputStream stream : streams) { total += stream.available(); if (total < 0) { - throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE"); + throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total); } } return total;