diff --git a/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java index a2ad20ac21..75f01f5460 100644 --- a/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java +++ b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java @@ -31,30 +31,10 @@ public static void beforeAll() { @Disabled("Not yet implemented") public void getObjectsConstraints() throws Exception {} - @Override - @Disabled("Not yet implemented") - public void getObjectsColumns() throws Exception {} - - @Override - @Disabled("Not yet implemented") - public void getObjectsCatalogs() throws Exception {} - @Override @Disabled("Not yet implemented") public void getObjectsCatalogsPattern() throws Exception {} - @Override - @Disabled("Not yet implemented") - public void getObjectsDbSchemas() throws Exception { - super.getObjectsDbSchemas(); - } - - @Override - @Disabled("Not yet implemented") - public void getObjectsTables() throws Exception { - super.getObjectsTables(); - } - @Override @Disabled("Not yet implemented") public void getTableSchema() throws Exception { diff --git a/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlTransactionTest.java b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlTransactionTest.java new file mode 100644 index 0000000000..ae29651c7a --- /dev/null +++ b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlTransactionTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.flightsql; + +import org.apache.arrow.adbc.driver.testsuite.AbstractTransactionTest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; + +public class FlightSqlTransactionTest extends AbstractTransactionTest { + + @BeforeAll + public static void beforeAll() { + quirks = new FlightSqlQuirks(); + } + + @Override + @Disabled("Not yet implemented") + public void enableAutoCommitAlsoCommits() throws Exception {} + + @Override + @Disabled("Not yet implemented") + public void commit() throws Exception {} + + @Override + @Disabled("Not yet implemented") + public void rollback() throws Exception {} + + @Override + @Disabled("Not yet implemented") + public void autoCommitByDefault() throws Exception {} + + @Override + @Disabled("Not yet implemented") + public void toggleAutoCommit() throws Exception {} +} diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java index f583f2b866..43ccd151ea 100644 --- a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java @@ -107,6 +107,30 @@ public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode) allocator, client, clientCache, quirks, targetTableName, mode); } + @Override + public ArrowReader getObjects( + GetObjectsDepth depth, + String catalogPattern, + String dbSchemaPattern, + String tableNamePattern, + String[] tableTypes, + String columnNamePattern) + throws AdbcException { + try (final VectorSchemaRoot root = + new ObjectMetadataBuilder( + allocator, + client, + depth, + catalogPattern, + dbSchemaPattern, + tableNamePattern, + tableTypes, + columnNamePattern) + .build()) { + return RootArrowReader.fromRoot(allocator, root); + } + } + @Override public ArrowReader getInfo(int[] infoCodes) throws AdbcException { try (InfoMetadataBuilder builder = new InfoMetadataBuilder(allocator, client, infoCodes)) { diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/ObjectMetadataBuilder.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/ObjectMetadataBuilder.java new file mode 100644 index 0000000000..d792b328a3 --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/ObjectMetadataBuilder.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adbc.driver.flightsql; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.regex.Pattern; + +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.StandardSchemas; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter; +import org.apache.arrow.vector.complex.writer.VarCharWriter; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +final class ObjectMetadataBuilder { + + private final FlightSqlClient client; + private final VectorSchemaRoot root; + private final VarCharVector adbcCatalogNames; + private final UnionListWriter adbcCatalogDbSchemasWriter; + private final BaseWriter.StructWriter adbcCatalogDbSchemasStructWriter; + private final BaseWriter.ListWriter adbcCatalogDbSchemaTablesWriter; + private final VarCharWriter adbcCatalogDbSchemaNameWriter; + private final BaseWriter.StructWriter adbcTablesStructWriter; + private final VarCharWriter adbcTableNameWriter; + private final VarCharWriter adbcTableTypeWriter; + private final BaseWriter.ListWriter adbcTableColumnsWriter; + private final BufferAllocator allocator; + private final AdbcConnection.GetObjectsDepth depth; + private final String catalogPattern; + private final String dbSchemaPattern; + private final String tableNamePattern; + private final String[] tableTypes; + private final Pattern precompiledColumnNamePattern; + + ObjectMetadataBuilder( + BufferAllocator allocator, + FlightSqlClient client, + final AdbcConnection.GetObjectsDepth depth, + final String catalogPattern, + final String dbSchemaPattern, + final String tableNamePattern, + final String[] tableTypes, + final String columnNamePattern) { + this.allocator = allocator; + this.client = client; + this.depth = depth; + this.catalogPattern = catalogPattern; + this.dbSchemaPattern = dbSchemaPattern; + this.tableNamePattern = tableNamePattern; + this.precompiledColumnNamePattern = columnNamePattern != null ? Pattern.compile( + Pattern.quote(columnNamePattern).replace("_", ".").replace("%", ".*") + ) : null; + this.tableTypes = tableTypes; + this.root = VectorSchemaRoot.create(StandardSchemas.GET_OBJECTS_SCHEMA, allocator); + this.adbcCatalogNames = (VarCharVector) root.getVector(0); + this.adbcCatalogDbSchemasWriter = ((ListVector) root.getVector(1)).getWriter(); + this.adbcCatalogDbSchemasStructWriter = adbcCatalogDbSchemasWriter.struct(); + this.adbcCatalogDbSchemaTablesWriter = + adbcCatalogDbSchemasStructWriter.list("db_schema_tables"); + this.adbcCatalogDbSchemaNameWriter = adbcCatalogDbSchemasStructWriter.varChar("db_schema_name"); + this.adbcTablesStructWriter = adbcCatalogDbSchemaTablesWriter.struct(); + this.adbcTableNameWriter = adbcTablesStructWriter.varChar("table_name"); + this.adbcTableTypeWriter = adbcTablesStructWriter.varChar("table_type"); + this.adbcTableColumnsWriter = adbcTablesStructWriter.list("table_columns"); + } + + private void writeVarChar(VarCharWriter writer, String value) { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + try (ArrowBuf tempBuf = allocator.buffer(bytes.length)) { + tempBuf.setBytes(0, bytes, 0, bytes.length); + writer.writeVarChar(0, bytes.length, tempBuf); + } + } + + VectorSchemaRoot build() throws AdbcException { + // TODO Catalogs and schemas that don't contain tables are being left out + FlightInfo info; + if (depth == AdbcConnection.GetObjectsDepth.CATALOGS) { + info = client.getCatalogs(); + } else if (depth == AdbcConnection.GetObjectsDepth.DB_SCHEMAS) { + info = client.getSchemas(null, dbSchemaPattern); + } else { + info = + client.getTables( + null, // TODO pattern match later during processing + dbSchemaPattern, + tableNamePattern, + tableTypes == null ? null : Arrays.asList(tableTypes), + depth == AdbcConnection.GetObjectsDepth.ALL); + } + + byte[] lastCatalogAdded = null; + byte[] lastDbSchemaAdded = null; + int catalogIndex = 0; + + for (FlightEndpoint endpoint : info.getEndpoints()) { + FlightStream stream = client.getStream(endpoint.getTicket()); + while (stream.next()) { + try (VectorSchemaRoot res = stream.getRoot()) { + VarCharVector catalogVector = (VarCharVector) res.getVector(0); + + for (int i = 0; i < res.getRowCount(); i++) { + byte[] catalog = catalogVector.get(i); + + if (i == 0 || lastCatalogAdded != catalog) { + if (catalog == null) { + adbcCatalogNames.setNull(catalogIndex); + } else { + adbcCatalogNames.setSafe(catalogIndex, catalog); + } + if (depth == AdbcConnection.GetObjectsDepth.CATALOGS) { + adbcCatalogDbSchemasWriter.writeNull(); + } else { + if (catalogIndex != 0) { + adbcCatalogDbSchemasWriter.endList(); + } + adbcCatalogDbSchemasWriter.startList(); + lastDbSchemaAdded = null; + } + catalogIndex++; + lastCatalogAdded = catalog; + } + + if (depth != AdbcConnection.GetObjectsDepth.CATALOGS) { + VarCharVector dbSchemaVector = (VarCharVector) res.getVector(1); + byte[] dbSchema = dbSchemaVector.get(i); + + if (!Arrays.equals(lastDbSchemaAdded, dbSchema)) { + if (i != 0) { + adbcCatalogDbSchemaTablesWriter.endList(); + adbcCatalogDbSchemasStructWriter.end(); + } + adbcCatalogDbSchemasStructWriter.start(); + writeVarChar( + adbcCatalogDbSchemaNameWriter, new String(dbSchema, StandardCharsets.UTF_8)); + if (depth == AdbcConnection.GetObjectsDepth.DB_SCHEMAS) { + adbcCatalogDbSchemaTablesWriter.writeNull(); + } else { + adbcCatalogDbSchemaTablesWriter.startList(); + } + + lastDbSchemaAdded = dbSchema; + } + } + + if (depth != AdbcConnection.GetObjectsDepth.CATALOGS + && depth != AdbcConnection.GetObjectsDepth.DB_SCHEMAS) { + VarCharVector tableNameVector = (VarCharVector) res.getVector(2); + VarCharVector tableTypeVector = (VarCharVector) res.getVector(3); + + adbcTablesStructWriter.start(); + writeVarChar( + adbcTableNameWriter, new String(tableNameVector.get(i), StandardCharsets.UTF_8)); + writeVarChar( + adbcTableTypeWriter, new String(tableTypeVector.get(i), StandardCharsets.UTF_8)); + + if (depth == AdbcConnection.GetObjectsDepth.ALL) { + VarBinaryVector tableSchemaVector = (VarBinaryVector) res.getVector(4); + Schema schema; + + try { + schema = + MessageSerializer.deserializeSchema( + new ReadChannel( + Channels.newChannel( + new ByteArrayInputStream(tableSchemaVector.get(i))))); + } catch (IOException e) { + throw new RuntimeException(e); + } + + adbcTableColumnsWriter.startList(); + + for (int y = 0; y < schema.getFields().size(); y++) { + Field field = schema.getFields().get(y); + if (precompiledColumnNamePattern == null || precompiledColumnNamePattern.matcher(field.getName()).matches()) { + adbcTableColumnsWriter.struct().start(); + writeVarChar( + adbcTableColumnsWriter.struct().varChar("column_name"), field.getName()); + adbcTableColumnsWriter.struct().integer("ordinal_position").writeInt(y + 1); + adbcTableColumnsWriter.struct().end(); + } + } + adbcTableColumnsWriter.endList(); + } + + adbcTablesStructWriter.end(); + } + } + + if (depth != AdbcConnection.GetObjectsDepth.CATALOGS) { + adbcCatalogDbSchemaTablesWriter.endList(); + adbcCatalogDbSchemasStructWriter.end(); + adbcCatalogDbSchemasWriter.endList(); + } + } + } + } + + root.setRowCount(catalogIndex); + return root; + } +} diff --git a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractTransactionTest.java b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractTransactionTest.java index 29265ba7e3..e866879dba 100644 --- a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractTransactionTest.java +++ b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractTransactionTest.java @@ -63,7 +63,7 @@ public void afterEach() throws Exception { } @Test - void autoCommitByDefault() throws Exception { + public void autoCommitByDefault() throws Exception { assertAdbcException(assertThrows(AdbcException.class, () -> connection.commit())) .isStatus(AdbcStatusCode.INVALID_STATE); assertAdbcException(assertThrows(AdbcException.class, () -> connection.rollback())) @@ -72,7 +72,7 @@ void autoCommitByDefault() throws Exception { } @Test - void toggleAutoCommit() throws Exception { + public void toggleAutoCommit() throws Exception { assertThat(connection.getAutoCommit()).isTrue(); connection.setAutoCommit(true); assertThat(connection.getAutoCommit()).isTrue(); @@ -83,7 +83,7 @@ void toggleAutoCommit() throws Exception { } @Test - void rollback() throws Exception { + public void rollback() throws Exception { final Schema schema = new Schema( Collections.singletonList( @@ -114,7 +114,7 @@ void rollback() throws Exception { } @Test - void commit() throws Exception { + public void commit() throws Exception { final Schema schema = new Schema( Collections.singletonList( @@ -147,7 +147,7 @@ void commit() throws Exception { } @Test - void enableAutoCommitAlsoCommits() throws Exception { + public void enableAutoCommitAlsoCommits() throws Exception { final Schema schema = new Schema( Collections.singletonList(