Skip to content

Commit

Permalink
query first attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
kindly committed Apr 12, 2023
1 parent 6912d0a commit f63a064
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 2 deletions.
2 changes: 1 addition & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### New

- Use duckdb for parquet converstion. More datetime formats allowed.
- `pipe` option to get data from stdin or named pipe.
- `pipe` option to get data from stdin or named pipe.

## [0.7.13] - 2023-01-22

Expand Down
2 changes: 1 addition & 1 deletion fixtures/add_resource/csv/games.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
id,title
1,game1_add_resource
2,game2_add_resource
2,game2_add_resource
1 change: 1 addition & 0 deletions src/converters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,7 @@ fn get_column_changes(
(add_columns, alter_columns)
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod converters;
mod describe;
mod describe_csv;
mod describer;
mod query;

#[cfg(not(target_family = "wasm"))]
mod zip_dir;
Expand All @@ -43,3 +44,7 @@ pub use converters::{
merge_datapackage, merge_datapackage_jsons, merge_datapackage_with_options,
Error, Options
};
#[cfg(not(target_family = "wasm"))]
pub use query::{
query, Error as QueryError
};
196 changes: 196 additions & 0 deletions src/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use duckdb::Connection as DuckdbConnection;
use snafu::prelude::*;
use typed_builder::TypedBuilder;

#[non_exhaustive]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("{}", source))]
DuckDbError { source: duckdb::Error },
}

#[derive(Default, Debug, TypedBuilder)]
pub struct Options {
#[builder(default)]
pub delimiter: String,
#[builder(default)]
pub quote: String,
#[builder(default)]
pub parquet: bool,
}


pub fn query(sql: String, output: String, options: Options) -> Result<(), Error> {
let conn = DuckdbConnection::open_in_memory().context(DuckDbSnafu {})?;

let sql = sql.trim();

let sql = if let Some(sql) = sql.strip_suffix(";") {
sql
} else {
sql
};

let mut output_options = vec![];

if options.parquet || output.ends_with(".parquet") {
output_options.push("FORMAT 'parquet'".to_owned());
} else {
output_options.push("HEADER 1".to_owned());
if !options.delimiter.is_empty() {
output_options.push(format!("DELIMETER '{}'", options.delimiter));
}
if !options.quote.is_empty() {
output_options.push(format!("QUOTE '{}'", options.quote));
}
}

let with_part = output_options.join(", ");

let output = if output == "-" {"/dev/stdout"} else {output.as_str()};

let sql = format!("copy ({sql}) TO '{output}' WITH ({with_part}) ");

conn.execute_batch("INSTALL parquet; LOAD parquet; INSTALL httpfs; LOAD httpfs;").context(DuckDbSnafu {})?;
conn.execute_batch(&sql).context(DuckDbSnafu {})?;

Ok(())
}


#[cfg(test)]
mod tests {
use std::io::BufRead;

use super::*;
use insta::assert_debug_snapshot;
use tempfile::TempDir;

fn get_results(file: String) -> Vec<Vec<duckdb::types::Value>> {
let conn = DuckdbConnection::open_in_memory().unwrap();
conn.execute_batch("INSTALL parquet; LOAD parquet").unwrap();
let mut stmt = conn.prepare(&format!("select * from '{file}';")).unwrap();
let mut rows = stmt.query([]).unwrap();

let mut results = vec![];

while let Some(row) = rows.next().unwrap() {
let mut result_row = vec![];
for i in 0.. {
if let Ok(item) = row.get(i) {
let cell: duckdb::types::Value = item;
result_row.push(cell)
} else {
break
}
}
results.push(result_row)
}
results
}

#[test]
fn test_query_to_parquet() {
let tmp_dir = TempDir::new().unwrap();
let tmp = tmp_dir.path().to_owned();

let output: String = tmp.join("output.parquet").to_string_lossy().into();

query(
"select * from 'fixtures/add_resource/csv/games.csv'".into(),
output.clone(),
Options::builder().build()
).unwrap();

let data = get_results(output);
assert_debug_snapshot!(data);

let output: String = tmp.join("output.parquet").to_string_lossy().into();

query(
"select * from 'fixtures/add_resource/csv/games.csv' where id=1".into(),
output.clone(),
Options::builder().parquet(true).build()
).unwrap();

let data = get_results(output);
assert_debug_snapshot!(data);

let output: String = tmp.join("output.parquet").to_string_lossy().into();

query(
"select * from 'https://csvs-convert-test.s3.eu-west-1.amazonaws.com/games.parquet' where id=1".into(),
output.clone(),
Options::builder().parquet(true).build()
).unwrap();

let data = get_results(output);
assert_debug_snapshot!(data);

}

#[test]
fn test_query_to_csvs() {
let tmp_dir = TempDir::new().unwrap();
let tmp = tmp_dir.path().to_owned();

let output: String = tmp.join("output.csv").to_string_lossy().into();

query(
"select * from 'fixtures/add_resource/csv/games.csv' where id=1".into(),
output.clone(),
Options::builder().build()
).unwrap();

let mut lines = vec![];

for line in std::io::BufReader::new(std::fs::File::open(output).unwrap()).lines() {
lines.push(line.unwrap())
}

assert_debug_snapshot!(lines);

let output: String = tmp.join("output.csv").to_string_lossy().into();

query(
"select * from 'fixtures/add_resource/csv/games.csv' where id=1".into(),
output.clone(),
Options::builder().delimiter(";".into()).build()
).unwrap();

let mut lines = vec![];

for line in std::io::BufReader::new(std::fs::File::open(output).unwrap()).lines() {
lines.push(line.unwrap())
}

assert_debug_snapshot!(lines);

let output: String = tmp.join("output.csv").to_string_lossy().into();

query(
"select * from 'https://gist.githubusercontent.com/kindly/7937e7d707a8bc3c2c812b4c6c314dc1/raw/f5767b3a878fb87ebdbe0071f308f2a3e132c3b9/test.csv'".into(),
output.clone(),
Options::builder().build()
).unwrap();

let mut lines = vec![];

for line in std::io::BufReader::new(std::fs::File::open(output).unwrap()).lines() {
lines.push(line.unwrap())
}

assert_debug_snapshot!(lines);

}

// #[test]
// fn test_s3() {
// query(
// "select * from 's3://csvs-convert-test/output.parquet'".into(),
// "s3://csvs-convert-test/output2.parquet".into(),
// Options::builder().build()
// ).unwrap();
// }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
source: src/query.rs
expression: lines
---
[
"id;title",
"1;game1_add_resource",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
source: src/query.rs
expression: lines
---
[
"id,title",
"1,game1_add_resource",
"2,game2_add_resource",
]
8 changes: 8 additions & 0 deletions src/snapshots/csvs_convert__query__tests__query_to_csvs.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
source: src/query.rs
expression: lines
---
[
"id,title",
"1,game1_add_resource",
]
14 changes: 14 additions & 0 deletions src/snapshots/csvs_convert__query__tests__query_to_parquet-2.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
source: src/query.rs
expression: data
---
[
[
BigInt(
1,
),
Text(
"game1_add_resource",
),
],
]
14 changes: 14 additions & 0 deletions src/snapshots/csvs_convert__query__tests__query_to_parquet-3.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
source: src/query.rs
expression: data
---
[
[
Int(
1,
),
Text(
"game1_add_resource",
),
],
]
22 changes: 22 additions & 0 deletions src/snapshots/csvs_convert__query__tests__query_to_parquet.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
source: src/query.rs
expression: data
---
[
[
BigInt(
1,
),
Text(
"game1_add_resource",
),
],
[
BigInt(
2,
),
Text(
"game2_add_resource",
),
],
]

0 comments on commit f63a064

Please sign in to comment.