Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Barrage Refactor Read/Write Chunk Factories #6065

Open
wants to merge 75 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
28b25d1
Barrage: Refactor Read/Write Chunk Factories
nbauernfeind Sep 16, 2024
8876208
Ryan's feedback and bug fixes
nbauernfeind Sep 16, 2024
27df3d3
dunno .. stuf
nbauernfeind Oct 16, 2024
1852b0a
Restore initial commit from grpc-java, plus a few local changes
niloc132 Oct 21, 2024
68b925a
Guard writing payload as hex if FINEST is enabled
niloc132 Oct 21, 2024
e88c47e
Apply upstream "Fix AsyncServletOutputStreamWriterConcurrencyTest
niloc132 Oct 21, 2024
f9a19fc
Apply upstream "Avoid flushing headers when the server returns a single
niloc132 Oct 21, 2024
4733524
Apply upstream "servlet: introduce ServletServerBuilder.buildServlet()"
niloc132 Oct 21, 2024
06e63ec
Bump grpc vers, add inprocess dep for tests
niloc132 Oct 21, 2024
09ade64
Merge branch 'main' into grpc-history-replay
niloc132 Oct 28, 2024
c8af47c
Apply https://github.com/deephaven/deephaven-core/pull/6301
niloc132 Oct 28, 2024
57c8008
Bump to 1.65.1 to better match arrow 18
niloc132 Nov 1, 2024
cbf8ab2
Merge remote-tracking branch 'colin/grpc-history-replay' into vp_simp…
nbauernfeind Nov 6, 2024
85f604f
Version Upgrades; MavenLocal
nbauernfeind Nov 6, 2024
70a0207
Implement Simplified Viewport Table Updates in BMP/BT
nbauernfeind Nov 8, 2024
0089d62
Ryan's Synchronous Review
nbauernfeind Nov 9, 2024
485746d
Merge remote-tracking branch 'upstream/main' into vp_simplification
nbauernfeind Nov 9, 2024
ad8de73
Remove SNAPSHOT version and mavenLocal references
nbauernfeind Nov 11, 2024
02ce2ad
Fixes removed/added rows in most VP cases
nbauernfeind Nov 12, 2024
da23e2b
Bug fixes around viewport snapshot rowsRemoved and rowsAdded
nbauernfeind Nov 12, 2024
299f56e
Bugfix for correct growing VP logic
nbauernfeind Nov 12, 2024
9d6f389
remaining java side fixes
nbauernfeind Nov 13, 2024
fd5aced
Ryan's feedback on javaserver/client impls
nbauernfeind Nov 14, 2024
53b1eed
Inline Feedback from VC w/Ryan
nbauernfeind Nov 14, 2024
6e7fe94
Do not propagate modifies for any repainted rows
nbauernfeind Nov 14, 2024
d568eb7
Minor cleanup from personal review
nbauernfeind Nov 14, 2024
6653ca6
Ryan's feedback latest round.
nbauernfeind Nov 14, 2024
44cdf93
jsAPI mostly complete; looking for tree table issue
nbauernfeind Nov 15, 2024
d549d79
Fixes for jsapi and HierarchicalTable
nbauernfeind Nov 15, 2024
b4d5b69
Lazily compute rowset encoding
nbauernfeind Nov 15, 2024
6c12314
Fixup jsapi tests
nbauernfeind Nov 15, 2024
f9be6e5
Quick round feedback
nbauernfeind Nov 15, 2024
4252622
spotless
nbauernfeind Nov 15, 2024
2767def
Double checked locking fixes
nbauernfeind Nov 15, 2024
78c4cb7
Ryan's final review
nbauernfeind Nov 15, 2024
ea6f898
Clarify strategy on who owns RowSets passed into getSubView
nbauernfeind Nov 15, 2024
3eeb628
npe fix
nbauernfeind Nov 15, 2024
84a6100
Bugfix if HT is empty or viewport past end of table
nbauernfeind Nov 16, 2024
476ae65
Colin's feedback
nbauernfeind Nov 16, 2024
738cb11
Limit jsapi data change event to prev and curr table sizes
nbauernfeind Nov 16, 2024
38f320c
Merge branch 'vp_simplification' into barrage_types
nbauernfeind Nov 16, 2024
7a351a2
Merge compilation fixes
nbauernfeind Nov 16, 2024
58d34e2
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Nov 16, 2024
68f080b
Fix for #5258
nbauernfeind Nov 16, 2024
0f088c8
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Nov 18, 2024
ea513bb
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Nov 18, 2024
6526a9f
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Nov 21, 2024
66261b0
tmp some fixes
nbauernfeind Nov 21, 2024
bf9495c
tmp disable cpp test
nbauernfeind Nov 21, 2024
fe02980
broken map support
nbauernfeind Nov 25, 2024
c9e8fc6
fix: Ensure that rollup and tree snapshot tests succeed reliably (#6407)
rcaudy Nov 21, 2024
e26d6f1
fix: return type for IcebergCatalogAdapter::load_table in python (#6408)
malhotrashivam Nov 21, 2024
2ee3dca
fix: Close Jetty h2 streams with RST_STREAM and no error code (#6401)
niloc132 Nov 21, 2024
1deee97
fix: Apply auto-locking in time_window() (#6411)
jmao-denver Nov 21, 2024
c31c48a
ci: Add cpwright to CODEOWNERS for /py, protos, gwt, and function lib…
cpwright Nov 21, 2024
3b13fc5
chore: Update projects that use Arrow 18 to require Java 11 (#6417)
niloc132 Nov 22, 2024
2222fd9
Add sketch of CommandGetSqlInfo
devinrsmith Nov 22, 2024
6a9be96
checkpoint
nbauernfeind Nov 29, 2024
4ba36f0
Fix #6216
nbauernfeind Nov 29, 2024
65d596d
java client support for column as list feature
nbauernfeind Dec 2, 2024
efa5dee
finish getSqlInfo up to new schema
nbauernfeind Dec 2, 2024
00b66a7
was able to acquire all union child writers
nbauernfeind Dec 3, 2024
a219b84
impl most of union writer; need non-nullable support
nbauernfeind Dec 3, 2024
675e745
Can officially write sql info
nbauernfeind Dec 4, 2024
4eadb96
Dense Union Reader + fixedChunkLength fixes
nbauernfeind Dec 4, 2024
4ae05b8
remove stubs for type mapping
nbauernfeind Dec 4, 2024
be9f2df
Bug fixes
nbauernfeind Dec 4, 2024
c5000e7
Merge remote-tracking branch 'upstream/main' into barrage_types
nbauernfeind Dec 4, 2024
2dba609
More BugFixes
nbauernfeind Dec 5, 2024
9c3db73
fix go tests
nbauernfeind Dec 5, 2024
2ab1088
revert: RST_STREAM(cancel) fix for gRPC, this seems to be breaking JS…
niloc132 Nov 22, 2024
ae71435
Charles' Feedback
nbauernfeind Dec 16, 2024
0400086
More Feedback + Tests + BugFixes
nbauernfeind Dec 16, 2024
1e68d61
spotless + pull casting out of loops
nbauernfeind Dec 16, 2024
fe1eb5c
WebReaderFactory support for ListView and FixedSizeList
nbauernfeind Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def testCppClient = Docker.registerDockerTask(project, 'testCppClient') {
environmentVariable 'DH_HOST', deephavenDocker.containerName.get()
environmentVariable 'DH_PORT', '10000'
}
waitTimeMinutes = 1
containerDependencies.dependsOn = [deephavenDocker.healthyTask]
containerDependencies.finalizedBy = deephavenDocker.endTask
network = deephavenDocker.networkName.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ public <Other> ColumnDefinition<Other> withDataType(@NotNull final Class<Other>
: fromGenericType(name, newDataType, componentType, columnType);
}

public <Other> ColumnDefinition<Other> withDataType(
@NotNull final Class<Other> newDataType,
@Nullable final Class<?> newComponentType) {
// noinspection unchecked
return dataType == newDataType && componentType == newComponentType
? (ColumnDefinition<Other>) this
: fromGenericType(name, newDataType, newComponentType, columnType);
}

public ColumnDefinition<?> withName(@NotNull final String newName) {
return newName.equals(name) ? this : new ColumnDefinition<>(newName, dataType, componentType, columnType);
}
Expand Down
4 changes: 4 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ public interface Table extends
* Set this attribute to enable collection of barrage performance stats.
*/
String BARRAGE_PERFORMANCE_KEY_ATTRIBUTE = "BarragePerformanceTableKey";
/**
* Set this to control the schema used for barrage serialization.
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
*/
String BARRAGE_SCHEMA_ATTRIBUTE = "BarrageSchema";

// -----------------------------------------------------------------------------------------------------------------
// ColumnSources for fetching data by row key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -74,6 +75,12 @@ public final boolean get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return false;
}
// endregion isNull

@Override
public BooleanChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/ByteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -78,6 +79,12 @@ public final byte get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_BYTE;
}
// endregion isNull

@Override
public ByteChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/CharChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -73,6 +74,12 @@ public final char get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_CHAR;
}
// endregion isNull

@Override
public CharChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final double get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_DOUBLE;
}
// endregion isNull

@Override
public DoubleChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/FloatChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final float get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_FLOAT;
}
// endregion isNull

@Override
public FloatChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/IntChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final int get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_INT;
}
// endregion isNull

@Override
public IntChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/LongChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final long get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_LONG;
}
// endregion isNull

@Override
public LongChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final T get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == null;
}
// endregion isNull

@Override
public ObjectChunk<T, ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
* {@link Chunk} that may have its backing storage reset to a slice of that belonging to another {@link Chunk} or a
* native array.
*/
public interface ResettableReadOnlyChunk<ATTR_BASE extends Any> extends ResettableChunk<ATTR_BASE>, PoolableChunk {
public interface ResettableReadOnlyChunk<ATTR_BASE extends Any>
extends ResettableChunk<ATTR_BASE>, PoolableChunk<ATTR_BASE> {

/**
* Reset the data and bounds of this chunk to a range or sub-range of the specified {@link Chunk}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* {@link WritableChunk} or a native array.
*/
public interface ResettableWritableChunk<ATTR_BASE extends Any>
extends ResettableChunk<ATTR_BASE>, WritableChunk<ATTR_BASE>, PoolableChunk {
extends ResettableChunk<ATTR_BASE>, WritableChunk<ATTR_BASE>, PoolableChunk<ATTR_BASE> {

@Override
<ATTR extends ATTR_BASE> WritableChunk<ATTR> resetFromChunk(WritableChunk<ATTR> other, int offset, int capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/ShortChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final short get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_SHORT;
}
// endregion isNull

@Override
public ShortChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*
* @param <ATTR> Descriptive attribute that applies to the elements stored within this WritableChunk
*/
public interface WritableChunk<ATTR extends Any> extends Chunk<ATTR>, PoolableChunk {
public interface WritableChunk<ATTR extends Any> extends Chunk<ATTR>, PoolableChunk<ATTR> {
@Override
WritableChunk<ATTR> slice(int offset, int capacity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
package io.deephaven.chunk.util.pools;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;

/**
* Marker interface for {@link Chunk} subclasses that can be kept with in a {@link ChunkPool}, and whose
* {@link #close()} method will return them to the appropriate pool.
*/
public interface PoolableChunk extends SafeCloseable {
public interface PoolableChunk<ATTR extends Any> extends Chunk<ATTR>, SafeCloseable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ public enum CopyAttributeOperation {
CopyAttributeOperation.Flatten, // add flatten for now because web flattens all views
CopyAttributeOperation.Preview));

tempMap.put(BARRAGE_SCHEMA_ATTRIBUTE, EnumSet.of(
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
CopyAttributeOperation.Filter));

attributeToCopySet = Collections.unmodifiableMap(tempMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,12 @@ private static boolean snapshotColumnsParallel(
final ExecutionContext executionContext,
@NotNull final BarrageMessage snapshot) {
final JobScheduler jobScheduler = new OperationInitializerJobScheduler();
final CompletableFuture<Void> waitForParallelSnapshot = new CompletableFuture<>();
final CompletableFuture<Void> waitForParallelSnapshot = new CompletableFuture<>() {
@Override
public boolean completeExceptionally(Throwable ex) {
return super.completeExceptionally(ex);
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}
};
jobScheduler.iterateParallel(
executionContext,
logOutput -> logOutput.append("snapshotColumnsParallel"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl.sources;

import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.WritableColumnSource;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -212,6 +213,27 @@ public static ColumnSource<?>[] maybeConvertToPrimitive(@NotNull final ColumnSou
return result;
}

/**
* If {@code columnDefinition.getDataType()} or {@code columnDefinition.getComponentType} are something that we
* prefer to handle as a primitive, do the appropriate conversion.
*
* @param columnDefinition The column definition to convert
* @return if possible, {@code columnDefinition} converted to a primitive, otherewise {@code columnDefinition}
*/
@NotNull
public static ColumnDefinition<?> maybeConvertToPrimitive(@NotNull final ColumnDefinition<?> columnDefinition) {
final Class<?> dataType = ReinterpretUtils.maybeConvertToPrimitiveDataType(columnDefinition.getDataType());
Class<?> componentType = columnDefinition.getComponentType();
if (componentType != null) {
componentType = ReinterpretUtils.maybeConvertToPrimitiveDataType(componentType);
}
if (columnDefinition.getDataType() == dataType
&& columnDefinition.getComponentType() == componentType) {
return columnDefinition;
}
return columnDefinition.withDataType(dataType, componentType);
}

/**
* If {@code source} is something that we prefer to handle as a primitive, do the appropriate conversion.
*
Expand Down Expand Up @@ -265,6 +287,7 @@ public static ChunkType maybeConvertToWritablePrimitiveChunkType(@NotNull final
}
if (dataType == Instant.class) {
// Note that storing ZonedDateTime as a primitive is lossy on the time zone.
// TODO (https://github.com/deephaven/deephaven-core/issues/5241): Inconsistent handling of ZonedDateTime
return ChunkType.Long;
}
return ChunkType.fromElementType(dataType);
Expand All @@ -283,6 +306,8 @@ public static Class<?> maybeConvertToPrimitiveDataType(@NotNull final Class<?> d
return byte.class;
}
if (dataType == Instant.class || dataType == ZonedDateTime.class) {
// Note: not all ZonedDateTime sources are convertible to long, so this doesn't match column source behavior
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
// TODO (https://github.com/deephaven/deephaven-core/issues/5241): Inconsistent handling of ZonedDateTime
return long.class;
}
return dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public static Builder newBuilder(final String name) {

public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP =
"PeriodicUpdateGraph.targetCycleDurationMillis";
public static final int DEFAULT_TARGET_CYCLE_DURATION_MILLIS =
Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);

private final long defaultTargetCycleDurationMillis;
private volatile long targetCycleDurationMillis;
private final ThreadInitializationFactory threadInitializationFactory;
Expand Down Expand Up @@ -1166,8 +1169,7 @@ public static PeriodicUpdateGraph getInstance(final String name) {
public static final class Builder {
private final boolean allowUnitTestMode =
Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false);
private long targetCycleDurationMillis =
Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);
private long targetCycleDurationMillis = DEFAULT_TARGET_CYCLE_DURATION_MILLIS;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
private long minimumCycleDurationToLogNanos = DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS;

private String name;
Expand Down
Loading
Loading