Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-7684: [Rust] Example Flight client and server for DataFusion #6308

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dev/release/00-prepare-test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ def test_version_pre_tag
["-arrow = { path = \"../arrow\", version = \"#{@snapshot_version}\" }",
"-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }",
"+arrow = { path = \"../arrow\", version = \"#{@release_version}\" }",
"+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"]
"+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"],
["-arrow-flight = { path = \"../arrow-flight\", version = \"#{@snapshot_version}\" }",
"+arrow-flight = { path = \"../arrow-flight\", version = \"#{@release_version}\" }"]
],
},
{
Expand Down Expand Up @@ -458,7 +460,9 @@ def test_version_post_tag
["-arrow = { path = \"../arrow\", version = \"#{@release_version}\" }",
"-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }",
"+arrow = { path = \"../arrow\", version = \"#{@next_snapshot_version}\" }",
"+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"]
"+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"],
["-arrow-flight = { path = \"../arrow-flight\", version = \"#{@release_version}\" }",
"+arrow-flight = { path = \"../arrow-flight\", version = \"#{@next_snapshot_version}\" }"]
],
},
{
Expand Down
4 changes: 3 additions & 1 deletion rust/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ packed_simd = { version = "0.3.1", optional = true }
chrono = "0.4"
flatbuffers = "0.6.0"
hex = "0.4"
arrow-flight = { path = "../arrow-flight", optional = true }

[features]
simd = ["packed_simd"]
default = ["simd"]
flight = ["arrow-flight"]
default = ["simd", "flight"]

[dev-dependencies]
criterion = "0.2"
Expand Down
83 changes: 83 additions & 0 deletions rust/arrow/src/flight/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utilities to assist with reading and writing Arrow data as Flight messages

use std::convert::TryFrom;
use std::sync::Arc;

use flight::FlightData;

use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};
use crate::ipc::{convert, reader, writer};
use crate::record_batch::RecordBatch;

/// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes
impl From<&RecordBatch> for FlightData {
fn from(batch: &RecordBatch) -> Self {
let (header, body) = writer::record_batch_to_bytes(batch);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: header,
data_body: body,
}
}
}

/// Convert a `Schema` to `FlightData` by converting to an IPC message
impl From<&Schema> for FlightData {
fn from(schema: &Schema) -> Self {
let schema = writer::schema_to_bytes(schema);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: schema,
data_body: vec![],
}
}
}

/// Try convert `FlightData` into an Arrow Schema
///
/// Returns an error if the `FlightData` header is not a valid IPC schema
impl TryFrom<&FlightData> for Schema {
type Error = ArrowError;
fn try_from(data: &FlightData) -> Result<Self> {
convert::schema_from_bytes(&data.data_header[..]).ok_or(ArrowError::ParseError(
"Unable to convert flight data to Arrow schema".to_string(),
))
}
}

/// Convert a FlightData message to a RecordBatch
pub fn flight_data_to_batch(
data: &FlightData,
schema: Arc<Schema>,
) -> Result<Option<RecordBatch>> {
// check that the data_header is a record batch message
let message = crate::ipc::get_root_as_message(&data.data_header[..]);
let batch_header = message
.header_as_record_batch()
.ok_or(ArrowError::ParseError(
"Unable to convert flight data header to a record batch".to_string(),
))?;
reader::read_record_batch(&data.data_body, batch_header, schema)
}

// TODO: add more explicit conversion that expoess flight descriptor and metadata options
6 changes: 6 additions & 0 deletions rust/arrow/src/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ pub(crate) fn fb_to_schema(fb: ipc::Schema) -> Schema {
Schema::new_with_metadata(fields, metadata)
}

/// Deserialize an IPC message into a schema
pub(crate) fn schema_from_bytes(bytes: &[u8]) -> Option<Schema> {
let ipc = ipc::get_root_as_message(bytes);
ipc.header_as_schema().map(|schema| fb_to_schema(schema))
}

/// Get the Arrow data type from the flatbuffer Field table
pub(crate) fn get_data_type(field: ipc::Field) -> DataType {
match field.type_type() {
Expand Down
2 changes: 1 addition & 1 deletion rust/arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ fn create_list_array(
}

/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema`
fn read_record_batch(
pub(crate) fn read_record_batch(
buf: &Vec<u8>,
batch: ipc::RecordBatch,
schema: Arc<Schema>,
Expand Down
37 changes: 23 additions & 14 deletions rust/arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ impl<W: Write> Drop for StreamWriter<W> {
}
}

/// Convert the schema to its IPC representation, and write it to the `writer`
fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<usize> {
pub(crate) fn schema_to_bytes(schema: &Schema) -> Vec<u8> {
let mut fbb = FlatBufferBuilder::new();
let schema = {
let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
Expand All @@ -227,9 +226,13 @@ fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<
fbb.finish(data, None);

let data = fbb.finished_data();
let written = write_padded_data(writer, data, WriteDataType::Header);
data.to_vec()
}

written
/// Convert the schema to its IPC representation, and write it to the `writer`
fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<usize> {
let data = schema_to_bytes(schema);
write_padded_data(writer, &data[..], WriteDataType::Header)
}

/// The message type being written. This determines whether to write the data length or not.
Expand Down Expand Up @@ -266,13 +269,8 @@ fn write_padded_data<R: Write>(
Ok(total_len as usize)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
fn write_record_batch<R: Write>(
writer: &mut BufWriter<R>,
batch: &RecordBatch,
is_stream: bool,
) -> Result<(usize, usize)> {
/// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data
pub(crate) fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec<u8>, Vec<u8>) {
let mut fbb = FlatBufferBuilder::new();

let mut nodes: Vec<ipc::FieldNode> = vec![];
Expand Down Expand Up @@ -313,13 +311,24 @@ fn write_record_batch<R: Write>(
let root = message.finish();
fbb.finish(root, None);
let finished_data = fbb.finished_data();

(finished_data.to_vec(), arrow_data)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
fn write_record_batch<R: Write>(
writer: &mut BufWriter<R>,
batch: &RecordBatch,
is_stream: bool,
) -> Result<(usize, usize)> {
let (meta_data, arrow_data) = record_batch_to_bytes(batch);
// write the length of data if writing to stream
if is_stream {
let total_len: u32 = finished_data.len() as u32;
let total_len: u32 = meta_data.len() as u32;
writer.write(&total_len.to_le_bytes()[..])?;
}
let meta_written =
write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?;
let meta_written = write_padded_data(writer, &meta_data[..], WriteDataType::Body)?;
let arrow_data_written =
write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?;
Ok((meta_written, arrow_data_written))
Expand Down
2 changes: 2 additions & 0 deletions rust/arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod compute;
pub mod csv;
pub mod datatypes;
pub mod error;
#[cfg(feature = "flight")]
pub mod flight;
pub mod ipc;
pub mod json;
pub mod memory;
Expand Down
6 changes: 6 additions & 0 deletions rust/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ crossbeam = "0.7.1"
[dev-dependencies]
criterion = "0.2.0"
tempdir = "0.3.7"
futures = "0.3"
prost = "0.6"
tokio = { version = "0.2", features = ["macros"] }
tonic = "0.1"
flatbuffers = "0.6.0"
arrow-flight = { path = "../arrow-flight", version = "0.16.0-SNAPSHOT" }

[[bench]]
name = "aggregate_query_sql"
Expand Down
28 changes: 28 additions & 0 deletions rust/datafusion/examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DataFusion Examples

## Single Process

The examples `csv_sql.rs` and `parquet_sql.rs` demonstrate building a query plan from a SQL statement and then executing the query plan against local CSV and Parquet files, respectively.

## Distributed

The `flight-client.rs` and `flight-server.rs` examples demonstrate how to run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol.
62 changes: 62 additions & 0 deletions rust/datafusion/examples/flight-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::convert::TryFrom;
use std::sync::Arc;

use arrow::array::Int32Array;
use arrow::datatypes::Schema;
use arrow::flight::flight_data_to_batch;
use flight::flight_service_client::FlightServiceClient;
use flight::Ticket;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = FlightServiceClient::connect("http://localhost:50051").await?;

let request = tonic::Request::new(Ticket {
ticket: "SELECT id FROM alltypes_plain".into(),
});

let mut stream = client.do_get(request).await?.into_inner();

// the schema should be the first message returned, else client should error
let flight_data = stream.message().await?.unwrap();
// convert FlightData to a stream
let schema = Arc::new(Schema::try_from(&flight_data)?);
println!("Schema: {:?}", schema);

// all the remaining stream messages should be dictionary and record batches
while let Some(flight_data) = stream.message().await? {
// the unwrap is infallible and thus safe
let record_batch = flight_data_to_batch(&flight_data, schema.clone())?.unwrap();

println!(
"record_batch has {} columns and {} rows",
record_batch.num_columns(),
record_batch.num_rows()
);
let column = record_batch.column(0);
let column = column
.as_any()
.downcast_ref::<Int32Array>()
.expect("Unable to get column");
println!("Column 1: {:?}", column);
}

Ok(())
}
Loading