Skip to content

Commit

Permalink
feat(n): improved file processing. improved file byte matching. added…
Browse files Browse the repository at this point in the history
… non geosptatial route. [2024-12-16]
  • Loading branch information
CHRISCARLON committed Dec 16, 2024
1 parent 97a05bc commit 0d926a5
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 46 deletions.
189 changes: 146 additions & 43 deletions src/duckdb_load/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use duckdb::arrow::datatypes::Schema;
use duckdb::Connection;
use std::error::Error;
use std::fs::File;
use std::io::{self, Read};
use std::io::{self, Read, Seek};
use std::sync::Arc;

// Enum that represents potential FileTypes
Expand Down Expand Up @@ -61,63 +61,113 @@ impl DuckDBFileProcessor {
}

fn process_new_file(&self) -> Result<(), Box<dyn Error>> {
// Call all the required methods
// Call initial methods
self.create_data_table()?;
self.query_and_print_schema()?;

// Transform geometry columns and store the result
let geom_columns = self.transform_geom_columns()?;

// Pass the geometry columns to load_data_postgis
self.load_data_postgis(&geom_columns)?;


// First, check if we have any geometry columns
let query = "SELECT column_name FROM information_schema.columns
WHERE table_name = 'data' AND data_type = 'GEOMETRY'";
let mut stmt = self.conn.prepare(query)?;
let mut rows = stmt.query([])?;

// If we find any geometry columns, process them specially
if rows.next()?.is_some() {
// Transform geometry columns and store the result
let geom_columns = self.transform_geom_columns()?;
// Pass the geometry columns to load_data_postgis
self.load_data_postgis(&geom_columns)?;
} else {
// No geometry columns - do a simple table copy
self.load_non_geo_data()?;
}

Ok(())
}

fn determine_file_type(file_path: &str) -> Result<FileType, Box<dyn Error>> {
// Open file and read into buffer
// Open file and read first 100 bytes for magic number detection
let mut file = File::open(file_path)?;
let mut header_buffer = [0u8; 100];
let bytes_read = file.read(&mut header_buffer)?;
let header = &header_buffer[..bytes_read];

// First try magic number detection
if let Some(file_type) = Self::match_magic_numbers(header) {
return Ok(file_type);
}

// If magic numbers don't match, perform content-based detection
let mut buffer = Vec::new();
file.seek(std::io::SeekFrom::Start(0))?;
file.read_to_end(&mut buffer)?;
Self::detect_content_based_type(&buffer)
}

// Read in header of file
let header = &buffer[0..16.min(buffer.len())];

// Check for FileType
fn match_magic_numbers(header: &[u8]) -> Option<FileType> {
match header {
b"PK\x03\x04" => Ok(FileType::Excel),
b"SQLite format 3\0" => Ok(FileType::Geopackage),
[0, 0, 39, 10, ..] => Ok(FileType::Shapefile),
b"PAR1" => Ok(FileType::Parquet),
_ if header.starts_with(b"{") => {
let json_start = std::str::from_utf8(&buffer)?;
if json_start.contains("\"type\":")
&& (json_start.contains("\"FeatureCollection\"")
|| json_start.contains("\"Feature\""))
{
Ok(FileType::Geojson)
} else {
Err("Not a valid GeoJSON file".into())
}
// Excel (XLSX) - PKZip signature
[0x50, 0x4B, 0x03, 0x04, ..] => Some(FileType::Excel),

// Excel (XLS)
[0xD0, 0xCF, 0x11, 0xE0, 0xA1, 0xB1, 0x1A, 0xE1, ..] => Some(FileType::Excel),

// Parquet
[0x50, 0x41, 0x52, 0x31, ..] => Some(FileType::Parquet),

// Geopackage (SQLite)
[0x53, 0x51, 0x4C, 0x69, 0x74, 0x65, 0x20, 0x66, 0x6F, 0x72, 0x6D, 0x61, 0x74, 0x20, 0x33, 0x00, ..] => {
Some(FileType::Geopackage)
}
_ => {
let file_text = std::str::from_utf8(&buffer)?;
let lines: Vec<&str> = file_text.lines().collect();
if lines.len() >= 2
&& lines[0].split(',').count() > 1
&& lines[1].split(',').count() == lines[0].split(',').count()
&& file_text.is_ascii()
{
Ok(FileType::Csv)
} else {
Err("Unknown file type".into())
}

// Shapefile
[0x00, 0x00, 0x27, 0x0A, ..] => Some(FileType::Shapefile),

_ => None,
}
}

fn detect_content_based_type(buffer: &[u8]) -> Result<FileType, Box<dyn Error>> {
// Try GeoJSON first
if let Ok(text) = std::str::from_utf8(buffer) {
let text_lower = text.trim_start().to_lowercase();

if text_lower.starts_with("{")
&& text_lower.contains("\"type\"")
&& (text_lower.contains("\"featurecollection\"")
|| text_lower.contains("\"feature\"")
|| text_lower.contains("\"geometry\"")) {
return Ok(FileType::Geojson);
}

// Check for CSV last
if Self::is_valid_csv(text) {
return Ok(FileType::Csv);
}
}

Err("Unknown or unsupported file type".into())
}

fn is_valid_csv(content: &str) -> bool {
let lines: Vec<&str> = content.lines().take(5).collect();

if lines.len() < 2 {
return false;
}

let first_line_fields = lines[0].split(',').count();
// Require at least 2 columns and check for consistency
first_line_fields >= 2
&& lines[1..].iter().all(|line| {
let fields = line.split(',').count();
fields == first_line_fields
&& line.chars().all(|c| c.is_ascii() || c.is_whitespace())
})
}

fn create_data_table(&self) -> Result<(), Box<dyn Error>> {
// Create initial 'data' table
// Create initial data table
let query = match self.file_type {
FileType::Geopackage | FileType::Shapefile | FileType::Geojson => {
format!(
Expand Down Expand Up @@ -182,6 +232,9 @@ impl DuckDBFileProcessor {
}

fn transform_geom_columns(&self) -> Result<Vec<String>, Box<dyn Error>> {

// Identify and select all geom columns
// Put in vector and loop through it to process each
let query = "SELECT column_name FROM information_schema.columns WHERE table_name = 'data' AND data_type = 'GEOMETRY'";
let mut stmt = self.conn.prepare(query)?;
let mut rows = stmt.query([])?;
Expand All @@ -192,9 +245,8 @@ impl DuckDBFileProcessor {
geom_columns.push(column_name);
}

println!("Geometry columns: {:?}", &geom_columns);

// Call transform_crs for each geometry column
println!("Geometry columns: {:?}", &geom_columns);
let target_crs = "4326";
for column in &geom_columns {
self.transform_crs(column, target_crs)?;
Expand All @@ -204,9 +256,11 @@ impl DuckDBFileProcessor {
}

fn transform_crs(&self, geom_column: &str, target_crs: &str) -> Result<String, Box<dyn Error>> {
// Get current CRS
let current_crs = self.get_crs_number()?;
println!("Current CRS for column {}: {}", geom_column, current_crs);

// Transform CRS if no match on target crs
let create_table_query = if current_crs == target_crs {
format!(
"CREATE TABLE transformed_data AS SELECT *,
Expand Down Expand Up @@ -244,6 +298,7 @@ impl DuckDBFileProcessor {

fn load_data_postgis(&self, geom_columns: &[String]) -> Result<(), Box<dyn Error>> {
// Attach Postgres DB instance
println!("LOADING GEOSPATIAL DATA");
self.conn.execute(
&format!(
"ATTACH '{}' AS gridwalk_db (TYPE POSTGRES)",
Expand Down Expand Up @@ -312,6 +367,54 @@ impl DuckDBFileProcessor {
);
Ok(())
}

fn load_non_geo_data(&self) -> Result<(), Box<dyn Error>> {
// Attach Postgres DB instance
println!("LOADING NON GEOSPATIAL DATA");
self.conn.execute(
&format!(
"ATTACH '{}' AS gridwalk_db (TYPE POSTGRES)",
self.postgis_uri
),
[],
)?;

// Create schema if it doesn't exist
let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS \"{}\";", self.schema_name);
self.conn.execute(
&format!(
"CALL postgres_execute('gridwalk_db', '{}');",
create_schema_sql.replace('\'', "''")
),
[],
)?;

// Schema qualified table name
let schema_qualified_table = format!("\"{}\".\"{}\"", self.schema_name, self.table_name);

// Drop existing table if it exists
let drop_table_sql = format!("DROP TABLE IF EXISTS {};", schema_qualified_table);
self.conn.execute(
&format!(
"CALL postgres_execute('gridwalk_db', '{}');",
drop_table_sql.replace('\'', "''")
),
[],
)?;

// Create data in table directly from 'data' table (no transformation needed)
let create_table_query = &format!(
"CREATE TABLE gridwalk_db.{} AS SELECT * FROM data;",
schema_qualified_table
);
self.conn.execute(create_table_query, [])?;

println!(
"Table {} created and data inserted successfully (no geometry columns)",
self.table_name
);
Ok(())
}
}

pub fn launch_process_file(
Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use duckdb_load::launch_process_file;

fn main() -> Result<(), Box<dyn std::error::Error>> {
launch_process_file(
"test_files/hotosm_twn_populated_places_points_geojson.geojson",
"test-table",
"test_files/December HMO public register.xlsx",
"test-table-hmo",
"postgresql://admin:password@localhost:5432/gridwalk",
"test-schema",
"test-schema-hmo",
)?;
Ok(())
}

0 comments on commit 0d926a5

Please sign in to comment.