Skip to content

Commit

Permalink
feat: implement parsing format from hashmap
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 19, 2023
1 parent e4cd08c commit f09f142
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 25 deletions.
36 changes: 35 additions & 1 deletion src/common/datasource/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use std::any::Any;
use std::num::ParseIntError;
use std::str::ParseBoolError;

use common_error::prelude::*;
use snafu::Location;
Expand Down Expand Up @@ -89,6 +91,30 @@ pub enum Error {
location: Location,
source: tokio::task::JoinError,
},

#[snafu(display("Failed to parse delimiter: {}", source))]
ParseDelimiter {
source: ParseIntError,
location: Location,
},

#[snafu(display("Failed to parse shcema infer max record: {}", source))]
ParseSchemaInferMaxRecord {
source: ParseIntError,
location: Location,
},

#[snafu(display("Failed to parse has header: {}", source))]
ParseHasHeader {
source: ParseBoolError,
location: Location,
},

#[snafu(display("Failed to merge schema: {}", source))]
MergeSchema {
source: arrow_schema::ArrowError,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -109,7 +135,11 @@ impl ErrorExt for Error {
| InvalidPath { .. }
| InferSchema { .. }
| ReadParquetSnafu { .. }
| ParquetToSchema { .. } => StatusCode::InvalidArguments,
| ParquetToSchema { .. }
| ParseDelimiter { .. }
| ParseSchemaInferMaxRecord { .. }
| ParseHasHeader { .. }
| MergeSchema { .. } => StatusCode::InvalidArguments,

Decompression { .. } | JoinHandle { .. } => StatusCode::Unexpected,
}
Expand All @@ -130,6 +160,10 @@ impl ErrorExt for Error {
ParquetToSchema { location, .. } => Some(*location),
Decompression { location, .. } => Some(*location),
JoinHandle { location, .. } => Some(*location),
ParseDelimiter { location, .. } => Some(*location),
ParseSchemaInferMaxRecord { location, .. } => Some(*location),
ParseHasHeader { location, .. } => Some(*location),
MergeSchema { location, .. } => Some(*location),

UnsupportedBackendProtocol { .. }
| EmptyHostPath { .. }
Expand Down
10 changes: 7 additions & 3 deletions src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use std::result;
use std::sync::Arc;
use std::task::Poll;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_schema::ArrowError;
use arrow_schema::{ArrowError, Schema};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
Expand All @@ -37,9 +36,14 @@ use object_store::ObjectStore;
use crate::compression::CompressionType;
use crate::error::Result;

pub const FORMAT_COMPRESSION_TYPE: &str = "COMPRESSION_TYPE";
pub const FORMAT_DELIMTERL: &str = "DELIMTERL";
pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "SCHEMA_INFER_MAX_RECORD";
pub const FORMAT_HAS_HEADER: &str = "FORMAT_HAS_HEADER";

#[async_trait]
pub trait FileFormat: Send + Sync + std::fmt::Debug {
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<SchemaRef>;
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<Schema>;
}

pub trait ArrowDecoder: Send + 'static {
Expand Down
73 changes: 67 additions & 6 deletions src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use arrow::csv;
use arrow::csv::reader::infer_reader_schema as infer_csv_schema;
use arrow_schema::SchemaRef;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use common_runtime;
use datafusion::error::Result as DataFusionResult;
Expand All @@ -30,14 +32,42 @@ use crate::compression::CompressionType;
use crate::error::{self, Result};
use crate::file_format::{self, open_with_decoder, FileFormat};

#[derive(Debug)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CsvFormat {
pub has_header: bool,
pub delimiter: u8,
pub schema_infer_max_record: Option<usize>,
pub compression_type: CompressionType,
}

impl TryFrom<&HashMap<String, String>> for CsvFormat {
type Error = error::Error;

fn try_from(value: &HashMap<String, String>) -> Result<Self> {
let mut format = CsvFormat::default();
if let Some(delimiter) = value.get(file_format::FORMAT_DELIMTERL) {
// TODO(weny): considers to support parse like "\t" (not only b'\t')
format.delimiter = u8::from_str(delimiter).context(error::ParseDelimiterSnafu)?;
};
if let Some(compression_type) = value.get(file_format::FORMAT_COMPRESSION_TYPE) {
format.compression_type = CompressionType::from_str(compression_type)?;
};
if let Some(schema_infer_max_record) =
value.get(file_format::FORMAT_SCHEMA_INFER_MAX_RECORD)
{
format.schema_infer_max_record = Some(
schema_infer_max_record
.parse::<usize>()
.context(error::ParseSchemaInferMaxRecordSnafu)?,
);
};
if let Some(has_header) = value.get(file_format::FORMAT_HAS_HEADER) {
format.has_header = has_header.parse().context(error::ParseHasHeaderSnafu)?
}
Ok(format)
}
}

impl Default for CsvFormat {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -112,7 +142,7 @@ impl FileOpener for CsvOpener {

#[async_trait]
impl FileFormat for CsvFormat {
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<SchemaRef> {
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<Schema> {
let reader = store
.reader(&path)
.await
Expand All @@ -130,8 +160,7 @@ impl FileFormat for CsvFormat {
let (schema, _records_read) =
infer_csv_schema(reader, delimiter, schema_infer_max_record, has_header)
.context(error::InferSchemaSnafu { path: &path })?;

Ok(Arc::new(schema))
Ok(schema)
})
.await
.context(error::JoinHandleSnafu)?
Expand All @@ -142,7 +171,10 @@ impl FileFormat for CsvFormat {
mod tests {

use super::*;
use crate::file_format::FileFormat;
use crate::file_format::{
FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_DELIMTERL, FORMAT_HAS_HEADER,
FORMAT_SCHEMA_INFER_MAX_RECORD,
};
use crate::test_util::{self, format_schema, test_store};

fn test_data_root() -> String {
Expand Down Expand Up @@ -220,4 +252,33 @@ mod tests {
formatted
);
}

#[test]
fn test_try_from() {
let mut map = HashMap::new();
let format: CsvFormat = CsvFormat::try_from(&map).unwrap();

assert_eq!(format, CsvFormat::default());

map.insert(
FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
"2000".to_string(),
);

map.insert(FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string());
map.insert(FORMAT_DELIMTERL.to_string(), b'\t'.to_string());
map.insert(FORMAT_HAS_HEADER.to_string(), "false".to_string());

let format = CsvFormat::try_from(&map).unwrap();

assert_eq!(
format,
CsvFormat {
compression_type: CompressionType::ZSTD,
schema_infer_max_record: Some(2000),
delimiter: b'\t',
has_header: false,
}
);
}
}
57 changes: 53 additions & 4 deletions src/common/datasource/src/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::io::BufReader;
use std::str::FromStr;
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow::json::RawReaderBuilder;
use arrow_schema::Schema;
use async_trait::async_trait;
use common_runtime;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
Expand All @@ -30,12 +33,33 @@ use crate::compression::CompressionType;
use crate::error::{self, Result};
use crate::file_format::{self, open_with_decoder, FileFormat};

#[derive(Debug)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct JsonFormat {
pub schema_infer_max_record: Option<usize>,
pub compression_type: CompressionType,
}

impl TryFrom<&HashMap<String, String>> for JsonFormat {
type Error = error::Error;

fn try_from(value: &HashMap<String, String>) -> Result<Self> {
let mut format = JsonFormat::default();
if let Some(compression_type) = value.get(file_format::FORMAT_COMPRESSION_TYPE) {
format.compression_type = CompressionType::from_str(compression_type)?
};
if let Some(schema_infer_max_record) =
value.get(file_format::FORMAT_SCHEMA_INFER_MAX_RECORD)
{
format.schema_infer_max_record = Some(
schema_infer_max_record
.parse::<usize>()
.context(error::ParseSchemaInferMaxRecordSnafu)?,
);
};
Ok(format)
}
}

impl Default for JsonFormat {
fn default() -> Self {
Self {
Expand All @@ -47,7 +71,7 @@ impl Default for JsonFormat {

#[async_trait]
impl FileFormat for JsonFormat {
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<SchemaRef> {
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<Schema> {
let reader = store
.reader(&path)
.await
Expand All @@ -65,7 +89,7 @@ impl FileFormat for JsonFormat {
let schema = infer_json_schema_from_iterator(iter)
.context(error::InferSchemaSnafu { path: &path })?;

Ok(Arc::new(schema))
Ok(schema)
})
.await
.context(error::JoinHandleSnafu)?
Expand Down Expand Up @@ -116,7 +140,7 @@ impl FileOpener for JsonOpener {
#[cfg(test)]
mod tests {
use super::*;
use crate::file_format::FileFormat;
use crate::file_format::{FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD};
use crate::test_util::{self, format_schema, test_store};

fn test_data_root() -> String {
Expand Down Expand Up @@ -162,4 +186,29 @@ mod tests {
formatted
);
}

#[test]
fn test_try_from() {
let mut map = HashMap::new();
let format = JsonFormat::try_from(&map).unwrap();

assert_eq!(format, JsonFormat::default());

map.insert(
FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
"2000".to_string(),
);

map.insert(FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string());

let format = JsonFormat::try_from(&map).unwrap();

assert_eq!(
format,
JsonFormat {
compression_type: CompressionType::ZSTD,
schema_infer_max_record: Some(2000),
}
);
}
}
8 changes: 3 additions & 5 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use arrow_schema::SchemaRef;
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
use datafusion::parquet::arrow::parquet_to_arrow_schema;
Expand All @@ -29,7 +27,7 @@ pub struct ParquetFormat {}

#[async_trait]
impl FileFormat for ParquetFormat {
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<SchemaRef> {
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<Schema> {
let mut reader = store
.reader(&path)
.await
Expand All @@ -47,7 +45,7 @@ impl FileFormat for ParquetFormat {
)
.context(error::ParquetToSchemaSnafu)?;

Ok(Arc::new(schema))
Ok(schema)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/common/datasource/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::error::{self, Result};
pub const FS_SCHEMA: &str = "FS";
pub const S3_SCHEMA: &str = "S3";

/// parse url returns (schema,Option<host>,path)
/// Returns (schema, Option<host>, path)
pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
let parsed_url = Url::parse(url);
match parsed_url {
Expand All @@ -43,7 +43,7 @@ pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
}
}

pub fn build_backend(url: &str, connection: HashMap<String, String>) -> Result<ObjectStore> {
pub fn build_backend(url: &str, connection: &HashMap<String, String>) -> Result<ObjectStore> {
let (schema, host, _path) = parse_url(url)?;

match schema.to_uppercase().as_str() {
Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/src/object_store/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const ENABLE_VIRTUAL_HOST_STYLE: &str = "ENABLE_VIRTUAL_HOST_STYLE";
pub fn build_s3_backend(
host: &str,
path: &str,
connection: HashMap<String, String>,
connection: &HashMap<String, String>,
) -> Result<ObjectStore> {
let mut builder = S3::default();

Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub fn get_data_dir(path: &str) -> PathBuf {
PathBuf::from(dir).join(path)
}

pub fn format_schema(schema: SchemaRef) -> Vec<String> {
pub fn format_schema(schema: Schema) -> Vec<String> {
schema
.fields()
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl StatementExecutor {
let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;

let object_store =
build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?;
build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;

let (dir, filename) = find_dir_and_filename(&path);
let regex = req
Expand Down
Loading

0 comments on commit f09f142

Please sign in to comment.