Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update flight definitions including backwards-incompatible change to GetSchema #2586

Merged
merged 7 commits into from
Sep 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 78 additions & 51 deletions arrow-flight/src/arrow.flight.protocol.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
// This file was automatically generated through the build.rs script, and should not be edited.

///
/// The request that a client provides to a server on handshake.
/// The request that a client provides to a server on handshake.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HandshakeRequest {
///
/// A defined protocol version
/// A defined protocol version
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a formatting change here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prost had a bug that got fixed. I don't think this is a huge problem. tokio-rs/prost#694

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not a huge problem, but it will introduce useless changes in the git diff for each build

#[prost(uint64, tag="1")]
pub protocol_version: u64,
///
/// Arbitrary auth/handshake info.
/// Arbitrary auth/handshake info.
#[prost(bytes="vec", tag="2")]
pub payload: ::prost::alloc::vec::Vec<u8>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HandshakeResponse {
///
/// A defined protocol version
/// A defined protocol version
#[prost(uint64, tag="1")]
pub protocol_version: u64,
///
/// Arbitrary auth/handshake info.
/// Arbitrary auth/handshake info.
#[prost(bytes="vec", tag="2")]
pub payload: ::prost::alloc::vec::Vec<u8>,
}
///
/// A message for doing simple auth.
/// A message for doing simple auth.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BasicAuth {
#[prost(string, tag="2")]
Expand All @@ -37,8 +37,8 @@ pub struct BasicAuth {
pub struct Empty {
}
///
/// Describes an available action, including both the name used for execution
/// along with a short description of the purpose of the action.
/// Describes an available action, including both the name used for execution
/// along with a short description of the purpose of the action.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ActionType {
#[prost(string, tag="1")]
Expand All @@ -47,15 +47,15 @@ pub struct ActionType {
pub description: ::prost::alloc::string::String,
}
///
/// A service specific expression that can be used to return a limited set
/// of available Arrow Flight streams.
/// A service specific expression that can be used to return a limited set
/// of available Arrow Flight streams.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Criteria {
#[prost(bytes="vec", tag="1")]
pub expression: ::prost::alloc::vec::Vec<u8>,
}
///
/// An opaque action specific for the service.
/// An opaque action specific for the service.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Action {
#[prost(string, tag="1")]
Expand All @@ -64,54 +64,57 @@ pub struct Action {
pub body: ::prost::alloc::vec::Vec<u8>,
}
///
/// An opaque result returned after executing an action.
/// An opaque result returned after executing an action.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Result {
#[prost(bytes="vec", tag="1")]
pub body: ::prost::alloc::vec::Vec<u8>,
}
///
/// Wrap the result of a getSchema call
/// Wrap the result of a getSchema call
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SchemaResult {
/// schema of the dataset as described in Schema.fbs::Schema.
/// The schema of the dataset in its IPC form:
/// 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
/// 4 bytes - the byte length of the payload
/// a flatbuffer Message whose header is the Schema
#[prost(bytes="vec", tag="1")]
pub schema: ::prost::alloc::vec::Vec<u8>,
}
///
/// The name or tag for a Flight. May be used as a way to retrieve or generate
/// a flight or be used to expose a set of previously defined flights.
/// The name or tag for a Flight. May be used as a way to retrieve or generate
/// a flight or be used to expose a set of previously defined flights.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FlightDescriptor {
#[prost(enumeration="flight_descriptor::DescriptorType", tag="1")]
pub r#type: i32,
///
/// Opaque value used to express a command. Should only be defined when
/// type = CMD.
/// Opaque value used to express a command. Should only be defined when
/// type = CMD.
#[prost(bytes="vec", tag="2")]
pub cmd: ::prost::alloc::vec::Vec<u8>,
///
/// List of strings identifying a particular dataset. Should only be defined
/// when type = PATH.
/// List of strings identifying a particular dataset. Should only be defined
/// when type = PATH.
#[prost(string, repeated, tag="3")]
pub path: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
/// Nested message and enum types in `FlightDescriptor`.
pub mod flight_descriptor {
///
/// Describes what type of descriptor is defined.
/// Describes what type of descriptor is defined.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum DescriptorType {
/// Protobuf pattern, not used.
/// Protobuf pattern, not used.
Unknown = 0,
///
/// A named path that identifies a dataset. A path is composed of a string
/// or list of strings describing a particular dataset. This is conceptually
/// A named path that identifies a dataset. A path is composed of a string
/// or list of strings describing a particular dataset. This is conceptually
/// similar to a path inside a filesystem.
Path = 1,
///
/// An opaque command to generate a dataset.
/// An opaque command to generate a dataset.
Cmd = 2,
}
impl DescriptorType {
Expand All @@ -129,86 +132,110 @@ pub mod flight_descriptor {
}
}
///
/// The access coordinates for retrieval of a dataset. With a FlightInfo, a
/// consumer is able to determine how to retrieve a dataset.
/// The access coordinates for retrieval of a dataset. With a FlightInfo, a
/// consumer is able to determine how to retrieve a dataset.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FlightInfo {
/// schema of the dataset as described in Schema.fbs::Schema.
/// The schema of the dataset in its IPC form:
/// 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
/// 4 bytes - the byte length of the payload
/// a flatbuffer Message whose header is the Schema
#[prost(bytes="vec", tag="1")]
pub schema: ::prost::alloc::vec::Vec<u8>,
///
/// The descriptor associated with this info.
/// The descriptor associated with this info.
#[prost(message, optional, tag="2")]
pub flight_descriptor: ::core::option::Option<FlightDescriptor>,
///
/// A list of endpoints associated with the flight. To consume the whole
/// flight, all endpoints must be consumed.
/// A list of endpoints associated with the flight. To consume the
/// whole flight, all endpoints (and hence all Tickets) must be
/// consumed. Endpoints can be consumed in any order.
///
/// In other words, an application can use multiple endpoints to
/// represent partitioned data.
///
/// There is no ordering defined on endpoints. Hence, if the returned
/// data has an ordering, it should be returned in a single endpoint.
#[prost(message, repeated, tag="3")]
pub endpoint: ::prost::alloc::vec::Vec<FlightEndpoint>,
/// Set these to -1 if unknown.
/// Set these to -1 if unknown.
#[prost(int64, tag="4")]
pub total_records: i64,
#[prost(int64, tag="5")]
pub total_bytes: i64,
}
///
/// A particular stream or split associated with a flight.
/// A particular stream or split associated with a flight.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FlightEndpoint {
///
/// Token used to retrieve this stream.
/// Token used to retrieve this stream.
#[prost(message, optional, tag="1")]
pub ticket: ::core::option::Option<Ticket>,
///
/// A list of URIs where this ticket can be redeemed. If the list is
/// empty, the expectation is that the ticket can only be redeemed on the
/// current service where the ticket was generated.
/// A list of URIs where this ticket can be redeemed via DoGet().
///
/// If the list is empty, the expectation is that the ticket can only
/// be redeemed on the current service where the ticket was
/// generated.
///
/// If the list is not empty, the expectation is that the ticket can
/// be redeemed at any of the locations, and that the data returned
/// will be equivalent. In this case, the ticket may only be redeemed
/// at one of the given locations, and not (necessarily) on the
/// current service.
///
/// In other words, an application can use multiple locations to
/// represent redundant and/or load balanced services.
#[prost(message, repeated, tag="2")]
pub location: ::prost::alloc::vec::Vec<Location>,
}
///
/// A location where a Flight service will accept retrieval of a particular
/// stream given a ticket.
/// A location where a Flight service will accept retrieval of a particular
/// stream given a ticket.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Location {
#[prost(string, tag="1")]
pub uri: ::prost::alloc::string::String,
}
///
/// An opaque identifier that the service can use to retrieve a particular
/// portion of a stream.
/// An opaque identifier that the service can use to retrieve a particular
/// portion of a stream.
///
/// Tickets are meant to be single use. It is an error/application-defined
/// behavior to reuse a ticket.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Ticket {
#[prost(bytes="vec", tag="1")]
pub ticket: ::prost::alloc::vec::Vec<u8>,
}
///
/// A batch of Arrow data as part of a stream of batches.
/// A batch of Arrow data as part of a stream of batches.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FlightData {
///
/// The descriptor of the data. This is only relevant when a client is
/// starting a new DoPut stream.
/// The descriptor of the data. This is only relevant when a client is
/// starting a new DoPut stream.
#[prost(message, optional, tag="1")]
pub flight_descriptor: ::core::option::Option<FlightDescriptor>,
///
/// Header for message data as described in Message.fbs::Message.
/// Header for message data as described in Message.fbs::Message.
#[prost(bytes="vec", tag="2")]
pub data_header: ::prost::alloc::vec::Vec<u8>,
///
/// Application-defined metadata.
/// Application-defined metadata.
#[prost(bytes="vec", tag="3")]
pub app_metadata: ::prost::alloc::vec::Vec<u8>,
///
/// The actual batch of Arrow data. Preferably handled with minimal-copies
/// coming last in the definition to help with sidecar patterns (it is
/// expected that some implementations will fetch this field off the wire
/// with specialized code to avoid extra memory copies).
/// The actual batch of Arrow data. Preferably handled with minimal-copies
/// coming last in the definition to help with sidecar patterns (it is
/// expected that some implementations will fetch this field off the wire
/// with specialized code to avoid extra memory copies).
#[prost(bytes="vec", tag="1000")]
pub data_body: ::prost::alloc::vec::Vec<u8>,
}
/// *
/// The response message associated with the submission of a DoPut.
/// The response message associated with the submission of a DoPut.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PutResult {
#[prost(bytes="vec", tag="1")]
Expand Down
31 changes: 21 additions & 10 deletions arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,17 @@ impl From<SchemaAsIpc<'_>> for FlightData {
}
}

impl From<SchemaAsIpc<'_>> for SchemaResult {
fn from(schema_ipc: SchemaAsIpc) -> Self {
let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
Copy link
Contributor Author

@liukun4515 liukun4515 Aug 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold
The original implementation has diff with the original implementation of java.
The version of java has the length as the prefix for this buffer, but the original rust does have the prefix length.

  /**
   * Write the serialized Message metadata, prefixed by the length, to the output Channel. This
   * ensures that it aligns to an 8 byte boundary and will adjust the message length to include
   * any padding used for alignment.
   *
   * @param out Output Channel
   * @param messageLength Number of bytes in the message buffer, written as little Endian prefix
   * @param messageBuffer Message metadata buffer to be written, this does not include any
   *                      message body data which should be subsequently written to the Channel
   * @param option IPC write options
   * @return Number of bytes written
   * @throws IOException on error
   */
  public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer, IpcOption option)
      throws IOException {

    // if write the pre-0.15.0 encapsulated IPC message format consisting of a 4-byte prefix instead of 8 byte
    int prefixSize = option.write_legacy_ipc_format ? 4 : 8;

    // ensure that message aligns to 8 byte padding - prefix_size bytes, then message body
    if ((messageLength + prefixSize ) % 8 != 0) {
      messageLength += 8 - (messageLength + prefixSize) % 8;
    }
    if (!option.write_legacy_ipc_format) {
      out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
    }
    out.writeIntLittleEndian(messageLength);
    out.write(messageBuffer);
    out.align();

    // any bytes written are already captured by our size modification above
    return messageLength + prefixSize;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original implementation for the rust is not right without the buffer size for the schema flatbuffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree the original Rust implementation is different, I'm not sure I would go so far as to say it is incorrect. There is no particular need to send the length, given the protobuf is already providing the message framing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, the layout of the schemaResult is bytes array, which is flexible but easy to change.

SchemaResult { schema: vals }
impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
type Error = ArrowError;

fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
Copy link
Contributor

@tustvold tustvold Aug 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised we can change this without also needing to change the flight client?

In particular I would expect changes to

impl TryFrom<&SchemaResult> for Schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flight client just can get the bytes array from GetSchema RPC.

The server/client grpc can be generated automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we provide logic to convert the SchemaResult to a Schema which will now fail??

Copy link
Contributor Author

@liukun4515 liukun4515 Aug 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to make consistent with original implementation for me.

Copy link
Contributor

@tustvold tustvold Aug 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is twofold:

  • We clearly are missing a test as the change you have made to the protocol should be failing a test somewhere
  • We need to update the logic to interpret the SchemaResult which does exist and following this PR will be incorrect

As an aside I do really wonder why arrow flight is using opaque byte objects, the whole point of using an API DSL is to avoid this but hey ho, this protocol is wild

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

helps tooling and people find the generated code

I think the tooling has issues when it is generated into target/ which I think is best practice. I'd advocate for leaving it where it is, but .gitignoring it, which should solve formatting without breaking IDEs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will that work when published to crates.io or used via a git revision?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the thread, it sounds like this has been resolved:

  1. use a flag to determine how to write
  2. check for the continuation token to differentiate between versions of the protocol and avoid a breaking change
  3. update the protobuf <-> native conversion code
  4. add a 4 round-trip tests: old -> new, new -> old, old -> old, new -> new

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will that work when published to crates.io or used via a git revision?

I think it will work for cargo publish but I think anyone depending on a git revision would require protoc to be installed. I have not verified these claims though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://doc.rust-lang.org/cargo/commands/cargo-publish.html

This command will create a distributable, compressed .crate file with the source code of the package in the current directory

// According to the definition from `Flight.proto`
// The schema of the dataset in its IPC form:
// 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
// 4 bytes - the byte length of the payload
// a flatbuffer Message whose header is the Schema
let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
Ok(SchemaResult { schema: vals })
}
}

Expand All @@ -275,15 +282,19 @@ impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
type Error = ArrowError;

fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
let pair = *schema_ipc;
let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);

let mut schema = vec![];
arrow::ipc::writer::write_message(&mut schema, encoded_data, pair.1)?;
Ok(IpcMessage(schema))
schema_to_ipc_format(schema_ipc)
}
}

fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
let pair = *schema_ipc;
let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);

let mut schema = vec![];
arrow::ipc::writer::write_message(&mut schema, encoded_data, pair.1)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've double-checked this will write the continuation token 👍

Ok(IpcMessage(schema))
}

impl TryFrom<&FlightData> for Schema {
type Error = ArrowError;
fn try_from(data: &FlightData) -> ArrowResult<Self> {
Expand Down
Loading