From 0d926a56811ed2b03e2f088d483dfbe1bb4b4dba Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Mon, 16 Dec 2024 15:25:08 +0000 Subject: [PATCH] feat(n): improved file processing. improved file byte matching. added non geosptatial route. [2024-12-16] --- src/duckdb_load/mod.rs | 189 +++++++++++++++++++++++++++++++---------- src/main.rs | 6 +- 2 files changed, 149 insertions(+), 46 deletions(-) diff --git a/src/duckdb_load/mod.rs b/src/duckdb_load/mod.rs index 23d5cbf..ff0a677 100644 --- a/src/duckdb_load/mod.rs +++ b/src/duckdb_load/mod.rs @@ -2,7 +2,7 @@ use duckdb::arrow::datatypes::Schema; use duckdb::Connection; use std::error::Error; use std::fs::File; -use std::io::{self, Read}; +use std::io::{self, Read, Seek}; use std::sync::Arc; // Enum that represents potential FileTypes @@ -61,63 +61,113 @@ impl DuckDBFileProcessor { } fn process_new_file(&self) -> Result<(), Box> { - // Call all the required methods + // Call initial methods self.create_data_table()?; self.query_and_print_schema()?; - - // Transform geometry columns and store the result - let geom_columns = self.transform_geom_columns()?; - - // Pass the geometry columns to load_data_postgis - self.load_data_postgis(&geom_columns)?; - + + // First, check if we have any geometry columns + let query = "SELECT column_name FROM information_schema.columns + WHERE table_name = 'data' AND data_type = 'GEOMETRY'"; + let mut stmt = self.conn.prepare(query)?; + let mut rows = stmt.query([])?; + + // If we find any geometry columns, process them specially + if rows.next()?.is_some() { + // Transform geometry columns and store the result + let geom_columns = self.transform_geom_columns()?; + // Pass the geometry columns to load_data_postgis + self.load_data_postgis(&geom_columns)?; + } else { + // No geometry columns - do a simple table copy + self.load_non_geo_data()?; + } + Ok(()) } fn determine_file_type(file_path: &str) -> Result> { - // Open file and read into buffer + // Open file and read first 100 bytes for magic number detection let mut file = File::open(file_path)?; + let mut header_buffer = [0u8; 100]; + let bytes_read = file.read(&mut header_buffer)?; + let header = &header_buffer[..bytes_read]; + + // First try magic number detection + if let Some(file_type) = Self::match_magic_numbers(header) { + return Ok(file_type); + } + + // If magic numbers don't match, perform content-based detection let mut buffer = Vec::new(); + file.seek(std::io::SeekFrom::Start(0))?; file.read_to_end(&mut buffer)?; + Self::detect_content_based_type(&buffer) + } - // Read in header of file - let header = &buffer[0..16.min(buffer.len())]; - - // Check for FileType + fn match_magic_numbers(header: &[u8]) -> Option { match header { - b"PK\x03\x04" => Ok(FileType::Excel), - b"SQLite format 3\0" => Ok(FileType::Geopackage), - [0, 0, 39, 10, ..] => Ok(FileType::Shapefile), - b"PAR1" => Ok(FileType::Parquet), - _ if header.starts_with(b"{") => { - let json_start = std::str::from_utf8(&buffer)?; - if json_start.contains("\"type\":") - && (json_start.contains("\"FeatureCollection\"") - || json_start.contains("\"Feature\"")) - { - Ok(FileType::Geojson) - } else { - Err("Not a valid GeoJSON file".into()) - } + // Excel (XLSX) - PKZip signature + [0x50, 0x4B, 0x03, 0x04, ..] => Some(FileType::Excel), + + // Excel (XLS) + [0xD0, 0xCF, 0x11, 0xE0, 0xA1, 0xB1, 0x1A, 0xE1, ..] => Some(FileType::Excel), + + // Parquet + [0x50, 0x41, 0x52, 0x31, ..] => Some(FileType::Parquet), + + // Geopackage (SQLite) + [0x53, 0x51, 0x4C, 0x69, 0x74, 0x65, 0x20, 0x66, 0x6F, 0x72, 0x6D, 0x61, 0x74, 0x20, 0x33, 0x00, ..] => { + Some(FileType::Geopackage) } - _ => { - let file_text = std::str::from_utf8(&buffer)?; - let lines: Vec<&str> = file_text.lines().collect(); - if lines.len() >= 2 - && lines[0].split(',').count() > 1 - && lines[1].split(',').count() == lines[0].split(',').count() - && file_text.is_ascii() - { - Ok(FileType::Csv) - } else { - Err("Unknown file type".into()) - } + + // Shapefile + [0x00, 0x00, 0x27, 0x0A, ..] => Some(FileType::Shapefile), + + _ => None, + } + } + + fn detect_content_based_type(buffer: &[u8]) -> Result> { + // Try GeoJSON first + if let Ok(text) = std::str::from_utf8(buffer) { + let text_lower = text.trim_start().to_lowercase(); + + if text_lower.starts_with("{") + && text_lower.contains("\"type\"") + && (text_lower.contains("\"featurecollection\"") + || text_lower.contains("\"feature\"") + || text_lower.contains("\"geometry\"")) { + return Ok(FileType::Geojson); } + + // Check for CSV last + if Self::is_valid_csv(text) { + return Ok(FileType::Csv); + } + } + + Err("Unknown or unsupported file type".into()) + } + + fn is_valid_csv(content: &str) -> bool { + let lines: Vec<&str> = content.lines().take(5).collect(); + + if lines.len() < 2 { + return false; } + + let first_line_fields = lines[0].split(',').count(); + // Require at least 2 columns and check for consistency + first_line_fields >= 2 + && lines[1..].iter().all(|line| { + let fields = line.split(',').count(); + fields == first_line_fields + && line.chars().all(|c| c.is_ascii() || c.is_whitespace()) + }) } fn create_data_table(&self) -> Result<(), Box> { - // Create initial 'data' table + // Create initial data table let query = match self.file_type { FileType::Geopackage | FileType::Shapefile | FileType::Geojson => { format!( @@ -182,6 +232,9 @@ impl DuckDBFileProcessor { } fn transform_geom_columns(&self) -> Result, Box> { + + // Identify and select all geom columns + // Put in vector and loop through it to process each let query = "SELECT column_name FROM information_schema.columns WHERE table_name = 'data' AND data_type = 'GEOMETRY'"; let mut stmt = self.conn.prepare(query)?; let mut rows = stmt.query([])?; @@ -192,9 +245,8 @@ impl DuckDBFileProcessor { geom_columns.push(column_name); } - println!("Geometry columns: {:?}", &geom_columns); - // Call transform_crs for each geometry column + println!("Geometry columns: {:?}", &geom_columns); let target_crs = "4326"; for column in &geom_columns { self.transform_crs(column, target_crs)?; @@ -204,9 +256,11 @@ impl DuckDBFileProcessor { } fn transform_crs(&self, geom_column: &str, target_crs: &str) -> Result> { + // Get current CRS let current_crs = self.get_crs_number()?; println!("Current CRS for column {}: {}", geom_column, current_crs); + // Transform CRS if no match on target crs let create_table_query = if current_crs == target_crs { format!( "CREATE TABLE transformed_data AS SELECT *, @@ -244,6 +298,7 @@ impl DuckDBFileProcessor { fn load_data_postgis(&self, geom_columns: &[String]) -> Result<(), Box> { // Attach Postgres DB instance + println!("LOADING GEOSPATIAL DATA"); self.conn.execute( &format!( "ATTACH '{}' AS gridwalk_db (TYPE POSTGRES)", @@ -312,6 +367,54 @@ impl DuckDBFileProcessor { ); Ok(()) } + + fn load_non_geo_data(&self) -> Result<(), Box> { + // Attach Postgres DB instance + println!("LOADING NON GEOSPATIAL DATA"); + self.conn.execute( + &format!( + "ATTACH '{}' AS gridwalk_db (TYPE POSTGRES)", + self.postgis_uri + ), + [], + )?; + + // Create schema if it doesn't exist + let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS \"{}\";", self.schema_name); + self.conn.execute( + &format!( + "CALL postgres_execute('gridwalk_db', '{}');", + create_schema_sql.replace('\'', "''") + ), + [], + )?; + + // Schema qualified table name + let schema_qualified_table = format!("\"{}\".\"{}\"", self.schema_name, self.table_name); + + // Drop existing table if it exists + let drop_table_sql = format!("DROP TABLE IF EXISTS {};", schema_qualified_table); + self.conn.execute( + &format!( + "CALL postgres_execute('gridwalk_db', '{}');", + drop_table_sql.replace('\'', "''") + ), + [], + )?; + + // Create data in table directly from 'data' table (no transformation needed) + let create_table_query = &format!( + "CREATE TABLE gridwalk_db.{} AS SELECT * FROM data;", + schema_qualified_table + ); + self.conn.execute(create_table_query, [])?; + + println!( + "Table {} created and data inserted successfully (no geometry columns)", + self.table_name + ); + Ok(()) + } } pub fn launch_process_file( diff --git a/src/main.rs b/src/main.rs index f64340f..5dec034 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,10 +4,10 @@ use duckdb_load::launch_process_file; fn main() -> Result<(), Box> { launch_process_file( - "test_files/hotosm_twn_populated_places_points_geojson.geojson", - "test-table", + "test_files/December HMO public register.xlsx", + "test-table-hmo", "postgresql://admin:password@localhost:5432/gridwalk", - "test-schema", + "test-schema-hmo", )?; Ok(()) }