Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support very large tables #2287

Merged
merged 61 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
4c744ca
working for data tests, failing others
lbooker42 Mar 3, 2022
c8b4bc9
spotless checkin
lbooker42 Mar 4, 2022
c006ab3
woohoo, _all_ tests passing
lbooker42 Mar 4, 2022
554cff1
time slicing added (beta code)
lbooker42 Mar 8, 2022
37a25cf
wip, BarrageSubscriptionImpl has blocking subscription
lbooker42 Mar 9, 2022
ea2e87f
spotless apply
lbooker42 Mar 9, 2022
4b0623a
updated BarrageSubscriptionImpl to block while subscription grows
lbooker42 Mar 9, 2022
b04dbb0
minor fix
lbooker42 Mar 9, 2022
9223c7a
cosmetic fixes
lbooker42 Mar 9, 2022
9999fc9
merged with upstream batching code
lbooker42 Mar 14, 2022
8000923
tests pass except testAllUniqueXXX
lbooker42 Mar 17, 2022
ac7cae2
all tests passing
lbooker42 Mar 21, 2022
bf79c58
nearly complete
lbooker42 Mar 21, 2022
529701c
Minor changes, fixed a memory leak in buildPostSnapshotViewports
lbooker42 Mar 22, 2022
811e3c7
Merge branch 'deephaven:main' into lab-grow-viewport
lbooker42 Mar 22, 2022
8b89c4a
Merge branch 'lab-grow-viewport' of github.com:lbooker42/deephaven-co…
lbooker42 Mar 22, 2022
9391ad5
Attempted to avoid a deadlock
lbooker42 Mar 22, 2022
df25d40
most comments done, tests still passing
lbooker42 Mar 30, 2022
ac6308b
final comments addressed
lbooker42 Mar 30, 2022
9460bac
spotless checks
lbooker42 Mar 30, 2022
b628654
minor change to BarrageUtils
lbooker42 Mar 31, 2022
e9e9421
final PR comments addressed
lbooker42 Mar 31, 2022
6a12d57
added tracking for addRowKeys
lbooker42 Mar 31, 2022
5ae5cfc
docs and PR comments
lbooker42 Apr 5, 2022
19a5b7e
Merge branch 'deephaven:main' into lab-grow-viewport
lbooker42 Apr 6, 2022
34eb31b
improved listener visibility in example
lbooker42 Apr 6, 2022
1f04db8
updated documentation
lbooker42 Apr 7, 2022
a0fa9ee
spotless apply
lbooker42 Apr 7, 2022
02cb7b8
initial push
lbooker42 Apr 19, 2022
9c5db55
updated architecture, removed ChunkList structure
lbooker42 Apr 19, 2022
2dc78b7
merged with main
lbooker42 Apr 19, 2022
5d1ece2
Merge branch 'deephaven:main' into lab-large-tables
lbooker42 Apr 20, 2022
7d8b737
passes tests
lbooker42 Apr 21, 2022
3a64e9c
spotless applied
lbooker42 Apr 21, 2022
8d6606e
merged upstream
lbooker42 Apr 21, 2022
33b75ba
addressed many PR comments
lbooker42 Apr 21, 2022
68ebcc9
fixed bug in VarBinary stream generator
lbooker42 Apr 21, 2022
8d8c167
fixed bug in BSG that wrote multiple column data
lbooker42 Apr 22, 2022
238226b
corrected bug in BSG column generation
lbooker42 Apr 22, 2022
1b52162
output message size enforcing
lbooker42 Apr 22, 2022
cdd15bb
renamed config item to be consisten with others
lbooker42 Apr 22, 2022
e5c4ac7
better comments
lbooker42 Apr 25, 2022
55e2a5d
Merge branch 'deephaven:main' into lab-large-tables
lbooker42 Apr 25, 2022
02c1471
PR comments partially addressed
lbooker42 Apr 29, 2022
cfdba3c
spotless applied
lbooker42 Apr 29, 2022
275d1d8
fixed bug
lbooker42 Apr 29, 2022
fb6c561
tests passing, still work to do
lbooker42 Apr 29, 2022
bf509e5
updated ByteStorage class to extend OutputStream
lbooker42 May 2, 2022
4d19767
PR comments addressed, ready for review
lbooker42 May 2, 2022
0667ad0
reverted the outbound message limit to 100 (from 99) MB
lbooker42 May 2, 2022
88fbb61
merge conflicts resolved
lbooker42 May 3, 2022
3bea459
Merge branch 'deephaven:main' into lab-large-tables
lbooker42 May 12, 2022
4e7a041
Merge branch 'deephaven:main' into lab-large-tables
lbooker42 May 12, 2022
e2e188f
finishing up PR
lbooker42 May 12, 2022
bf76b45
consolidation of constants
lbooker42 May 12, 2022
2a8434b
spotless and exception message correction
lbooker42 May 12, 2022
2d0836d
bug fix in VarBinaryChunkInputStreamGenerator.java'
lbooker42 May 12, 2022
07d2218
Merge branch 'deephaven:main' into lab-large-tables
lbooker42 May 17, 2022
41f9166
PR comments addressed
lbooker42 May 17, 2022
d7d890d
Merge branch 'nightly/lab-large-tables' into lab-large-tables
lbooker42 May 17, 2022
6228a5f
minor edit of comment
lbooker42 May 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.deephaven.chunk;

import io.deephaven.chunk.attributes.Any;

import java.io.IOException;
import java.io.OutputStream;

public class ByteChunkToOutputStreamAdapter {
public static void write(OutputStream stream, ByteChunk<? extends Any> chunk, int srcOffset, int length) throws IOException {
stream.write(chunk.data, chunk.offset + srcOffset, length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.util.pools.ChunkPoolConstants;
import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.rowset.*;
Expand Down Expand Up @@ -79,6 +80,8 @@ public NoSnapshotAllowedException(String reason) {
private static final int MAX_CONCURRENT_ATTEMPT_DURATION_MILLIS = Configuration.getInstance()
.getIntegerWithDefault("ConstructSnapshot.maxConcurrentAttemptDurationMillis", 5000);

public static final int SNAPSHOT_CHUNK_SIZE = 1 << ChunkPoolConstants.LARGEST_POOLED_CHUNK_LOG2_CAPACITY;

/**
* Holder for thread-local state.
*/
Expand Down Expand Up @@ -1316,8 +1319,6 @@ public static boolean serializeAllTable(final boolean usePrev,
snapshot.rowsIncluded = snapshot.rowsAdded.copy();
}

LongSizedDataStructure.intSize("construct snapshot", snapshot.rowsIncluded.size());

final Map<String, ? extends ColumnSource> sourceMap = table.getColumnSourceMap();
final String[] columnSources = sourceMap.keySet().toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY);

Expand All @@ -1340,16 +1341,19 @@ public static boolean serializeAllTable(final boolean usePrev,
final RowSet rows = columnIsEmpty ? RowSetFactory.empty() : snapshot.rowsIncluded;
// Note: cannot use shared context across several calls of differing lengths and no sharing necessary
// when empty
acd.data = getSnapshotDataAsChunk(columnSource, columnIsEmpty ? null : sharedContext, rows, usePrev);
acd.data =
getSnapshotDataAsChunkList(columnSource, columnIsEmpty ? null : sharedContext, rows, usePrev);
acd.type = columnSource.getType();
acd.componentType = columnSource.getComponentType();
acd.chunkType = columnSource.getChunkType();

final BarrageMessage.ModColumnData mcd = new BarrageMessage.ModColumnData();
snapshot.modColumnData[ii] = mcd;
mcd.rowsModified = RowSetFactory.empty();
mcd.data = getSnapshotDataAsChunk(columnSource, null, RowSetFactory.empty(), usePrev);
mcd.data = getSnapshotDataAsChunkList(columnSource, null, RowSetFactory.empty(), usePrev);
mcd.type = acd.type;
mcd.componentType = acd.componentType;
mcd.chunkType = columnSource.getChunkType();
}
}

Expand Down Expand Up @@ -1430,6 +1434,52 @@ private static <T> WritableChunk<Values> getSnapshotDataAsChunk(final ColumnSour
}
}

private static <T> ArrayList<Chunk<Values>> getSnapshotDataAsChunkList(final ColumnSource<T> columnSource,
final SharedContext sharedContext, final RowSet rowSet, final boolean usePrev) {
final ColumnSource<?> sourceToUse = ReinterpretUtils.maybeConvertToPrimitive(columnSource);
long offset = 0;
final long size = rowSet.size();
final ArrayList<Chunk<Values>> result = new ArrayList<>();

if (size == 0) {
return result;
}

final int maxChunkSize = (int) Math.min(size, SNAPSHOT_CHUNK_SIZE);

try (final ColumnSource.FillContext context = sourceToUse.makeFillContext(maxChunkSize, sharedContext);
final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) {
int chunkSize = maxChunkSize;
while (it.hasMore()) {
final RowSequence reducedRowSet = it.getNextRowSequenceWithLength(chunkSize);
final ChunkType chunkType = sourceToUse.getChunkType();

// create a new chunk
WritableChunk<Values> currentChunk = chunkType.makeWritableChunk(chunkSize);

if (usePrev) {
sourceToUse.fillPrevChunk(context, currentChunk, reducedRowSet);
} else {
sourceToUse.fillChunk(context, currentChunk, reducedRowSet);
}

// add the chunk to the current list
result.add(currentChunk);

// increment the offset for the next chunk (using the actual values written)
offset += currentChunk.size();

// recompute the size of the next chunk
if (size - offset > maxChunkSize) {
chunkSize = maxChunkSize;
} else {
chunkSize = (int) (size - offset);
}
}
}
return result;
}

/**
* Estimate the size of a complete table snapshot in bytes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package io.deephaven.engine.table.impl.util;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.util.SafeCloseable;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

Expand All @@ -28,13 +30,15 @@ public static class ModColumnData {
public RowSet rowsModified;
public Class<?> type;
public Class<?> componentType;
public Chunk<Values> data;
public ArrayList<Chunk<Values>> data;
public ChunkType chunkType;
}

public static class AddColumnData {
public Class<?> type;
public Class<?> componentType;
public Chunk<Values> data;
public ArrayList<Chunk<Values>> data;
public ChunkType chunkType;
}

public long firstSeq = -1;
Expand Down Expand Up @@ -94,9 +98,13 @@ public void close() {
continue;
}

if (acd.data instanceof PoolableChunk) {
((PoolableChunk) acd.data).close();
for (Chunk<Values> chunk : acd.data) {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
}

acd.data.clear();
}
}
if (modColumnData != null) {
Expand All @@ -105,12 +113,13 @@ public void close() {
continue;
}

if (mcd.rowsModified != null) {
mcd.rowsModified.close();
}
if (mcd.data instanceof PoolableChunk) {
((PoolableChunk) mcd.data).close();
for (Chunk<Values> chunk : mcd.data) {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
}

mcd.data.clear();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2404,8 +2404,10 @@ public void testUngroupConstructSnapshotOfBoxedNull() {

try (final BarrageMessage snap = ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable) ungrouped)) {
assertEquals(snap.rowsAdded, i(0, 1, 2));
assertEquals(snap.addColumnData[0].data.asIntChunk().get(0), io.deephaven.util.QueryConstants.NULL_INT);
assertEquals(snap.addColumnData[1].data.asIntChunk().get(2), io.deephaven.util.QueryConstants.NULL_INT);
assertEquals(snap.addColumnData[0].data.get(0).asIntChunk().get(0),
io.deephaven.util.QueryConstants.NULL_INT);
assertEquals(snap.addColumnData[1].data.get(0).asIntChunk().get(2),
io.deephaven.util.QueryConstants.NULL_INT);
}
}

Expand Down
Loading