Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup to BarrageStreamGenerator and its related classes in antipation of porting to JS #5552

Merged
merged 14 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,38 @@
import com.google.flatbuffers.FlatBufferBuilder;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.util.BitSet;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;

/**
* A StreamGenerator takes a BarrageMessage and re-uses portions of the serialized payload across different subscribers
* that may subscribe to different viewports and columns.
*
* @param <MessageView> The sub-view type that the listener expects to receive.
*/
public interface BarrageStreamGenerator<MessageView> extends SafeCloseable {
public interface BarrageStreamGenerator extends SafeCloseable {

interface Factory<MessageView> {
/**
* Represents a single update, which might be sent as multiple distinct payloads as necessary based in the
* implementation.
*/
interface MessageView {
void forEachStream(Consumer<DefensiveDrainable> visitor) throws IOException;
}

interface Factory {
/**
* Create a StreamGenerator that now owns the BarrageMessage.
*
* @param message the message that contains the update that we would like to propagate
* @param metricsConsumer a method that can be used to record write metrics
*/
BarrageStreamGenerator<MessageView> newGenerator(
BarrageStreamGenerator newGenerator(
BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer);

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.SafeCloseable;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class ChunkListInputStreamGenerator implements SafeCloseable {
private final List<ChunkInputStreamGenerator> generators;
private final ChunkInputStreamGenerator emptyGenerator;

public ChunkListInputStreamGenerator(Class<?> type, Class<?> componentType, List<Chunk<Values>> data,
ChunkType chunkType) {
// create an input stream generator for each chunk
ChunkInputStreamGenerator[] generators = new ChunkInputStreamGenerator[data.size()];

long rowOffset = 0;
for (int i = 0; i < data.size(); ++i) {
final Chunk<Values> valuesChunk = data.get(i);
generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator(chunkType, type, componentType,
valuesChunk, rowOffset);
rowOffset += valuesChunk.size();
}
this.generators = Arrays.asList(generators);
emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator(
chunkType, type, componentType, chunkType.getEmptyChunk(), 0);
}

public List<ChunkInputStreamGenerator> generators() {
return generators;
}

public ChunkInputStreamGenerator.DrainableColumn empty(StreamReaderOptions options, RowSet rowSet)
throws IOException {
return emptyGenerator.getInputStream(options, rowSet);
}

@Override
public void close() {
for (ChunkInputStreamGenerator generator : generators) {
generator.close();
}
emptyGenerator.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.datastructures.SizeException;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.OutputStream;

public class ConsecutiveDrainableStreams extends DefensiveDrainable {
final DefensiveDrainable[] streams;

public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) {
this.streams = streams;
}

@Override
public int drainTo(final OutputStream outputStream) throws IOException {
int total = 0;
for (final DefensiveDrainable stream : streams) {
final int expected = total + stream.available();
total += stream.drainTo(outputStream);
if (expected != total) {
throw new IllegalStateException("drained message drained wrong number of bytes");
}
if (total < 0) {
throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE");
}
}
return total;
}

@Override
public int available() throws SizeException, IOException {
int total = 0;
for (final DefensiveDrainable stream : streams) {
total += stream.available();
if (total < 0) {
throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total);
}
}
return total;
}

@Override
public void close() throws IOException {
SafeCloseable.closeAll(streams);
super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import io.deephaven.extensions.barrage.util.DefensiveDrainable;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;

public class DrainableByteArrayInputStream extends DefensiveDrainable {

private byte[] buf;
private final int offset;
private final int length;

public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) {
this.buf = Objects.requireNonNull(buf);
this.offset = offset;
this.length = length;
}

@Override
public int available() {
if (buf == null) {
return 0;
}
return length;
}

@Override
public int drainTo(final OutputStream outputStream) throws IOException {
if (buf != null) {
try {
outputStream.write(buf, offset, length);
} finally {
buf = null;
}
return length;
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl;
import io.deephaven.extensions.barrage.chunk.vector.VectorExpansionKernel;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
Expand Down Expand Up @@ -702,13 +701,13 @@ private static Field arrowFieldForVectorType(
}

public static void createAndSendStaticSnapshot(
BarrageStreamGenerator.Factory<BarrageStreamGeneratorImpl.View> streamGeneratorFactory,
BarrageStreamGenerator.Factory streamGeneratorFactory,
BaseTable<?> table,
BitSet columns,
RowSet viewport,
boolean reverseViewport,
BarrageSnapshotOptions snapshotRequestOptions,
StreamObserver<BarrageStreamGeneratorImpl.View> listener,
StreamObserver<BarrageStreamGenerator.MessageView> listener,
BarragePerformanceLog.SnapshotMetricsHelper metrics) {
// start with small value and grow
long snapshotTargetCellCount = MIN_SNAPSHOT_CELL_COUNT;
Expand Down Expand Up @@ -755,8 +754,7 @@ public static void createAndSendStaticSnapshot(
// send out the data. Note that although a `BarrageUpdateMetaData` object will
// be provided with each unique snapshot, vanilla Flight clients will ignore
// these and see only an incoming stream of batches
try (final BarrageStreamGenerator<BarrageStreamGeneratorImpl.View> bsg =
streamGeneratorFactory.newGenerator(msg, metrics)) {
try (final BarrageStreamGenerator bsg = streamGeneratorFactory.newGenerator(msg, metrics)) {
if (rsIt.hasMore()) {
listener.onNext(bsg.getSnapshotView(snapshotRequestOptions,
snapshotViewport, false,
Expand Down Expand Up @@ -797,11 +795,11 @@ public static void createAndSendStaticSnapshot(
}

public static void createAndSendSnapshot(
BarrageStreamGenerator.Factory<BarrageStreamGeneratorImpl.View> streamGeneratorFactory,
BarrageStreamGenerator.Factory streamGeneratorFactory,
BaseTable<?> table,
BitSet columns, RowSet viewport, boolean reverseViewport,
BarrageSnapshotOptions snapshotRequestOptions,
StreamObserver<BarrageStreamGeneratorImpl.View> listener,
StreamObserver<BarrageStreamGenerator.MessageView> listener,
BarragePerformanceLog.SnapshotMetricsHelper metrics) {

// if the table is static and a full snapshot is requested, we can make and send multiple
Expand All @@ -828,8 +826,7 @@ public static void createAndSendSnapshot(
msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // no mod column data

// translate the viewport to keyspace and make the call
try (final BarrageStreamGenerator<BarrageStreamGeneratorImpl.View> bsg =
streamGeneratorFactory.newGenerator(msg, metrics);
try (final BarrageStreamGenerator bsg = streamGeneratorFactory.newGenerator(msg, metrics);
final RowSet keySpaceViewport = viewport != null
? msg.rowsAdded.subSetForPositions(viewport, reverseViewport)
: null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl;
import io.grpc.Drainable;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -58,11 +59,11 @@ public byte[] next() {
return listener.batchMessages.pop();
}

private static class ArrowBuilderObserver implements StreamObserver<BarrageStreamGeneratorImpl.View> {
private static class ArrowBuilderObserver implements StreamObserver<BarrageStreamGenerator.MessageView> {
final Deque<byte[]> batchMessages = new ArrayDeque<>();

@Override
public void onNext(final BarrageStreamGeneratorImpl.View messageView) {
public void onNext(final BarrageStreamGenerator.MessageView messageView) {
try {
messageView.forEachStream(inputStream -> {
try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public class BarrageStreamGeneratorTest {
@Test
public void testDrainableStreamIsEmptied() throws IOException {
final int length = 512;
final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream inputStream =
new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final DrainableByteArrayInputStream inputStream =
new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);

int bytesRead = inputStream.drainTo(new NullOutputStream());

Expand All @@ -26,12 +26,11 @@ public void testDrainableStreamIsEmptied() throws IOException {
@Test
public void testConsecutiveDrainableStreamIsEmptied() throws IOException {
final int length = 512;
final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in1 =
new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in2 =
new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams inputStream =
new BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams(in1, in2);
final DrainableByteArrayInputStream in1 =
new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final DrainableByteArrayInputStream in2 =
new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final ConsecutiveDrainableStreams inputStream = new ConsecutiveDrainableStreams(in1, in2);

int bytesRead = inputStream.drainTo(new NullOutputStream());

Expand Down
Loading
Loading