Skip to content

Commit

Permalink
🎉Source File: increase unit test coverage at least 90% (#15933)
Browse files Browse the repository at this point in the history
* Added inital test setup for client.py, added WIP tests for source.py

* Add tests for storage scheme and started tests for _open() method

* Unittest coverage increased to 90

* Small refactoring

Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com>
  • Loading branch information
3 people authored Aug 26, 2022
1 parent 07bfe05 commit 7d6dfac
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 14 deletions.
35 changes: 35 additions & 0 deletions airbyte-integrations/connectors/source-file/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,44 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from pathlib import Path
import pytest
from source_file.client import Client


@pytest.fixture
def read_file():
def _read_file(file_name):
parent_location = Path(__file__).absolute().parent
file = open(parent_location / file_name).read()
return file
return _read_file


@pytest.fixture
def config():
return {"dataset_name": "test", "format": "json", "url": "https://airbyte.com", "provider": {"storage": "HTTPS"}}


@pytest.fixture
def invalid_config(read_file):
return {"dataset_name": "test", "format": "jsonl", "url": "https://airbyte.com", "reader_options":'{"encoding": "encoding"}', "provider": {"storage": "HTTPS"}}


@pytest.fixture
def client():
return Client(
dataset_name="test_dataset",
url="scp://test_dataset",
provider={"provider": {"storage": "HTTPS", "reader_impl": "gcsfs", "user_agent": True}},
)


@pytest.fixture
def absolute_path():
return Path(__file__).parent.absolute()


@pytest.fixture
def test_files():
return "../integration_tests/sample_files"
129 changes: 129 additions & 0 deletions airbyte-integrations/connectors/source-file/unit_tests/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pytest
from pandas import read_csv
from source_file.client import Client, URLFile, ConfigurationError


@pytest.fixture
def client():
return Client(
dataset_name="test_dataset",
url="scp://test_dataset",
provider={"provider": {"storage": "HTTPS", "reader_impl": "gcsfs", "user_agent": False}},
)


@pytest.fixture
def wrong_format_client():
return Client(
dataset_name="test_dataset",
url="scp://test_dataset",
provider={"provider": {"storage": "HTTPS", "reader_impl": "gcsfs", "user_agent": False}},
format="wrong"
)


@pytest.mark.parametrize(
"storage, expected_scheme",
[
("GCS", "gs://"),
("S3", "s3://"),
("AZBLOB", "azure://"),
("HTTPS", "https://"),
("SSH", "scp://"),
("SCP", "scp://"),
("SFTP", "sftp://"),
("WEBHDFS", "webhdfs://"),
("LOCAL", "file://"),
],
)
def test_storage_scheme(storage, expected_scheme):
urlfile = URLFile(provider={"storage": storage}, url="http://localhost")
assert urlfile.storage_scheme == expected_scheme


def test_load_dataframes(client, wrong_format_client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/test.csv"
read_file = next(client.load_dataframes(fp=f))
expected = read_csv(f)
assert read_file.equals(expected)

with pytest.raises(ConfigurationError):
next(wrong_format_client.load_dataframes(fp=f))

with pytest.raises(StopIteration):
next(client.load_dataframes(fp=f, skip_data=True))


def test_load_nested_json(client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/formats/json/demo.json"
with open(f, mode='rb') as file:
assert client.load_nested_json(fp=file)


@pytest.mark.parametrize(
"current_type, dtype, expected",
[
("string", "string", "string"),
("", object, "string"),
("", "int64", "number"),
("boolean", "bool", "boolean"),
("integer", "int64", "string"),
],
)
def test_dtype_to_json_type(client, current_type, dtype, expected):
assert client.dtype_to_json_type(current_type, dtype) == expected


def test_cache_stream(client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/test.csv"
with open(f, mode='rb') as file:
assert client._cache_stream(file)


def test_open_aws_url():
url = "s3://my_bucket/my_key"
provider = {"storage": "S3"}
with pytest.raises(OSError):
assert URLFile(url=url, provider=provider)._open_aws_url()

provider.update({"aws_access_key_id": "aws_access_key_id", "aws_secret_access_key": "aws_secret_access_key"})
with pytest.raises(OSError):
assert URLFile(url=url, provider=provider)._open_aws_url()


def test_open_azblob_url():
provider = {"storage": "AZBLOB"}
with pytest.raises(ValueError):
assert URLFile(url="", provider=provider)._open_azblob_url()

provider.update({"storage_account": "storage_account", "sas_token": "sas_token", "shared_key": "shared_key"})
with pytest.raises(ValueError):
assert URLFile(url="", provider=provider)._open_azblob_url()


def test_open_gcs_url():
provider = {"storage": "GCS"}
with pytest.raises(IndexError):
assert URLFile(url="", provider=provider)._open_gcs_url()

provider.update({"service_account_json": '{"service_account_json": "service_account_json"}'})
with pytest.raises(ValueError):
assert URLFile(url="", provider=provider)._open_gcs_url()

provider.update({"service_account_json": '{service_account_json": "service_account_json"}'})
with pytest.raises(ConfigurationError):
assert URLFile(url="", provider=provider)._open_gcs_url()


def test_client_wrong_reader_options():
with pytest.raises(ConfigurationError):
Client(
dataset_name="test_dataset",
url="scp://test_dataset",
provider={"provider": {"storage": "HTTPS", "reader_impl": "gcsfs", "user_agent": False}},
reader_options='{encoding":"utf_16"}',
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,46 @@

import json
import logging
from pathlib import Path
from unittest.mock import PropertyMock

import jsonschema
import pytest
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
DestinationSyncMode,
Status,
SyncMode,
Type,
)

from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode
from source_file.source import SourceFile

HERE = Path(__file__).parent.absolute()
logger = logging.getLogger("airbyte")


@pytest.fixture
def source():
return SourceFile()


def test_csv_with_utf16_encoding():
@pytest.fixture
def config():
config_path: str = "integration_tests/config.json"
with open(config_path, "r") as f:
return json.loads(f.read())


def test_csv_with_utf16_encoding(absolute_path, test_files):
config_local_csv_utf16 = {
"dataset_name": "AAA",
"format": "csv",
"reader_options": '{"encoding":"utf_16"}',
"url": f"{HERE}/../integration_tests/sample_files/test_utf16.csv",
"url": f"{absolute_path}/{test_files}/test_utf16.csv",
"provider": {"storage": "local"},
}
expected_schema = {
Expand All @@ -32,7 +57,7 @@ def test_csv_with_utf16_encoding():
"type": "object",
}

catalog = SourceFile().discover(logger=logging.getLogger("airbyte"), config=config_local_csv_utf16)
catalog = SourceFile().discover(logger=logger, config=config_local_csv_utf16)
stream = next(iter(catalog.streams))
assert stream.json_schema == expected_schema

Expand All @@ -52,30 +77,66 @@ def get_catalog(properties):
)


def test_nan_to_null():
def test_nan_to_null(absolute_path, test_files):
"""make sure numpy.nan converted to None"""
config = {
"dataset_name": "test",
"format": "csv",
"reader_options": json.dumps({"sep": ";"}),
"url": f"{HERE}/../integration_tests/sample_files/test_nan.csv",
"url": f"{absolute_path}/{test_files}/test_nan.csv",
"provider": {"storage": "local"},
}

catalog = get_catalog(
{
"col1": {"type": ["string", "null"]},
"col2": {"type": ["number", "null"]},
"col3": {"type": ["number", "null"]},
}
{"col1": {"type": ["string", "null"]}, "col2": {"type": ["number", "null"]}, "col3": {"type": ["number", "null"]}}
)

source = SourceFile()
records = source.read(logger=logging.getLogger("airbyte"), config=config, catalog=catalog)
records = source.read(logger=logger, config=config, catalog=catalog)
records = [r.record.data for r in records]
assert records == [
{"col1": "key1", "col2": 1.11, "col3": None},
{"col1": "key2", "col2": None, "col3": 2.22},
{"col1": "key3", "col2": None, "col3": None},
{"col1": "key4", "col2": 3.33, "col3": None},
]

config.update({"format": "yaml", "url": f"{absolute_path}/{test_files}/formats/yaml/demo.yaml"})
records = source.read(logger=logger, config=config, catalog=catalog)
records = [r.record.data for r in records]
assert records == []

config.update({"provider": {"storage": "SSH", "user": "user", "host": "host"}})

with pytest.raises(Exception):
next(source.read(logger=logger, config=config, catalog=catalog))


def test_spec(source):
spec = source.spec(None)
assert isinstance(spec, ConnectorSpecification)


def test_check(source, config):
expected = AirbyteConnectionStatus(status=Status.SUCCEEDED)
actual = source.check(logger=logger, config=config)
assert actual == expected


def test_check_invalid_config(source, invalid_config):
expected = AirbyteConnectionStatus(status=Status.FAILED)
actual = source.check(logger=logger, config=invalid_config)
assert actual.status == expected.status


def test_discover(source, config, client):
catalog = source.discover(logger=logger, config=config)
catalog = AirbyteMessage(type=Type.CATALOG, catalog=catalog).dict(exclude_unset=True)
schemas = [stream["json_schema"] for stream in catalog["catalog"]["streams"]]
for schema in schemas:
jsonschema.Draft7Validator.check_schema(schema)

type(client).streams = PropertyMock(side_effect=Exception)

with pytest.raises(Exception):
source.discover(logger=logger, config=config)

0 comments on commit 7d6dfac

Please sign in to comment.