diff --git a/changelog.md b/changelog.md index 9a6f3a1..87bcea2 100644 --- a/changelog.md +++ b/changelog.md @@ -5,19 +5,25 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [0.7.12] - 2023-01-21 + +### New + +- `dump_file` option which will create dump files for `psql` and `sqlite3` cli tools. + ## [0.7.11] - 2022-01-07 ### Fixed - WASM dependancy loop -## [0.7.10] - 2022-01-06 +## [0.7.10] - 2023-01-06 ### Fixed - Truncate for too large xlsx cell. -## [0.7.9] - 2022-01-01 +## [0.7.9] - 2023-01-01 ### Changed diff --git a/src/converters.rs b/src/converters.rs index 636aa95..4690d23 100644 --- a/src/converters.rs +++ b/src/converters.rs @@ -4,12 +4,12 @@ use csv::Writer; use minijinja::Environment; use postgres::{Client, NoTls}; use rusqlite::Connection; -use serde_json::Value; +use serde_json::{Value, json}; use snafu::prelude::*; use snafu::{ensure, Snafu}; use std::collections::HashMap; use std::fmt::Write as fmt_write; -use std::fs::File; +use std::fs::{File, canonicalize}; use std::io::BufReader; use std::io::Write; use std::path::PathBuf; @@ -40,6 +40,12 @@ pub enum Error { filename: String, }, + #[snafu(display("Error writing file {}: {}", filename, source))] + WriteError { + source: std::io::Error, + filename: String, + }, + #[snafu(display("Error parsing JSON {}: {}", filename, source))] JSONError { source: serde_json::Error, @@ -131,7 +137,7 @@ pub struct Options { #[builder(default)] pub threads: usize, #[builder(default)] - pub show_queries: bool, + pub dump_file: String, } lazy_static::lazy_static! { @@ -816,9 +822,9 @@ lazy_static::lazy_static! { fn insert_sql_data( csv_reader: csv::Reader, - mut conn: rusqlite::Connection, + conn: &mut rusqlite::Connection, resource: Value, -) -> Result { +) -> Result<(), Error> { let tx = conn.transaction().context(RusqliteSnafu { message: "Error making transaction: ", })?; @@ -869,7 +875,7 @@ fn insert_sql_data( tx.commit().context(RusqliteSnafu { message: "Error commiting sqlite: ", })?; - Ok(conn) + Ok(()) } pub fn csvs_to_sqlite(db_path: String, csvs: Vec) -> Result { @@ -922,60 +928,94 @@ pub fn datapackage_to_sqlite_with_options( ) -> Result<(), Error> { let (table_to_schema, ordered_tables) = get_table_info(&datapackage, &options)?; - let mut conn = Connection::open(db_path).context(RusqliteSnafu { - message: "Error opening connection: ", - })?; + let mut conn= if !db_path.is_empty() { + Some(Connection::open(&db_path).context(RusqliteSnafu { + message: "Error opening connection: ", + })?) + } else { + None + }; + + let mut dump_writer: Option> = if !options.dump_file.is_empty() { + if options.dump_file == "-" { + Some(Box::new(std::io::stdout())) + } else { + Some(Box::new(File::create(&options.dump_file).context(WriteSnafu {filename: db_path})?)) + } + } else { + None + }; - conn.execute_batch( - "PRAGMA journal_mode = OFF; + let pragmas = "PRAGMA journal_mode = OFF; PRAGMA synchronous = 0; PRAGMA locking_mode = EXCLUSIVE; - PRAGMA temp_store = MEMORY;", - ) - .context(RusqliteSnafu { - message: "Error executing pragmas: ", - })?; + PRAGMA temp_store = MEMORY;"; + + if let Some(conn) = conn.as_mut() { + conn.execute_batch( + pragmas + ) + .context(RusqliteSnafu { + message: "Error executing pragmas: ", + })?; + }; + + + if let Some(dump_writer) = dump_writer.as_mut() { + writeln!(dump_writer, "{pragmas}").context(IoSnafu {filename: &options.dump_file})?; + writeln!(dump_writer, ".mode csv").context(IoSnafu {filename: &options.dump_file})?; + } for table in ordered_tables { let resource = table_to_schema.get(&table).unwrap(); + let mut existing_columns: HashMap = HashMap::new(); - { - let mut fields_query = conn - .prepare("select name, type from pragma_table_info(?)") - .context(RusqliteSnafu { + if options.drop || options.evolve { + if let Some(conn) = conn.as_mut() { + let mut fields_query = conn + .prepare("select name, type from pragma_table_info(?)") + .context(RusqliteSnafu { + message: "Error peparing sql", + })?; + + let mut rows = fields_query.query([&table]).context(RusqliteSnafu { message: "Error peparing sql", })?; - let mut rows = fields_query.query([&table]).context(RusqliteSnafu { - message: "Error peparing sql", - })?; - - while let Some(row) = rows.next().context(RusqliteSnafu { - message: "Error fetching rows", - })? { - existing_columns.insert( - row.get(0).context(RusqliteSnafu { - message: "Error fetching rows", - })?, - row.get(1).context(RusqliteSnafu { - message: "Error fetching rows", - })?, - ); - } + while let Some(row) = rows.next().context(RusqliteSnafu { + message: "Error fetching rows", + })? { + existing_columns.insert( + row.get(0).context(RusqliteSnafu { + message: "Error fetching rows", + })?, + row.get(1).context(RusqliteSnafu { + message: "Error fetching rows", + })?, + ); + } + }; } let mut create = false; if existing_columns.is_empty() { create = true - } else if options.drop { - conn.execute(&format!("drop table [{table}];"), []) - .context(RusqliteSnafu { - message: "Error making sqlite tables: ", - })?; - create = true + } + + if options.drop { + if let Some(conn) = conn.as_mut() { + conn.execute(&format!("drop table if exists [{table}];"), []) + .context(RusqliteSnafu { + message: "Error making sqlite tables: ", + })?; + create = true + } + if let Some(dump_writer) = dump_writer.as_mut() { + writeln!(dump_writer, "drop table if exists [{table}];").context(IoSnafu {filename: &options.dump_file})?; + } } ensure!( @@ -987,44 +1027,75 @@ pub fn datapackage_to_sqlite_with_options( if create { let resource_sqlite = render_sqlite_table(resource.clone())?; - - conn.execute(&resource_sqlite, []).context(RusqliteSnafu { - message: "Error making sqlite tables: ", - })?; + if let Some(conn) = conn.as_mut() { + conn.execute(&resource_sqlite, []).context(RusqliteSnafu { + message: "Error making sqlite tables: ", + })?; + } + if let Some(dump_writer) = dump_writer.as_mut() { + writeln!(dump_writer, "{}", &resource_sqlite).context(IoSnafu {filename: &options.dump_file})?; + } } else if options.evolve { let (add_columns, _alter_columns) = get_column_changes(resource, existing_columns); for (name, type_) in add_columns { - conn.execute(&format!("ALTER TABLE {table} ADD [{name}] {type_}"), []) - .context(RusqliteSnafu { - message: "Error altering sqlite tables: ", - })?; + if let Some(conn) = conn.as_mut() { + conn.execute(&format!("ALTER TABLE {table} ADD [{name}] {type_}"), []) + .context(RusqliteSnafu { + message: "Error altering sqlite tables: ", + })?; + } + if let Some(dump_writer) = dump_writer.as_mut() { + writeln!(dump_writer, "ALTER TABLE {table} ADD [{name}] {type_}").context(IoSnafu {filename: &options.dump_file})?; + } } } let resource_path = resource["path"].as_str().unwrap(); - let csv_readers = get_reader(&datapackage, resource_path, &options)?; + if let Some(conn) = conn.as_mut() { + let csv_readers = get_reader(&datapackage, resource_path, &options)?; - match csv_readers { - Readers::Zip(mut zip) => { - let zipped_file = zip.by_name(resource_path).context(ZipSnafu { - filename: &datapackage, - })?; - let csv_reader = - get_csv_reader_builder(&options, resource).from_reader(zipped_file); - conn = insert_sql_data(csv_reader, conn, resource.clone())? - } - Readers::File(csv_file) => { - let (filename, file) = csv_file; - let csv_reader = get_csv_reader_builder(&options, resource).from_reader(file); - conn = insert_sql_data(csv_reader, conn, resource.clone())?; - if options.delete_input_csv { - std::fs::remove_file(&filename).context(IoSnafu { - filename: filename.to_string_lossy(), + match csv_readers { + Readers::Zip(mut zip) => { + let zipped_file = zip.by_name(resource_path).context(ZipSnafu { + filename: &datapackage, })?; + let csv_reader = + get_csv_reader_builder(&options, resource).from_reader(zipped_file); + insert_sql_data(csv_reader, conn, resource.clone())? + } + Readers::File(csv_file) => { + let (filename, file) = csv_file; + let csv_reader = get_csv_reader_builder(&options, resource).from_reader(file); + insert_sql_data(csv_reader, conn, resource.clone())?; + if options.delete_input_csv { + std::fs::remove_file(&filename).context(IoSnafu { + filename: filename.to_string_lossy(), + })?; + } } } } + if let Some(dump_writer) = dump_writer.as_mut() { + let table_value = json!(table); + let table_name = resource.get("title").unwrap_or(resource.get("name").unwrap_or(&table_value)).as_str().unwrap_or(&table); + + let mut delimiter_u8 = options.delimiter.unwrap_or(b','); + + if let Some(dialect_delimiter) = resource["dialect"]["delimiter"].as_str() { + if dialect_delimiter.as_bytes().len() == 1 { + delimiter_u8 = *dialect_delimiter.as_bytes().first().unwrap() + } + }; + + let delimiter = std::str::from_utf8(&[delimiter_u8]) + .context(DelimeiterSnafu {})? + .to_owned(); + + writeln!(dump_writer, ".separator '{delimiter}'").context(IoSnafu {filename: &options.dump_file})?; + writeln!(dump_writer, ".import '{resource_path}' {table_name} --skip 1 ").context(IoSnafu {filename: &options.dump_file})?; + } + } Ok(()) @@ -1579,8 +1650,6 @@ pub fn datapackage_to_postgres_with_options( ) -> Result<(), Error> { let (table_to_schema, ordered_tables) = get_table_info(&datapackage, &options)?; - let show_queries = options.show_queries || postgres_url.is_empty(); - let mut conf = postgres_url.clone(); if postgres_url.trim_start().to_lowercase().starts_with("env") { @@ -1605,6 +1674,16 @@ pub fn datapackage_to_postgres_with_options( None }; + let mut dump_writer: Option> = if !options.dump_file.is_empty() { + if options.dump_file == "-" { + Some(Box::new(std::io::stdout())) + } else { + Some(Box::new(File::create(&options.dump_file).context(WriteSnafu {filename: postgres_url})?)) + } + } else { + None + }; + for table in ordered_tables { let resource = table_to_schema.get(&table).unwrap(); @@ -1679,8 +1758,8 @@ set search_path = "{schema}"; } write!(drop_statement, "DROP TABLE IF EXISTS \"{table}\" CASCADE;").unwrap(); if let Some(client) = client.as_mut() { - if show_queries { - println!("{drop_statement}") + if let Some(dump_writer) = dump_writer.as_mut() { + writeln!(dump_writer, "{drop_statement}").context(IoSnafu {filename: &options.dump_file})?; } client .batch_execute(&drop_statement) @@ -1689,8 +1768,8 @@ set search_path = "{schema}"; } if create { - if show_queries { - println!("{resource_postgres}") + if let Some(dump_writer) = dump_writer.as_mut() { + writeln!(dump_writer, "{resource_postgres}").context(IoSnafu {filename: &options.dump_file})?; } if let Some(client) = client.as_mut() { client @@ -1717,8 +1796,8 @@ set search_path = "{schema}"; let (add_columns, alter_columns) = get_column_changes(resource, existing_columns); for (name, type_) in add_columns { if let Some(client) = client.as_mut() { - if show_queries { - println!("ALTER TABLE {schema_table} ADD COLUMN \"{name}\" {type_}") + if let Some(dump_writer) = dump_writer.as_mut() { + writeln!(dump_writer, "ALTER TABLE {schema_table} ADD COLUMN \"{name}\" {type_}").context(IoSnafu {filename: &options.dump_file})?; } client .batch_execute(&format!( @@ -1730,8 +1809,8 @@ set search_path = "{schema}"; for name in alter_columns { if let Some(client) = client.as_mut() { - if show_queries { - println!("ALTER TABLE {schema_table} ALTER COLUMN \"{name}\" TYPE TEXT") + if let Some(dump_writer) = dump_writer.as_mut() { + writeln!(dump_writer, "ALTER TABLE {schema_table} ALTER COLUMN \"{name}\" TYPE TEXT").context(IoSnafu {filename: &options.dump_file})?; } client .batch_execute(&format!( @@ -1768,12 +1847,14 @@ set search_path = "{schema}"; .to_owned(); let query = format!("copy {schema_table}({all_columns}) from STDIN WITH CSV HEADER QUOTE '{quote}' DELIMITER '{delimiter}'"); - if show_queries { - println!("copy {schema_table}({all_columns}) from '{resource_path}' WITH CSV HEADER QUOTE '{quote}' DELIMITER '{delimiter}'"); + + if let Some(dump_writer) = dump_writer.as_mut() { + let full_path = canonicalize(resource_path).context(IoSnafu {filename: resource_path})?; + let full_path = full_path.to_string_lossy(); + writeln!(dump_writer, "\\copy {schema_table}({all_columns}) from '{full_path}' WITH CSV HEADER QUOTE '{quote}' DELIMITER '{delimiter}'").context(IoSnafu {filename: &options.dump_file})?; } match csv_readers { - Readers::Zip(mut zip) => { let mut zipped_file = zip.by_name(resource_path).context(ZipSnafu { filename: &datapackage, @@ -2304,7 +2385,7 @@ mod tests { #[test] fn test_csvs_db() { - let options = Options::builder().drop(true).show_queries(true).schema("test".into()).build(); + let options = Options::builder().drop(true).dump_file("-".into()).schema("test".into()).build(); csvs_to_postgres_with_options( "postgresql://test@localhost/test".into(), @@ -2319,7 +2400,7 @@ mod tests { #[test] fn test_csvs_db_no_conn() { - let options = Options::builder().drop(true).build(); + let options = Options::builder().drop(true).dump_file("/tmp/postgres_dump.sql".into()).build(); csvs_to_postgres_with_options( "".into(), @@ -2330,6 +2411,13 @@ mod tests { options, ) .unwrap(); + + let file = File::open("/tmp/postgres_dump.sql").unwrap(); + let lines: Vec = std::io::BufReader::new(file) + .lines() + .map(|x| x.unwrap()) + .collect(); + insta::assert_yaml_snapshot!(lines); } #[test] @@ -2519,6 +2607,28 @@ mod tests { .unwrap(); } + #[test] + fn test_csvs_sqlite_no_conn() { + let options = Options::builder().drop(true).dump_file("/tmp/sqlite_dump.sql".into()).build(); + + csvs_to_sqlite_with_options( + "".into(), + vec![ + "src/fixtures/all_types.csv".into(), + "src/fixtures/all_types_semi_colon.csv".into(), + ], + options, + ) + .unwrap(); + + let file = File::open("/tmp/sqlite_dump.sql").unwrap(); + let lines: Vec = std::io::BufReader::new(file) + .lines() + .map(|x| x.unwrap()) + .collect(); + insta::assert_yaml_snapshot!(lines); + } + #[test] fn test_evolve_sqlite() { let options = Options::builder().drop(true).build();