Skip to content

Commit

Permalink
Flight SQL Ratification Based On Community Feedback #5 (apache#91)
Browse files Browse the repository at this point in the history
* Delegate GetSchemaImportedKeys

* Remove schema retrieval methods for catalog functions and delegate to constants

* Add IPC encapsulation to Schema serialization

* Fix checkstyle violations

* Update javadoc for FlightSqlClient

* Update documentation for FlightSql.proto

Co-authored-by: Abner Eduardo Ferreira <abenaru@protonmail.ch>
  • Loading branch information
rafael-telles and Abner Eduardo Ferreira committed Oct 12, 2021
1 parent 66977fa commit 39b8eea
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 115 deletions.
4 changes: 2 additions & 2 deletions format/FlightSql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ message CommandGetSqlInfo {
* More information types can be added in future releases.
* E.g. more SQL syntax support types, scalar functions support, type conversion support etc.
*
* // TODO: Flesh out the available set of metadata below.
* Note that the set of metadata may expand.
*
* Initially, Flight SQL will support the following information types:
* - Server Information - Range [0-500)
* - Syntax Information - Range [500-1000)
* Range [0-100000) is reserved for defaults. Custom options should start at 100000.
* Range [0-10,000) is reserved for defaults. Custom options should start at 10,000.
*
* 1. Server Information [0-500): Provides basic information about the Flight SQL Server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CallStatus;
Expand All @@ -58,6 +59,7 @@
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;

import com.google.protobuf.Any;
Expand Down Expand Up @@ -94,7 +96,7 @@ public FlightInfo execute(final String query, final CallOption... options) {
*
* @param query The query to execute.
* @param options RPC-layer hints for this call.
* @return a FlightInfo object representing the stream(s) to fetch.
* @return the number of rows affected.
*/
public long executeUpdate(final String query, final CallOption... options) {
final CommandStatementUpdate.Builder builder = CommandStatementUpdate.newBuilder();
Expand Down Expand Up @@ -425,10 +427,9 @@ public void clearParameters() {
public Schema getResultSetSchema() {
if (resultSetSchema == null) {
final ByteString bytes = preparedStatementResult.getDatasetSchema();
if (bytes.isEmpty()) {
return new Schema(Collections.emptyList());
}
resultSetSchema = Schema.deserialize(bytes.asReadOnlyByteBuffer());
resultSetSchema = bytes.isEmpty() ?
new Schema(Collections.emptyList()) :
MessageSerializer.deserializeSchema(Message.getRootAsMessage(bytes.asReadOnlyByteBuffer()));
}
return resultSetSchema;
}
Expand All @@ -441,10 +442,9 @@ public Schema getResultSetSchema() {
public Schema getParameterSchema() {
if (parameterSchema == null) {
final ByteString bytes = preparedStatementResult.getParameterSchema();
if (bytes.isEmpty()) {
return new Schema(Collections.emptyList());
}
parameterSchema = Schema.deserialize(bytes.asReadOnlyByteBuffer());
parameterSchema = bytes.isEmpty() ?
new Schema(Collections.emptyList()) :
MessageSerializer.deserializeSchema(Message.getRootAsMessage(bytes.asReadOnlyByteBuffer()));
}
return parameterSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.ActionType;
Expand Down Expand Up @@ -128,21 +127,19 @@ default SchemaResult getSchema(CallContext context, FlightDescriptor descriptor)
return getSchemaStatement(
FlightSqlUtils.unpackOrThrow(command, CommandStatementQuery.class), context, descriptor);
} else if (command.is(CommandGetCatalogs.class)) {
return getSchemaCatalogs();
return new SchemaResult(Schemas.GET_CATALOGS_SCHEMA);
} else if (command.is(CommandGetSchemas.class)) {
return getSchemaSchemas();
return new SchemaResult(Schemas.GET_SCHEMAS_SCHEMA);
} else if (command.is(CommandGetTables.class)) {
return getSchemaTables();
return new SchemaResult(Schemas.GET_TABLES_SCHEMA);
} else if (command.is(CommandGetTableTypes.class)) {
return getSchemaTableTypes();
return new SchemaResult(Schemas.GET_TABLE_TYPES_SCHEMA);
} else if (command.is(CommandGetSqlInfo.class)) {
return getSchemaSqlInfo();
return new SchemaResult(Schemas.GET_SQL_INFO_SCHEMA);
} else if (command.is(CommandGetPrimaryKeys.class)) {
return getSchemaPrimaryKeys();
} else if (command.is(CommandGetExportedKeys.class)) {
return getSchemaForImportedAndExportedKeys();
} else if (command.is(CommandGetImportedKeys.class)) {
return getSchemaForImportedAndExportedKeys();
return new SchemaResult(Schemas.GET_PRIMARY_KEYS_SCHEMA);
} else if (command.is(CommandGetImportedKeys.class) || command.is(CommandGetExportedKeys.class)) {
return new SchemaResult(Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA);
}

throw CallStatus.INVALID_ARGUMENT.withDescription("Invalid command provided.").toRuntimeException();
Expand Down Expand Up @@ -394,15 +391,6 @@ Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery command,
FlightInfo getFlightInfoSqlInfo(CommandGetSqlInfo request, CallContext context,
FlightDescriptor descriptor);

/**
* Gets schema about the get SQL info data stream.
*
* @return Schema for the stream.
*/
default SchemaResult getSchemaSqlInfo() {
return new SchemaResult(Schemas.GET_SQL_INFO_SCHEMA);
}

/**
* Returns data for SQL info based data stream.
*
Expand All @@ -426,15 +414,6 @@ void getStreamSqlInfo(CommandGetSqlInfo command, CallContext context, Ticket tic
FlightInfo getFlightInfoCatalogs(CommandGetCatalogs request, CallContext context,
FlightDescriptor descriptor);

/**
* Gets schema about the get catalogs data stream.
*
* @return Schema for the stream.
*/
default SchemaResult getSchemaCatalogs() {
return new SchemaResult(Schemas.GET_CATALOGS_SCHEMA);
}

/**
* Returns data for catalogs based data stream.
*
Expand All @@ -457,15 +436,6 @@ void getStreamCatalogs(CallContext context, Ticket ticket,
FlightInfo getFlightInfoSchemas(CommandGetSchemas request, CallContext context,
FlightDescriptor descriptor);

/**
* Gets schema about the get schemas data stream.
*
* @return Schema for the stream.
*/
default SchemaResult getSchemaSchemas() {
return new SchemaResult(Schemas.GET_SCHEMAS_SCHEMA);
}

/**
* Returns data for schemas based data stream.
*
Expand All @@ -489,15 +459,6 @@ void getStreamSchemas(CommandGetSchemas command, CallContext context, Ticket tic
FlightInfo getFlightInfoTables(CommandGetTables request, CallContext context,
FlightDescriptor descriptor);

/**
* Gets schema about the get tables data stream.
*
* @return Schema for the stream.
*/
default SchemaResult getSchemaTables() {
return new SchemaResult(Schemas.GET_TABLES_SCHEMA);
}

/**
* Returns data for tables based data stream.
*
Expand All @@ -520,15 +481,6 @@ void getStreamTables(CommandGetTables command, CallContext context, Ticket ticke
FlightInfo getFlightInfoTableTypes(CommandGetTableTypes request, CallContext context,
FlightDescriptor descriptor);

/**
* Gets schema about the get table types data stream.
*
* @return Schema for the stream.
*/
default SchemaResult getSchemaTableTypes() {
return new SchemaResult(Schemas.GET_TABLE_TYPES_SCHEMA);
}

/**
* Returns data for table types based data stream.
*
Expand All @@ -550,23 +502,6 @@ default SchemaResult getSchemaTableTypes() {
FlightInfo getFlightInfoPrimaryKeys(CommandGetPrimaryKeys request, CallContext context,
FlightDescriptor descriptor);

/**
* Gets schema about the get primary keys data stream.
*
* @return Schema for the stream.
*/
default SchemaResult getSchemaPrimaryKeys() {
final List<Field> fields = Arrays.asList(
Field.nullable("catalog_name", MinorType.VARCHAR.getType()),
Field.nullable("schema_name", MinorType.VARCHAR.getType()),
Field.nullable("table_name", MinorType.VARCHAR.getType()),
Field.nullable("column_name", MinorType.VARCHAR.getType()),
Field.nullable("key_sequence", MinorType.INT.getType()),
Field.nullable("key_name", MinorType.VARCHAR.getType()));

return new SchemaResult(new Schema(fields));
}

/**
* Returns data for primary keys based data stream.
*
Expand Down Expand Up @@ -602,15 +537,6 @@ FlightInfo getFlightInfoExportedKeys(CommandGetExportedKeys request, CallContext
FlightInfo getFlightInfoImportedKeys(CommandGetImportedKeys request, CallContext context,
FlightDescriptor descriptor);

/**
* Gets schema about the get imported and exported keys data stream.
*
* @return Schema for the stream.
*/
default SchemaResult getSchemaForImportedAndExportedKeys() {
return new SchemaResult(Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA);
}

/**
* Returns data for foreign keys based data stream.
*
Expand Down Expand Up @@ -680,6 +606,13 @@ final class Schemas {
Field.nullable("int_value", MinorType.INT.getType()),
Field.nullable("bigint_value", MinorType.BIGINT.getType()),
Field.nullable("int32_bitmask", MinorType.INT.getType())))));
public static final Schema GET_PRIMARY_KEYS_SCHEMA = new Schema(Arrays.asList(
Field.nullable("catalog_name", MinorType.VARCHAR.getType()),
Field.nullable("schema_name", MinorType.VARCHAR.getType()),
Field.nullable("table_name", MinorType.VARCHAR.getType()),
Field.nullable("column_name", MinorType.VARCHAR.getType()),
Field.nullable("key_sequence", MinorType.INT.getType()),
Field.nullable("key_name", MinorType.VARCHAR.getType())));

private Schemas() {
// Prevent instantiation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Properties;
import java.util.stream.IntStream;

import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.flight.sql.FlightSqlClient.PreparedStatement;
import org.apache.arrow.flight.sql.FlightSqlProducer;
Expand All @@ -54,6 +55,7 @@
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.DenseUnionVector;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
Expand Down Expand Up @@ -614,7 +616,9 @@ List<List<String>> getResults(FlightStream stream) {
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
final byte[] data = varbinaryVector.getObject(rowIndex);
final String output =
isNull(data) ? null : Schema.deserialize(ByteBuffer.wrap(data)).toJson();
isNull(data) ?
null :
MessageSerializer.deserializeSchema(Message.getRootAsMessage(ByteBuffer.wrap(data))).toJson();
results.get(rowIndex).add(output);
}
} else if (fieldVector instanceof DenseUnionVector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@
import org.apache.arrow.vector.complex.DenseUnionVector;
import org.apache.arrow.vector.holders.NullableIntHolder;
import org.apache.arrow.vector.holders.NullableVarCharHolder;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
Expand Down Expand Up @@ -175,6 +177,7 @@
* with {@link #getFlightInfo} and {@link #getStream}.
*/
public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
private static final IpcOption DEFAULT_OPTION = IpcOption.DEFAULT;
private static final String DATABASE_URI = "jdbc:derby:target/derbyDB";
private static final Logger LOGGER = getLogger(FlightSqlExample.class);
private static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar();
Expand Down Expand Up @@ -523,7 +526,9 @@ private static VectorSchemaRoot getTablesRoot(final DatabaseMetaData databaseMet
for (int index = 0; index < rows; index++) {
final String tableName = tableNameVector.getObject(index).toString();
final Schema schema = new Schema(tableToFields.get(tableName));
saveToVector(schema.toByteArray(), tableSchemaVector, index);
saveToVector(
copyFrom(MessageSerializer.serializeMetadata(schema, DEFAULT_OPTION)).toByteArray(),
tableSchemaVector, index);
}
}

Expand Down Expand Up @@ -737,17 +742,15 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
jdbcToArrowSchema(preparedStatement.getParameterMetaData(), DEFAULT_CALENDAR);

final ResultSetMetaData metaData = preparedStatement.getMetaData();

ByteString bytes;
if (isNull(metaData)) {
bytes = ByteString.EMPTY;
} else {
bytes = ByteString.copyFrom(
jdbcToArrowSchema(metaData, DEFAULT_CALENDAR).toByteArray());
}
final ByteString bytes = isNull(metaData) ?
ByteString.EMPTY :
ByteString.copyFrom(
MessageSerializer.serializeMetadata(
jdbcToArrowSchema(metaData, DEFAULT_CALENDAR),
DEFAULT_OPTION));
final ActionCreatePreparedStatementResult result = ActionCreatePreparedStatementResult.newBuilder()
.setDatasetSchema(bytes)
.setParameterSchema(copyFrom(parameterSchema.toByteArray()))
.setParameterSchema(copyFrom(MessageSerializer.serializeMetadata(parameterSchema, DEFAULT_OPTION)))
.setPreparedStatementHandle(preparedStatementHandle)
.build();
listener.onNext(new Result(pack(result).toByteArray()));
Expand Down Expand Up @@ -1346,7 +1349,7 @@ public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery co
@Override
public FlightInfo getFlightInfoSqlInfo(final CommandGetSqlInfo request, final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, getSchemaSqlInfo().getSchema());
return getFlightInfoForSchema(request, descriptor, Schemas.GET_SQL_INFO_SCHEMA);
}

@Override
Expand Down Expand Up @@ -1377,7 +1380,7 @@ public void getStreamSqlInfo(final CommandGetSqlInfo command, final CallContext
@Override
public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, getSchemaCatalogs().getSchema());
return getFlightInfoForSchema(request, descriptor, Schemas.GET_CATALOGS_SCHEMA);
}

@Override
Expand All @@ -1398,7 +1401,7 @@ public void getStreamCatalogs(final CallContext context, final Ticket ticket, fi
@Override
public FlightInfo getFlightInfoSchemas(final CommandGetSchemas request, final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, getSchemaSchemas().getSchema());
return getFlightInfoForSchema(request, descriptor, Schemas.GET_SCHEMAS_SCHEMA);
}

@Override
Expand All @@ -1423,8 +1426,7 @@ public void getStreamSchemas(final CommandGetSchemas command, final CallContext
@Override
public FlightInfo getFlightInfoTables(final CommandGetTables request, final CallContext context,
final FlightDescriptor descriptor) {
final Schema schema = getSchemaTables().getSchema();
return getFlightInfoForSchema(request, descriptor, schema);
return getFlightInfoForSchema(request, descriptor, Schemas.GET_TABLES_SCHEMA);
}

@Override
Expand Down Expand Up @@ -1460,7 +1462,7 @@ public void getStreamTables(final CommandGetTables command, final CallContext co
@Override
public FlightInfo getFlightInfoTableTypes(final CommandGetTableTypes request, final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, getSchemaTableTypes().getSchema());
return getFlightInfoForSchema(request, descriptor, Schemas.GET_TABLE_TYPES_SCHEMA);
}

@Override
Expand All @@ -1481,7 +1483,7 @@ public void getStreamTableTypes(final CallContext context, final Ticket ticket,
@Override
public FlightInfo getFlightInfoPrimaryKeys(final CommandGetPrimaryKeys request, final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, getSchemaPrimaryKeys().getSchema());
return getFlightInfoForSchema(request, descriptor, Schemas.GET_PRIMARY_KEYS_SCHEMA);
}

@Override
Expand Down Expand Up @@ -1536,8 +1538,7 @@ public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final Call
@Override
public FlightInfo getFlightInfoExportedKeys(final FlightSql.CommandGetExportedKeys request, final CallContext context,
final FlightDescriptor descriptor) {
final Schema schema = getSchemaForImportedAndExportedKeys().getSchema();
return getFlightInfoForSchema(request, descriptor, schema);
return getFlightInfoForSchema(request, descriptor, Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA);
}

@Override
Expand All @@ -1563,8 +1564,7 @@ public void getStreamExportedKeys(final FlightSql.CommandGetExportedKeys command
@Override
public FlightInfo getFlightInfoImportedKeys(final FlightSql.CommandGetImportedKeys request, final CallContext context,
final FlightDescriptor descriptor) {
final Schema schema = getSchemaForImportedAndExportedKeys().getSchema();
return getFlightInfoForSchema(request, descriptor, schema);
return getFlightInfoForSchema(request, descriptor, Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA);
}

@Override
Expand Down

0 comments on commit 39b8eea

Please sign in to comment.