Skip to content

Commit

Permalink
Test databricks-sdk 0.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaohanZhangCMU committed Oct 10, 2023
1 parent b200267 commit bde4641
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 133 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
]

extra_deps['databricks'] = [
'databricks-sdk==0.8.0',
'databricks-sdk==0.10.0',
]

extra_deps['all'] = sorted({dep for deps in extra_deps.values() for dep in deps})
Expand Down
23 changes: 11 additions & 12 deletions tests/base/converters/test_integratoin_dataframe_to_mds.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import os
import shutil
import time
from decimal import Decimal
from tempfile import mkdtemp
from typing import Any, Tuple
Expand All @@ -21,7 +22,7 @@
'gs://': 'mosaicml-composer-tests',
's3://': 'mosaicml-internal-temporary-composer-testing',
'oci://': 'mosaicml-internal-checkpoints',
'dbfs:/Volumes/': 'main/mosaic_hackathon/managed-volume',
'dbfs:/Volumes': 'main/mosaic_hackathon/managed-volume',
}
MANUAL_INTEGRATION_TEST = True
os.environ[
Expand Down Expand Up @@ -59,8 +60,8 @@ def _method(cloud_prefix: str = 'gs://') -> Tuple[str, str]:
blobs = bucket.list_blobs(prefix=MY_PREFIX)
for blob in blobs:
blob.delete()
except ImportError:
raise ImportError('google.cloud.storage is not imported correctly.')
except:
print('tear down gcs test folder failed, continue...')

try:
import boto3
Expand All @@ -70,8 +71,8 @@ def _method(cloud_prefix: str = 'gs://') -> Tuple[str, str]:
if objects_to_delete:
s3.delete_objects(Bucket=MY_BUCKET['s3://'],
Delete={'Objects': objects_to_delete})
except ImportError:
raise ImportError('boto3 is not imported correctly.')
except:
print('tear down s3 test folder failed, continue....')

try:
import oci
Expand All @@ -92,8 +93,8 @@ def _method(cloud_prefix: str = 'gs://') -> Tuple[str, str]:
)
print(f'Deleted {len(response.data.objects)} objects with prefix: {MY_PREFIX}')

except ImportError:
raise ImportError('boto3 is not imported correctly.')
except :
print('tear down oci test folder failed, continue...')


class TestDataFrameToMDS:
Expand Down Expand Up @@ -263,7 +264,7 @@ def test_end_to_end_conversion_local(self, dataframe: Any, keep_local: bool, mer
assert not os.path.exists(os.path.join(
out, 'index.json')), 'merged index is created when merge_index=False'

@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://'])
@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://', 'dbfs:/Volumes'])
@pytest.mark.parametrize('keep_local', [True]) # , False])
@pytest.mark.parametrize('merge_index', [True]) # , False])
@pytest.mark.usefixtures('manual_integration_dir')
Expand Down Expand Up @@ -309,11 +310,10 @@ def test_patch_conversion_local_and_remote(self, dataframe: Any, scheme: str,
assert not (os.path.exists(os.path.join(
mds_path[0], 'index.json'))), 'merged index is created when merge_index=False'

@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://'])
@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://', 'dbfs:/Volumes'])
@pytest.mark.parametrize('keep_local', [True, False])
@pytest.mark.parametrize('merge_index', [True, False])
@pytest.mark.usefixtures('manual_integration_dir')
@pytest.mark.remote
def test_integration_conversion_local_and_remote(self, dataframe: Any,
manual_integration_dir: Any,
merge_index: bool, keep_local: bool,
Expand Down Expand Up @@ -353,9 +353,8 @@ def test_integration_conversion_local_and_remote(self, dataframe: Any,
f'merged index is created at {mds_path[0]} when merge_index={merge_index} and ' +
f'keep_local={keep_local}')

@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://'])
@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://', 'dbfs:/Volumes'])
@pytest.mark.usefixtures('manual_integration_dir')
@pytest.mark.remote
def test_integration_conversion_remote_only(self, dataframe: Any, manual_integration_dir: Any,
scheme: str):
_, remote = manual_integration_dir('s3://')
Expand Down
124 changes: 4 additions & 120 deletions tests/test_integration_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
'gs://': 'mosaicml-composer-tests',
's3://': 'mosaicml-internal-temporary-composer-testing',
'oci://': 'mosaicml-internal-checkpoints',
'dbfs:/Volumes/': 'main/mosaic_hackathon/managed-volume',
'dbfs:/Volumes': 'main/mosaic_hackathon/managed-volume',
}
MANUAL_INTEGRATION_TEST = True
os.environ[
Expand Down Expand Up @@ -101,102 +101,8 @@ def _method(cloud_prefix: str = 'gs://') -> Tuple[str, str]:
print('tear down oci test folder failed, continue...')


@pytest.mark.parametrize(('text', 'expected_output'), [('hello,world', ['hello', 'world']),
('hello', ['hello']), ('', [])])
def test_get_list_arg(text: str, expected_output: List[Optional[str]]):
output = get_list_arg(text)
assert output == expected_output


def test_bytes_to_int():
input_to_expected = [
('1234', 1234),
('1b', 1),
('50b', 50),
('50B', 50),
('100kb', 102400),
(' 100 kb', 102400),
('75mb', 78643200),
('75MB', 78643200),
('75 mb ', 78643200),
('1.39gb', 1492501135),
('1.39Gb', 1492501135),
('2tb', 2199023255552),
('3pb', 3377699720527872),
('1.11eb', 1279742870113600256),
('1.09zb', 1286844866581978415104),
('2.0yb', 2417851639229258349412352),
(1234, 1234),
(1, 1),
(0.5 * 1024, 512),
(100 * 1024, 102400),
(75 * 1024**2, 78643200),
(75 * 1024 * 1024, 78643200),
(35.78, 35),
(325388903.203984, 325388903),
]
for size_pair in input_to_expected:
output = bytes_to_int(size_pair[0])
assert output == size_pair[1]


def test_bytes_to_int_Exception():
input_data = ['', '12kbb', '27mxb', '79kkb']
for value in input_data:
with pytest.raises(ValueError, match=f'Unsupported value/suffix.*'):
_ = bytes_to_int(value)


def test_number_abbrev_to_int():
input_to_expected = [
('1234', 1234),
('1k', 1000),
('50k', 50000),
('50K', 50000),
('100k', 100000),
(' 100 k', 100000),
('75m', 75000000),
('75M', 75000000),
('75 m ', 75000000),
('1.39b', 1390000000),
('1.39B', 1390000000),
('2t', 2000000000000),
('3 T', 3000000000000),
(1234, 1234),
(1, 1),
(0.5 * 1000, 500),
(100 * 1000, 100000),
(75 * 1000**2, 75000000),
(75 * 1000 * 1000, 75000000),
(35.78, 35),
(325388903.203984, 325388903),
]
for size_pair in input_to_expected:
output = number_abbrev_to_int(size_pair[0])
assert output == size_pair[1]


def test_number_abbrev_to_int_Exception():
input_data = ['', '12kbb', '27mxb', '79bk', '79bb', '79 b m', 'p 64', '64p']
for value in input_data:
with pytest.raises(ValueError, match=f'Unsupported value/suffix.*'):
_ = number_abbrev_to_int(value)


def test_clean_stale_shared_memory():
# Create a leaked shared memory
name = _get_path(0, RESUME)
_ = BuiltinSharedMemory(name, True, 64)

# Clean up the stale shared memory
clean_stale_shared_memory()

# If clean up is successful, it should raise FileNotFoundError Exception
with pytest.raises(FileNotFoundError):
_ = BuiltinSharedMemory(name, False, 64)


@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://'])
@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://', 'dbfs:/Volumes'])
@pytest.mark.parametrize('index_file_urls_pattern', [4, 5])
@pytest.mark.parametrize('out_format', ['remote', 'local', 'tuple'])
@pytest.mark.usefixtures('manual_integration_dir')
Expand Down Expand Up @@ -384,7 +290,7 @@ def test_merge_index_from_root_local(manual_integration_dir: Any, n_partitions:
integrity_check(mds_path, keep_local=keep_local)


@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://'])
@pytest.mark.parametrize('scheme', ['oci://', 'gs://', 's3://', 'dbfs:/Volumes'])
@pytest.mark.parametrize('out_format', ['remote', 'tuple'])
@pytest.mark.parametrize('n_partitions', [1, 2, 3, 4])
@pytest.mark.parametrize('keep_local', [False, True])
Expand Down Expand Up @@ -422,7 +328,7 @@ def test_merge_index_from_root_remote(manual_integration_dir: Any, out_format: s
integrity_check(mds_path, keep_local=keep_local)


@pytest.mark.parametrize('scheme', ['dbfs:/Volumes/'])
@pytest.mark.parametrize('scheme', ['dbfs:/Volumes'])
@pytest.mark.parametrize('out_format', ['remote']) # , 'tuple'])
@pytest.mark.parametrize('n_partitions', [3]) # , 2, 3, 4])
@pytest.mark.parametrize('keep_local', [False]) # , True])
Expand Down Expand Up @@ -459,25 +365,3 @@ def test_uc_volume(manual_integration_dir: Any, out_format: str,
with pytest.raises(NotImplementedError, match=f'DatabricksUnityCatalogUploader.list_objects is not implemented.*'):
merge_index(mds_path, keep_local=keep_local)

@pytest.mark.parametrize('with_args', [True, False])
def test_retry(with_args: bool):
num_tries = 0
return_after = 2

if with_args:
decorator = retry(RuntimeError, num_attempts=3, initial_backoff=0.01, max_jitter=0.01)
return_after = 2
else:
decorator = retry
# Need to return immediately to avoid timeouts
return_after = 0

@decorator
def flaky_function():
nonlocal num_tries
if num_tries < return_after:
num_tries += 1
raise RuntimeError('Called too soon!')
return "Third time's a charm"

assert flaky_function() == "Third time's a charm"

0 comments on commit bde4641

Please sign in to comment.