Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingestion: Ignore dates for non-UUID sources #2031

Merged
merged 2 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions ingestion/functions/retrieval/retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def get_source_details(env, source_id, upload_id, api_headers, cookies):
"automation", {}).get(
"parser", {}).get(
"awsLambdaArn", ""), api_json.get(
'dateFilter', {})
'dateFilter', {}), api_json.get(
'hasStableIdentifiers', False)
upload_error = (
common_lib.UploadError.SOURCE_CONFIGURATION_NOT_FOUND
if r.status_code == 404 else common_lib.UploadError.INTERNAL_ERROR)
Expand Down Expand Up @@ -287,8 +288,13 @@ def run_retrieval(tempdir=TEMP_PATH):
auth_headers = common_lib.obtain_api_credentials(s3_client)
upload_id = common_lib.create_upload_record(
env, source_id, auth_headers, cookies)
url, source_format, parser, date_filter = get_source_details(
url, source_format, parser, date_filter, stable_identifiers = get_source_details(
env, source_id, upload_id, auth_headers, cookies)
if not stable_identifiers:
print(f"Source {source_id} does not have stable identifiers\n"
"Ingesting entire dataset and ignoring date filter and date ranges")
date_filter = {}
parsing_date_range = {}
url = format_source_url(url)
file_names_s3_object_keys = retrieve_content(
env, source_id, upload_id, url, source_format, auth_headers, cookies, tempdir=tempdir)
Expand Down
107 changes: 67 additions & 40 deletions ingestion/functions/retrieval/retrieval_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@

_SOURCE_API_URL = "http://foo.bar"

date_filter = {"numDaysBeforeToday": 2, "op": "EQ"}
origin_url = "http://bar.baz/"
upload_id = "012345678901234567890123"
example_source = {
"format": "JSON",
"origin": {"url": origin_url, "license": "MIT"},
"automation": {"parser": {"awsLambdaArn": "example.example"}},
"dateFilter": date_filter
}
example_source_stable_ids = {
**example_source,
"hasStableIdentifiers": True
}


def create_upload_url(source_id):
return f"{_SOURCE_API_URL}/sources/{source_id}/uploads"


@pytest.fixture()
def mock_source_api_url_fixture():
"""
Expand All @@ -42,6 +61,36 @@ def mock_source_api_url_fixture():
yield common_lib


@pytest.fixture()
def setup_e2e(mock_source_api_url_fixture, valid_event, requests_mock):
source_id = valid_event['sourceId']

# Mock the request to create the upload.
requests_mock.post(
create_upload_url(source_id), json={"_id": upload_id},
status_code=201)

# Mock the request to retrieve source content.
requests_mock.get(origin_url, json={"data": "yes"})

# Mock/stub retrieving credentials, invoking the parser, and S3.
common_lib = mock_source_api_url_fixture
common_lib.obtain_api_credentials = MagicMock(
name="obtain_api_credentials", return_value={})

# Set up mock request values used in multiple requests.
# TODO: Complete removal of URL env var.
os.environ["EPID_INGESTION_ENV"] = valid_event['env']
os.environ["EPID_INGESTION_SOURCE_ID"] = valid_event['sourceId']
os.environ["EPID_INGESTION_PARSING_DATE_RANGE"] = (
valid_event['parsingDateRange']['start']
+ ","
+ valid_event['parsingDateRange']['end']
)
os.environ["SOURCE_API_URL"] = _SOURCE_API_URL
yield requests_mock, common_lib


@pytest.fixture()
def valid_event():
"""Loads valid CloudWatch ScheduledEvent from file."""
Expand All @@ -65,49 +114,21 @@ def test_format_url(mock_today):
assert retrieval.format_source_url(
url) == "http://foo.bar/2020-06-08/6/8.json"


@pytest.mark.skipif(not os.environ.get("DOCKERIZED", False),
reason="Running integration tests outside of mock environment disabled")
def test_e2e(valid_event, requests_mock, mock_source_api_url_fixture, tempdir="/tmp"):
from retrieval import retrieval # Import locally to avoid superseding mock
@pytest.mark.parametrize("source", [example_source_stable_ids, example_source])
def test_e2e(source, valid_event, mock_source_api_url_fixture, setup_e2e, tempdir="/tmp"):
from retrieval import retrieval
requests_mock, common_lib = setup_e2e
print(valid_event)

# Mock/stub retrieving credentials, invoking the parser, and S3.
common_lib = mock_source_api_url_fixture
common_lib.obtain_api_credentials = MagicMock(
name="obtain_api_credentials", return_value={})
retrieval.invoke_parser = MagicMock(name="invoke_parser")

# Set up mock request values used in multiple requests.
# TODO: Complete removal of URL env var.
os.environ["EPID_INGESTION_ENV"] = valid_event['env']
os.environ["EPID_INGESTION_SOURCE_ID"] = valid_event['sourceId']
os.environ["EPID_INGESTION_PARSING_DATE_RANGE"] = (
valid_event['parsingDateRange']['start']
+ ","
+ valid_event['parsingDateRange']['end']
)
os.environ["SOURCE_API_URL"] = _SOURCE_API_URL
source_id = valid_event['sourceId']
upload_id = "012345678901234567890123"
origin_url = "http://bar.baz/"

# Mock the request to create the upload.
create_upload_url = f"{_SOURCE_API_URL}/sources/{source_id}/uploads"
requests_mock.post(
create_upload_url, json={"_id": upload_id},
status_code=201)
retrieval.invoke_parser = MagicMock(name="invoke_parser")

# Mock the request to retrieval source details (e.g. format).
date_filter = {"numDaysBeforeToday": 2, "op": "EQ"}
full_source_url = f"{_SOURCE_API_URL}/sources/{source_id}"
requests_mock.get(
full_source_url,
json={"origin": {"url": origin_url, "license": "MIT"}, "format": "JSON",
"automation": {"parser": {"awsLambdaArn": "example.example"}},
"dateFilter": date_filter})

# Mock the request to retrieve source content.
requests_mock.get(origin_url, json={"data": "yes"})
requests_mock.get(full_source_url, json=source)
has_stable_ids = source.get("hasStableIdentifiers", False)

response = retrieval.run_retrieval(tempdir=tempdir)

Expand All @@ -117,8 +138,10 @@ def test_e2e(valid_event, requests_mock, mock_source_api_url_fixture, tempdir="/
"parsing.example.example",
source_id, upload_id, {}, None,
response["key"],
origin_url, date_filter, valid_event["parsingDateRange"])
assert requests_mock.request_history[0].url == create_upload_url
origin_url,
date_filter if has_stable_ids else {},
valid_event["parsingDateRange"] if has_stable_ids else {})
assert requests_mock.request_history[0].url == create_upload_url(source_id)
assert requests_mock.request_history[1].url == full_source_url
assert requests_mock.request_history[2].url == origin_url
assert response["bucket"] == retrieval.OUTPUT_BUCKET
Expand Down Expand Up @@ -146,19 +169,23 @@ def test_extract_event_fields_raises_error_if_event_lacks_source_id():
retrieval.extract_event_fields({"env": "env"})


def test_get_source_details_returns_url_and_format(
def test_get_source_details_returns_url_format_stable_identifiers(
requests_mock, mock_source_api_url_fixture):
from retrieval import retrieval # Import locally to avoid superseding mock
source_id = "id"
content_url = "http://bar.baz"
requests_mock.get(f"{_SOURCE_API_URL}/sources/{source_id}",
json={"format": "CSV",
"origin": {"url": content_url, "license": "MIT"}})
"origin": {"url": content_url, "license": "MIT"},
"hasStableIdentifiers": True
})
result = retrieval.get_source_details(
"env", source_id, "upload_id", {}, {})
assert result[0] == content_url
assert result[1] == "CSV"
assert result[2] == ""
assert result[3] == {}
assert result[4] is True


def test_get_source_details_returns_parser_arn_if_present(
Expand Down