From 1cea623eaab989c599318b360d37340bc5948398 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Thu, 4 Aug 2022 14:23:05 -0600 Subject: [PATCH 1/4] S&T outstanding work Try to publish Attach workspace Don't rebuild every time Only need to deploy jdbc-driver Only need to deploy jdbc-driver Might work this time? Keep trying Try to tag Add docker Build from correct directory typo --- .circleci/config.yml | 59 ++++++ .gitignore | 3 + java/flight/flight-jdbc-driver/pom.xml | 3 +- .../flight-jdbc-driver/sqlline.Dockerfile | 9 + .../driver/jdbc/ArrowFlightJdbcDriver.java | 7 +- .../client/ArrowFlightSqlClientHandler.java | 168 ++++++++++++------ .../jdbc/ArrowFlightJdbcDriverTest.java | 109 +++++++++++- 7 files changed, 292 insertions(+), 66 deletions(-) create mode 100644 .circleci/config.yml create mode 100644 java/flight/flight-jdbc-driver/sqlline.Dockerfile diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000000000..a649e86d03891 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,59 @@ +# CircleCI configuration file +version: 2.1 + +orbs: + maven: circleci/maven@1.3.0 + +jobs: + build: + docker: + - image: maven:3.8.6-jdk-8 + steps: + - checkout + - run: | + git tag circle-$CIRCLE_BUILD_NUM + git push origin --tags + cd /root/project/java + mvn install -DskipTests -Dcheckstyle.skip -Drat.skip=true -pl :flight-jdbc-driver -am + mkdir -p /root/project/artifacts + - persist_to_workspace: + root: /root + paths: + - project/ + - .m2/ + + maven_deploy: + docker: + - image: maven:3.8.6-jdk-8 + steps: + - attach_workspace: + at: /root + - run: | + mkdir -p /root/.m2 + echo '${repo.id}${repo.login}${repo.pwd}' > /root/.m2/settings.xml + cd /root/project/java/flight/flight-jdbc-driver + mvn deploy -DskipTests -Drat.skip=true -Dcheckstyle.skip -DaltDeploymentRepository="snapshots::default::$ARTIFACTORY_URL" -e -Drepo.id=snapshots -Drepo.login=$ARTIFACTORY_USER -Drepo.pwd="$ARTIFACTORY_API_KEY" -Dmaven.install.skip=true + + docker_deploy: + docker: + - image: docker:17.05.0-ce-git + steps: + - setup_remote_docker + - attach_workspace: + at: /root + - run: | + cd /root/project/java/flight/flight-jdbc-driver + docker build -f sqlline.Dockerfile . -t spaceandtime.jfrog.io/sxt-docker-docker/ballista-sqlline:$CIRCLE_BUILD_NUM + docker login -u$ARTIFACTORY_USER -p$ARTIFACTORY_API_KEY spaceandtime.jfrog.io + docker push spaceandtime.jfrog.io/sxt-docker-docker/ballista-sqlline:$CIRCLE_BUILD_NUM + +workflows: + main_workflow: + jobs: + - build + - maven_deploy: + requires: + - build + - docker_deploy: + requires: + - build diff --git a/.gitignore b/.gitignore index 1406c30689f8c..a0d4fb8275f65 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,6 @@ cpp/Brewfile.lock.json java-dist/ java-native-c/ java-native-cpp/ + +# files altered by build +java/flight/flight-jdbc-driver/src/main/resources/properties/flight.properties diff --git a/java/flight/flight-jdbc-driver/pom.xml b/java/flight/flight-jdbc-driver/pom.xml index b127998cc8b74..014da57168bbc 100644 --- a/java/flight/flight-jdbc-driver/pom.xml +++ b/java/flight/flight-jdbc-driver/pom.xml @@ -195,8 +195,9 @@ org. cfjd.org. - org.apache.arrow.driver.jdbc.** + org.apache.arrow.** org.slf4j.** + org.apache.calcite.** org.apache.arrow.flight.name org.apache.arrow.flight.version diff --git a/java/flight/flight-jdbc-driver/sqlline.Dockerfile b/java/flight/flight-jdbc-driver/sqlline.Dockerfile new file mode 100644 index 0000000000000..02d4f2cd362d9 --- /dev/null +++ b/java/flight/flight-jdbc-driver/sqlline.Dockerfile @@ -0,0 +1,9 @@ +FROM dcm4che/sqlline + +COPY target/flight-jdbc-driver-9.0.0-SNAPSHOT.jar /usr/share/java + +ENV JAVA_CLASSPATH=/usr/share/java/flight-jdbc-driver-9.0.0-SNAPSHOT.jar + +RUN echo 'sqlline -d org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver --verbose=true -n admin -p password -u jdbc:arrow-flight://$1:50050?useEncryption=false' > main.sh + +ENTRYPOINT ["/bin/bash", "main.sh"] diff --git a/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriver.java b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriver.java index 216c37fb5d32d..f521db9a194e6 100644 --- a/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriver.java +++ b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriver.java @@ -245,9 +245,10 @@ private Map getUrlsArgs(String url) resultMap.put(ArrowFlightConnectionProperty.PORT.camelName(), uri.getPort()); // port final String extraParams = uri.getRawQuery(); // optional params - - final Map keyValuePairs = UrlParser.parse(extraParams, "&"); - resultMap.putAll(keyValuePairs); + if(extraParams != null) { + final Map keyValuePairs = UrlParser.parse(extraParams, "&"); + resultMap.putAll(keyValuePairs); + } return resultMap; } diff --git a/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java index afac6c164708b..19a54038a21c7 100644 --- a/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java +++ b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java @@ -25,7 +25,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.net.URI; import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils; import org.apache.arrow.flight.CallOption; @@ -50,30 +51,39 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.calcite.avatica.Meta.StatementType; +import static org.apache.arrow.flight.LocationSchemes.GRPC_INSECURE; +import static org.apache.arrow.flight.LocationSchemes.GRPC_TLS; + /** * A {@link FlightSqlClient} handler. */ public final class ArrowFlightSqlClientHandler implements AutoCloseable { + private final ConnectionFactory factory; private final FlightSqlClient sqlClient; private final Set options = new HashSet<>(); - ArrowFlightSqlClientHandler(final FlightSqlClient sqlClient, - final Collection options) { + ArrowFlightSqlClientHandler( + final ConnectionFactory factory, + final Collection options + ) throws SQLException { + this.factory = Preconditions.checkNotNull(factory); + this.sqlClient = factory.createConnection(null); this.options.addAll(options); - this.sqlClient = Preconditions.checkNotNull(sqlClient); } /** * Creates a new {@link ArrowFlightSqlClientHandler} from the provided {@code client} and {@code options}. * - * @param client the {@link FlightClient} to manage under a {@link FlightSqlClient} wrapper. + * @param factory the {@link ConnectionFactory} to create {@link FlightSqlClient}s. * @param options the {@link CallOption}s to persist in between subsequent client calls. * @return a new {@link ArrowFlightSqlClientHandler}. */ - public static ArrowFlightSqlClientHandler createNewHandler(final FlightClient client, - final Collection options) { - return new ArrowFlightSqlClientHandler(new FlightSqlClient(client), options); + public static ArrowFlightSqlClientHandler createNewHandler( + final ConnectionFactory factory, + final Collection options + ) throws SQLException { + return new ArrowFlightSqlClientHandler(factory, options); } /** @@ -92,11 +102,16 @@ private CallOption[] getOptions() { * @param flightInfo The {@link FlightInfo} instance from which to fetch results. * @return a {@code FlightStream} of results. */ - public List getStreams(final FlightInfo flightInfo) { - return flightInfo.getEndpoints().stream() - .map(FlightEndpoint::getTicket) - .map(ticket -> sqlClient.getStream(ticket, getOptions())) - .collect(Collectors.toList()); + public List getStreams(final FlightInfo flightInfo) throws SQLException { + ArrayList streams = new java.util.ArrayList<>(); + for (FlightEndpoint ep : flightInfo.getEndpoints()) { + URI uri = ep.getLocations().isEmpty() ? null : ep.getLocations().get(0).getUri(); + System.out.println(String.format("Getting stream %s", uri)); // TODO: remove + FlightSqlClient sqlClient = this.factory.createConnection(uri); + FlightStream stream = sqlClient.getStream(ep.getTicket(), getOptions()); // TODO: wrap and close client + streams.add(stream); + } + return streams; } /** @@ -332,23 +347,31 @@ public FlightInfo getCrossReference(String pkCatalog, String pkSchema, String pk getOptions()); } + /** + * Config options shared by both the {@link ArrowFlightSqlClientHandler.Builder} + * and the {@link ConnectionFactory}. + */ + static class Config { + protected final Set middlewareFactories = new HashSet<>(); + protected final Set options = new HashSet<>(); + protected String host; + protected int port; + protected String username; + protected String password; + protected String trustStorePath; + protected String trustStorePassword; + protected String token; + protected boolean useEncryption; + protected boolean disableCertificateVerification; + protected boolean useSystemTrustStore; + protected BufferAllocator allocator; + } + /** * Builder for {@link ArrowFlightSqlClientHandler}. */ public static final class Builder { - private final Set middlewareFactories = new HashSet<>(); - private final Set options = new HashSet<>(); - private String host; - private int port; - private String username; - private String password; - private String trustStorePath; - private String trustStorePassword; - private String token; - private boolean useEncryption; - private boolean disableCertificateVerification; - private boolean useSystemTrustStore; - private BufferAllocator allocator; + private final Config config = new Config(); /** * Sets the host for this handler. @@ -357,7 +380,7 @@ public static final class Builder { * @return this instance. */ public Builder withHost(final String host) { - this.host = host; + this.config.host = host; return this; } @@ -368,7 +391,7 @@ public Builder withHost(final String host) { * @return this instance. */ public Builder withPort(final int port) { - this.port = port; + this.config.port = port; return this; } @@ -379,7 +402,7 @@ public Builder withPort(final int port) { * @return this instance. */ public Builder withUsername(final String username) { - this.username = username; + this.config.username = username; return this; } @@ -390,7 +413,7 @@ public Builder withUsername(final String username) { * @return this instance. */ public Builder withPassword(final String password) { - this.password = password; + this.config.password = password; return this; } @@ -401,7 +424,7 @@ public Builder withPassword(final String password) { * @return this instance. */ public Builder withTrustStorePath(final String trustStorePath) { - this.trustStorePath = trustStorePath; + this.config.trustStorePath = trustStorePath; return this; } @@ -412,7 +435,7 @@ public Builder withTrustStorePath(final String trustStorePath) { * @return this instance. */ public Builder withTrustStorePassword(final String trustStorePassword) { - this.trustStorePassword = trustStorePassword; + this.config.trustStorePassword = trustStorePassword; return this; } @@ -423,7 +446,7 @@ public Builder withTrustStorePassword(final String trustStorePassword) { * @return this instance. */ public Builder withEncryption(final boolean useEncryption) { - this.useEncryption = useEncryption; + this.config.useEncryption = useEncryption; return this; } @@ -434,7 +457,7 @@ public Builder withEncryption(final boolean useEncryption) { * @return this instance. */ public Builder withDisableCertificateVerification(final boolean disableCertificateVerification) { - this.disableCertificateVerification = disableCertificateVerification; + this.config.disableCertificateVerification = disableCertificateVerification; return this; } @@ -445,7 +468,7 @@ public Builder withDisableCertificateVerification(final boolean disableCertifica * @return this instance. */ public Builder withSystemTrustStore(final boolean useSystemTrustStore) { - this.useSystemTrustStore = useSystemTrustStore; + this.config.useSystemTrustStore = useSystemTrustStore; return this; } @@ -455,7 +478,7 @@ public Builder withSystemTrustStore(final boolean useSystemTrustStore) { * @return this builder instance. */ public Builder withToken(final String token) { - this.token = token; + this.config.token = token; return this; } @@ -466,13 +489,13 @@ public Builder withToken(final String token) { * @return this instance. */ public Builder withBufferAllocator(final BufferAllocator allocator) { - this.allocator = allocator + this.config.allocator = allocator .newChildAllocator("ArrowFlightSqlClientHandler", 0, allocator.getLimit()); return this; } /** - * Adds the provided {@code factories} to the list of {@link #middlewareFactories} of this handler. + * Adds the provided {@code factories} to the list of middlewareFactories of this handler. * * @param factories the factories to add. * @return this instance. @@ -482,14 +505,14 @@ public Builder withMiddlewareFactories(final FlightClientMiddleware.Factory... f } /** - * Adds the provided {@code factories} to the list of {@link #middlewareFactories} of this handler. + * Adds the provided {@code factories} to the list of middlewareFactories of this handler. * * @param factories the factories to add. * @return this instance. */ public Builder withMiddlewareFactories( final Collection factories) { - this.middlewareFactories.addAll(factories); + this.config.middlewareFactories.addAll(factories); return this; } @@ -510,7 +533,7 @@ public Builder withCallOptions(final CallOption... options) { * @return this instance. */ public Builder withCallOptions(final Collection options) { - this.options.addAll(options); + this.config.options.addAll(options); return this; } @@ -521,19 +544,50 @@ public Builder withCallOptions(final Collection options) { * @throws SQLException on error. */ public ArrowFlightSqlClientHandler build() throws SQLException { + ConnectionFactory factory = new ConnectionFactory(config); + return ArrowFlightSqlClientHandler.createNewHandler(factory, config.options); + } + } + + static class ConnectionFactory { + final Config config; + + public ConnectionFactory(Config config) { + this.config = config; + } + + public FlightSqlClient createConnection(URI uri) throws SQLException { + final Set middlewareFactories = new HashSet<>(); + middlewareFactories.addAll(config.middlewareFactories); FlightClient client = null; + String username = config.username; + String password = config.password; try { + // Use default parameters if no URI provided + if (uri == null) { + String scheme = config.useEncryption ? GRPC_TLS : GRPC_INSECURE; + uri = new URI(String.format("%s://%s:%d", scheme, config.host, config.port)); + } else { + // TODO: use same auth for both executors and scheduler + username = null; + password = null; + } + + // Create a new connection and add it to the map + String host = uri.getHost(); + int port = uri.getPort(); + boolean useEncryption = GRPC_TLS.equals(uri.getScheme()); ClientIncomingAuthHeaderMiddleware.Factory authFactory = null; if (username != null) { authFactory = new ClientIncomingAuthHeaderMiddleware.Factory(new ClientBearerHeaderHandler()); - withMiddlewareFactories(authFactory); + middlewareFactories.add(authFactory); } - final FlightClient.Builder clientBuilder = FlightClient.builder().allocator(allocator); - withMiddlewareFactories(new ClientCookieMiddleware.Factory()); + final FlightClient.Builder clientBuilder = FlightClient.builder().allocator(config.allocator); + middlewareFactories.add(new ClientCookieMiddleware.Factory()); middlewareFactories.forEach(clientBuilder::intercept); Location location; - if (useEncryption) { + if (config.useEncryption) { location = Location.forGrpcTls(host, port); clientBuilder.useTls(); } else { @@ -542,31 +596,33 @@ public ArrowFlightSqlClientHandler build() throws SQLException { clientBuilder.location(location); if (useEncryption) { - if (disableCertificateVerification) { + if (config.disableCertificateVerification) { clientBuilder.verifyServer(false); } else { - if (useSystemTrustStore) { + if (config.useSystemTrustStore) { clientBuilder.trustedCertificates( - ClientAuthenticationUtils.getCertificateInputStreamFromSystem(trustStorePassword)); - } else if (trustStorePath != null) { + ClientAuthenticationUtils.getCertificateInputStreamFromSystem(config.trustStorePassword)); + } else if (config.trustStorePath != null) { clientBuilder.trustedCertificates( - ClientAuthenticationUtils.getCertificateStream(trustStorePath, trustStorePassword)); + ClientAuthenticationUtils.getCertificateStream(config.trustStorePath, config.trustStorePassword)); } } } client = clientBuilder.build(); if (authFactory != null) { - options.add( + config.options.add( ClientAuthenticationUtils.getAuthenticate(client, username, password, authFactory)); - } else if (token != null) { - options.add( + } else if (config.token != null) { + config.options.add( ClientAuthenticationUtils.getAuthenticate( - client, new CredentialCallOption(new BearerCredentialWriter(token)))); + client, new CredentialCallOption(new BearerCredentialWriter(config.token)))); } - return ArrowFlightSqlClientHandler.createNewHandler(client, options); + FlightSqlClient sqlClient = new FlightSqlClient(client); + return sqlClient; - } catch (final IllegalArgumentException | GeneralSecurityException | IOException | FlightRuntimeException e) { + } catch (final IllegalArgumentException | GeneralSecurityException | IOException | FlightRuntimeException | + java.net.URISyntaxException e) { final SQLException originalException = new SQLException(e); if (client != null) { try { diff --git a/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriverTest.java b/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriverTest.java index d39ec61f3099e..f17668ed001dd 100644 --- a/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriverTest.java +++ b/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriverTest.java @@ -17,14 +17,9 @@ package org.apache.arrow.driver.jdbc; -import static org.junit.Assert.assertEquals; - import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.SQLException; +import java.sql.*; import java.util.Collection; import java.util.Map; @@ -40,6 +35,8 @@ import org.junit.Ignore; import org.junit.Test; +import static org.junit.Assert.*; + /** * Tests for {@link ArrowFlightJdbcDriver}. */ @@ -74,6 +71,106 @@ public void tearDown() throws Exception { AutoCloseables.close(dataSource, allocator); } + @Test + public void testBallista() throws Exception { + String url = "jdbc:arrow-flight://localhost:50050"; + java.util.Properties props = new java.util.Properties(); + props.setProperty("useEncryption", "false"); + props.setProperty("user", "admin"); + props.setProperty("password", "password"); + Connection con = DriverManager.getConnection(url, props); + + // Do exact same sequence as DataGrip does + assertFalse(con.isClosed()); + DatabaseMetaData dbmd = con.getMetaData(); + assertEquals("Arrow Flight JDBC Driver", dbmd.getDriverName()); + assertEquals("9.0.0-SNAPSHOT", dbmd.getDriverVersion()); + //assertEquals("", md.getDatabaseProductName()); // UNIMPLEMENTED: Implement CommandGetSqlInfo + assertEquals(9, dbmd.getDatabaseMajorVersion()); + assertEquals(0, dbmd.getDatabaseMinorVersion()); + assertNull(con.getClientInfo("ApplicationName")); +// con.setClientInfo("ApplicationName", "DataGrip 2022.2.1"); + assertEquals(4, dbmd.getJDBCMajorVersion()); + assertNotNull(dbmd.getConnection()); +// assertEquals("", md.getIdentifierQuoteString()); // Implement CommandGetSqlInfo +// assertEquals("", md.getExtraNameCharacters()); // Implement CommandGetSqlInfo + assertFalse(dbmd.supportsMixedCaseIdentifiers()); + assertTrue(dbmd.storesUpperCaseIdentifiers()); + assertFalse(dbmd.storesLowerCaseIdentifiers()); + assertFalse(dbmd.storesMixedCaseIdentifiers()); + assertTrue(dbmd.supportsMixedCaseQuotedIdentifiers()); + assertFalse(dbmd.storesUpperCaseQuotedIdentifiers()); + assertFalse(dbmd.storesLowerCaseQuotedIdentifiers()); + assertFalse(dbmd.storesMixedCaseQuotedIdentifiers()); + assertTrue(con.isValid(20)); +// assertFalse(md.supportsSavepoints()); // Implement CommandGetSqlInfo + assertTrue(con.getAutoCommit()); + con.setReadOnly(false); + assertNull(con.getCatalog()); + assertNull(con.getSchema()); + + // statement + java.sql.Statement stmt = con.createStatement(); +// stmt.setEscapeProcessing(false); // Avatica not supported + assertFalse(stmt.isClosed()); + stmt.setFetchSize(100); + String sql = "create external table customer STORED AS CSV WITH HEADER ROW\n" + + " LOCATION '/home/bgardner/workspace/ballista/arrow-datafusion/datafusion/core/tests/tpch-csv/customer.csv';\n"; + assertFalse(stmt.execute(sql)); + + // resultset + ResultSet rs = stmt.getResultSet(); + assertFalse(rs.isClosed()); // NPE + ResultSetMetaData md = rs.getMetaData(); + assertNotNull(md); + assertEquals(1, md.getColumnCount()); + assertEquals("", md.getColumnTypeName(1)); + assertEquals("", md.getColumnType(1)); + assertEquals("", md.getColumnClassName(1)); + assertEquals(1, rs.getType()); + assertFalse(rs.next()); + assertFalse(rs.isClosed()); + assertFalse(stmt.getMoreResults()); + assertFalse(stmt.isClosed()); + assertNull(con.getWarnings()); + con.clearWarnings();; + stmt.close(); + + int updateCount = stmt.getUpdateCount(); + assertEquals(updateCount, 0); + + sql = "select c_name from customer order by c_name limit 1"; + assertTrue(stmt.execute(sql)); + rs = stmt.getResultSet(); + int count = 0; + while(rs.next()) { + String key = rs.getString(1); + assertEquals(key, "Customer#000000002"); + count++; + } + assertEquals(count, 1); + } + + @Test + public void testDremio() throws Exception { + String sql = "select 'Hello, Dremio';"; + String url = "jdbc:arrow-flight://localhost:32010"; + java.util.Properties props = new java.util.Properties(); + props.setProperty("useEncryption", "false"); + props.setProperty("user", "dremio"); + props.setProperty("password", "dremio123"); + Connection con = DriverManager.getConnection(url, props); + java.sql.Statement stmt = con.createStatement(); + boolean result = stmt.execute(sql); + assertEquals(result, true); + java.sql.ResultSet rs = stmt.getResultSet(); + int count = 0; + while(rs.next()) { + count++; + } + assertEquals(count, 1); + } + /** * Tests whether the {@link ArrowFlightJdbcDriver} is registered in the * {@link DriverManager}. From 5e4c3854703250b39491fef0b12ecdd990e63f3b Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Tue, 16 Aug 2022 12:24:14 -0600 Subject: [PATCH 2/4] Fix auth TODO --- .../arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java index 19a54038a21c7..53d62cf70084a 100644 --- a/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java +++ b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java @@ -567,10 +567,6 @@ public FlightSqlClient createConnection(URI uri) throws SQLException { if (uri == null) { String scheme = config.useEncryption ? GRPC_TLS : GRPC_INSECURE; uri = new URI(String.format("%s://%s:%d", scheme, config.host, config.port)); - } else { - // TODO: use same auth for both executors and scheduler - username = null; - password = null; } // Create a new connection and add it to the map From 9f23ae9682eed1cdc940795f3e532ec141327fd1 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Tue, 16 Aug 2022 13:09:58 -0600 Subject: [PATCH 3/4] Close clients --- .../org/apache/arrow/flight/ArrowMessage.java | 2 +- .../org/apache/arrow/flight/FlightClient.java | 8 +- .../apache/arrow/flight/FlightService.java | 4 +- .../org/apache/arrow/flight/FlightStream.java | 519 +----------------- .../apache/arrow/flight/FlightStreamImpl.java | 495 +++++++++++++++++ .../arrow/flight/TestMetadataVersion.java | 10 +- .../tests/IntegrationProducer.java | 2 +- java/flight/flight-jdbc-driver/pom.xml | 5 + .../arrow/driver/jdbc/FlightClientCloser.java | 108 ++++ .../client/ArrowFlightSqlClientHandler.java | 7 +- .../jdbc/utils/FlightStreamQueueTest.java | 3 +- 11 files changed, 657 insertions(+), 506 deletions(-) create mode 100644 java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStreamImpl.java create mode 100644 java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/FlightClientCloser.java diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index b4ee835dee4a0..08f2a133d6550 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -65,7 +65,7 @@ /** * The in-memory representation of FlightData used to manage a stream of Arrow messages. */ -class ArrowMessage implements AutoCloseable { +public class ArrowMessage implements AutoCloseable { // If true, deserialize Arrow data by giving Arrow a reference to the underlying gRPC buffer // instead of copying the data. Defaults to true. diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java index 762b37859b948..9204e62fa7707 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java @@ -303,7 +303,7 @@ public SchemaResult getSchema(FlightDescriptor descriptor, CallOption... options public FlightStream getStream(Ticket ticket, CallOption... options) { final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, options).getCallOptions(); ClientCall call = interceptedChannel.newCall(doGetDescriptor, callOptions); - FlightStream stream = new FlightStream( + FlightStream stream = new FlightStreamImpl( allocator, PENDING_REQUESTS, (String message, Throwable cause) -> call.cancel(message, cause), @@ -352,14 +352,14 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption.. try { final ClientCall call = interceptedChannel.newCall(doExchangeDescriptor, callOptions); - final FlightStream stream = new FlightStream(allocator, PENDING_REQUESTS, call::cancel, call::request); + final FlightStream stream = new FlightStreamImpl(allocator, PENDING_REQUESTS, call::cancel, call::request); final ClientCallStreamObserver observer = (ClientCallStreamObserver) ClientCalls.asyncBidiStreamingCall(call, stream.asObserver()); final ClientStreamListener writer = new PutObserver( - descriptor, observer, stream.cancelled::isDone, + descriptor, observer, stream.cancelled()::isDone, () -> { try { - stream.completed.get(); + stream.completed().get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw CallStatus.INTERNAL diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java index 4fb0dea2cba26..8913331a75719 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java @@ -220,7 +220,7 @@ public StreamObserver doPutCustom(final StreamObserver ackStream = StreamPipe .wrap(responseObserver, PutResult::toProtocol, this::handleExceptionWithMiddleware); - final FlightStream fs = new FlightStream( + final FlightStream fs = new FlightStreamImpl( allocator, PENDING_REQUESTS, /* server-upload streams are not cancellable */null, @@ -351,7 +351,7 @@ public StreamObserver doExchangeCustom(StreamObserver { - }; - private final AutoCloseable DONE_EX = () -> { - }; - - private final BufferAllocator allocator; - private final Cancellable cancellable; - private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); - private final SettableFuture root = SettableFuture.create(); - private final SettableFuture descriptor = SettableFuture.create(); - private final int pendingTarget; - private final Requestor requestor; - // The completion flags. - // This flag is only updated as the user iterates through the data, i.e. it tracks whether the user has read all the - // data and closed the stream - final CompletableFuture completed; - // This flag is immediately updated when gRPC signals that the server has ended the call. This is used to make sure - // we don't block forever trying to write to a server that has rejected a call. - final CompletableFuture cancelled; - - private volatile int pending = 1; - private volatile VectorSchemaRoot fulfilledRoot; - private DictionaryProvider.MapDictionaryProvider dictionaries; - private volatile VectorLoader loader; - private volatile Throwable ex; - private volatile ArrowBuf applicationMetadata = null; - @VisibleForTesting - volatile MetadataVersion metadataVersion = null; - - /** - * Constructs a new instance. - * - * @param allocator The allocator to use for creating/reallocating buffers for Vectors. - * @param pendingTarget Target number of messages to receive. - * @param cancellable Used to cancel mid-stream requests. - * @param requestor A callback to determine how many pending items there are. - */ - public FlightStream(BufferAllocator allocator, int pendingTarget, Cancellable cancellable, Requestor requestor) { - Objects.requireNonNull(allocator); - Objects.requireNonNull(requestor); - this.allocator = allocator; - this.pendingTarget = pendingTarget; - this.cancellable = cancellable; - this.requestor = requestor; - this.dictionaries = new DictionaryProvider.MapDictionaryProvider(); - this.completed = new CompletableFuture<>(); - this.cancelled = new CompletableFuture<>(); - } - - /** - * Get the schema for this stream. Blocks until the schema is available. - */ - public Schema getSchema() { - return getRoot().getSchema(); - } - - /** - * Get the provider for dictionaries in this stream. - * - *

Does NOT retain a reference to the underlying dictionaries. Dictionaries may be updated as the stream is read. - * This method is intended for stream processing, where the application code will not retain references to values - * after the stream is closed. - * - * @throws IllegalStateException if {@link #takeDictionaryOwnership()} was called - * @see #takeDictionaryOwnership() - */ - public DictionaryProvider getDictionaryProvider() { - if (dictionaries == null) { - throw new IllegalStateException("Dictionary ownership was claimed by the application."); - } - return dictionaries; - } - /** - * Get an owned reference to the dictionaries in this stream. Should be called after finishing reading the stream, - * but before closing. - * - *

If called, the client is responsible for closing the dictionaries in this provider. Can only be called once. - * - * @return The dictionary provider for the stream. - * @throws IllegalStateException if called more than once. - */ - public DictionaryProvider takeDictionaryOwnership() { - if (dictionaries == null) { - throw new IllegalStateException("Dictionary ownership was claimed by the application."); - } - // Swap out the provider so it is not closed - final DictionaryProvider provider = dictionaries; - dictionaries = null; - return provider; - } - - /** - * Get the descriptor for this stream. Only applicable on the server side of a DoPut operation. Will block until the - * client sends the descriptor. - */ - public FlightDescriptor getDescriptor() { - // This blocks until the first message from the client is received. - try { - return descriptor.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw CallStatus.INTERNAL.withCause(e).withDescription("Interrupted").toRuntimeException(); - } catch (ExecutionException e) { - throw CallStatus.INTERNAL.withCause(e).withDescription("Error getting descriptor").toRuntimeException(); - } - } - - /** - * Closes the stream (freeing any existing resources). - * - *

If the stream isn't complete and is cancellable, this method will cancel and drain the stream first. - */ - public void close() throws Exception { - final List closeables = new ArrayList<>(); - Throwable suppressor = null; - if (cancellable != null) { - // Client-side stream. Cancel the call, to help ensure gRPC doesn't deliver a message after close() ends. - // On the server side, we can't rely on draining the stream , because this gRPC bug means the completion callback - // may never run https://github.com/grpc/grpc-java/issues/5882 - try { - synchronized (cancellable) { - if (!cancelled.isDone()) { - // Only cancel if the call is not done on the gRPC side - cancellable.cancel("Stream closed before end", /* no exception to report */null); - } - } - // Drain the stream without the lock (as next() implicitly needs the lock) - while (next()) { } - } catch (FlightRuntimeException e) { - suppressor = e; - } - } - // Perform these operations under a lock. This way the observer can't enqueue new messages while we're in the - // middle of cleanup. This should only be a concern for server-side streams since client-side streams are drained - // by the lambda above. - synchronized (completed) { - try { - if (fulfilledRoot != null) { - closeables.add(fulfilledRoot); - } - closeables.add(applicationMetadata); - closeables.addAll(queue); - if (dictionaries != null) { - dictionaries.getDictionaryIds().forEach(id -> closeables.add(dictionaries.lookup(id).getVector())); - } - if (suppressor != null) { - AutoCloseables.close(suppressor, closeables); - } else { - AutoCloseables.close(closeables); - } - } finally { - // The value of this CompletableFuture is meaningless, only whether it's completed (or has an exception) - // No-op if already complete - completed.complete(null); - } - } - } - - /** - * Blocking request to load next item into list. - * @return Whether or not more data was found. - */ - public boolean next() { - try { - if (completed.isDone() && queue.isEmpty()) { - return false; - } - - pending--; - requestOutstanding(); - - Object data = queue.take(); - if (DONE == data) { - queue.put(DONE); - // Other code ignores the value of this CompletableFuture, only whether it's completed (or has an exception) - completed.complete(null); - return false; - } else if (DONE_EX == data) { - queue.put(DONE_EX); - if (ex instanceof Exception) { - throw (Exception) ex; - } else { - throw new Exception(ex); - } - } else { - try (ArrowMessage msg = ((ArrowMessage) data)) { - if (msg.getMessageType() == HeaderType.NONE) { - updateMetadata(msg); - // We received a message without data, so erase any leftover data - if (fulfilledRoot != null) { - fulfilledRoot.clear(); - } - } else if (msg.getMessageType() == HeaderType.RECORD_BATCH) { - checkMetadataVersion(msg); - // Ensure we have the root - root.get().clear(); - try (ArrowRecordBatch arb = msg.asRecordBatch()) { - loader.load(arb); - } - updateMetadata(msg); - } else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) { - checkMetadataVersion(msg); - // Ensure we have the root - root.get().clear(); - try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) { - final long id = arb.getDictionaryId(); - if (dictionaries == null) { - throw new IllegalStateException("Dictionary ownership was claimed by the application."); - } - final Dictionary dictionary = dictionaries.lookup(id); - if (dictionary == null) { - throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id); - } - - final FieldVector vector = dictionary.getVector(); - final VectorSchemaRoot dictionaryRoot = new VectorSchemaRoot(Collections.singletonList(vector.getField()), - Collections.singletonList(vector), 0); - final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot); - dictionaryLoader.load(arb.getDictionary()); - } - return next(); - } else { - throw new UnsupportedOperationException("Message type is unsupported: " + msg.getMessageType()); - } - return true; - } - } - } catch (RuntimeException e) { - throw e; - } catch (ExecutionException e) { - throw StatusUtils.fromThrowable(e.getCause()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - /** Update our metadata reference with a new one from this message. */ - private void updateMetadata(ArrowMessage msg) { - if (this.applicationMetadata != null) { - this.applicationMetadata.close(); - } - this.applicationMetadata = msg.getApplicationMetadata(); - if (this.applicationMetadata != null) { - this.applicationMetadata.getReferenceManager().retain(); - } - } - - /** Ensure the Arrow metadata version doesn't change mid-stream. */ - private void checkMetadataVersion(ArrowMessage msg) { - if (msg.asSchemaMessage() == null) { - return; - } - MetadataVersion receivedVersion = MetadataVersion.fromFlatbufID(msg.asSchemaMessage().getMessage().version()); - if (this.metadataVersion != receivedVersion) { - throw new IllegalStateException("Metadata version mismatch: stream started as " + - this.metadataVersion + " but got message with version " + receivedVersion); - } - } - - /** - * Get the current vector data from the stream. - * - *

The data in the root may change at any time. Clients should NOT modify the root, but instead unload the data - * into their own root. - * - * @throws FlightRuntimeException if there was an error reading the schema from the stream. - */ - public VectorSchemaRoot getRoot() { - try { - return root.get(); - } catch (InterruptedException e) { - throw CallStatus.INTERNAL.withCause(e).toRuntimeException(); - } catch (ExecutionException e) { - throw StatusUtils.fromThrowable(e.getCause()); - } - } - - /** - * Check if there is a root (i.e. whether the other end has started sending data). - * - * Updated by calls to {@link #next()}. - * - * @return true if and only if the other end has started sending data. - */ - public boolean hasRoot() { - return root.isDone(); - } - - /** - * Get the most recent metadata sent from the server. This may be cleared by calls to {@link #next()} if the server - * sends a message without metadata. This does NOT take ownership of the buffer - call retain() to create a reference - * if you need the buffer after a call to {@link #next()}. - * - * @return the application metadata. May be null. - */ - public ArrowBuf getLatestMetadata() { - return applicationMetadata; - } - - private synchronized void requestOutstanding() { - if (pending < pendingTarget) { - requestor.request(pendingTarget - pending); - pending = pendingTarget; - } - } - - private class Observer implements StreamObserver { - - Observer() { - super(); - } - - /** Helper to add an item to the queue under the appropriate lock. */ - private void enqueue(AutoCloseable message) { - synchronized (completed) { - if (completed.isDone()) { - // The stream is already closed (RPC ended), discard the message - AutoCloseables.closeNoChecked(message); - } else { - queue.add(message); - } - } - } - - @Override - public void onNext(ArrowMessage msg) { - // Operations here have to be under a lock so that we don't add a message to the queue while in the middle of - // close(). - requestOutstanding(); - switch (msg.getMessageType()) { - case NONE: { - // No IPC message - pure metadata or descriptor - if (msg.getDescriptor() != null) { - descriptor.set(new FlightDescriptor(msg.getDescriptor())); - } - if (msg.getApplicationMetadata() != null) { - enqueue(msg); - } - break; - } - case SCHEMA: { - Schema schema = msg.asSchema(); - - // if there is app metadata in the schema message, make sure - // that we don't leak it. - ArrowBuf meta = msg.getApplicationMetadata(); - if (meta != null) { - meta.close(); - } - - final List fields = new ArrayList<>(); - final Map dictionaryMap = new HashMap<>(); - for (final Field originalField : schema.getFields()) { - final Field updatedField = DictionaryUtility.toMemoryFormat(originalField, allocator, dictionaryMap); - fields.add(updatedField); - } - for (final Map.Entry entry : dictionaryMap.entrySet()) { - dictionaries.put(entry.getValue()); - } - schema = new Schema(fields, schema.getCustomMetadata()); - metadataVersion = MetadataVersion.fromFlatbufID(msg.asSchemaMessage().getMessage().version()); - try { - MetadataV4UnionChecker.checkRead(schema, metadataVersion); - } catch (IOException e) { - ex = e; - enqueue(DONE_EX); - break; - } +import java.util.concurrent.CompletableFuture; - synchronized (completed) { - if (!completed.isDone()) { - fulfilledRoot = VectorSchemaRoot.create(schema, allocator); - loader = new VectorLoader(fulfilledRoot); - if (msg.getDescriptor() != null) { - descriptor.set(new FlightDescriptor(msg.getDescriptor())); - } - root.set(fulfilledRoot); - } - } - break; - } - case RECORD_BATCH: - case DICTIONARY_BATCH: - enqueue(msg); - break; - case TENSOR: - default: - ex = new UnsupportedOperationException("Unable to handle message of type: " + msg.getMessageType()); - enqueue(DONE_EX); - } - } +public interface FlightStream extends AutoCloseable { + Schema getSchema(); + DictionaryProvider getDictionaryProvider(); + DictionaryProvider takeDictionaryOwnership(); + FlightDescriptor getDescriptor(); + void close() throws Exception; + boolean next(); + VectorSchemaRoot getRoot(); + boolean hasRoot(); + ArrowBuf getLatestMetadata(); + void cancel(String message, Throwable exception); - @Override - public void onError(Throwable t) { - ex = StatusUtils.fromThrowable(t); - queue.add(DONE_EX); - cancelled.complete(null); - root.setException(ex); - } + CompletableFuture cancelled(); + CompletableFuture completed(); - @Override - public void onCompleted() { - // Depends on gRPC calling onNext and onCompleted non-concurrently - cancelled.complete(null); - queue.add(DONE); - } - } + StreamObserver asObserver(); - /** - * Cancels sending the stream to a client. - * - *

Callers should drain the stream (with {@link #next()}) to ensure all messages sent before cancellation are - * received and to wait for the underlying transport to acknowledge cancellation. - */ - public void cancel(String message, Throwable exception) { - if (cancellable == null) { - throw new UnsupportedOperationException("Streams cannot be cancelled that are produced by client. " + - "Instead, server should reject incoming messages."); + /** + * Provides a callback to cancel a process that is in progress. + */ + @FunctionalInterface + interface Cancellable { + void cancel(String message, Throwable exception); } - cancellable.cancel(message, exception); - // Do not mark the stream as completed, as gRPC may still be delivering messages. - } - - StreamObserver asObserver() { - return new Observer(); - } - /** - * Provides a callback to cancel a process that is in progress. - */ - @FunctionalInterface - public interface Cancellable { - void cancel(String message, Throwable exception); - } - - /** - * Provides a interface to request more items from a stream producer. - */ - @FunctionalInterface - public interface Requestor { /** - * Requests count more messages from the instance of this object. + * Provides a interface to request more items from a stream producer. */ - void request(int count); - } + @FunctionalInterface + interface Requestor { + /** + * Requests count more messages from the instance of this object. + */ + void request(int count); + } } diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStreamImpl.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStreamImpl.java new file mode 100644 index 0000000000000..57550faa56d15 --- /dev/null +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStreamImpl.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.arrow.flight.ArrowMessage.HeaderType; +import org.apache.arrow.flight.grpc.StatusUtils; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.util.VisibleForTesting; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.MetadataVersion; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.DictionaryUtility; +import org.apache.arrow.vector.validate.MetadataV4UnionChecker; + +import com.google.common.util.concurrent.SettableFuture; + +import io.grpc.stub.StreamObserver; + +/** + * An adaptor between protobuf streams and flight data streams. + */ +public class FlightStreamImpl implements FlightStream, AutoCloseable { + // Use AutoCloseable sentinel objects to simplify logic in #close + private final AutoCloseable DONE = () -> { + }; + private final AutoCloseable DONE_EX = () -> { + }; + + private final BufferAllocator allocator; + private final Cancellable cancellable; + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + private final SettableFuture root = SettableFuture.create(); + private final SettableFuture descriptor = SettableFuture.create(); + private final int pendingTarget; + private final Requestor requestor; + // The completion flags. + // This flag is only updated as the user iterates through the data, i.e. it tracks whether the user has read all the + // data and closed the stream + private final CompletableFuture completed; + // This flag is immediately updated when gRPC signals that the server has ended the call. This is used to make sure + // we don't block forever trying to write to a server that has rejected a call. + private final CompletableFuture cancelled; + + private volatile int pending = 1; + private volatile VectorSchemaRoot fulfilledRoot; + private DictionaryProvider.MapDictionaryProvider dictionaries; + private volatile VectorLoader loader; + private volatile Throwable ex; + private volatile ArrowBuf applicationMetadata = null; + @VisibleForTesting + volatile MetadataVersion metadataVersion = null; + + /** + * Constructs a new instance. + * + * @param allocator The allocator to use for creating/reallocating buffers for Vectors. + * @param pendingTarget Target number of messages to receive. + * @param cancellable Used to cancel mid-stream requests. + * @param requestor A callback to determine how many pending items there are. + */ + public FlightStreamImpl(BufferAllocator allocator, int pendingTarget, Cancellable cancellable, Requestor requestor) { + Objects.requireNonNull(allocator); + Objects.requireNonNull(requestor); + this.allocator = allocator; + this.pendingTarget = pendingTarget; + this.cancellable = cancellable; + this.requestor = requestor; + this.dictionaries = new DictionaryProvider.MapDictionaryProvider(); + this.completed = new CompletableFuture<>(); + this.cancelled = new CompletableFuture<>(); + } + + public CompletableFuture cancelled() { + return cancelled; + } + + public CompletableFuture completed() { + return completed; + } + + /** + * Get the schema for this stream. Blocks until the schema is available. + */ + public Schema getSchema() { + return getRoot().getSchema(); + } + + /** + * Get the provider for dictionaries in this stream. + * + *

Does NOT retain a reference to the underlying dictionaries. Dictionaries may be updated as the stream is read. + * This method is intended for stream processing, where the application code will not retain references to values + * after the stream is closed. + * + * @throws IllegalStateException if {@link #takeDictionaryOwnership()} was called + * @see #takeDictionaryOwnership() + */ + public DictionaryProvider getDictionaryProvider() { + if (dictionaries == null) { + throw new IllegalStateException("Dictionary ownership was claimed by the application."); + } + return dictionaries; + } + + /** + * Get an owned reference to the dictionaries in this stream. Should be called after finishing reading the stream, + * but before closing. + * + *

If called, the client is responsible for closing the dictionaries in this provider. Can only be called once. + * + * @return The dictionary provider for the stream. + * @throws IllegalStateException if called more than once. + */ + public DictionaryProvider takeDictionaryOwnership() { + if (dictionaries == null) { + throw new IllegalStateException("Dictionary ownership was claimed by the application."); + } + // Swap out the provider so it is not closed + final DictionaryProvider provider = dictionaries; + dictionaries = null; + return provider; + } + + /** + * Get the descriptor for this stream. Only applicable on the server side of a DoPut operation. Will block until the + * client sends the descriptor. + */ + public FlightDescriptor getDescriptor() { + // This blocks until the first message from the client is received. + try { + return descriptor.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw CallStatus.INTERNAL.withCause(e).withDescription("Interrupted").toRuntimeException(); + } catch (ExecutionException e) { + throw CallStatus.INTERNAL.withCause(e).withDescription("Error getting descriptor").toRuntimeException(); + } + } + + /** + * Closes the stream (freeing any existing resources). + * + *

If the stream isn't complete and is cancellable, this method will cancel and drain the stream first. + */ + public void close() throws Exception { + final List closeables = new ArrayList<>(); + Throwable suppressor = null; + if (cancellable != null) { + // Client-side stream. Cancel the call, to help ensure gRPC doesn't deliver a message after close() ends. + // On the server side, we can't rely on draining the stream , because this gRPC bug means the completion callback + // may never run https://github.com/grpc/grpc-java/issues/5882 + try { + synchronized (cancellable) { + if (!cancelled.isDone()) { + // Only cancel if the call is not done on the gRPC side + cancellable.cancel("Stream closed before end", /* no exception to report */null); + } + } + // Drain the stream without the lock (as next() implicitly needs the lock) + while (next()) { } + } catch (FlightRuntimeException e) { + suppressor = e; + } + } + // Perform these operations under a lock. This way the observer can't enqueue new messages while we're in the + // middle of cleanup. This should only be a concern for server-side streams since client-side streams are drained + // by the lambda above. + synchronized (completed) { + try { + if (fulfilledRoot != null) { + closeables.add(fulfilledRoot); + } + closeables.add(applicationMetadata); + closeables.addAll(queue); + if (dictionaries != null) { + dictionaries.getDictionaryIds().forEach(id -> closeables.add(dictionaries.lookup(id).getVector())); + } + if (suppressor != null) { + AutoCloseables.close(suppressor, closeables); + } else { + AutoCloseables.close(closeables); + } + } finally { + // The value of this CompletableFuture is meaningless, only whether it's completed (or has an exception) + // No-op if already complete + completed.complete(null); + } + } + } + + /** + * Blocking request to load next item into list. + * @return Whether or not more data was found. + */ + public boolean next() { + try { + if (completed.isDone() && queue.isEmpty()) { + return false; + } + + pending--; + requestOutstanding(); + + Object data = queue.take(); + if (DONE == data) { + queue.put(DONE); + // Other code ignores the value of this CompletableFuture, only whether it's completed (or has an exception) + completed.complete(null); + return false; + } else if (DONE_EX == data) { + queue.put(DONE_EX); + if (ex instanceof Exception) { + throw (Exception) ex; + } else { + throw new Exception(ex); + } + } else { + try (ArrowMessage msg = ((ArrowMessage) data)) { + if (msg.getMessageType() == HeaderType.NONE) { + updateMetadata(msg); + // We received a message without data, so erase any leftover data + if (fulfilledRoot != null) { + fulfilledRoot.clear(); + } + } else if (msg.getMessageType() == HeaderType.RECORD_BATCH) { + checkMetadataVersion(msg); + // Ensure we have the root + root.get().clear(); + try (ArrowRecordBatch arb = msg.asRecordBatch()) { + loader.load(arb); + } + updateMetadata(msg); + } else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) { + checkMetadataVersion(msg); + // Ensure we have the root + root.get().clear(); + try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) { + final long id = arb.getDictionaryId(); + if (dictionaries == null) { + throw new IllegalStateException("Dictionary ownership was claimed by the application."); + } + final Dictionary dictionary = dictionaries.lookup(id); + if (dictionary == null) { + throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id); + } + + final FieldVector vector = dictionary.getVector(); + final VectorSchemaRoot dictionaryRoot = new VectorSchemaRoot(Collections.singletonList(vector.getField()), + Collections.singletonList(vector), 0); + final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot); + dictionaryLoader.load(arb.getDictionary()); + } + return next(); + } else { + throw new UnsupportedOperationException("Message type is unsupported: " + msg.getMessageType()); + } + return true; + } + } + } catch (RuntimeException e) { + throw e; + } catch (ExecutionException e) { + throw StatusUtils.fromThrowable(e.getCause()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** Update our metadata reference with a new one from this message. */ + private void updateMetadata(ArrowMessage msg) { + if (this.applicationMetadata != null) { + this.applicationMetadata.close(); + } + this.applicationMetadata = msg.getApplicationMetadata(); + if (this.applicationMetadata != null) { + this.applicationMetadata.getReferenceManager().retain(); + } + } + + /** Ensure the Arrow metadata version doesn't change mid-stream. */ + private void checkMetadataVersion(ArrowMessage msg) { + if (msg.asSchemaMessage() == null) { + return; + } + MetadataVersion receivedVersion = MetadataVersion.fromFlatbufID(msg.asSchemaMessage().getMessage().version()); + if (this.metadataVersion != receivedVersion) { + throw new IllegalStateException("Metadata version mismatch: stream started as " + + this.metadataVersion + " but got message with version " + receivedVersion); + } + } + + /** + * Get the current vector data from the stream. + * + *

The data in the root may change at any time. Clients should NOT modify the root, but instead unload the data + * into their own root. + * + * @throws FlightRuntimeException if there was an error reading the schema from the stream. + */ + public VectorSchemaRoot getRoot() { + try { + return root.get(); + } catch (InterruptedException e) { + throw CallStatus.INTERNAL.withCause(e).toRuntimeException(); + } catch (ExecutionException e) { + throw StatusUtils.fromThrowable(e.getCause()); + } + } + + /** + * Check if there is a root (i.e. whether the other end has started sending data). + * + * Updated by calls to {@link #next()}. + * + * @return true if and only if the other end has started sending data. + */ + public boolean hasRoot() { + return root.isDone(); + } + + /** + * Get the most recent metadata sent from the server. This may be cleared by calls to {@link #next()} if the server + * sends a message without metadata. This does NOT take ownership of the buffer - call retain() to create a reference + * if you need the buffer after a call to {@link #next()}. + * + * @return the application metadata. May be null. + */ + public ArrowBuf getLatestMetadata() { + return applicationMetadata; + } + + private synchronized void requestOutstanding() { + if (pending < pendingTarget) { + requestor.request(pendingTarget - pending); + pending = pendingTarget; + } + } + + private class Observer implements StreamObserver { + + Observer() { + super(); + } + + /** Helper to add an item to the queue under the appropriate lock. */ + private void enqueue(AutoCloseable message) { + synchronized (completed) { + if (completed.isDone()) { + // The stream is already closed (RPC ended), discard the message + AutoCloseables.closeNoChecked(message); + } else { + queue.add(message); + } + } + } + + @Override + public void onNext(ArrowMessage msg) { + // Operations here have to be under a lock so that we don't add a message to the queue while in the middle of + // close(). + requestOutstanding(); + switch (msg.getMessageType()) { + case NONE: { + // No IPC message - pure metadata or descriptor + if (msg.getDescriptor() != null) { + descriptor.set(new FlightDescriptor(msg.getDescriptor())); + } + if (msg.getApplicationMetadata() != null) { + enqueue(msg); + } + break; + } + case SCHEMA: { + Schema schema = msg.asSchema(); + + // if there is app metadata in the schema message, make sure + // that we don't leak it. + ArrowBuf meta = msg.getApplicationMetadata(); + if (meta != null) { + meta.close(); + } + + final List fields = new ArrayList<>(); + final Map dictionaryMap = new HashMap<>(); + for (final Field originalField : schema.getFields()) { + final Field updatedField = DictionaryUtility.toMemoryFormat(originalField, allocator, dictionaryMap); + fields.add(updatedField); + } + for (final Map.Entry entry : dictionaryMap.entrySet()) { + dictionaries.put(entry.getValue()); + } + schema = new Schema(fields, schema.getCustomMetadata()); + metadataVersion = MetadataVersion.fromFlatbufID(msg.asSchemaMessage().getMessage().version()); + try { + MetadataV4UnionChecker.checkRead(schema, metadataVersion); + } catch (IOException e) { + ex = e; + enqueue(DONE_EX); + break; + } + + synchronized (completed) { + if (!completed.isDone()) { + fulfilledRoot = VectorSchemaRoot.create(schema, allocator); + loader = new VectorLoader(fulfilledRoot); + if (msg.getDescriptor() != null) { + descriptor.set(new FlightDescriptor(msg.getDescriptor())); + } + root.set(fulfilledRoot); + } + } + break; + } + case RECORD_BATCH: + case DICTIONARY_BATCH: + enqueue(msg); + break; + case TENSOR: + default: + ex = new UnsupportedOperationException("Unable to handle message of type: " + msg.getMessageType()); + enqueue(DONE_EX); + } + } + + @Override + public void onError(Throwable t) { + ex = StatusUtils.fromThrowable(t); + queue.add(DONE_EX); + cancelled.complete(null); + root.setException(ex); + } + + @Override + public void onCompleted() { + // Depends on gRPC calling onNext and onCompleted non-concurrently + cancelled.complete(null); + queue.add(DONE); + } + } + + /** + * Cancels sending the stream to a client. + * + *

Callers should drain the stream (with {@link #next()}) to ensure all messages sent before cancellation are + * received and to wait for the underlying transport to acknowledge cancellation. + */ + public void cancel(String message, Throwable exception) { + if (cancellable == null) { + throw new UnsupportedOperationException("Streams cannot be cancelled that are produced by client. " + + "Instead, server should reject incoming messages."); + } + cancellable.cancel(message, exception); + // Do not mark the stream as completed, as gRPC may still be delivering messages. + } + + public StreamObserver asObserver() { + return new Observer(); + } + +} diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java index 83a694bf34e51..63cfb537928c2 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java @@ -129,7 +129,7 @@ public void testPutV4() throws Exception { public void testGetV4() throws Exception { try (final FlightServer server = startServer(optionV4); final FlightClient client = connect(server); - final FlightStream stream = client.getStream(new Ticket(new byte[0]))) { + final FlightStreamImpl stream = (FlightStreamImpl)client.getStream(new Ticket(new byte[0]))) { assertTrue(stream.next()); assertEquals(optionV4.metadataVersion, stream.metadataVersion); validateRoot(stream.getRoot()); @@ -148,7 +148,7 @@ public void testExchangeV4ToV5() throws Exception { stream.getWriter().putNext(); stream.getWriter().completed(); assertTrue(stream.getReader().next()); - assertEquals(optionV5.metadataVersion, stream.getReader().metadataVersion); + assertEquals(optionV5.metadataVersion, ((FlightStreamImpl)stream.getReader()).metadataVersion); validateRoot(stream.getReader().getRoot()); assertFalse(stream.getReader().next()); } @@ -165,7 +165,7 @@ public void testExchangeV5ToV4() throws Exception { stream.getWriter().putNext(); stream.getWriter().completed(); assertTrue(stream.getReader().next()); - assertEquals(optionV4.metadataVersion, stream.getReader().metadataVersion); + assertEquals(optionV4.metadataVersion, ((FlightStreamImpl)stream.getReader()).metadataVersion); validateRoot(stream.getReader().getRoot()); assertFalse(stream.getReader().next()); } @@ -182,7 +182,7 @@ public void testExchangeV4ToV4() throws Exception { stream.getWriter().putNext(); stream.getWriter().completed(); assertTrue(stream.getReader().next()); - assertEquals(optionV4.metadataVersion, stream.getReader().metadataVersion); + assertEquals(optionV4.metadataVersion, ((FlightStreamImpl)stream.getReader()).metadataVersion); validateRoot(stream.getReader().getRoot()); assertFalse(stream.getReader().next()); } @@ -263,7 +263,7 @@ public Runnable acceptPut(CallContext context, FlightStream flightStream, Stream return () -> { try { assertTrue(flightStream.next()); - assertEquals(option.metadataVersion, flightStream.metadataVersion); + assertEquals(option.metadataVersion, ((FlightStreamImpl)flightStream).metadataVersion); validateRoot(flightStream.getRoot()); } catch (AssertionError err) { // gRPC doesn't propagate stack traces across the wire. diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationProducer.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationProducer.java index aed3045accb2f..ef2b6f415adb7 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationProducer.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationProducer.java @@ -104,7 +104,7 @@ public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor @Override public Runnable acceptPut(CallContext context, - final FlightStream flightStream, final StreamListener ackStream) { + final FlightStream flightStream, final StreamListener ackStream) { return () -> { List batches = new ArrayList<>(); try { diff --git a/java/flight/flight-jdbc-driver/pom.xml b/java/flight/flight-jdbc-driver/pom.xml index 014da57168bbc..7023c8ac059f4 100644 --- a/java/flight/flight-jdbc-driver/pom.xml +++ b/java/flight/flight-jdbc-driver/pom.xml @@ -84,6 +84,11 @@ guava + + io.grpc + grpc-stub + + org.slf4j slf4j-api diff --git a/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/FlightClientCloser.java b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/FlightClientCloser.java new file mode 100644 index 0000000000000..6dcce249499ee --- /dev/null +++ b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/FlightClientCloser.java @@ -0,0 +1,108 @@ +package org.apache.arrow.driver.jdbc; + +import org.apache.arrow.driver.jdbc.utils.FlightStreamQueue; +import org.apache.arrow.flight.ArrowMessage; +import io.grpc.stub.StreamObserver; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.pojo.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +public class FlightClientCloser implements FlightStream { + private static final Logger LOGGER = LoggerFactory.getLogger(FlightClientCloser.class); + + private final FlightSqlClient client; + private final FlightStream stream; + + public FlightClientCloser(FlightSqlClient client, FlightStream stream) { + this.client = client; + this.stream = stream; + } + + @Override + public Schema getSchema() { + return stream.getSchema(); + } + + @Override + public DictionaryProvider getDictionaryProvider() { + return stream.getDictionaryProvider(); + } + + @Override + public DictionaryProvider takeDictionaryOwnership() { + return stream.takeDictionaryOwnership(); + } + + @Override + public FlightDescriptor getDescriptor() { + return stream.getDescriptor(); + } + + @Override + public void close() throws Exception { + Exception e = null; + try { + stream.close(); + LOGGER.debug("Closed stream"); + } catch (Exception ex) { + e = ex; + } + try { + client.close(); + LOGGER.debug("Closed client"); + } catch (Exception ex) { + e = ex; + } + if(e != null) { + throw e; + } + } + + @Override + public boolean next() { + return stream.next(); + } + + @Override + public VectorSchemaRoot getRoot() { + return stream.getRoot(); + } + + @Override + public boolean hasRoot() { + return stream.hasRoot(); + } + + @Override + public ArrowBuf getLatestMetadata() { + return stream.getLatestMetadata(); + } + + @Override + public void cancel(String message, Throwable exception) { + stream.cancel(message, exception); + } + + @Override + public CompletableFuture cancelled() { + return stream.cancelled(); + } + + @Override + public CompletableFuture completed() { + return stream.completed(); + } + + @Override + public StreamObserver asObserver() { + return stream.asObserver(); + } +} diff --git a/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java index 53d62cf70084a..1be536df4be16 100644 --- a/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java +++ b/java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.net.URI; +import org.apache.arrow.driver.jdbc.FlightClientCloser; import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils; import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.FlightClient; @@ -106,10 +107,10 @@ public List getStreams(final FlightInfo flightInfo) throws SQLExce ArrayList streams = new java.util.ArrayList<>(); for (FlightEndpoint ep : flightInfo.getEndpoints()) { URI uri = ep.getLocations().isEmpty() ? null : ep.getLocations().get(0).getUri(); - System.out.println(String.format("Getting stream %s", uri)); // TODO: remove FlightSqlClient sqlClient = this.factory.createConnection(uri); - FlightStream stream = sqlClient.getStream(ep.getTicket(), getOptions()); // TODO: wrap and close client - streams.add(stream); + FlightStream stream = sqlClient.getStream(ep.getTicket(), getOptions()); + FlightClientCloser closer = new FlightClientCloser(sqlClient, stream); + streams.add(closer); } return streams; } diff --git a/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/FlightStreamQueueTest.java b/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/FlightStreamQueueTest.java index b474da55a7f1f..edb3bce678de5 100644 --- a/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/FlightStreamQueueTest.java +++ b/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/FlightStreamQueueTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletionService; import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.FlightStreamImpl; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -64,7 +65,7 @@ public void testNextShouldThrowExceptionUponClose() throws Exception { public void testEnqueueShouldThrowExceptionUponClose() throws Exception { queue.close(); ThrowableAssertionUtils.simpleAssertThrowableClass(IllegalStateException.class, - () -> queue.enqueue(mock(FlightStream.class))); + () -> queue.enqueue(mock(FlightStreamImpl.class))); } @Test From bb0fbd530d2293781b969fa462942830ed61384e Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Tue, 16 Aug 2022 13:17:52 -0600 Subject: [PATCH 4/4] Cleanup for PR --- .circleci/config.yml | 59 ---------- .../tests/IntegrationProducer.java | 2 +- .../flight-jdbc-driver/sqlline.Dockerfile | 9 -- .../jdbc/ArrowFlightJdbcDriverTest.java | 109 +----------------- 4 files changed, 7 insertions(+), 172 deletions(-) delete mode 100644 .circleci/config.yml delete mode 100644 java/flight/flight-jdbc-driver/sqlline.Dockerfile diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index a649e86d03891..0000000000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,59 +0,0 @@ -# CircleCI configuration file -version: 2.1 - -orbs: - maven: circleci/maven@1.3.0 - -jobs: - build: - docker: - - image: maven:3.8.6-jdk-8 - steps: - - checkout - - run: | - git tag circle-$CIRCLE_BUILD_NUM - git push origin --tags - cd /root/project/java - mvn install -DskipTests -Dcheckstyle.skip -Drat.skip=true -pl :flight-jdbc-driver -am - mkdir -p /root/project/artifacts - - persist_to_workspace: - root: /root - paths: - - project/ - - .m2/ - - maven_deploy: - docker: - - image: maven:3.8.6-jdk-8 - steps: - - attach_workspace: - at: /root - - run: | - mkdir -p /root/.m2 - echo '${repo.id}${repo.login}${repo.pwd}' > /root/.m2/settings.xml - cd /root/project/java/flight/flight-jdbc-driver - mvn deploy -DskipTests -Drat.skip=true -Dcheckstyle.skip -DaltDeploymentRepository="snapshots::default::$ARTIFACTORY_URL" -e -Drepo.id=snapshots -Drepo.login=$ARTIFACTORY_USER -Drepo.pwd="$ARTIFACTORY_API_KEY" -Dmaven.install.skip=true - - docker_deploy: - docker: - - image: docker:17.05.0-ce-git - steps: - - setup_remote_docker - - attach_workspace: - at: /root - - run: | - cd /root/project/java/flight/flight-jdbc-driver - docker build -f sqlline.Dockerfile . -t spaceandtime.jfrog.io/sxt-docker-docker/ballista-sqlline:$CIRCLE_BUILD_NUM - docker login -u$ARTIFACTORY_USER -p$ARTIFACTORY_API_KEY spaceandtime.jfrog.io - docker push spaceandtime.jfrog.io/sxt-docker-docker/ballista-sqlline:$CIRCLE_BUILD_NUM - -workflows: - main_workflow: - jobs: - - build - - maven_deploy: - requires: - - build - - docker_deploy: - requires: - - build diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationProducer.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationProducer.java index ef2b6f415adb7..aed3045accb2f 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationProducer.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationProducer.java @@ -104,7 +104,7 @@ public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor @Override public Runnable acceptPut(CallContext context, - final FlightStream flightStream, final StreamListener ackStream) { + final FlightStream flightStream, final StreamListener ackStream) { return () -> { List batches = new ArrayList<>(); try { diff --git a/java/flight/flight-jdbc-driver/sqlline.Dockerfile b/java/flight/flight-jdbc-driver/sqlline.Dockerfile deleted file mode 100644 index 02d4f2cd362d9..0000000000000 --- a/java/flight/flight-jdbc-driver/sqlline.Dockerfile +++ /dev/null @@ -1,9 +0,0 @@ -FROM dcm4che/sqlline - -COPY target/flight-jdbc-driver-9.0.0-SNAPSHOT.jar /usr/share/java - -ENV JAVA_CLASSPATH=/usr/share/java/flight-jdbc-driver-9.0.0-SNAPSHOT.jar - -RUN echo 'sqlline -d org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver --verbose=true -n admin -p password -u jdbc:arrow-flight://$1:50050?useEncryption=false' > main.sh - -ENTRYPOINT ["/bin/bash", "main.sh"] diff --git a/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriverTest.java b/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriverTest.java index f17668ed001dd..d39ec61f3099e 100644 --- a/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriverTest.java +++ b/java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriverTest.java @@ -17,9 +17,14 @@ package org.apache.arrow.driver.jdbc; +import static org.junit.Assert.assertEquals; + import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.sql.*; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; import java.util.Collection; import java.util.Map; @@ -35,8 +40,6 @@ import org.junit.Ignore; import org.junit.Test; -import static org.junit.Assert.*; - /** * Tests for {@link ArrowFlightJdbcDriver}. */ @@ -71,106 +74,6 @@ public void tearDown() throws Exception { AutoCloseables.close(dataSource, allocator); } - @Test - public void testBallista() throws Exception { - String url = "jdbc:arrow-flight://localhost:50050"; - java.util.Properties props = new java.util.Properties(); - props.setProperty("useEncryption", "false"); - props.setProperty("user", "admin"); - props.setProperty("password", "password"); - Connection con = DriverManager.getConnection(url, props); - - // Do exact same sequence as DataGrip does - assertFalse(con.isClosed()); - DatabaseMetaData dbmd = con.getMetaData(); - assertEquals("Arrow Flight JDBC Driver", dbmd.getDriverName()); - assertEquals("9.0.0-SNAPSHOT", dbmd.getDriverVersion()); - //assertEquals("", md.getDatabaseProductName()); // UNIMPLEMENTED: Implement CommandGetSqlInfo - assertEquals(9, dbmd.getDatabaseMajorVersion()); - assertEquals(0, dbmd.getDatabaseMinorVersion()); - assertNull(con.getClientInfo("ApplicationName")); -// con.setClientInfo("ApplicationName", "DataGrip 2022.2.1"); - assertEquals(4, dbmd.getJDBCMajorVersion()); - assertNotNull(dbmd.getConnection()); -// assertEquals("", md.getIdentifierQuoteString()); // Implement CommandGetSqlInfo -// assertEquals("", md.getExtraNameCharacters()); // Implement CommandGetSqlInfo - assertFalse(dbmd.supportsMixedCaseIdentifiers()); - assertTrue(dbmd.storesUpperCaseIdentifiers()); - assertFalse(dbmd.storesLowerCaseIdentifiers()); - assertFalse(dbmd.storesMixedCaseIdentifiers()); - assertTrue(dbmd.supportsMixedCaseQuotedIdentifiers()); - assertFalse(dbmd.storesUpperCaseQuotedIdentifiers()); - assertFalse(dbmd.storesLowerCaseQuotedIdentifiers()); - assertFalse(dbmd.storesMixedCaseQuotedIdentifiers()); - assertTrue(con.isValid(20)); -// assertFalse(md.supportsSavepoints()); // Implement CommandGetSqlInfo - assertTrue(con.getAutoCommit()); - con.setReadOnly(false); - assertNull(con.getCatalog()); - assertNull(con.getSchema()); - - // statement - java.sql.Statement stmt = con.createStatement(); -// stmt.setEscapeProcessing(false); // Avatica not supported - assertFalse(stmt.isClosed()); - stmt.setFetchSize(100); - String sql = "create external table customer STORED AS CSV WITH HEADER ROW\n" + - " LOCATION '/home/bgardner/workspace/ballista/arrow-datafusion/datafusion/core/tests/tpch-csv/customer.csv';\n"; - assertFalse(stmt.execute(sql)); - - // resultset - ResultSet rs = stmt.getResultSet(); - assertFalse(rs.isClosed()); // NPE - ResultSetMetaData md = rs.getMetaData(); - assertNotNull(md); - assertEquals(1, md.getColumnCount()); - assertEquals("", md.getColumnTypeName(1)); - assertEquals("", md.getColumnType(1)); - assertEquals("", md.getColumnClassName(1)); - assertEquals(1, rs.getType()); - assertFalse(rs.next()); - assertFalse(rs.isClosed()); - assertFalse(stmt.getMoreResults()); - assertFalse(stmt.isClosed()); - assertNull(con.getWarnings()); - con.clearWarnings();; - stmt.close(); - - int updateCount = stmt.getUpdateCount(); - assertEquals(updateCount, 0); - - sql = "select c_name from customer order by c_name limit 1"; - assertTrue(stmt.execute(sql)); - rs = stmt.getResultSet(); - int count = 0; - while(rs.next()) { - String key = rs.getString(1); - assertEquals(key, "Customer#000000002"); - count++; - } - assertEquals(count, 1); - } - - @Test - public void testDremio() throws Exception { - String sql = "select 'Hello, Dremio';"; - String url = "jdbc:arrow-flight://localhost:32010"; - java.util.Properties props = new java.util.Properties(); - props.setProperty("useEncryption", "false"); - props.setProperty("user", "dremio"); - props.setProperty("password", "dremio123"); - Connection con = DriverManager.getConnection(url, props); - java.sql.Statement stmt = con.createStatement(); - boolean result = stmt.execute(sql); - assertEquals(result, true); - java.sql.ResultSet rs = stmt.getResultSet(); - int count = 0; - while(rs.next()) { - count++; - } - assertEquals(count, 1); - } - /** * Tests whether the {@link ArrowFlightJdbcDriver} is registered in the * {@link DriverManager}.