Skip to content

Commit

Permalink
[ISSUE #28893] fix scenario tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Aug 7, 2023
1 parent a394666 commit 4f9d162
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async def infer_schema(
logger: logging.Logger,
) -> Dict[str, Any]:
if config.input_schema:
# FIXME: what happens if it's a string
# FIXME change type of method to Mapping
return config.input_schema

# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

single_csv_scenario = (
TestScenarioBuilder()
.set_name("single_csv_stream")
.set_name("single_csv_scenario")
.set_config(
{
"streams": [
Expand Down Expand Up @@ -249,7 +249,19 @@
"type": "string"
},
"uniqueItems": True
}
},
"infer_datatypes": {
"default": False,
"description": "Whether to autogenerate the schema based the file content.",
"title": "Infer Datatypes",
"type": "boolean"
},
"infer_datatypes_legacy": {
"default": False,
"description": "Whether to autogenerate the schema based the file content. This inferrence does not support list and objects",
"title": "Infer Datatypes (legacy)",
"type": "boolean"
},
}
},
{
Expand Down Expand Up @@ -1999,6 +2011,7 @@
"message": "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. stream=stream1 file=a.csv line_no=2 n_skipped=0",
}
]})
.set_expected_discover_error(SchemaInferenceError, FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value)
).build()

csv_escape_char_is_set_scenario = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"a.csv": { # The records in this file do not conform to the schema
"contents": [
("col1", "col2"),
("val_a_11", "val_a_21"),
("val_a_12", "val_a_22"),
("val_a_11", "1"),
("val_a_12", "2"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
Expand All @@ -29,7 +29,7 @@
"contents": [
("col1",),
("val_c_11",),
("val_c_12","val_c_22"), # This record doesn't conform to the schema
("val_c_12","val_c_22"), # This record is not parsable
("val_c_13",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
Expand All @@ -55,9 +55,9 @@
"col1": {
"type": "string",
},
# "col2": { # remove this so the record does not conform to the schema
# "type": "string",
# },
"col2": {
"type": "number",
},
"_ab_source_file_last_modified": {
"type": "string"
},
Expand Down Expand Up @@ -100,7 +100,7 @@
"contents": [
("col1",),
("val_aa3_11",),
("val_aa3_12", "val_aa3_22"), # This record does not conform to the schema
("val_aa3_12", "val_aa3_22"), # This record is not parsable
("val_aa3_13",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
Expand Down Expand Up @@ -219,7 +219,7 @@
{"data": {"col1": "val_b_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_c_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_c_12", None: "val_c_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
{"data": {"col1": "val_c_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_c_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # Skipped since previous record is malformed
{"data": {"col1": "val_d_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "d.csv"}, "stream": "stream1"},
]
)
Expand All @@ -230,8 +230,8 @@
"message": "Records in file did not pass validation policy. stream=stream1 file=a.csv n_skipped=2 validation_policy=skip_record",
},
{
"level": "WARN",
"message": "Records in file did not pass validation policy. stream=stream1 file=c.csv n_skipped=1 validation_policy=skip_record",
"level": "ERROR",
"message": "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. stream=stream1 file=c.csv line_no=2 n_skipped=0",
},
]
})
Expand Down Expand Up @@ -268,7 +268,7 @@
{"data": {"col1": "val_aa2_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_12", None: "val_aa3_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
{"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # Skipped since previous record is malformed
{"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_bb1_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"}, # This record is skipped because it does not conform
{"data": {"col1": "val_bb1_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"}, # This record is skipped because it does not conform
Expand All @@ -285,8 +285,8 @@
"message": "Records in file did not pass validation policy. stream=stream1 file=a/a1.csv n_skipped=2 validation_policy=skip_record",
},
{
"level": "WARN",
"message": "Records in file did not pass validation policy. stream=stream1 file=a/a3.csv n_skipped=1 validation_policy=skip_record",
"level": "ERROR",
"message": "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. stream=stream1 file=a/a3.csv line_no=2 n_skipped=0",
},
{
"level": "WARN",
Expand Down Expand Up @@ -314,14 +314,14 @@
)
.set_expected_records(
[
{"data": {"col1": "val_a_11", "col2": "val_a_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_12", "col2": "val_a_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_11", "col2": "1", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_12", "col2": "2", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_b_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_b_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_c_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_c_12", None: "val_c_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
# {"data": {"col1": "val_c_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # No more records from this stream are emitted after we hit a parse error
# {"data": {"col1": "val_d_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "d.csv"}, "stream": "stream1"},
{"data": {"col1": "val_d_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "d.csv"}, "stream": "stream1"},
]
)
.set_expected_logs({
Expand Down Expand Up @@ -366,7 +366,7 @@
{"data": {"col1": "val_aa3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_12", None: "val_aa3_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
# {"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # No more records from this stream are emitted after we hit a parse error
# {"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_bb1_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb2_11", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
Expand Down

0 comments on commit 4f9d162

Please sign in to comment.