From ee1903922481b0a720ecef9de9b1ba91f7bd2bd8 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 21 Dec 2022 07:49:49 -0800 Subject: [PATCH] test(python): make sure integration tests wait for services to start (#979) # Description This is intended to fix the issue where the Python CI job is frequently failing. I believe it's because the Docker services haven't fully started up before the tests start running. To address this, I added a function to wait for the services to be responsive. # Related Issue(s) # Documentation --- docker-compose.yml | 2 +- python/pyproject.toml | 3 ++- python/tests/conftest.py | 19 ++++++++++++++++++- python/tests/test_fs.py | 16 ++++++++-------- rust/src/storage/utils.rs | 3 +++ rust/src/test_utils.rs | 6 +++--- 6 files changed, 35 insertions(+), 14 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 6f78fa444c..444f0edc15 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.9" services: localstack: - image: localstack/localstack:0.14.4 + image: localstack/localstack:0.14 ports: - 4566:4566 - 8080:8080 diff --git a/python/pyproject.toml b/python/pyproject.toml index 5d4be70441..662cfeddc4 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -38,7 +38,8 @@ devel = [ ] pyspark = [ "pyspark", - "delta-spark" + "delta-spark", + "numpy==1.22.2" # pyspark is no compatible with latest numpy ] [project.urls] diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 92d4c9b43f..b5dd65c5cf 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -3,6 +3,7 @@ import subprocess from datetime import date, datetime, timedelta from decimal import Decimal +from time import sleep import pyarrow as pa import pytest @@ -10,6 +11,20 @@ from deltalake import DeltaTable, write_deltalake +def wait_till_host_is_available(host: str, timeout_sec: int = 30): + spacing = 2 + attempts = timeout_sec / spacing + while True: + try: + subprocess.run(["curl", host], timeout=500, check=True) + except: + pass + else: + break + + sleep(spacing) + + @pytest.fixture(scope="session") def s3_localstack_creds(): endpoint_url = "http://localhost:4566" @@ -46,6 +61,8 @@ def s3_localstack_creds(): ], ] + wait_till_host_is_available(endpoint_url) + try: for args in setup_commands: subprocess.run(args, env=env) @@ -109,7 +126,7 @@ def azurite_creds(): f"AccountKey={config['AZURE_STORAGE_ACCOUNT_KEY']};" f"BlobEndpoint={endpoint_url};" ) - + wait_till_host_is_available(endpoint_url) try: subprocess.run( [ diff --git a/python/tests/test_fs.py b/python/tests/test_fs.py index 62cdcb8b01..17e6125da4 100644 --- a/python/tests/test_fs.py +++ b/python/tests/test_fs.py @@ -12,7 +12,7 @@ @pytest.mark.s3 @pytest.mark.integration -@pytest.mark.timeout(timeout=5, method="thread") +@pytest.mark.timeout(timeout=15, method="thread") def test_read_files(s3_localstack): table_path = "s3://deltars/simple" handler = DeltaStorageHandler(table_path) @@ -29,7 +29,7 @@ def test_read_files(s3_localstack): @pytest.mark.s3 @pytest.mark.integration -@pytest.mark.timeout(timeout=4, method="thread") +@pytest.mark.timeout(timeout=15, method="thread") def test_s3_authenticated_read_write(s3_localstack_creds): # Create unauthenticated handler storage_handler = DeltaStorageHandler( @@ -54,7 +54,7 @@ def test_s3_authenticated_read_write(s3_localstack_creds): @pytest.mark.s3 @pytest.mark.integration -@pytest.mark.timeout(timeout=5, method="thread") +@pytest.mark.timeout(timeout=15, method="thread") def test_read_simple_table_from_remote(s3_localstack): table_path = "s3://deltars/simple" dt = DeltaTable(table_path) @@ -63,7 +63,7 @@ def test_read_simple_table_from_remote(s3_localstack): @pytest.mark.s3 @pytest.mark.integration -@pytest.mark.timeout(timeout=5, method="thread") +@pytest.mark.timeout(timeout=15, method="thread") def test_roundtrip_s3_env(s3_localstack, sample_data: pa.Table, monkeypatch): table_path = "s3://deltars/roundtrip" @@ -91,7 +91,7 @@ def test_roundtrip_s3_env(s3_localstack, sample_data: pa.Table, monkeypatch): @pytest.mark.s3 @pytest.mark.integration -@pytest.mark.timeout(timeout=5, method="thread") +@pytest.mark.timeout(timeout=15, method="thread") def test_roundtrip_s3_direct(s3_localstack_creds, sample_data: pa.Table): table_path = "s3://deltars/roundtrip2" @@ -146,7 +146,7 @@ def test_roundtrip_s3_direct(s3_localstack_creds, sample_data: pa.Table): @pytest.mark.azure @pytest.mark.integration -@pytest.mark.timeout(timeout=5, method="thread") +@pytest.mark.timeout(timeout=15, method="thread") def test_roundtrip_azure_env(azurite_env_vars, sample_data: pa.Table): table_path = "az://deltars/roundtrip" @@ -168,7 +168,7 @@ def test_roundtrip_azure_env(azurite_env_vars, sample_data: pa.Table): @pytest.mark.azure @pytest.mark.integration -@pytest.mark.timeout(timeout=5, method="thread") +@pytest.mark.timeout(timeout=15, method="thread") def test_roundtrip_azure_direct(azurite_creds, sample_data: pa.Table): table_path = "az://deltars/roundtrip2" @@ -197,7 +197,7 @@ def test_roundtrip_azure_direct(azurite_creds, sample_data: pa.Table): @pytest.mark.azure @pytest.mark.integration -@pytest.mark.timeout(timeout=5, method="thread") +@pytest.mark.timeout(timeout=15, method="thread") def test_roundtrip_azure_sas(azurite_sas_creds, sample_data: pa.Table): table_path = "az://deltars/roundtrip3" diff --git a/rust/src/storage/utils.rs b/rust/src/storage/utils.rs index b352149608..c80a114087 100644 --- a/rust/src/storage/utils.rs +++ b/rust/src/storage/utils.rs @@ -15,12 +15,15 @@ pub async fn copy_table( from_options: Option>, to: impl AsRef, to_options: Option>, + allow_http: bool, ) -> Result<(), DeltaTableError> { let from_store = DeltaTableBuilder::from_uri(from) .with_storage_options(from_options.unwrap_or_default()) + .with_allow_http(allow_http) .build_storage()?; let to_store = DeltaTableBuilder::from_uri(to) .with_storage_options(to_options.unwrap_or_default()) + .with_allow_http(allow_http) .build_storage()?; sync_stores(from_store, to_store).await } diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 74a2399e2b..c2624baf0c 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -46,7 +46,7 @@ impl IntegrationContext { account_path.as_path().to_str().unwrap(), ); } - integration.crate_bucket(&bucket)?; + integration.create_bucket(&bucket)?; let store_uri = match integration { StorageIntegration::Amazon => format!("s3://{}", &bucket), StorageIntegration::Microsoft => format!("az://{}", &bucket), @@ -114,7 +114,7 @@ impl IntegrationContext { _ => { let from = table.as_path().as_str().to_owned(); let to = format!("{}/{}", self.root_uri(), name.as_ref()); - copy_table(from, None, to, None).await?; + copy_table(from, None, to, None, true).await?; } }; Ok(()) @@ -157,7 +157,7 @@ impl StorageIntegration { } } - fn crate_bucket(&self, name: impl AsRef) -> std::io::Result<()> { + fn create_bucket(&self, name: impl AsRef) -> std::io::Result<()> { match self { Self::Microsoft => { az_cli::create_container(name)?;