Skip to content

Commit

Permalink
test(python): make sure integration tests wait for services to start (#…
Browse files Browse the repository at this point in the history
…979)

# Description

This is intended to fix the issue where the Python CI job is frequently
failing. I believe it's because the Docker services haven't fully
started up before the tests start running. To address this, I added a
function to wait for the services to be responsive.

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
wjones127 authored Dec 21, 2022
1 parent 28d4aa3 commit ee19039
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.9"
services:
localstack:
image: localstack/localstack:0.14.4
image: localstack/localstack:0.14
ports:
- 4566:4566
- 8080:8080
Expand Down
3 changes: 2 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ devel = [
]
pyspark = [
"pyspark",
"delta-spark"
"delta-spark",
"numpy==1.22.2" # pyspark is no compatible with latest numpy
]

[project.urls]
Expand Down
19 changes: 18 additions & 1 deletion python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,28 @@
import subprocess
from datetime import date, datetime, timedelta
from decimal import Decimal
from time import sleep

import pyarrow as pa
import pytest

from deltalake import DeltaTable, write_deltalake


def wait_till_host_is_available(host: str, timeout_sec: int = 30):
spacing = 2
attempts = timeout_sec / spacing
while True:
try:
subprocess.run(["curl", host], timeout=500, check=True)
except:
pass
else:
break

sleep(spacing)


@pytest.fixture(scope="session")
def s3_localstack_creds():
endpoint_url = "http://localhost:4566"
Expand Down Expand Up @@ -46,6 +61,8 @@ def s3_localstack_creds():
],
]

wait_till_host_is_available(endpoint_url)

try:
for args in setup_commands:
subprocess.run(args, env=env)
Expand Down Expand Up @@ -109,7 +126,7 @@ def azurite_creds():
f"AccountKey={config['AZURE_STORAGE_ACCOUNT_KEY']};"
f"BlobEndpoint={endpoint_url};"
)

wait_till_host_is_available(endpoint_url)
try:
subprocess.run(
[
Expand Down
16 changes: 8 additions & 8 deletions python/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@pytest.mark.s3
@pytest.mark.integration
@pytest.mark.timeout(timeout=5, method="thread")
@pytest.mark.timeout(timeout=15, method="thread")
def test_read_files(s3_localstack):
table_path = "s3://deltars/simple"
handler = DeltaStorageHandler(table_path)
Expand All @@ -29,7 +29,7 @@ def test_read_files(s3_localstack):

@pytest.mark.s3
@pytest.mark.integration
@pytest.mark.timeout(timeout=4, method="thread")
@pytest.mark.timeout(timeout=15, method="thread")
def test_s3_authenticated_read_write(s3_localstack_creds):
# Create unauthenticated handler
storage_handler = DeltaStorageHandler(
Expand All @@ -54,7 +54,7 @@ def test_s3_authenticated_read_write(s3_localstack_creds):

@pytest.mark.s3
@pytest.mark.integration
@pytest.mark.timeout(timeout=5, method="thread")
@pytest.mark.timeout(timeout=15, method="thread")
def test_read_simple_table_from_remote(s3_localstack):
table_path = "s3://deltars/simple"
dt = DeltaTable(table_path)
Expand All @@ -63,7 +63,7 @@ def test_read_simple_table_from_remote(s3_localstack):

@pytest.mark.s3
@pytest.mark.integration
@pytest.mark.timeout(timeout=5, method="thread")
@pytest.mark.timeout(timeout=15, method="thread")
def test_roundtrip_s3_env(s3_localstack, sample_data: pa.Table, monkeypatch):
table_path = "s3://deltars/roundtrip"

Expand Down Expand Up @@ -91,7 +91,7 @@ def test_roundtrip_s3_env(s3_localstack, sample_data: pa.Table, monkeypatch):

@pytest.mark.s3
@pytest.mark.integration
@pytest.mark.timeout(timeout=5, method="thread")
@pytest.mark.timeout(timeout=15, method="thread")
def test_roundtrip_s3_direct(s3_localstack_creds, sample_data: pa.Table):
table_path = "s3://deltars/roundtrip2"

Expand Down Expand Up @@ -146,7 +146,7 @@ def test_roundtrip_s3_direct(s3_localstack_creds, sample_data: pa.Table):

@pytest.mark.azure
@pytest.mark.integration
@pytest.mark.timeout(timeout=5, method="thread")
@pytest.mark.timeout(timeout=15, method="thread")
def test_roundtrip_azure_env(azurite_env_vars, sample_data: pa.Table):
table_path = "az://deltars/roundtrip"

Expand All @@ -168,7 +168,7 @@ def test_roundtrip_azure_env(azurite_env_vars, sample_data: pa.Table):

@pytest.mark.azure
@pytest.mark.integration
@pytest.mark.timeout(timeout=5, method="thread")
@pytest.mark.timeout(timeout=15, method="thread")
def test_roundtrip_azure_direct(azurite_creds, sample_data: pa.Table):
table_path = "az://deltars/roundtrip2"

Expand Down Expand Up @@ -197,7 +197,7 @@ def test_roundtrip_azure_direct(azurite_creds, sample_data: pa.Table):

@pytest.mark.azure
@pytest.mark.integration
@pytest.mark.timeout(timeout=5, method="thread")
@pytest.mark.timeout(timeout=15, method="thread")
def test_roundtrip_azure_sas(azurite_sas_creds, sample_data: pa.Table):
table_path = "az://deltars/roundtrip3"

Expand Down
3 changes: 3 additions & 0 deletions rust/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ pub async fn copy_table(
from_options: Option<HashMap<String, String>>,
to: impl AsRef<str>,
to_options: Option<HashMap<String, String>>,
allow_http: bool,
) -> Result<(), DeltaTableError> {
let from_store = DeltaTableBuilder::from_uri(from)
.with_storage_options(from_options.unwrap_or_default())
.with_allow_http(allow_http)
.build_storage()?;
let to_store = DeltaTableBuilder::from_uri(to)
.with_storage_options(to_options.unwrap_or_default())
.with_allow_http(allow_http)
.build_storage()?;
sync_stores(from_store, to_store).await
}
Expand Down
6 changes: 3 additions & 3 deletions rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl IntegrationContext {
account_path.as_path().to_str().unwrap(),
);
}
integration.crate_bucket(&bucket)?;
integration.create_bucket(&bucket)?;
let store_uri = match integration {
StorageIntegration::Amazon => format!("s3://{}", &bucket),
StorageIntegration::Microsoft => format!("az://{}", &bucket),
Expand Down Expand Up @@ -114,7 +114,7 @@ impl IntegrationContext {
_ => {
let from = table.as_path().as_str().to_owned();
let to = format!("{}/{}", self.root_uri(), name.as_ref());
copy_table(from, None, to, None).await?;
copy_table(from, None, to, None, true).await?;
}
};
Ok(())
Expand Down Expand Up @@ -157,7 +157,7 @@ impl StorageIntegration {
}
}

fn crate_bucket(&self, name: impl AsRef<str>) -> std::io::Result<()> {
fn create_bucket(&self, name: impl AsRef<str>) -> std::io::Result<()> {
match self {
Self::Microsoft => {
az_cli::create_container(name)?;
Expand Down

0 comments on commit ee19039

Please sign in to comment.