Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Barrage Refactor Read/Write Chunk Factories #6065

Open
wants to merge 75 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
28b25d1
Barrage: Refactor Read/Write Chunk Factories
nbauernfeind Sep 16, 2024
8876208
Ryan's feedback and bug fixes
nbauernfeind Sep 16, 2024
27df3d3
dunno .. stuf
nbauernfeind Oct 16, 2024
1852b0a
Restore initial commit from grpc-java, plus a few local changes
niloc132 Oct 21, 2024
68b925a
Guard writing payload as hex if FINEST is enabled
niloc132 Oct 21, 2024
e88c47e
Apply upstream "Fix AsyncServletOutputStreamWriterConcurrencyTest
niloc132 Oct 21, 2024
f9a19fc
Apply upstream "Avoid flushing headers when the server returns a single
niloc132 Oct 21, 2024
4733524
Apply upstream "servlet: introduce ServletServerBuilder.buildServlet()"
niloc132 Oct 21, 2024
06e63ec
Bump grpc vers, add inprocess dep for tests
niloc132 Oct 21, 2024
09ade64
Merge branch 'main' into grpc-history-replay
niloc132 Oct 28, 2024
c8af47c
Apply https://github.com/deephaven/deephaven-core/pull/6301
niloc132 Oct 28, 2024
57c8008
Bump to 1.65.1 to better match arrow 18
niloc132 Nov 1, 2024
cbf8ab2
Merge remote-tracking branch 'colin/grpc-history-replay' into vp_simp…
nbauernfeind Nov 6, 2024
85f604f
Version Upgrades; MavenLocal
nbauernfeind Nov 6, 2024
70a0207
Implement Simplified Viewport Table Updates in BMP/BT
nbauernfeind Nov 8, 2024
0089d62
Ryan's Synchronous Review
nbauernfeind Nov 9, 2024
485746d
Merge remote-tracking branch 'upstream/main' into vp_simplification
nbauernfeind Nov 9, 2024
ad8de73
Remove SNAPSHOT version and mavenLocal references
nbauernfeind Nov 11, 2024
02ce2ad
Fixes removed/added rows in most VP cases
nbauernfeind Nov 12, 2024
da23e2b
Bug fixes around viewport snapshot rowsRemoved and rowsAdded
nbauernfeind Nov 12, 2024
299f56e
Bugfix for correct growing VP logic
nbauernfeind Nov 12, 2024
9d6f389
remaining java side fixes
nbauernfeind Nov 13, 2024
fd5aced
Ryan's feedback on javaserver/client impls
nbauernfeind Nov 14, 2024
53b1eed
Inline Feedback from VC w/Ryan
nbauernfeind Nov 14, 2024
6e7fe94
Do not propagate modifies for any repainted rows
nbauernfeind Nov 14, 2024
d568eb7
Minor cleanup from personal review
nbauernfeind Nov 14, 2024
6653ca6
Ryan's feedback latest round.
nbauernfeind Nov 14, 2024
44cdf93
jsAPI mostly complete; looking for tree table issue
nbauernfeind Nov 15, 2024
d549d79
Fixes for jsapi and HierarchicalTable
nbauernfeind Nov 15, 2024
b4d5b69
Lazily compute rowset encoding
nbauernfeind Nov 15, 2024
6c12314
Fixup jsapi tests
nbauernfeind Nov 15, 2024
f9be6e5
Quick round feedback
nbauernfeind Nov 15, 2024
4252622
spotless
nbauernfeind Nov 15, 2024
2767def
Double checked locking fixes
nbauernfeind Nov 15, 2024
78c4cb7
Ryan's final review
nbauernfeind Nov 15, 2024
ea6f898
Clarify strategy on who owns RowSets passed into getSubView
nbauernfeind Nov 15, 2024
3eeb628
npe fix
nbauernfeind Nov 15, 2024
84a6100
Bugfix if HT is empty or viewport past end of table
nbauernfeind Nov 16, 2024
476ae65
Colin's feedback
nbauernfeind Nov 16, 2024
738cb11
Limit jsapi data change event to prev and curr table sizes
nbauernfeind Nov 16, 2024
38f320c
Merge branch 'vp_simplification' into barrage_types
nbauernfeind Nov 16, 2024
7a351a2
Merge compilation fixes
nbauernfeind Nov 16, 2024
58d34e2
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Nov 16, 2024
68f080b
Fix for #5258
nbauernfeind Nov 16, 2024
0f088c8
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Nov 18, 2024
ea513bb
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Nov 18, 2024
6526a9f
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Nov 21, 2024
66261b0
tmp some fixes
nbauernfeind Nov 21, 2024
bf9495c
tmp disable cpp test
nbauernfeind Nov 21, 2024
fe02980
broken map support
nbauernfeind Nov 25, 2024
c9e8fc6
fix: Ensure that rollup and tree snapshot tests succeed reliably (#6407)
rcaudy Nov 21, 2024
e26d6f1
fix: return type for IcebergCatalogAdapter::load_table in python (#6408)
malhotrashivam Nov 21, 2024
2ee3dca
fix: Close Jetty h2 streams with RST_STREAM and no error code (#6401)
niloc132 Nov 21, 2024
1deee97
fix: Apply auto-locking in time_window() (#6411)
jmao-denver Nov 21, 2024
c31c48a
ci: Add cpwright to CODEOWNERS for /py, protos, gwt, and function lib…
cpwright Nov 21, 2024
3b13fc5
chore: Update projects that use Arrow 18 to require Java 11 (#6417)
niloc132 Nov 22, 2024
2222fd9
Add sketch of CommandGetSqlInfo
devinrsmith Nov 22, 2024
6a9be96
checkpoint
nbauernfeind Nov 29, 2024
4ba36f0
Fix #6216
nbauernfeind Nov 29, 2024
65d596d
java client support for column as list feature
nbauernfeind Dec 2, 2024
efa5dee
finish getSqlInfo up to new schema
nbauernfeind Dec 2, 2024
00b66a7
was able to acquire all union child writers
nbauernfeind Dec 3, 2024
a219b84
impl most of union writer; need non-nullable support
nbauernfeind Dec 3, 2024
675e745
Can officially write sql info
nbauernfeind Dec 4, 2024
4eadb96
Dense Union Reader + fixedChunkLength fixes
nbauernfeind Dec 4, 2024
4ae05b8
remove stubs for type mapping
nbauernfeind Dec 4, 2024
be9f2df
Bug fixes
nbauernfeind Dec 4, 2024
c5000e7
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Dec 4, 2024
2dba609
More BugFixes
nbauernfeind Dec 5, 2024
9c3db73
fix go tests
nbauernfeind Dec 5, 2024
2ab1088
revert: RST_STREAM(cancel) fix for gRPC, this seems to be breaking JS…
niloc132 Nov 22, 2024
ae71435
Charles' Feedback
nbauernfeind Dec 16, 2024
0400086
More Feedback + Tests + BugFixes
nbauernfeind Dec 16, 2024
1e68d61
spotless + pull casting out of loops
nbauernfeind Dec 16, 2024
fe1eb5c
WebReaderFactory support for ListView and FixedSizeList
nbauernfeind Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ test_that("is_static returns the correct value", {
})

test_that("nrow returns the correct number of rows", {
skip()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just temporary until you work with Corey to fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These few test skip()s are going to be committed with this PR to enable independent PRs between Corey and I.

The issue is that the cpp client cannot handle the data type explicitly uploaded by the r client in this test (as now DH assume you want to round-trip schema).


data <- setup()

expect_equal(nrow(data$th1), nrow(data$df1))
Expand All @@ -67,6 +69,8 @@ test_that("nrow returns the correct number of rows", {
})

test_that("ncol returns the correct number of columns", {
skip()

data <- setup()

expect_equal(ncol(data$th1), ncol(data$df1))
Expand All @@ -78,6 +82,8 @@ test_that("ncol returns the correct number of columns", {
})

test_that("dim returns the correct dimension", {
skip()

data <- setup()

expect_equal(dim(data$th1), dim(data$df1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public interface Table extends
*/
String BARRAGE_PERFORMANCE_KEY_ATTRIBUTE = "BarragePerformanceTableKey";
/**
* Set this to control the schema used for barrage serialization.
* Set an Apache Arrow POJO Schema to this attribute to control the column encoding used for barrage serialization.
*/
String BARRAGE_SCHEMA_ATTRIBUTE = "BarrageSchema";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public WritableChunk<T> get() {

/**
* Ensure the underlying chunk has a capacity of at least {@code capacity}.
*
* <p>
* The data and size of the returned chunk are undefined.
*
* @param capacity the minimum capacity for the chunk.
Expand All @@ -56,9 +56,9 @@ public WritableChunk<T> ensureCapacity(int capacity) {

/**
* Ensure the underlying chunk has a capacity of at least {@code capacity}.
*
* <p>
* If the chunk has existing data, then it is copied to the new chunk.
*
* <p>
* If the underlying chunk already exists, then the size of the chunk is the original size. If the chunk did not
* exist, then the size of the returned chunk is zero.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,13 @@ public enum CopyAttributeOperation {
CopyAttributeOperation.Preview));

tempMap.put(BARRAGE_SCHEMA_ATTRIBUTE, EnumSet.of(
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
CopyAttributeOperation.Filter));
CopyAttributeOperation.Filter,
CopyAttributeOperation.FirstBy,
CopyAttributeOperation.Flatten,
CopyAttributeOperation.LastBy,
CopyAttributeOperation.PartitionBy,
CopyAttributeOperation.Reverse,
CopyAttributeOperation.Sort));

attributeToCopySet = Collections.unmodifiableMap(tempMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1411,12 +1411,7 @@ private static boolean snapshotColumnsParallel(
final ExecutionContext executionContext,
@NotNull final BarrageMessage snapshot) {
final JobScheduler jobScheduler = new OperationInitializerJobScheduler();
final CompletableFuture<Void> waitForParallelSnapshot = new CompletableFuture<>() {
@Override
public boolean completeExceptionally(Throwable ex) {
return super.completeExceptionally(ex);
}
};
final CompletableFuture<Void> waitForParallelSnapshot = new CompletableFuture<>();
jobScheduler.iterateParallel(
executionContext,
logOutput -> logOutput.append("snapshotColumnsParallel"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ public static Builder newBuilder(final String name) {

public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP =
"PeriodicUpdateGraph.targetCycleDurationMillis";
public static final int DEFAULT_TARGET_CYCLE_DURATION_MILLIS =
Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);

public static int getDefaultTargetCycleDurationMillis() {
return Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);
}

private final long defaultTargetCycleDurationMillis;
private volatile long targetCycleDurationMillis;
Expand Down Expand Up @@ -255,7 +257,7 @@ public boolean isCycleOnBudget(long cycleTimeNanos) {
* Resets the run cycle time to the default target configured via the {@link Builder} setting.
*
* @implNote If the {@link Builder#targetCycleDurationMillis(long)} property is not set, this value defaults to
* {@link Builder#DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP} which defaults to 1000ms.
* {@link #DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP} which defaults to 1000ms.
*/
@SuppressWarnings("unused")
public void resetTargetCycleDuration() {
Expand Down Expand Up @@ -1169,7 +1171,7 @@ public static PeriodicUpdateGraph getInstance(final String name) {
public static final class Builder {
private final boolean allowUnitTestMode =
Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false);
private long targetCycleDurationMillis = DEFAULT_TARGET_CYCLE_DURATION_MILLIS;
private long targetCycleDurationMillis = getDefaultTargetCycleDurationMillis();
private long minimumCycleDurationToLogNanos = DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS;

private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ private void processBatches(Consumer<DefensiveDrainable> visitor, final RecordBa
}
}

private static int findWriterForOffset(final ChunkWriter.Context<?>[] chunks, final long offset) {
private static int findWriterForOffset(final ChunkWriter.Context[] chunks, final long offset) {
// fast path for smaller updates
if (chunks.length <= 1) {
return 0;
Expand Down Expand Up @@ -949,7 +949,7 @@ private int appendAddColumns(final RecordBatchMessageView view, final long start
endPos = Long.MAX_VALUE;
}
if (addColumnData[0].chunks().length != 0) {
final ChunkWriter.Context<?> writer = addColumnData[0].chunks()[chunkIdx];
final ChunkWriter.Context writer = addColumnData[0].chunks()[chunkIdx];
endPos = Math.min(endPos, writer.getLastRowOffset());
shift = -writer.getRowOffset();
}
Expand Down Expand Up @@ -981,9 +981,9 @@ private int appendAddColumns(final RecordBatchMessageView view, final long start
// Add the drainable last as it is allowed to immediately close a row set the visitors need
addStream.accept(drainableColumn);
} else {
final ChunkWriter.Context<Chunk<Values>> chunk = chunkListWriter.chunks()[chunkIdx];
final ChunkWriter.Context context = chunkListWriter.chunks()[chunkIdx];
final ChunkWriter.DrainableColumn drainableColumn = chunkListWriter.writer().getInputStream(
chunk,
context,
shift == 0 ? myAddedOffsets : adjustedOffsets,
view.options());
drainableColumn.visitFieldNodes(fieldNodeListener);
Expand All @@ -1008,18 +1008,18 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
// adjust the batch size if we would cross a chunk boundary
for (int ii = 0; ii < modColumnData.length; ++ii) {
final ModColumnWriter mcd = modColumnData[ii];
final ChunkWriter.Context<?>[] chunks = mcd.chunkListWriter.chunks();
if (chunks.length == 0) {
final ChunkWriter.Context[] contexts = mcd.chunkListWriter.chunks();
if (contexts.length == 0) {
continue;
}

final RowSet modOffsets = view.modRowOffsets(ii);
// if all mods are being sent, then offsets yield an identity mapping
final long startPos = modOffsets != null ? modOffsets.get(startRange) : startRange;
if (startPos != RowSet.NULL_ROW_KEY) {
final int chunkIdx = findWriterForOffset(chunks, startPos);
if (chunkIdx < chunks.length - 1) {
maxLength = Math.min(maxLength, chunks[chunkIdx].getLastRowOffset() + 1 - startPos);
final int chunkIdx = findWriterForOffset(contexts, startPos);
if (chunkIdx < contexts.length - 1) {
maxLength = Math.min(maxLength, contexts[chunkIdx].getLastRowOffset() + 1 - startPos);
}
columnChunkIdx[ii] = chunkIdx;
}
Expand All @@ -1029,7 +1029,7 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
long numRows = 0;
for (int ii = 0; ii < modColumnData.length; ++ii) {
final ModColumnWriter mcd = modColumnData[ii];
final ChunkWriter.Context<Chunk<Values>> chunk = mcd.chunkListWriter.chunks().length == 0
final ChunkWriter.Context context = mcd.chunkListWriter.chunks().length == 0
? null
: mcd.chunkListWriter.chunks()[columnChunkIdx[ii]];

Expand All @@ -1046,8 +1046,8 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
// if all mods are being sent, then offsets yield an identity mapping
startPos = startRange;
endPos = startRange + maxLength - 1;
if (chunk != null) {
endPos = Math.min(endPos, chunk.getLastRowOffset());
if (context != null) {
endPos = Math.min(endPos, context.getLastRowOffset());
}
}

Expand All @@ -1065,7 +1065,7 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
numRows = Math.max(numRows, myModOffsets.size());

try {
final int numElements = chunk == null ? 0 : myModOffsets.intSize("BarrageStreamWriterImpl");
final int numElements = context == null ? 0 : myModOffsets.intSize("BarrageStreamWriterImpl");
if (view.options().columnsAsList()) {
// if we are sending columns as a list, we need to add the list buffers before each column
final SingleElementListHeaderWriter listHeader =
Expand All @@ -1084,11 +1084,11 @@ private int appendModColumns(final RecordBatchMessageView view, final long start
// Add the drainable last as it is allowed to immediately close a row set the visitors need
addStream.accept(drainableColumn);
} else {
final long shift = -chunk.getRowOffset();
final long shift = -context.getRowOffset();
// normalize to the chunk offsets
try (final WritableRowSet adjustedOffsets = shift == 0 ? null : myModOffsets.shift(shift)) {
final ChunkWriter.DrainableColumn drainableColumn = mcd.chunkListWriter.writer().getInputStream(
chunk, shift == 0 ? myModOffsets : adjustedOffsets, view.options());
context, shift == 0 ? myModOffsets : adjustedOffsets, view.options());
drainableColumn.visitFieldNodes(fieldNodeListener);
drainableColumn.visitBuffers(bufferListener);
// Add the drainable last as it is allowed to immediately close a row set the visitors need
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,30 @@
import java.io.IOException;
import java.util.List;

public class ChunkListWriter<SourceChunkType extends Chunk<Values>> implements SafeCloseable {
private final ChunkWriter<SourceChunkType> writer;
private final ChunkWriter.Context<SourceChunkType>[] contexts;
public class ChunkListWriter<SOURCE_CHUNK_TYPE extends Chunk<Values>> implements SafeCloseable {
private final ChunkWriter<SOURCE_CHUNK_TYPE> writer;
private final ChunkWriter.Context[] contexts;

public ChunkListWriter(
final ChunkWriter<SourceChunkType> writer,
final List<SourceChunkType> chunks) {
final ChunkWriter<SOURCE_CHUNK_TYPE> writer,
final List<SOURCE_CHUNK_TYPE> chunks) {
this.writer = writer;

// noinspection unchecked
this.contexts = (ChunkWriter.Context<SourceChunkType>[]) new ChunkWriter.Context[chunks.size()];
this.contexts = new ChunkWriter.Context[chunks.size()];

long rowOffset = 0;
for (int i = 0; i < chunks.size(); ++i) {
final SourceChunkType valuesChunk = chunks.get(i);
final SOURCE_CHUNK_TYPE valuesChunk = chunks.get(i);
this.contexts[i] = writer.makeContext(valuesChunk, rowOffset);
rowOffset += valuesChunk.size();
}
}

public ChunkWriter<SourceChunkType> writer() {
public ChunkWriter<SOURCE_CHUNK_TYPE> writer() {
return writer;
}

public ChunkWriter.Context<SourceChunkType>[] chunks() {
public ChunkWriter.Context[] chunks() {
return contexts;
}

Expand All @@ -46,8 +45,8 @@ public ChunkWriter.DrainableColumn empty(@NotNull final BarrageOptions options)

@Override
public void close() {
for (final ChunkWriter.Context<SourceChunkType> context : contexts) {
context.decrementReferenceCount();
for (final ChunkWriter.Context context : contexts) {
context.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@

import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import org.jetbrains.annotations.NotNull;

import java.io.DataInput;
import java.io.IOException;
import java.util.function.Function;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -35,4 +40,25 @@ public static ChunkType getChunkTypeFor(final Class<?> dest) {
}
return ChunkType.fromElementType(dest);
}

protected static void readValidityBuffer(
@NotNull final DataInput is,
final int numValidityLongs,
final long validityBufferLength,
@NotNull final WritableLongChunk<Values> isValid,
@NotNull final String DEBUG_NAME) throws IOException {
// Read validity buffer:
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBufferLength / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBufferLength) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBufferLength - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
}
}
Loading
Loading