Skip to content

Commit

Permalink
GH-34532: [Java] Change JDBC to handle multi-endpoints
Browse files Browse the repository at this point in the history
- Create new clients to connect to new locations in endpoints.
- If no location is reported using the current connection.
- Rework ArrowFlightSqlHandler.Builder to avoid changing its state
during build() so that it can be re-used.
- Add a copy constructor to ArrowFlightSqlHandler.Builder to make it
easy to replicate connection settings when connecting to different
locations.
- Add unit test infrastructure for building a distributed FlightSql
server
  • Loading branch information
jduo committed Nov 3, 2023
1 parent cd6e635 commit 9a0bd79
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.arrow.driver.jdbc;

import static org.apache.arrow.driver.jdbc.utils.FlightStreamQueue.createNewQueue;
import static org.apache.arrow.driver.jdbc.utils.FlightEndpointDataQueue.createNewQueue;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand All @@ -26,7 +26,8 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import org.apache.arrow.driver.jdbc.utils.FlightStreamQueue;
import org.apache.arrow.driver.jdbc.client.CloseableEndpointStreamPair;
import org.apache.arrow.driver.jdbc.utils.FlightEndpointDataQueue;
import org.apache.arrow.driver.jdbc.utils.VectorSchemaRootTransformer;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
Expand All @@ -47,8 +48,8 @@ public final class ArrowFlightJdbcFlightStreamResultSet
extends ArrowFlightJdbcVectorSchemaRootResultSet {

private final ArrowFlightConnection connection;
private FlightStream currentFlightStream;
private FlightStreamQueue flightStreamQueue;
private CloseableEndpointStreamPair currentEndpointData;
private FlightEndpointDataQueue flightEndpointDataQueue;

private VectorSchemaRootTransformer transformer;
private VectorSchemaRoot currentVectorSchemaRoot;
Expand Down Expand Up @@ -102,20 +103,20 @@ static ArrowFlightJdbcFlightStreamResultSet fromFlightInfo(

resultSet.transformer = transformer;

resultSet.execute(flightInfo);
resultSet.populateData(flightInfo);
return resultSet;
}

private void loadNewQueue() {
Optional.ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
flightStreamQueue = createNewQueue(connection.getExecutorService());
Optional.ofNullable(flightEndpointDataQueue).ifPresent(AutoCloseables::closeNoChecked);
flightEndpointDataQueue = createNewQueue(connection.getExecutorService());
}

private void loadNewFlightStream() throws SQLException {
if (currentFlightStream != null) {
AutoCloseables.closeNoChecked(currentFlightStream);
if (currentEndpointData != null) {
AutoCloseables.closeNoChecked(currentEndpointData);
}
this.currentFlightStream = getNextFlightStream(true);
this.currentEndpointData = getNextEndpointStream(true);
}

@Override
Expand All @@ -124,24 +125,24 @@ protected AvaticaResultSet execute() throws SQLException {

if (flightInfo != null) {
schema = flightInfo.getSchemaOptional().orElse(null);
execute(flightInfo);
populateData(flightInfo);
}
return this;
}

private void execute(final FlightInfo flightInfo) throws SQLException {
private void populateData(final FlightInfo flightInfo) throws SQLException {
loadNewQueue();
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
flightEndpointDataQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
loadNewFlightStream();

// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
executeForCurrentFlightStream();
if (currentEndpointData != null) {
populateDataForCurrentFlightStream();
}
}

private void executeForCurrentFlightStream() throws SQLException {
final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
private void populateDataForCurrentFlightStream() throws SQLException {
final VectorSchemaRoot originalRoot = currentEndpointData.getStream().getRoot();

if (transformer != null) {
try {
Expand All @@ -154,9 +155,9 @@ private void executeForCurrentFlightStream() throws SQLException {
}

if (schema != null) {
execute(currentVectorSchemaRoot, schema);
populateData(currentVectorSchemaRoot, schema);
} else {
execute(currentVectorSchemaRoot);
populateData(currentVectorSchemaRoot);
}
}

Expand All @@ -179,20 +180,20 @@ public boolean next() throws SQLException {
return true;
}

if (currentFlightStream != null) {
currentFlightStream.getRoot().clear();
if (currentFlightStream.next()) {
executeForCurrentFlightStream();
if (currentEndpointData != null) {
currentEndpointData.getStream().getRoot().clear();
if (currentEndpointData.getStream().next()) {
populateDataForCurrentFlightStream();
continue;
}

flightStreamQueue.enqueue(currentFlightStream);
flightEndpointDataQueue.enqueue(currentEndpointData);
}

currentFlightStream = getNextFlightStream(false);
currentEndpointData = getNextEndpointStream(false);

if (currentFlightStream != null) {
executeForCurrentFlightStream();
if (currentEndpointData != null) {
populateDataForCurrentFlightStream();
continue;
}

Expand All @@ -207,14 +208,14 @@ public boolean next() throws SQLException {
@Override
protected void cancel() {
super.cancel();
final FlightStream currentFlightStream = this.currentFlightStream;
if (currentFlightStream != null) {
currentFlightStream.cancel("Cancel", null);
final CloseableEndpointStreamPair currentEndpoint = this.currentEndpointData;
if (currentEndpoint != null) {
currentEndpoint.getStream().cancel("Cancel", null);
}

if (flightStreamQueue != null) {
if (flightEndpointDataQueue != null) {
try {
flightStreamQueue.close();
flightEndpointDataQueue.close();
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -224,27 +225,28 @@ protected void cancel() {
@Override
public synchronized void close() {
try {
if (flightStreamQueue != null) {
if (flightEndpointDataQueue != null) {
// flightStreamQueue should close currentFlightStream internally
flightStreamQueue.close();
} else if (currentFlightStream != null) {
flightEndpointDataQueue.close();
} else if (currentEndpointData != null) {
// close is only called for currentFlightStream if there's no queue
currentFlightStream.close();
currentEndpointData.close();
}

} catch (final Exception e) {
throw new RuntimeException(e);
} finally {
super.close();
}
}

private FlightStream getNextFlightStream(final boolean isExecution) throws SQLException {
if (isExecution) {
private CloseableEndpointStreamPair getNextEndpointStream(final boolean canTimeout) throws SQLException {
if (canTimeout) {
final int statementTimeout = statement != null ? statement.getQueryTimeout() : 0;
return statementTimeout != 0 ?
flightStreamQueue.next(statementTimeout, TimeUnit.SECONDS) : flightStreamQueue.next();
flightEndpointDataQueue.next(statementTimeout, TimeUnit.SECONDS) : flightEndpointDataQueue.next();
} else {
return flightStreamQueue.next();
return flightEndpointDataQueue.next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static ArrowFlightJdbcVectorSchemaRootResultSet fromVectorSchemaRoot(
new ArrowFlightJdbcVectorSchemaRootResultSet(null, state, signature, resultSetMetaData,
timeZone, null);

resultSet.execute(vectorSchemaRoot);
resultSet.populateData(vectorSchemaRoot);
return resultSet;
}

Expand All @@ -92,7 +92,7 @@ protected AvaticaResultSet execute() throws SQLException {
throw new RuntimeException("Can only execute with execute(VectorSchemaRoot)");
}

void execute(final VectorSchemaRoot vectorSchemaRoot) {
void populateData(final VectorSchemaRoot vectorSchemaRoot) {
final List<Field> fields = vectorSchemaRoot.getSchema().getFields();
final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(fields);
signature.columns.clear();
Expand All @@ -102,7 +102,7 @@ void execute(final VectorSchemaRoot vectorSchemaRoot) {
execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot), this.signature.columns);
}

void execute(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
void populateData(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(schema.getFields());
signature.columns.clear();
signature.columns.addAll(columns);
Expand Down
Loading

0 comments on commit 9a0bd79

Please sign in to comment.