diff --git a/Cargo.toml b/Cargo.toml index f2aaff8..205be7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pg-embed" -version = "0.7.1" +version = "0.7.2" license = "MIT/Apache-2.0" readme = "README.md" repository = "https://github.com/faokunega/pg-embed" @@ -17,23 +17,23 @@ default = ["rt_tokio_migrate"] rt_tokio = ["tokio", "reqwest"] rt_tokio_migrate = ["tokio", "reqwest", "sqlx_tokio"] # please don't select the following features -rt_async_std = ["async-std", "surf"] +rt_async_std = ["async-std"] rt_actix = ["tokio", "reqwest"] -rt_async_std_migrate = ["async-std", "surf", "sqlx_async_std"] +rt_async_std_migrate = ["async-std", "sqlx_async_std"] rt_actix_migrate = ["tokio", "reqwest", "sqlx_actix"] [dependencies] reqwest = { version = "0.11", optional = true } -surf = { version = "2.2", optional = true } tokio = { version = "1", features = ["full"], optional = true } async-std = { version = "1.9.0", features = ["attributes"], optional = true } futures = "0.3" thiserror = "1.0" -archiver-rs = "0.5" +# Waiting for https://github.com/JoyMoe/archiver-rs/pull/6 +archiver-rs = { git = "https://github.com/gz/archiver-rs.git", branch = "patch-1" } sqlx_tokio = { version = "0.6", features = ["runtime-tokio-rustls", "postgres", "migrate"], package = "sqlx", optional = true } sqlx_async_std = { version = "0.6", features = ["runtime-async-std-rustls", "postgres", "migrate"], package = "sqlx", optional = true } sqlx_actix = { version = "0.6", features = ["runtime-actix-rustls", "postgres", "migrate"], package = "sqlx", optional = true } -zip = "0.5.11" +zip = "0.6" log = "0.4" dirs = "3.0" bytes = "1.0" @@ -42,7 +42,7 @@ async-trait = "0.1" [dev-dependencies] serial_test = "0.5" -env_logger = "0.8" +env_logger = "0.10" [[test]] name = "migration_tokio" diff --git a/src/command_executor.rs b/src/command_executor.rs index f7ad548..0fde0be 100644 --- a/src/command_executor.rs +++ b/src/command_executor.rs @@ -86,7 +86,7 @@ where Self: Send, { /// Process command - command: tokio::process::Command, + _command: tokio::process::Command, /// Process child process: Child, /// Process type @@ -205,10 +205,10 @@ where A: IntoIterator, B: AsRef, { - let mut command = Self::generate_command(executable_path, args); - let process = Self::init(&mut command, &process_type)?; + let mut _command = Self::generate_command(executable_path, args); + let process = Self::init(&mut _command, &process_type)?; Ok(AsyncCommandExecutor { - command, + _command, process, process_type, _marker_s: Default::default(), diff --git a/src/pg_access.rs b/src/pg_access.rs index c452932..972da2b 100644 --- a/src/pg_access.rs +++ b/src/pg_access.rs @@ -10,12 +10,13 @@ use std::sync::Arc; use futures::TryFutureExt; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; -use tokio::time::{interval, Duration}; use crate::pg_enums::{OperationSystem, PgAcquisitionStatus}; use crate::pg_errors::{PgEmbedError, PgEmbedErrorType}; -use crate::pg_fetch::PgFetchSettings; +use crate::pg_fetch::{PgFetchSettings}; use crate::pg_types::{PgCommandSync, PgResult}; +use crate::pg_unpack; + lazy_static! { /// @@ -50,6 +51,8 @@ pub struct PgAccess { /// Postgresql database version file /// used for internal checks pg_version_file: PathBuf, + /// Fetch settings + fetch_settings: PgFetchSettings, } impl PgAccess { @@ -91,6 +94,7 @@ impl PgAccess { pw_file_path: pw_file, zip_file_path, pg_version_file, + fetch_settings: fetch_settings.clone(), }) } @@ -142,6 +146,29 @@ impl PgAccess { .await } + /// + /// Download and unpack postgres binaries + /// + pub async fn maybe_acquire_postgres(&self) -> PgResult<()> { + let mut lock = ACQUIRED_PG_BINS.lock().await; + + if self.pg_executables_cached().await? { + return Ok(()); + } + + lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::InProgress); + let pg_bin_data = self.fetch_settings.fetch_postgres().await?; + self.write_pg_zip(&pg_bin_data).await?; + log::debug!("Unpacking postgres binaries {} {}", self.zip_file_path.display(), self.cache_dir.display()); + pg_unpack::unpack_postgres(&self.zip_file_path, &self.cache_dir) + .await?; + + lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::Finished); + Ok(()) + + } + + /// /// Check if postgresql executables are already cached /// @@ -153,7 +180,7 @@ impl PgAccess { /// Check if database files exist /// pub async fn db_files_exist(&self) -> PgResult { - Self::path_exists(self.pg_version_file.as_path()).await + Ok(self.pg_executables_cached().await? && Self::path_exists(self.pg_version_file.as_path()).await?) } /// @@ -181,30 +208,6 @@ impl PgAccess { } } - /// - /// Mark postgresql binaries acquisition in progress - /// - /// Used while acquiring postgresql binaries, so that no two instances - /// of PgEmbed try to acquire the same resources - /// - pub async fn mark_acquisition_in_progress(&self) -> PgResult<()> { - let mut lock = ACQUIRED_PG_BINS.lock().await; - lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::InProgress); - Ok(()) - } - - /// - /// Mark postgresql binaries acquisition finished - /// - /// Used when acquiring postgresql has finished, so that other instances - /// of PgEmbed don't try to reacquire resources - /// - pub async fn mark_acquisition_finished(&self) -> PgResult<()> { - let mut lock = ACQUIRED_PG_BINS.lock().await; - lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::Finished); - Ok(()) - } - /// /// Check postgresql acquisition status /// @@ -217,31 +220,10 @@ impl PgAccess { } } - /// - /// Determine if postgresql binaries acquisition is needed - /// - pub async fn acquisition_needed(&self) -> PgResult { - if !self.pg_executables_cached().await? { - match self.acquisition_status().await { - PgAcquisitionStatus::InProgress => { - let mut interval = interval(Duration::from_millis(100)); - while self.acquisition_status().await == PgAcquisitionStatus::InProgress { - interval.tick().await; - } - Ok(false) - } - PgAcquisitionStatus::Finished => Ok(false), - PgAcquisitionStatus::Undefined => Ok(true), - } - } else { - Ok(false) - } - } - /// /// Write pg binaries zip to postgresql cache directory /// - pub async fn write_pg_zip(&self, bytes: &[u8]) -> PgResult<()> { + async fn write_pg_zip(&self, bytes: &[u8]) -> PgResult<()> { let mut file: tokio::fs::File = tokio::fs::File::create(&self.zip_file_path.as_path()) .map_err(|e| PgEmbedError { error_type: PgEmbedErrorType::WriteFileError, @@ -256,6 +238,12 @@ impl PgAccess { message: None, }) .await?; + file.sync_data() + .map_err(|e| PgEmbedError { + error_type: PgEmbedErrorType::WriteFileError, + source: Some(Box::new(e)), + message: None, + }).await?; Ok(()) } diff --git a/src/pg_commands.rs b/src/pg_commands.rs index 6158ac3..e3c0243 100644 --- a/src/pg_commands.rs +++ b/src/pg_commands.rs @@ -38,6 +38,13 @@ impl PgCommand { auth_host, "-U", user, + // The postgres-tokio driver uses utf8 encoding, however on windows + // if -E is not specified WIN1252 encoding is chosen by default + // which can lead to encoding errors like this: + // + // ERROR: character with byte sequence 0xe0 0xab 0x87 in encoding + // "UTF8" has no equivalent in encoding "WIN1252" + "-E=UTF8", "-D", database_dir.to_str().unwrap(), &password_file_arg, diff --git a/src/pg_enums.rs b/src/pg_enums.rs index bee16b1..a480d62 100644 --- a/src/pg_enums.rs +++ b/src/pg_enums.rs @@ -120,7 +120,7 @@ impl ToString for PgProcessType { } /// The operation systems enum -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Copy, Clone)] pub enum OperationSystem { Darwin, Windows, @@ -159,7 +159,7 @@ impl Default for OperationSystem { } /// The cpu architectures enum -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Copy, Clone)] pub enum Architecture { Amd64, I386, diff --git a/src/pg_fetch.rs b/src/pg_fetch.rs index ec95301..2de515c 100644 --- a/src/pg_fetch.rs +++ b/src/pg_fetch.rs @@ -13,9 +13,10 @@ use crate::pg_errors::{PgEmbedError, PgEmbedErrorType}; use crate::pg_types::PgResult; /// Postgresql version struct (simple version wrapper) +#[derive(Debug, Copy, Clone)] pub struct PostgresVersion(pub &'static str); /// Latest postgres version 15 -pub const PG_V15: PostgresVersion = PostgresVersion("15.1.0"); +pub const PG_V15: PostgresVersion = PostgresVersion("15.2.0"); /// Latest postgres version 14 pub const PG_V14: PostgresVersion = PostgresVersion("14.6.0"); /// Latest postgres version 13 @@ -30,6 +31,7 @@ pub const PG_V10: PostgresVersion = PostgresVersion("10.23.0"); pub const PG_V9: PostgresVersion = PostgresVersion("9.6.24"); /// Settings that determine the postgres binary to be fetched +#[derive(Debug, Clone)] pub struct PgFetchSettings { /// The repository host pub host: String, @@ -69,7 +71,7 @@ impl PgFetchSettings { /// /// Returns the data of the downloaded binary in an `Ok([u8])` on success, otherwise returns an error. /// - pub async fn fetch_postgres(&self) -> PgResult> { + pub async fn fetch_postgres(&self) -> PgResult { let platform = &self.platform(); let version = self.version.0; let download_url = format!( @@ -96,6 +98,9 @@ impl PgFetchSettings { }) .await?; - Ok(Box::new(content)) + log::debug!("Downloaded {} bytes", content.len()); + log::trace!("First 1024 bytes: {:?}", &String::from_utf8_lossy(&content[..1024])); + + Ok(content) } } diff --git a/src/pg_unpack.rs b/src/pg_unpack.rs index 600ecac..218cf39 100644 --- a/src/pg_unpack.rs +++ b/src/pg_unpack.rs @@ -18,7 +18,7 @@ fn unzip_txz(zip_file_path: &PathBuf, cache_dir: &PathBuf) -> Result PgResult<()> { - if self.pg_access.acquisition_needed().await? { - self.acquire_postgres().await?; - } + self.pg_access.maybe_acquire_postgres().await?; self.pg_access .create_password_file(self.pg_settings.password.as_bytes()) .await?; @@ -127,24 +125,11 @@ impl PgEmbed { let mut server_status = self.server_status.lock().await; *server_status = PgServerStatus::Initialized; } else { - &self.init_db().await?; + let _r = &self.init_db().await?; } Ok(()) } - /// - /// Download and unpack postgres binaries - /// - pub async fn acquire_postgres(&self) -> PgResult<()> { - self.pg_access.mark_acquisition_in_progress().await?; - let pg_bin_data = &self.fetch_settings.fetch_postgres().await?; - self.pg_access.write_pg_zip(&pg_bin_data).await?; - pg_unpack::unpack_postgres(&self.pg_access.zip_file_path, &self.pg_access.cache_dir) - .await?; - self.pg_access.mark_acquisition_finished().await?; - Ok(()) - } - /// /// Initialize postgresql database /// diff --git a/tests/common.rs b/tests/common.rs index 9579237..fc34bad 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -5,7 +5,7 @@ use env_logger::Env; use pg_embed::pg_enums::PgAuthMethod; use pg_embed::pg_errors::PgEmbedError; -use pg_embed::pg_fetch::{PgFetchSettings, PG_V13}; +use pg_embed::pg_fetch::{PgFetchSettings, PG_V15}; use pg_embed::postgres::{PgEmbed, PgSettings}; pub async fn setup( @@ -28,7 +28,7 @@ pub async fn setup( migration_dir, }; let fetch_settings = PgFetchSettings { - version: PG_V13, + version: PG_V15, ..Default::default() }; let mut pg = PgEmbed::new(pg_settings, fetch_settings).await?; diff --git a/tests/postgres_tokio.rs b/tests/postgres_tokio.rs index b9cca56..fa0b02a 100644 --- a/tests/postgres_tokio.rs +++ b/tests/postgres_tokio.rs @@ -7,8 +7,8 @@ use tokio::sync::Mutex; use env_logger::Env; use pg_embed::pg_access::PgAccess; use pg_embed::pg_enums::{PgAuthMethod, PgServerStatus}; -use pg_embed::pg_errors::{PgEmbedError, PgEmbedErrorType}; -use pg_embed::pg_fetch::{PgFetchSettings, PG_V13}; +use pg_embed::pg_errors::{PgEmbedError}; +use pg_embed::pg_fetch::{PgFetchSettings, PG_V15}; use pg_embed::postgres::{PgEmbed, PgSettings}; use std::time::Duration; @@ -144,7 +144,7 @@ async fn postgres_server_timeout() -> Result<(), PgEmbedError> { let _ = env_logger::Builder::from_env(Env::default().default_filter_or("info")) .is_test(true) .try_init(); - let mut pg_settings = PgSettings { + let pg_settings = PgSettings { database_dir, port: 5432, user: "postgres".to_string(), @@ -155,7 +155,7 @@ async fn postgres_server_timeout() -> Result<(), PgEmbedError> { migration_dir: None, }; let fetch_settings = PgFetchSettings { - version: PG_V13, + version: PG_V15, ..Default::default() }; let mut pg = PgEmbed::new(pg_settings, fetch_settings).await?;