Skip to content

Commit

Permalink
Made code refactors and error handling simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Oct 5, 2024
1 parent b42539a commit 73bc8ea
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 64 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/target
/local/postgresql/data
*.DS_Store
test_files/osopenusrn_202410.gpkg
test_files/codepo_gb.gpkg
125 changes: 70 additions & 55 deletions src/duckdb_load/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use duckdb::arrow::datatypes::Schema;
use duckdb::{Connection, Result};
use std::error::Error;
use std::fs::File;
use std::io::{self, Read};
use std::sync::Arc;

// Enum to hold file types to match on
// Enum that represents potential FileTypes
#[derive(Debug, PartialEq)]
pub enum FileType {
enum FileType {
Geopackage,
Shapefile,
Geojson,
Expand All @@ -15,8 +17,11 @@ pub enum FileType {
}

// Determine the file type that is being processed
fn determine_file_type(file_content: &[u8]) -> io::Result<FileType> {
fn determine_file_type(file_content: &[u8]) -> Result<FileType, Box<dyn Error>> {
// Gather fiule data
let header = &file_content[0..16.min(file_content.len())];

// Check for file types
if &header[0..4] == b"PK\x03\x04" {
Ok(FileType::Excel)
} else if &header[0..16] == b"SQLite format 3\0" {
Expand All @@ -26,19 +31,16 @@ fn determine_file_type(file_content: &[u8]) -> io::Result<FileType> {
} else if &header[0..4] == b"PAR1" {
Ok(FileType::Parquet)
} else if header.starts_with(b"{") {
let json_start = std::str::from_utf8(file_content).unwrap_or("");
let json_start = std::str::from_utf8(file_content)?;
if json_start.contains("\"type\":")
&& (json_start.contains("\"FeatureCollection\"") || json_start.contains("\"Feature\""))
{
Ok(FileType::Geojson)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Not a valid GeoJSON file",
))
Err("Not a valid GeoJSON file".into())
}
} else {
let file_text = std::str::from_utf8(file_content).unwrap_or("");
let file_text = std::str::from_utf8(file_content)?;
let lines: Vec<&str> = file_text.lines().collect();
if lines.len() >= 2
&& lines[0].split(',').count() > 1
Expand All @@ -47,27 +49,29 @@ fn determine_file_type(file_content: &[u8]) -> io::Result<FileType> {
{
Ok(FileType::Csv)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unknown file type",
))
Err("Unknown file type".into())
}
}
}

// Get data schema
fn query_and_print_schema(conn: &Connection) -> Result<()> {
// Get the data schema and make available for Dynamo DB ingestion in the future with Arc
fn query_and_print_schema(conn: &Connection) -> Result<Arc<Schema>> {
// Prep query
let query = "SELECT * FROM data LIMIT 10";

// Process query
let mut stmt = conn.prepare(query)?;
let arrow_result = stmt.query_arrow([])?;
// Get the schema

// Print the schema for logging
let schema = arrow_result.get_schema();
println!("Schema: {:?}", schema);
Ok(())

Ok(schema)
}

// Load to postgis
fn load_data_postgis(conn: &Connection, table_name: &str) -> Result<(), Box<dyn Error>> {
fn load_data_postgis(conn: &Connection, table_name: &str) -> Result<()> {
// Attach PostGIS database
conn.execute(
"ATTACH 'dbname=gridwalk user=admin password=password host=localhost port=5432' AS gridwalk_db (TYPE POSTGRES)",
Expand Down Expand Up @@ -111,63 +115,65 @@ fn load_data_postgis(conn: &Connection, table_name: &str) -> Result<(), Box<dyn

conn.execute(&postgis_query, [])?;

// Log if table creation in PostGIS is successful
println!(
"Table {} created and data inserted successfully",
my_table_name
);
Ok(())
}

fn get_crs_number(conn: &Connection, file_path: &str) -> Result<String, duckdb::Error> {
let query = format!(
// Get the current CRS number to compare it to the 4326 target CRS
fn get_crs_number(conn: &Connection, file_path: &str) -> Result<String> {
// Prep query
let query = &format!(
"SELECT layers[1].geometry_fields[1].crs.auth_code AS crs_number FROM st_read_meta('{}');",
file_path
);
let mut stmt = conn.prepare(&query)?;

// Run query
let mut rows = stmt.query([])?;
if let Some(row) = rows.next()? {
let crs_number: String = row.get(0)?;
Ok(crs_number)
} else {
Ok("CRS number not found".to_string())
panic!("CRS not found for the following file: {}", file_path)
}
}

fn transform_crs(
conn: &Connection,
file_path: &str,
target_crs: &str,
) -> Result<String, duckdb::Error> {
// Transform the CRS and create transformed_data table in duckdb for table for later use in PostGIS
fn transform_crs(conn: &Connection, file_path: &str, target_crs: &str) -> Result<String> {
// Get the current CRS
let current_crs = get_crs_number(conn, file_path)?;
println!("Current CRS: {}", current_crs);

// Check if the CRS is already the target CRS
// Check if the current CRS matches the target CRS
if current_crs == target_crs {
// Create the transformed_data table without transformation
// Create the transformed_data table without transformation if current == target
let create_table_query = "
CREATE TABLE transformed_data AS
SELECT
*,
ST_AsText(geom) as geom_wkt
ST_AsText(geometry) as geom_wkt
FROM data;
";
conn.execute(create_table_query, [])?;
} else {
// Create the transformed_data table with transformation
// Create the transformed_data table with transformation if current =! target
let create_table_query = format!(
"CREATE TABLE transformed_data AS
SELECT
*,
ST_AsText(ST_Transform(geom, 'EPSG:{}', 'EPSG:{}', always_xy := true)) AS geom_wkt,
ST_AsText(ST_Transform(geometry, 'EPSG:{}', 'EPSG:{}', always_xy := true)) AS geom_wkt,
FROM data;",
current_crs, target_crs
);
conn.execute(&create_table_query, [])?;
}

// Drop the original geom column
let drop_column_query = "ALTER TABLE transformed_data DROP COLUMN geom;";
let drop_column_query = "ALTER TABLE transformed_data DROP COLUMN geometry;";
conn.execute(drop_column_query, [])?;

if current_crs == target_crs {
Expand All @@ -183,18 +189,18 @@ fn transform_crs(
}
}

fn duckdb_transform(conn: &Connection, file_path: &str) -> Result<String, duckdb::Error> {
transform_crs(conn, file_path, "4326")
}

// DuckDB file loader
// Process file and call all functions
fn process_file(file_path: &str, file_type: &FileType) -> Result<()> {
// Create connection that will be used throughout processing
let conn = Connection::open_in_memory()?;

// Ensure required extensions are installed and loaded
conn.execute("INSTALL spatial;", [])?;
conn.execute("LOAD spatial;", [])?;
conn.execute("INSTALL postgres;", [])?;
conn.execute("LOAD postgres;", [])?;

// Prep table creation queries
let create_table_query = match file_type {
FileType::Geopackage | FileType::Shapefile | FileType::Geojson => {
format!(
Expand Down Expand Up @@ -224,40 +230,49 @@ fn process_file(file_path: &str, file_type: &FileType) -> Result<()> {
}
};

// Create the table in DuckDB
// Create 'data' table in DuckDB
conn.execute(&create_table_query, [])?;

// Call to query and print data schema
// Fetch schema of loaded data
query_and_print_schema(&conn)?;

// Transform
duckdb_transform(&conn, file_path)?;
// Perform Geospatial transformation and create 'transformed_data' table for later use in PostGIS
transform_crs(&conn, file_path, "4326")?;

// Call to load data into postgres and handle the result
match load_data_postgis(&conn, "lllllll") {
Ok(_) => println!("Data successfully loaded into PostgreSQL"),
Err(e) => eprintln!("Error loading data into PostgreSQL: {}", e),
}

load_data_postgis(&conn, "testing_123")?;
Ok(())
}

// Process file
pub fn launch_process_file(file_path: &str) -> io::Result<()> {
// Launch process file function - this is what you'd call in main.rs for example
pub fn launch_process_file(file_path: &str) -> Result<(), io::Error> {
// Open file
let mut file = File::open(file_path)?;

// Read file content into a bytes array
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;

let file_type = determine_file_type(&buffer)?;
println!("Detected file type: {:?}", file_type);

process_file(file_path, &file_type).map_err(|e| {
// Check the type of file
let file_type = determine_file_type(&buffer).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Error loading {:?} into DuckDB: {}", file_type, e),
format!("Error determining file type: {}", e),
)
})?;

println!("Successfully loaded {:?} into DuckDB", file_type);
Ok(())
// Print file type
println!("Detected file type: {:?}", file_type);

// Process the file
match process_file(file_path, &file_type) {
Ok(_) => {
println!("Successfully loaded {:?}", file_type);
Ok(())
}
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
format!("Error processing {:?}: {}", file_type, e),
)),
}
}
19 changes: 11 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// Example usage
use std::error::Error;

mod duckdb_load;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let file_path = "test_files/hotosm_twn_populated_places_points_geojson.geojson";
fn main() -> Result<(), Box<dyn Error>> {
let file_path = "test_files/GLA_High_Street_boundaries.gpkg";
println!("Processing file: {}", file_path);

match duckdb_load::launch_process_file(file_path) {
Ok(_) => Ok(()),
Err(e) => {
eprintln!("Error processing file: {}", e);
Err(Box::new(e))
}
if let Err(e) = duckdb_load::launch_process_file(file_path) {
eprintln!("Error processing file: {}", e);
return Err(e.into());
}

println!("File processed successfully");
Ok(())
}
2 changes: 1 addition & 1 deletion test_files/invalid.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
test file
test file test file test file

0 comments on commit 73bc8ea

Please sign in to comment.