diff --git a/format/FlightSql.proto b/format/FlightSql.proto index b563b5fb7a92d..9227e355b88a7 100644 --- a/format/FlightSql.proto +++ b/format/FlightSql.proto @@ -45,12 +45,12 @@ message CommandGetSqlInfo { * More information types can be added in future releases. * E.g. more SQL syntax support types, scalar functions support, type conversion support etc. * - * // TODO: Flesh out the available set of metadata below. + * Note that the set of metadata may expand. * * Initially, Flight SQL will support the following information types: * - Server Information - Range [0-500) * - Syntax Information - Range [500-1000) - * Range [0-100000) is reserved for defaults. Custom options should start at 100000. + * Range [0-10,000) is reserved for defaults. Custom options should start at 10,000. * * 1. Server Information [0-500): Provides basic information about the Flight SQL Server. * diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java index 18adf40e19fc1..262fae58dd9d9 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java @@ -40,6 +40,7 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flight.Action; import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.CallStatus; @@ -58,6 +59,7 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.pojo.Schema; import com.google.protobuf.Any; @@ -94,7 +96,7 @@ public FlightInfo execute(final String query, final CallOption... options) { * * @param query The query to execute. * @param options RPC-layer hints for this call. - * @return a FlightInfo object representing the stream(s) to fetch. + * @return the number of rows affected. */ public long executeUpdate(final String query, final CallOption... options) { final CommandStatementUpdate.Builder builder = CommandStatementUpdate.newBuilder(); @@ -425,10 +427,9 @@ public void clearParameters() { public Schema getResultSetSchema() { if (resultSetSchema == null) { final ByteString bytes = preparedStatementResult.getDatasetSchema(); - if (bytes.isEmpty()) { - return new Schema(Collections.emptyList()); - } - resultSetSchema = Schema.deserialize(bytes.asReadOnlyByteBuffer()); + resultSetSchema = bytes.isEmpty() ? + new Schema(Collections.emptyList()) : + MessageSerializer.deserializeSchema(Message.getRootAsMessage(bytes.asReadOnlyByteBuffer())); } return resultSetSchema; } @@ -441,10 +442,9 @@ public Schema getResultSetSchema() { public Schema getParameterSchema() { if (parameterSchema == null) { final ByteString bytes = preparedStatementResult.getParameterSchema(); - if (bytes.isEmpty()) { - return new Schema(Collections.emptyList()); - } - parameterSchema = Schema.deserialize(bytes.asReadOnlyByteBuffer()); + parameterSchema = bytes.isEmpty() ? + new Schema(Collections.emptyList()) : + MessageSerializer.deserializeSchema(Message.getRootAsMessage(bytes.asReadOnlyByteBuffer())); } return parameterSchema; } diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java index 2c737769f3c2e..941f989b58411 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; -import java.util.List; import org.apache.arrow.flight.Action; import org.apache.arrow.flight.ActionType; @@ -128,21 +127,19 @@ default SchemaResult getSchema(CallContext context, FlightDescriptor descriptor) return getSchemaStatement( FlightSqlUtils.unpackOrThrow(command, CommandStatementQuery.class), context, descriptor); } else if (command.is(CommandGetCatalogs.class)) { - return getSchemaCatalogs(); + return new SchemaResult(Schemas.GET_CATALOGS_SCHEMA); } else if (command.is(CommandGetSchemas.class)) { - return getSchemaSchemas(); + return new SchemaResult(Schemas.GET_SCHEMAS_SCHEMA); } else if (command.is(CommandGetTables.class)) { - return getSchemaTables(); + return new SchemaResult(Schemas.GET_TABLES_SCHEMA); } else if (command.is(CommandGetTableTypes.class)) { - return getSchemaTableTypes(); + return new SchemaResult(Schemas.GET_TABLE_TYPES_SCHEMA); } else if (command.is(CommandGetSqlInfo.class)) { - return getSchemaSqlInfo(); + return new SchemaResult(Schemas.GET_SQL_INFO_SCHEMA); } else if (command.is(CommandGetPrimaryKeys.class)) { - return getSchemaPrimaryKeys(); - } else if (command.is(CommandGetExportedKeys.class)) { - return getSchemaForImportedAndExportedKeys(); - } else if (command.is(CommandGetImportedKeys.class)) { - return getSchemaForImportedAndExportedKeys(); + return new SchemaResult(Schemas.GET_PRIMARY_KEYS_SCHEMA); + } else if (command.is(CommandGetImportedKeys.class) || command.is(CommandGetExportedKeys.class)) { + return new SchemaResult(Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA); } throw CallStatus.INVALID_ARGUMENT.withDescription("Invalid command provided.").toRuntimeException(); @@ -394,15 +391,6 @@ Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery command, FlightInfo getFlightInfoSqlInfo(CommandGetSqlInfo request, CallContext context, FlightDescriptor descriptor); - /** - * Gets schema about the get SQL info data stream. - * - * @return Schema for the stream. - */ - default SchemaResult getSchemaSqlInfo() { - return new SchemaResult(Schemas.GET_SQL_INFO_SCHEMA); - } - /** * Returns data for SQL info based data stream. * @@ -426,15 +414,6 @@ void getStreamSqlInfo(CommandGetSqlInfo command, CallContext context, Ticket tic FlightInfo getFlightInfoCatalogs(CommandGetCatalogs request, CallContext context, FlightDescriptor descriptor); - /** - * Gets schema about the get catalogs data stream. - * - * @return Schema for the stream. - */ - default SchemaResult getSchemaCatalogs() { - return new SchemaResult(Schemas.GET_CATALOGS_SCHEMA); - } - /** * Returns data for catalogs based data stream. * @@ -457,15 +436,6 @@ void getStreamCatalogs(CallContext context, Ticket ticket, FlightInfo getFlightInfoSchemas(CommandGetSchemas request, CallContext context, FlightDescriptor descriptor); - /** - * Gets schema about the get schemas data stream. - * - * @return Schema for the stream. - */ - default SchemaResult getSchemaSchemas() { - return new SchemaResult(Schemas.GET_SCHEMAS_SCHEMA); - } - /** * Returns data for schemas based data stream. * @@ -489,15 +459,6 @@ void getStreamSchemas(CommandGetSchemas command, CallContext context, Ticket tic FlightInfo getFlightInfoTables(CommandGetTables request, CallContext context, FlightDescriptor descriptor); - /** - * Gets schema about the get tables data stream. - * - * @return Schema for the stream. - */ - default SchemaResult getSchemaTables() { - return new SchemaResult(Schemas.GET_TABLES_SCHEMA); - } - /** * Returns data for tables based data stream. * @@ -520,15 +481,6 @@ void getStreamTables(CommandGetTables command, CallContext context, Ticket ticke FlightInfo getFlightInfoTableTypes(CommandGetTableTypes request, CallContext context, FlightDescriptor descriptor); - /** - * Gets schema about the get table types data stream. - * - * @return Schema for the stream. - */ - default SchemaResult getSchemaTableTypes() { - return new SchemaResult(Schemas.GET_TABLE_TYPES_SCHEMA); - } - /** * Returns data for table types based data stream. * @@ -550,23 +502,6 @@ default SchemaResult getSchemaTableTypes() { FlightInfo getFlightInfoPrimaryKeys(CommandGetPrimaryKeys request, CallContext context, FlightDescriptor descriptor); - /** - * Gets schema about the get primary keys data stream. - * - * @return Schema for the stream. - */ - default SchemaResult getSchemaPrimaryKeys() { - final List fields = Arrays.asList( - Field.nullable("catalog_name", MinorType.VARCHAR.getType()), - Field.nullable("schema_name", MinorType.VARCHAR.getType()), - Field.nullable("table_name", MinorType.VARCHAR.getType()), - Field.nullable("column_name", MinorType.VARCHAR.getType()), - Field.nullable("key_sequence", MinorType.INT.getType()), - Field.nullable("key_name", MinorType.VARCHAR.getType())); - - return new SchemaResult(new Schema(fields)); - } - /** * Returns data for primary keys based data stream. * @@ -602,15 +537,6 @@ FlightInfo getFlightInfoExportedKeys(CommandGetExportedKeys request, CallContext FlightInfo getFlightInfoImportedKeys(CommandGetImportedKeys request, CallContext context, FlightDescriptor descriptor); - /** - * Gets schema about the get imported and exported keys data stream. - * - * @return Schema for the stream. - */ - default SchemaResult getSchemaForImportedAndExportedKeys() { - return new SchemaResult(Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA); - } - /** * Returns data for foreign keys based data stream. * @@ -680,6 +606,13 @@ final class Schemas { Field.nullable("int_value", MinorType.INT.getType()), Field.nullable("bigint_value", MinorType.BIGINT.getType()), Field.nullable("int32_bitmask", MinorType.INT.getType()))))); + public static final Schema GET_PRIMARY_KEYS_SCHEMA = new Schema(Arrays.asList( + Field.nullable("catalog_name", MinorType.VARCHAR.getType()), + Field.nullable("schema_name", MinorType.VARCHAR.getType()), + Field.nullable("table_name", MinorType.VARCHAR.getType()), + Field.nullable("column_name", MinorType.VARCHAR.getType()), + Field.nullable("key_sequence", MinorType.INT.getType()), + Field.nullable("key_name", MinorType.VARCHAR.getType()))); private Schemas() { // Prevent instantiation. diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java index ccdd3cdb94b22..2f85d4fbed873 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java @@ -40,6 +40,7 @@ import java.util.Properties; import java.util.stream.IntStream; +import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flight.sql.FlightSqlClient; import org.apache.arrow.flight.sql.FlightSqlClient.PreparedStatement; import org.apache.arrow.flight.sql.FlightSqlProducer; @@ -54,6 +55,7 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.DenseUnionVector; +import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; @@ -614,7 +616,9 @@ List> getResults(FlightStream stream) { for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { final byte[] data = varbinaryVector.getObject(rowIndex); final String output = - isNull(data) ? null : Schema.deserialize(ByteBuffer.wrap(data)).toJson(); + isNull(data) ? + null : + MessageSerializer.deserializeSchema(Message.getRootAsMessage(ByteBuffer.wrap(data))).toJson(); results.get(rowIndex).add(output); } } else if (fieldVector instanceof DenseUnionVector) { diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java index 76490974bff1a..f0df2a42b9030 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java @@ -136,6 +136,8 @@ import org.apache.arrow.vector.complex.DenseUnionVector; import org.apache.arrow.vector.holders.NullableIntHolder; import org.apache.arrow.vector.holders.NullableVarCharHolder; +import org.apache.arrow.vector.ipc.message.IpcOption; +import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; @@ -175,6 +177,7 @@ * with {@link #getFlightInfo} and {@link #getStream}. */ public class FlightSqlExample implements FlightSqlProducer, AutoCloseable { + private static final IpcOption DEFAULT_OPTION = IpcOption.DEFAULT; private static final String DATABASE_URI = "jdbc:derby:target/derbyDB"; private static final Logger LOGGER = getLogger(FlightSqlExample.class); private static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar(); @@ -523,7 +526,9 @@ private static VectorSchemaRoot getTablesRoot(final DatabaseMetaData databaseMet for (int index = 0; index < rows; index++) { final String tableName = tableNameVector.getObject(index).toString(); final Schema schema = new Schema(tableToFields.get(tableName)); - saveToVector(schema.toByteArray(), tableSchemaVector, index); + saveToVector( + copyFrom(MessageSerializer.serializeMetadata(schema, DEFAULT_OPTION)).toByteArray(), + tableSchemaVector, index); } } @@ -737,17 +742,15 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r jdbcToArrowSchema(preparedStatement.getParameterMetaData(), DEFAULT_CALENDAR); final ResultSetMetaData metaData = preparedStatement.getMetaData(); - - ByteString bytes; - if (isNull(metaData)) { - bytes = ByteString.EMPTY; - } else { - bytes = ByteString.copyFrom( - jdbcToArrowSchema(metaData, DEFAULT_CALENDAR).toByteArray()); - } + final ByteString bytes = isNull(metaData) ? + ByteString.EMPTY : + ByteString.copyFrom( + MessageSerializer.serializeMetadata( + jdbcToArrowSchema(metaData, DEFAULT_CALENDAR), + DEFAULT_OPTION)); final ActionCreatePreparedStatementResult result = ActionCreatePreparedStatementResult.newBuilder() .setDatasetSchema(bytes) - .setParameterSchema(copyFrom(parameterSchema.toByteArray())) + .setParameterSchema(copyFrom(MessageSerializer.serializeMetadata(parameterSchema, DEFAULT_OPTION))) .setPreparedStatementHandle(preparedStatementHandle) .build(); listener.onNext(new Result(pack(result).toByteArray())); @@ -1346,7 +1349,7 @@ public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery co @Override public FlightInfo getFlightInfoSqlInfo(final CommandGetSqlInfo request, final CallContext context, final FlightDescriptor descriptor) { - return getFlightInfoForSchema(request, descriptor, getSchemaSqlInfo().getSchema()); + return getFlightInfoForSchema(request, descriptor, Schemas.GET_SQL_INFO_SCHEMA); } @Override @@ -1377,7 +1380,7 @@ public void getStreamSqlInfo(final CommandGetSqlInfo command, final CallContext @Override public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final CallContext context, final FlightDescriptor descriptor) { - return getFlightInfoForSchema(request, descriptor, getSchemaCatalogs().getSchema()); + return getFlightInfoForSchema(request, descriptor, Schemas.GET_CATALOGS_SCHEMA); } @Override @@ -1398,7 +1401,7 @@ public void getStreamCatalogs(final CallContext context, final Ticket ticket, fi @Override public FlightInfo getFlightInfoSchemas(final CommandGetSchemas request, final CallContext context, final FlightDescriptor descriptor) { - return getFlightInfoForSchema(request, descriptor, getSchemaSchemas().getSchema()); + return getFlightInfoForSchema(request, descriptor, Schemas.GET_SCHEMAS_SCHEMA); } @Override @@ -1423,8 +1426,7 @@ public void getStreamSchemas(final CommandGetSchemas command, final CallContext @Override public FlightInfo getFlightInfoTables(final CommandGetTables request, final CallContext context, final FlightDescriptor descriptor) { - final Schema schema = getSchemaTables().getSchema(); - return getFlightInfoForSchema(request, descriptor, schema); + return getFlightInfoForSchema(request, descriptor, Schemas.GET_TABLES_SCHEMA); } @Override @@ -1460,7 +1462,7 @@ public void getStreamTables(final CommandGetTables command, final CallContext co @Override public FlightInfo getFlightInfoTableTypes(final CommandGetTableTypes request, final CallContext context, final FlightDescriptor descriptor) { - return getFlightInfoForSchema(request, descriptor, getSchemaTableTypes().getSchema()); + return getFlightInfoForSchema(request, descriptor, Schemas.GET_TABLE_TYPES_SCHEMA); } @Override @@ -1481,7 +1483,7 @@ public void getStreamTableTypes(final CallContext context, final Ticket ticket, @Override public FlightInfo getFlightInfoPrimaryKeys(final CommandGetPrimaryKeys request, final CallContext context, final FlightDescriptor descriptor) { - return getFlightInfoForSchema(request, descriptor, getSchemaPrimaryKeys().getSchema()); + return getFlightInfoForSchema(request, descriptor, Schemas.GET_PRIMARY_KEYS_SCHEMA); } @Override @@ -1536,8 +1538,7 @@ public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final Call @Override public FlightInfo getFlightInfoExportedKeys(final FlightSql.CommandGetExportedKeys request, final CallContext context, final FlightDescriptor descriptor) { - final Schema schema = getSchemaForImportedAndExportedKeys().getSchema(); - return getFlightInfoForSchema(request, descriptor, schema); + return getFlightInfoForSchema(request, descriptor, Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA); } @Override @@ -1563,8 +1564,7 @@ public void getStreamExportedKeys(final FlightSql.CommandGetExportedKeys command @Override public FlightInfo getFlightInfoImportedKeys(final FlightSql.CommandGetImportedKeys request, final CallContext context, final FlightDescriptor descriptor) { - final Schema schema = getSchemaForImportedAndExportedKeys().getSchema(); - return getFlightInfoForSchema(request, descriptor, schema); + return getFlightInfoForSchema(request, descriptor, Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA); } @Override