Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add java client MultiJoin #5886

Merged
merged 4 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.EmptyTable;
import io.deephaven.qst.table.InputTable;
import io.deephaven.qst.table.MultiJoinInput;
import io.deephaven.qst.table.NewTable;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.qst.table.TimeTable;

import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -52,6 +54,11 @@ public Table of(InputTable inputTable) {
return delegate.of(inputTable);
}

@Override
public Table multiJoin(List<MultiJoinInput<Table>> multiJoinInputs) {
return delegate.multiJoin(multiJoinInputs);
}

@Override
public Table merge(Iterable<Table> tables) {
return delegate.merge(tables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import com.google.auto.service.AutoService;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.MultiJoinFactory;
import io.deephaven.engine.table.MultiJoinInput;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableFactory;
Expand All @@ -13,6 +15,8 @@
import io.deephaven.engine.util.TableTools;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.BlinkInputTable;
import io.deephaven.qst.table.Clock;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.EmptyTable;
import io.deephaven.qst.table.InMemoryAppendOnlyInputTable;
import io.deephaven.qst.table.InMemoryKeyBackedInputTable;
Expand All @@ -22,12 +26,11 @@
import io.deephaven.qst.table.TableSchema;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.qst.table.Clock;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.TimeTable;
import io.deephaven.stream.TablePublisher;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -85,6 +88,14 @@ public final Table of(InputTable inputTable) {
return InputTableAdapter.of(inputTable);
}

@Override
public final Table multiJoin(List<io.deephaven.qst.table.MultiJoinInput<Table>> multiJoinInputs) {
return MultiJoinFactory.of(multiJoinInputs.stream().map(this::adapt).toArray(MultiJoinInput[]::new)).table();
}

private MultiJoinInput adapt(io.deephaven.qst.table.MultiJoinInput<Table> input) {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
return MultiJoinInput.of(input.table(), input.matches(), input.additions());
}

@Override
public final Table merge(Iterable<Table> tables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.deephaven.extensions.barrage.util.*;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.qst.table.TableLabelVisitor;
import io.deephaven.qst.table.TicketTable;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Context;
Expand Down Expand Up @@ -161,12 +159,7 @@ public void onError(final Throwable t) {
.append(": Error detected in snapshot: ")
.append(t).endl();

final String label = tableHandle.export().table().walk(new TableLabelVisitor() {
@Override
public String visit(TicketTable ticketTable) {
return BarrageSubscriptionImpl.nameForTableTicket(ticketTable);
}
});
final String label = TableSpecLabeler.of(tableHandle.export().table());
// this error will always be propagated to our CheckForCompletion#onError callback
resultTable.handleBarrageError(new TableDataException(
String.format("Barrage snapshot error for %s (%s)", logName, label), t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@
import io.deephaven.extensions.barrage.util.*;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.util.ApplicationTicketHelper;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.proto.util.ScopeTicketHelper;
import io.deephaven.proto.util.SharedTicketHelper;
import io.deephaven.qst.table.TableLabelVisitor;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.annotations.VisibleForTesting;
import io.grpc.CallOptions;
Expand All @@ -45,7 +39,6 @@
import io.grpc.stub.ClientResponseObserver;
import org.apache.arrow.flight.impl.Flight.FlightData;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.commons.codec.binary.Hex;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -162,12 +155,7 @@ public void onError(final Throwable t) {
.append(": Error detected in subscription: ")
.append(t).endl();

final String label = tableHandle.export().table().walk(new TableLabelVisitor() {
@Override
public String visit(TicketTable ticketTable) {
return nameForTableTicket(ticketTable);
}
});
final String label = TableSpecLabeler.of(tableHandle.export().table());
resultTable.handleBarrageError(new TableDataException(
String.format("Barrage subscription error for %s (%s)", logName, label), t));
cleanup();
Expand All @@ -185,33 +173,6 @@ public void onCompleted() {
}
}

static String nameForTableTicket(TicketTable table) {
byte[] ticket = table.ticket();
if (ticket.length == 0) {
return "ticketTable(EMPTY)";
}

// We'll try our best to decode the ticket, but it's not guaranteed to be a well-known ticket route.
try {
switch (ticket[0]) {
case 'a':
return ApplicationTicketHelper.toReadableString(ticket);
case 's':
return ScopeTicketHelper.toReadableString(ticket);
case 'e':
return ExportTicketHelper.toReadableString(ByteBuffer.wrap(ticket), "TicketTable");
case 'h':
return SharedTicketHelper.toReadableString(ticket);
default:
break;
}
} catch (Exception err) {
// ignore - let's just return the hex representation
}

return "ticketTable(0x" + Hex.encodeHexString(ticket) + ")";
}

@Override
public Future<Table> entireTable() {
return partialTable(null, null, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.client.impl;

import io.deephaven.proto.util.ApplicationTicketHelper;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.proto.util.ScopeTicketHelper;
import io.deephaven.proto.util.SharedTicketHelper;
import io.deephaven.qst.table.TableLabelVisitor;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.TicketTable;
import org.apache.commons.codec.binary.Hex;

import java.nio.ByteBuffer;

final class TableSpecLabeler extends TableLabelVisitor {
private static final TableSpecLabeler INSTANCE = new TableSpecLabeler();

public static String of(TableSpec tableSpec) {
return tableSpec.walk(INSTANCE);
}

private TableSpecLabeler() {}

static String nameForTableTicket(TicketTable table) {
byte[] ticket = table.ticket();
if (ticket.length == 0) {
return "ticketTable(EMPTY)";
}

// We'll try our best to decode the ticket, but it's not guaranteed to be a well-known ticket route.
try {
switch (ticket[0]) {
case 'a':
return ApplicationTicketHelper.toReadableString(ticket);
case 's':
return ScopeTicketHelper.toReadableString(ticket);
case 'e':
return ExportTicketHelper.toReadableString(ByteBuffer.wrap(ticket), "TicketTable");
case 'h':
return SharedTicketHelper.toReadableString(ticket);
default:
break;
}
} catch (Exception err) {
// ignore - let's just return the hex representation
}

return "ticketTable(0x" + Hex.encodeHexString(ticket) + ")";
}

@Override
public String visit(TicketTable ticketTable) {
return nameForTableTicket(ticketTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import io.deephaven.base.verify.Require;
import io.deephaven.client.impl.Session;
import io.deephaven.client.impl.SessionImpl;
import io.deephaven.engine.table.Table;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.server.runner.DeephavenApiServerTestBase;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.SessionState.ExportObject;
import io.grpc.ManagedChannel;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -50,4 +54,9 @@ public void tearDown() throws Exception {
}
super.tearDown();
}

public TicketTable ref(Table table) {
final ExportObject<Table> export = serverSessionState.newServerSideExport(table);
return TableSpec.ticket(export.getExportId().getTicket().toByteArray());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.client;

import io.deephaven.api.ColumnName;
import io.deephaven.api.JoinMatch;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandle.TableHandleException;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.TableTools;
import io.deephaven.qst.table.MultiJoinInput;
import io.deephaven.qst.table.MultiJoinTable;
import io.deephaven.qst.table.TableSpec;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class MultiJoinTest extends DeephavenSessionTestBase {

@Test
public void muliJoinTableExecute() throws TableHandleException, InterruptedException {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
try (final TableHandle handle = session.batch().execute(prototype())) {
assertThat(handle.isSuccessful()).isTrue();
}
}

private MultiJoinTable prototype() {
final Table t1 = TableTools.newTable(
TableTools.longCol("Key", 0L),
TableTools.longCol("First", 0L));
final Table t2 = TableTools.newTable(
TableTools.longCol("Key", 0L),
TableTools.longCol("Second", 1L));
final Table t3 = TableTools.newTable(
TableTools.longCol("Key", 0L),
TableTools.longCol("Third", 2L));
return MultiJoinTable.builder()
.addInputs(MultiJoinInput.<TableSpec>builder()
.table(ref(t1))
.addMatches(JoinMatch.parse("OutputKey=Key"))
.addAdditions(ColumnName.of("First"))
.build())
.addInputs(MultiJoinInput.<TableSpec>builder()
.table(ref(t2))
.addMatches(JoinMatch.parse("OutputKey=Key"))
.addAdditions(ColumnName.of("Second"))
.build())
.addInputs(MultiJoinInput.<TableSpec>builder()
.table(ref(t3))
.addMatches(JoinMatch.parse("OutputKey=Key"))
.addAdditions(ColumnName.of("Third"))
.build())
.build();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.deephaven.proto.backplane.grpc.InCondition;
import io.deephaven.proto.backplane.grpc.IsNullCondition;
import io.deephaven.proto.backplane.grpc.MergeTablesRequest;
import io.deephaven.proto.backplane.grpc.MultiJoinTablesRequest;
import io.deephaven.proto.backplane.grpc.NaturalJoinTablesRequest;
import io.deephaven.proto.backplane.grpc.NotCondition;
import io.deephaven.proto.backplane.grpc.OrCondition;
Expand Down Expand Up @@ -90,6 +91,8 @@
import io.deephaven.qst.table.JoinTable;
import io.deephaven.qst.table.LazyUpdateTable;
import io.deephaven.qst.table.MergeTable;
import io.deephaven.qst.table.MultiJoinInput;
import io.deephaven.qst.table.MultiJoinTable;
import io.deephaven.qst.table.NaturalJoinTable;
import io.deephaven.qst.table.NewTable;
import io.deephaven.qst.table.RangeJoinTable;
Expand Down Expand Up @@ -583,6 +586,29 @@ public Operation visit(DropColumnsTable dropColumnsTable) {
return op(Builder::setDropColumns, request);
}

@Override
public Operation visit(MultiJoinTable multiJoinTable) {
final MultiJoinTablesRequest.Builder request = MultiJoinTablesRequest.newBuilder()
.setResultId(ticket);
for (MultiJoinInput<TableSpec> input : multiJoinTable.inputs()) {
request.addMultiJoinInputs(adapt(input));
}
return op(Builder::setMultiJoin, request);
}

private io.deephaven.proto.backplane.grpc.MultiJoinInput adapt(MultiJoinInput<TableSpec> input) {
io.deephaven.proto.backplane.grpc.MultiJoinInput.Builder builder =
io.deephaven.proto.backplane.grpc.MultiJoinInput.newBuilder()
.setSourceId(ref(input.table()));
for (JoinMatch match : input.matches()) {
builder.addColumnsToMatch(Strings.of(match));
}
for (JoinAddition addition : input.additions()) {
builder.addColumnsToAdd(Strings.of(addition));
}
return builder.build();
}

private SelectOrUpdateRequest selectOrUpdate(SingleParentTable x,
Collection<Selectable> columns) {
SelectOrUpdateRequest.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
import io.deephaven.qst.table.InputTable;
import io.deephaven.qst.table.LabeledTables;
import io.deephaven.qst.table.MergeTable;
import io.deephaven.qst.table.MultiJoinInput;
import io.deephaven.qst.table.MultiJoinTable;
import io.deephaven.qst.table.NewTable;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.qst.table.TimeTable;

import java.util.List;

abstract class TableHandleManagerBase implements TableHandleManager {

protected abstract TableHandle handle(TableSpec table);
Expand Down Expand Up @@ -49,6 +53,19 @@ public final TableHandle of(InputTable inputTable) {
return handle(inputTable);
}

@Override
public final TableHandle multiJoin(List<MultiJoinInput<TableHandle>> multiJoinInputs) {
MultiJoinTable.Builder builder = MultiJoinTable.builder();
for (MultiJoinInput<TableHandle> input : multiJoinInputs) {
builder.addInputs(MultiJoinInput.<TableSpec>builder()
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
.table(input.table().table())
.addAllMatches(input.matches())
.addAllAdditions(input.additions())
.build());
}
return handle(builder.build());
}

@Override
public final TableHandle merge(Iterable<TableHandle> tableProxies) {
MergeTable.Builder builder = MergeTable.builder();
Expand Down
Loading
Loading