Skip to content

Commit

Permalink
Cleanup to BarrageStreamGenerator and its related classes in antipati…
Browse files Browse the repository at this point in the history
…on of porting to JS (#5552)

Fixes #5411
  • Loading branch information
niloc132 authored Jun 6, 2024
1 parent 4d58cc6 commit 6711cd1
Show file tree
Hide file tree
Showing 15 changed files with 547 additions and 613 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,38 @@
import com.google.flatbuffers.FlatBufferBuilder;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.util.BitSet;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;

/**
* A StreamGenerator takes a BarrageMessage and re-uses portions of the serialized payload across different subscribers
* that may subscribe to different viewports and columns.
*
* @param <MessageView> The sub-view type that the listener expects to receive.
*/
public interface BarrageStreamGenerator<MessageView> extends SafeCloseable {
public interface BarrageStreamGenerator extends SafeCloseable {

interface Factory<MessageView> {
/**
* Represents a single update, which might be sent as multiple distinct payloads as necessary based in the
* implementation.
*/
interface MessageView {
void forEachStream(Consumer<DefensiveDrainable> visitor) throws IOException;
}

interface Factory {
/**
* Create a StreamGenerator that now owns the BarrageMessage.
*
* @param message the message that contains the update that we would like to propagate
* @param metricsConsumer a method that can be used to record write metrics
*/
BarrageStreamGenerator<MessageView> newGenerator(
BarrageStreamGenerator newGenerator(
BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer);

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.SafeCloseable;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class ChunkListInputStreamGenerator implements SafeCloseable {
private final List<ChunkInputStreamGenerator> generators;
private final ChunkInputStreamGenerator emptyGenerator;

public ChunkListInputStreamGenerator(Class<?> type, Class<?> componentType, List<Chunk<Values>> data,
ChunkType chunkType) {
// create an input stream generator for each chunk
ChunkInputStreamGenerator[] generators = new ChunkInputStreamGenerator[data.size()];

long rowOffset = 0;
for (int i = 0; i < data.size(); ++i) {
final Chunk<Values> valuesChunk = data.get(i);
generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator(chunkType, type, componentType,
valuesChunk, rowOffset);
rowOffset += valuesChunk.size();
}
this.generators = Arrays.asList(generators);
emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator(
chunkType, type, componentType, chunkType.getEmptyChunk(), 0);
}

public List<ChunkInputStreamGenerator> generators() {
return generators;
}

public ChunkInputStreamGenerator.DrainableColumn empty(StreamReaderOptions options, RowSet rowSet)
throws IOException {
return emptyGenerator.getInputStream(options, rowSet);
}

@Override
public void close() {
for (ChunkInputStreamGenerator generator : generators) {
generator.close();
}
emptyGenerator.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.datastructures.SizeException;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.OutputStream;

public class ConsecutiveDrainableStreams extends DefensiveDrainable {
final DefensiveDrainable[] streams;

public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) {
this.streams = streams;
}

@Override
public int drainTo(final OutputStream outputStream) throws IOException {
int total = 0;
for (final DefensiveDrainable stream : streams) {
final int expected = total + stream.available();
total += stream.drainTo(outputStream);
if (expected != total) {
throw new IllegalStateException("drained message drained wrong number of bytes");
}
if (total < 0) {
throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE");
}
}
return total;
}

@Override
public int available() throws SizeException, IOException {
int total = 0;
for (final DefensiveDrainable stream : streams) {
total += stream.available();
if (total < 0) {
throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total);
}
}
return total;
}

@Override
public void close() throws IOException {
SafeCloseable.closeAll(streams);
super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import io.deephaven.extensions.barrage.util.DefensiveDrainable;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;

public class DrainableByteArrayInputStream extends DefensiveDrainable {

private byte[] buf;
private final int offset;
private final int length;

public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) {
this.buf = Objects.requireNonNull(buf);
this.offset = offset;
this.length = length;
}

@Override
public int available() {
if (buf == null) {
return 0;
}
return length;
}

@Override
public int drainTo(final OutputStream outputStream) throws IOException {
if (buf != null) {
try {
outputStream.write(buf, offset, length);
} finally {
buf = null;
}
return length;
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl;
import io.deephaven.extensions.barrage.chunk.vector.VectorExpansionKernel;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
Expand Down Expand Up @@ -702,13 +701,13 @@ private static Field arrowFieldForVectorType(
}

public static void createAndSendStaticSnapshot(
BarrageStreamGenerator.Factory<BarrageStreamGeneratorImpl.View> streamGeneratorFactory,
BarrageStreamGenerator.Factory streamGeneratorFactory,
BaseTable<?> table,
BitSet columns,
RowSet viewport,
boolean reverseViewport,
BarrageSnapshotOptions snapshotRequestOptions,
StreamObserver<BarrageStreamGeneratorImpl.View> listener,
StreamObserver<BarrageStreamGenerator.MessageView> listener,
BarragePerformanceLog.SnapshotMetricsHelper metrics) {
// start with small value and grow
long snapshotTargetCellCount = MIN_SNAPSHOT_CELL_COUNT;
Expand Down Expand Up @@ -755,8 +754,7 @@ public static void createAndSendStaticSnapshot(
// send out the data. Note that although a `BarrageUpdateMetaData` object will
// be provided with each unique snapshot, vanilla Flight clients will ignore
// these and see only an incoming stream of batches
try (final BarrageStreamGenerator<BarrageStreamGeneratorImpl.View> bsg =
streamGeneratorFactory.newGenerator(msg, metrics)) {
try (final BarrageStreamGenerator bsg = streamGeneratorFactory.newGenerator(msg, metrics)) {
if (rsIt.hasMore()) {
listener.onNext(bsg.getSnapshotView(snapshotRequestOptions,
snapshotViewport, false,
Expand Down Expand Up @@ -797,11 +795,11 @@ public static void createAndSendStaticSnapshot(
}

public static void createAndSendSnapshot(
BarrageStreamGenerator.Factory<BarrageStreamGeneratorImpl.View> streamGeneratorFactory,
BarrageStreamGenerator.Factory streamGeneratorFactory,
BaseTable<?> table,
BitSet columns, RowSet viewport, boolean reverseViewport,
BarrageSnapshotOptions snapshotRequestOptions,
StreamObserver<BarrageStreamGeneratorImpl.View> listener,
StreamObserver<BarrageStreamGenerator.MessageView> listener,
BarragePerformanceLog.SnapshotMetricsHelper metrics) {

// if the table is static and a full snapshot is requested, we can make and send multiple
Expand All @@ -828,8 +826,7 @@ public static void createAndSendSnapshot(
msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // no mod column data

// translate the viewport to keyspace and make the call
try (final BarrageStreamGenerator<BarrageStreamGeneratorImpl.View> bsg =
streamGeneratorFactory.newGenerator(msg, metrics);
try (final BarrageStreamGenerator bsg = streamGeneratorFactory.newGenerator(msg, metrics);
final RowSet keySpaceViewport = viewport != null
? msg.rowsAdded.subSetForPositions(viewport, reverseViewport)
: null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl;
import io.grpc.Drainable;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -58,11 +59,11 @@ public byte[] next() {
return listener.batchMessages.pop();
}

private static class ArrowBuilderObserver implements StreamObserver<BarrageStreamGeneratorImpl.View> {
private static class ArrowBuilderObserver implements StreamObserver<BarrageStreamGenerator.MessageView> {
final Deque<byte[]> batchMessages = new ArrayDeque<>();

@Override
public void onNext(final BarrageStreamGeneratorImpl.View messageView) {
public void onNext(final BarrageStreamGenerator.MessageView messageView) {
try {
messageView.forEachStream(inputStream -> {
try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public class BarrageStreamGeneratorTest {
@Test
public void testDrainableStreamIsEmptied() throws IOException {
final int length = 512;
final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream inputStream =
new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final DrainableByteArrayInputStream inputStream =
new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);

int bytesRead = inputStream.drainTo(new NullOutputStream());

Expand All @@ -26,12 +26,11 @@ public void testDrainableStreamIsEmptied() throws IOException {
@Test
public void testConsecutiveDrainableStreamIsEmptied() throws IOException {
final int length = 512;
final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in1 =
new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in2 =
new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams inputStream =
new BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams(in1, in2);
final DrainableByteArrayInputStream in1 =
new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final DrainableByteArrayInputStream in2 =
new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final ConsecutiveDrainableStreams inputStream = new ConsecutiveDrainableStreams(in1, in2);

int bytesRead = inputStream.drainTo(new NullOutputStream());

Expand Down
Loading

0 comments on commit 6711cd1

Please sign in to comment.