Skip to content

Commit

Permalink
Barrage: Refactor Read/Write Chunk Factories
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Sep 16, 2024
1 parent 564b146 commit c4e970d
Show file tree
Hide file tree
Showing 116 changed files with 6,757 additions and 4,334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -74,6 +75,10 @@ public final boolean get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_BOOLEAN;
}

@Override
public BooleanChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/ByteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -78,6 +79,10 @@ public final byte get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_BYTE;
}

@Override
public ByteChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/CharChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -73,6 +74,10 @@ public final char get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_CHAR;
}

@Override
public CharChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
6 changes: 6 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/Chunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ default void copyToBuffer(int srcOffset, @NotNull Buffer destBuffer, int destOff
*/
int size();

/**
* @return whether The value offset is null
* @param index The index to check
*/
boolean isNullAt(int index);

/**
* @return The underlying chunk type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final double get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_DOUBLE;
}

@Override
public DoubleChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/FloatChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final float get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_FLOAT;
}

@Override
public FloatChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/IntChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final int get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_INT;
}

@Override
public IntChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/LongChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final long get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_LONG;
}

@Override
public LongChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final T get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == null;
}

@Override
public ObjectChunk<T, ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/ShortChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final short get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_SHORT;
}

@Override
public ShortChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,7 @@ private Table snapshotIncrementalInternal(final Table base, final boolean doInit
new ListenerRecorder("snapshotIncremental (triggerTable)", this, resultTable);
addUpdateListener(triggerListenerRecorder);

dropColumns(getColumnSourceMap().keySet());
final SnapshotIncrementalListener listener =
new SnapshotIncrementalListener(this, resultTable, resultColumns,
baseListenerRecorder, triggerListenerRecorder, baseTable, triggerColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.engine.table.impl.preview;

import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
import io.deephaven.vector.VectorFactory;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -34,7 +35,9 @@ public static ArrayPreview fromArray(final Object array) {
if (componentType == boolean.class) {
return new ArrayPreview(convertToString((boolean[]) array));
}
return new ArrayPreview(VectorFactory.forElementType(componentType)
// Boxed primitives need the Object wrapper.
final Class<?> elementType = TypeUtils.isBoxedType(componentType) ? Object.class : componentType;
return new ArrayPreview(VectorFactory.forElementType(elementType)
.vectorWrap(array)
.toString(ARRAY_SIZE_CUTOFF));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public static Class<?> maybeConvertToPrimitiveDataType(@NotNull final Class<?> d
return byte.class;
}
if (dataType == Instant.class || dataType == ZonedDateTime.class) {
// Note: not all ZonedDateTime sources are convertible to long, so this doesn't match column source behavior
return long.class;
}
return dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package io.deephaven.extensions.barrage;

import com.google.flatbuffers.FlatBufferBuilder;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.chunk.ChunkWriter;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
Expand All @@ -17,10 +20,10 @@
import java.util.function.ToIntFunction;

/**
* A StreamGenerator takes a BarrageMessage and re-uses portions of the serialized payload across different subscribers
* that may subscribe to different viewports and columns.
* A {@code BarrageMessageWriter} takes a {@link BarrageMessage} and re-uses portions of the serialized payload across
* different subscribers that may subscribe to different viewports and columns.
*/
public interface BarrageStreamGenerator extends SafeCloseable {
public interface BarrageMessageWriter extends SafeCloseable {

/**
* Represents a single update, which might be sent as multiple distinct payloads as necessary based in the
Expand All @@ -32,16 +35,18 @@ interface MessageView {

interface Factory {
/**
* Create a StreamGenerator that now owns the BarrageMessage.
* Create a {@code BarrageMessageWriter} that now owns the {@link BarrageMessage}.
*
* @param message the message that contains the update that we would like to propagate
* @param metricsConsumer a method that can be used to record write metrics
*/
BarrageStreamGenerator newGenerator(
BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer);
BarrageMessageWriter newMessageWriter(
@NotNull BarrageMessage message,
@NotNull ChunkWriter<Chunk<Values>>[] chunkWriters,
@NotNull BarragePerformanceLog.WriteMetricsConsumer metricsConsumer);

/**
* Create a MessageView of the Schema to send as the initial message to a new subscriber.
* Create a {@link MessageView} of the Schema to send as the initial message to a new subscriber.
*
* @param schemaPayloadWriter a function that writes schema data to a {@link FlatBufferBuilder} and returns the
* schema offset
Expand All @@ -51,21 +56,22 @@ BarrageStreamGenerator newGenerator(
}

/**
* @return the BarrageMessage that this generator is operating on
* @return the {@link BarrageMessage} that this writer is operating on
*/
BarrageMessage getMessage();

/**
* Obtain a Full-Subscription View of this StreamGenerator that can be sent to a single subscriber.
* Obtain a Full-Subscription {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single
* subscriber.
*
* @param options serialization options for this specific view
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
* @param isInitialSnapshot indicates whether this is the first snapshot for the listener
* @return a MessageView filtered by the subscription properties that can be sent to that subscriber
*/
MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot);

/**
* Obtain a View of this StreamGenerator that can be sent to a single subscriber.
* Obtain a {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single subscriber.
*
* @param options serialization options for this specific view
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
Expand All @@ -79,15 +85,16 @@ MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnap
boolean reverseViewport, @Nullable RowSet keyspaceViewport, BitSet subscribedColumns);

/**
* Obtain a Full-Snapshot View of this StreamGenerator that can be sent to a single requestor.
* Obtain a Full-Snapshot {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single
* requestor.
*
* @param options serialization options for this specific view
* @return a MessageView filtered by the snapshot properties that can be sent to that requestor
*/
MessageView getSnapshotView(BarrageSnapshotOptions options);

/**
* Obtain a View of this StreamGenerator that can be sent to a single requestor.
* Obtain a {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single requestor.
*
* @param options serialization options for this specific view
* @param viewport is the position-space viewport
Expand Down
Loading

0 comments on commit c4e970d

Please sign in to comment.