diff --git a/arrow-cast/Cargo.toml b/arrow-cast/Cargo.toml index 2e0a9fdd4ebd..19b857297d14 100644 --- a/arrow-cast/Cargo.toml +++ b/arrow-cast/Cargo.toml @@ -50,10 +50,12 @@ half = { version = "2.1", default-features = false } num = { version = "0.4", default-features = false, features = ["std"] } lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] } comfy-table = { version = "7.0", optional = true, default-features = false } +base64 = "0.21" [dev-dependencies] criterion = { version = "0.5", default-features = false } half = { version = "2.1", default-features = false } +rand = "0.8" [build-dependencies] diff --git a/arrow-cast/src/base64.rs b/arrow-cast/src/base64.rs new file mode 100644 index 000000000000..e109c8112480 --- /dev/null +++ b/arrow-cast/src/base64.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functions for Base64 encoding/decoding + +use arrow_array::{Array, GenericBinaryArray, GenericStringArray, OffsetSizeTrait}; +use arrow_buffer::OffsetBuffer; +use arrow_schema::ArrowError; +use base64::encoded_len; +use base64::engine::Config; + +pub use base64::prelude::*; + +/// Bas64 encode each element of `array` with the provided `engine` +pub fn b64_encode( + engine: &E, + array: &GenericBinaryArray, +) -> GenericStringArray { + let lengths = array.offsets().windows(2).map(|w| { + let len = w[1].as_usize() - w[0].as_usize(); + encoded_len(len, engine.config().encode_padding()).unwrap() + }); + let offsets = OffsetBuffer::::from_lengths(lengths); + let buffer_len = offsets.last().unwrap().as_usize(); + let mut buffer = vec![0_u8; buffer_len]; + let mut offset = 0; + + for i in 0..array.len() { + let len = engine + .encode_slice(array.value(i), &mut buffer[offset..]) + .unwrap(); + offset += len; + } + assert_eq!(offset, buffer_len); + + // Safety: Base64 is valid UTF-8 + unsafe { GenericStringArray::new_unchecked(offsets, buffer.into(), array.nulls().cloned()) } +} + +/// Base64 decode each element of `array` with the provided `engine` +pub fn b64_decode( + engine: &E, + array: &GenericBinaryArray, +) -> Result, ArrowError> { + let estimated_len = array.values().len(); // This is an overestimate + let mut buffer = vec![0; estimated_len]; + + let mut offsets = Vec::with_capacity(array.len() + 1); + offsets.push(O::usize_as(0)); + let mut offset = 0; + + for v in array.iter() { + if let Some(v) = v { + let len = engine.decode_slice(v, &mut buffer[offset..]).unwrap(); + // This cannot overflow as `len` is less than `v.len()` and `a` is valid + offset += len; + } + offsets.push(O::usize_as(offset)); + } + + // Safety: offsets monotonically increasing by construction + let offsets = unsafe { OffsetBuffer::new_unchecked(offsets.into()) }; + + Ok(GenericBinaryArray::new( + offsets, + buffer.into(), + array.nulls().cloned(), + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::BinaryArray; + use base64::prelude::{BASE64_STANDARD, BASE64_STANDARD_NO_PAD}; + use rand::{thread_rng, Rng}; + + fn test_engine(e: &E, a: &BinaryArray) { + let encoded = b64_encode(e, a); + encoded.to_data().validate_full().unwrap(); + + let to_decode = encoded.into(); + let decoded = b64_decode(e, &to_decode).unwrap(); + decoded.to_data().validate_full().unwrap(); + + assert_eq!(&decoded, a); + } + + #[test] + fn test_b64() { + let mut rng = thread_rng(); + let len = rng.gen_range(1024..1050); + let data: BinaryArray = (0..len) + .map(|_| { + let len = rng.gen_range(0..16); + Some((0..len).map(|_| rng.gen()).collect::>()) + }) + .collect(); + + test_engine(&BASE64_STANDARD, &data); + test_engine(&BASE64_STANDARD_NO_PAD, &data); + } +} diff --git a/arrow-cast/src/lib.rs b/arrow-cast/src/lib.rs index d2677a0e0a53..71ebe6c0ed8b 100644 --- a/arrow-cast/src/lib.rs +++ b/arrow-cast/src/lib.rs @@ -21,6 +21,7 @@ pub mod cast; pub use cast::*; pub mod display; pub mod parse; - #[cfg(feature = "prettyprint")] pub mod pretty; + +pub mod base64; diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index df38a52811c2..7e49a57fbd6c 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -34,11 +34,11 @@ path = "src/lib.rs" bench = false [dependencies] -arrow-array = { workspace = true } -arrow-buffer = { workspace = true } -arrow-cast = { workspace = true } -arrow-data = { workspace = true } -arrow-schema = { workspace = true } +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-cast = { workspace = true } +arrow-data = { workspace = true } +arrow-schema = { workspace = true } half = { version = "2.1", default-features = false } indexmap = { version = "2.0", default-features = false, features = ["std"] } num = { version = "0.4", default-features = false, features = ["std"] } diff --git a/arrow-json/src/lib.rs b/arrow-json/src/lib.rs index 88415ff2ecac..e69eaaba3ef8 100644 --- a/arrow-json/src/lib.rs +++ b/arrow-json/src/lib.rs @@ -15,9 +15,53 @@ // specific language governing permissions and limitations // under the License. -//! Transfer data between the Arrow memory format and JSON -//! line-delimited records. See the module level documentation for the +//! Transfer data between the Arrow memory format and JSON line-delimited records. +//! +//! See the module level documentation for the //! [`reader`] and [`writer`] for usage examples. +//! +//! # Binary Data +//! +//! As per [RFC7159] JSON cannot encode arbitrary binary data. A common approach to workaround +//! this is to use a [binary-to-text encoding] scheme, such as base64, to encode the +//! input data and then decode it on output. +//! +//! ``` +//! # use std::io::Cursor; +//! # use std::sync::Arc; +//! # use arrow_array::{BinaryArray, RecordBatch, StringArray}; +//! # use arrow_array::cast::AsArray; +//! # use arrow_cast::base64::{b64_decode, b64_encode, BASE64_STANDARD}; +//! # use arrow_json::{LineDelimitedWriter, ReaderBuilder}; +//! # +//! // The data we want to write +//! let input = BinaryArray::from(vec![b"\xDE\x00\xFF".as_ref()]); +//! +//! // Base64 encode it to a string +//! let encoded: StringArray = b64_encode(&BASE64_STANDARD, &input); +//! +//! // Write the StringArray to JSON +//! let batch = RecordBatch::try_from_iter([("col", Arc::new(encoded) as _)]).unwrap(); +//! let mut buf = Vec::with_capacity(1024); +//! let mut writer = LineDelimitedWriter::new(&mut buf); +//! writer.write(&batch).unwrap(); +//! writer.finish().unwrap(); +//! +//! // Read the JSON data +//! let cursor = Cursor::new(buf); +//! let mut reader = ReaderBuilder::new(batch.schema()).build(cursor).unwrap(); +//! let batch = reader.next().unwrap().unwrap(); +//! +//! // Reverse the base64 encoding +//! let col: BinaryArray = batch.column(0).as_string::().clone().into(); +//! let output = b64_decode(&BASE64_STANDARD, &col).unwrap(); +//! +//! assert_eq!(input, output); +//! ``` +//! +//! [RFC7159]: https://datatracker.ietf.org/doc/html/rfc7159#section-8.1 +//! [binary-to-text encoding]: https://en.wikipedia.org/wiki/Binary-to-text_encoding +//! #![deny(rustdoc::broken_intra_doc_links)] #![warn(missing_docs)]