Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file-based CDK: Configurable strings_can_be_null #29298

Merged
merged 34 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
eaba483
[ISSUE #28893] infer csv schema
maxi297 Aug 4, 2023
60757c1
[ISSUE #28893] align with pyarrow
maxi297 Aug 4, 2023
f49235d
Automated Commit - Formatting Changes
maxi297 Aug 4, 2023
a394666
[ISSUE #28893] legacy inference and infer only when needed
maxi297 Aug 7, 2023
4f9d162
[ISSUE #28893] fix scenario tests
maxi297 Aug 7, 2023
0617c82
[ISSUE #28893] using discovered schema as part of read
maxi297 Aug 7, 2023
d157aa3
[ISSUE #28893] self-review + cleanup
maxi297 Aug 8, 2023
57b011f
[ISSUE #28893] fix test
maxi297 Aug 8, 2023
71cdca9
[ISSUE #28893] code review part #1
maxi297 Aug 9, 2023
ef8f5f5
Merge branch 'master' into issue-28893/infer-schema-csv
maxi297 Aug 9, 2023
f651d03
[ISSUE #28893] code review part #2
maxi297 Aug 9, 2023
a573a89
Fix test
maxi297 Aug 9, 2023
0ce37e5
formatcdk
maxi297 Aug 9, 2023
ddd7f49
first pass
girarda Aug 9, 2023
82db6c3
[ISSUE #28893] code review
maxi297 Aug 9, 2023
fc6028e
fix mypy issues
girarda Aug 9, 2023
c063d9a
comment
girarda Aug 9, 2023
37992a2
rename for clarity
girarda Aug 9, 2023
46d57e9
Add a scenario test case
girarda Aug 9, 2023
af16160
this isn't optional anymore
girarda Aug 9, 2023
0c732db
merge
girarda Aug 9, 2023
3027a4f
FIX test log level
maxi297 Aug 9, 2023
ac91730
Re-adding failing tests
maxi297 Aug 9, 2023
f1a60ba
[ISSUE #28893] improve inferrence to consider multiple types per value
maxi297 Aug 10, 2023
7113603
Merge branch 'master' into issue-28893/infer-schema-csv
maxi297 Aug 10, 2023
c9e2004
Automated Commit - Formatting Changes
maxi297 Aug 10, 2023
9f3479f
[ISSUE #28893] remove InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
maxi297 Aug 11, 2023
29c6858
Code review
maxi297 Aug 11, 2023
9f640f9
Merge branch 'master' into issue-28893/infer-schema-csv
maxi297 Aug 11, 2023
dc2913f
Automated Commit - Formatting Changes
maxi297 Aug 11, 2023
2348f71
merge
girarda Aug 14, 2023
8ace2f1
fix unit tests
girarda Aug 14, 2023
31da8d2
Merge branch 'issue-28893/infer-schema-csv' into alex/strings_can_be_…
girarda Aug 14, 2023
6d4a612
merge
girarda Aug 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@


class DefaultFileBasedAvailabilityStrategy(AbstractFileBasedAvailabilityStrategy):
_WITHOUT_SCHEMA = None

def __init__(self, stream_reader: AbstractFileBasedStreamReader):
self.stream_reader = stream_reader

Expand Down Expand Up @@ -82,7 +84,7 @@ def _check_parse_record(self, stream: "AbstractFileBasedStream", file: RemoteFil
parser = stream.get_parser(stream.config.file_type)

try:
record = next(iter(parser.parse_records(stream.config, file, self.stream_reader, logger)))
record = next(iter(parser.parse_records(stream.config, file, self.stream_reader, logger, self._WITHOUT_SCHEMA)))
except StopIteration:
# The file is empty. We've verified that we can open it, so will
# consider the connection check successful even though it means
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ class QuotingBehavior(Enum):
QUOTE_NONE = "Quote None"


class InferenceType(Enum):
NONE = "None"
PRIMITIVE_TYPES_ONLY = "Primitive Types Only"
PRIMITIVE_AND_COMPLEX_TYPES = "Primitive and Complex Types"


DEFAULT_TRUE_VALUES = ["y", "yes", "t", "true", "on", "1"]
DEFAULT_FALSE_VALUES = ["n", "no", "f", "false", "off", "0"]

Expand Down Expand Up @@ -58,6 +64,11 @@ class Config:
default=[],
description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
)
strings_can_be_null: bool = Field(
girarda marked this conversation as resolved.
Show resolved Hide resolved
title="Strings Can Be Null",
default=True,
description="Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.",
)
skip_rows_before_header: int = Field(
title="Skip Rows Before Header",
default=0,
Expand All @@ -81,6 +92,11 @@ class Config:
default=DEFAULT_FALSE_VALUES,
description="A set of case-sensitive strings that should be interpreted as false values.",
)
inference_type: InferenceType = Field(
title="Inference Type",
default=InferenceType.NONE,
description="How to infer the types of the columns. If none, inference default to strings.",
)

@validator("delimiter")
def validate_delimiter(cls, v: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

import logging
import uuid
from typing import Any, Dict, Iterable, Mapping
from typing import Any, Dict, Iterable, Mapping, Optional

import fastavro
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType

AVRO_TYPE_TO_JSON_TYPE = {
"null": "null",
Expand Down Expand Up @@ -45,7 +46,7 @@ async def infer_schema(
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Dict[str, Any]:
) -> SchemaType:
avro_format = config.format or AvroFormat()
if not isinstance(avro_format, AvroFormat):
raise ValueError(f"Expected ParquetFormat, got {avro_format}")
Expand Down Expand Up @@ -130,6 +131,7 @@ def parse_records(
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
avro_format = config.format or AvroFormat()
if not isinstance(avro_format, AvroFormat):
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable
from typing import Any, Dict, Iterable, Mapping, Optional

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType

Schema = Dict[str, str]
Record = Dict[str, Any]


Expand All @@ -27,7 +27,7 @@ async def infer_schema(
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Schema:
) -> SchemaType:
"""
Infer the JSON Schema for this file.
"""
Expand All @@ -40,6 +40,7 @@ def parse_records(
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Record]:
"""
Parse and emit each record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

import json
import logging
from typing import Any, Dict, Iterable
from typing import Any, Dict, Iterable, Mapping, Optional

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import PYTHON_TYPE_MAPPING, merge_schemas
from airbyte_cdk.sources.file_based.schema_helpers import PYTHON_TYPE_MAPPING, SchemaType, merge_schemas


class JsonlParser(FileTypeParser):
Expand All @@ -23,12 +23,12 @@ async def infer_schema(
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Dict[str, Any]:
) -> SchemaType:
"""
Infers the schema for the file by inferring the schema for each line, and merging
it with the previously-inferred schema.
"""
inferred_schema: Dict[str, Any] = {}
inferred_schema: Mapping[str, Any] = {}
read_bytes = 0

with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
Expand All @@ -52,6 +52,7 @@ def parse_records(
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
for line in fp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import json
import logging
import os
from typing import Any, Dict, Iterable, List, Mapping
from typing import Any, Dict, Iterable, List, Mapping, Optional
from urllib.parse import unquote

import pyarrow as pa
Expand All @@ -14,6 +14,7 @@
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
from pyarrow import Scalar


Expand All @@ -24,7 +25,7 @@ async def infer_schema(
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Dict[str, Any]:
) -> SchemaType:
parquet_format = config.format or ParquetFormat()
if not isinstance(parquet_format, ParquetFormat):
raise ValueError(f"Expected ParquetFormat, got {parquet_format}")
Expand All @@ -47,6 +48,7 @@ def parse_records(
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
parquet_format = config.format or ParquetFormat()
if not isinstance(parquet_format, ParquetFormat):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError, SchemaInferenceError

JsonSchemaSupportedType = Union[List[str], Literal["string"], str]
SchemaType = Dict[str, Dict[str, JsonSchemaSupportedType]]
SchemaType = Mapping[str, Mapping[str, JsonSchemaSupportedType]]

schemaless_schema = {"type": "object", "properties": {"data": {"type": "object"}}}

Expand Down Expand Up @@ -99,7 +99,7 @@ def merge_schemas(schema1: SchemaType, schema2: SchemaType) -> SchemaType:
if not isinstance(t, dict) or "type" not in t or not _is_valid_type(t["type"]):
raise SchemaInferenceError(FileBasedSourceError.UNRECOGNIZED_TYPE, key=k, type=t)

merged_schema: Dict[str, Any] = deepcopy(schema1)
merged_schema: Dict[str, Any] = deepcopy(schema1) # type: ignore # as of 2023-08-08, deepcopy can copy Mapping
for k2, t2 in schema2.items():
t1 = merged_schema.get(k2)
if t1 is None:
Expand All @@ -116,7 +116,7 @@ def _is_valid_type(t: JsonSchemaSupportedType) -> bool:
return t == "array" or get_comparable_type(t) is not None


def _choose_wider_type(key: str, t1: Dict[str, Any], t2: Dict[str, Any]) -> Dict[str, Any]:
def _choose_wider_type(key: str, t1: Mapping[str, Any], t2: Mapping[str, Any]) -> Mapping[str, Any]:
if (t1["type"] == "array" or t2["type"] == "array") and t1 != t2:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import itertools
import traceback
from functools import cache
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Set, Union

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
from airbyte_cdk.models import Type as MessageType
Expand All @@ -20,7 +20,7 @@
StopSyncPerValidationPolicy,
)
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import merge_schemas, schemaless_schema
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType, merge_schemas, schemaless_schema
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
from airbyte_cdk.sources.file_based.types import StreamSlice
Expand Down Expand Up @@ -84,7 +84,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping
n_skipped = line_no = 0

try:
for record in parser.parse_records(self.config, file, self._stream_reader, self.logger):
for record in parser.parse_records(self.config, file, self._stream_reader, self.logger, schema):
line_no += 1
if self.config.schemaless:
record = {"data": record}
Expand Down Expand Up @@ -231,8 +231,8 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
Each file type has a corresponding `infer_schema` handler.
Dispatch on file type.
"""
base_schema: Dict[str, Any] = {}
pending_tasks: Set[asyncio.tasks.Task[Dict[str, Any]]] = set()
base_schema: SchemaType = {}
pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set()

n_started, n_files = 0, len(files)
files_iterator = iter(files)
Expand All @@ -251,7 +251,7 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:

return base_schema

async def _infer_file_schema(self, file: RemoteFile) -> Dict[str, Any]:
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
try:
return await self.get_parser(self.config.file_type).infer_schema(self.config, file, self._stream_reader, self.logger)
except Exception as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
pytest.param(0, False, None, id="test_no_skip_rows_before_header_and_no_autogenerate_column_names"),
]
)
def test_csv_format(skip_rows_before_header, autogenerate_column_names, expected_error):
def test_csv_format_skip_rows_and_autogenerate_column_names(skip_rows_before_header, autogenerate_column_names, expected_error) -> None:
if expected_error:
with pytest.raises(expected_error):
CsvFormat(skip_rows_before_header=skip_rows_before_header, autogenerate_column_names=autogenerate_column_names)
Expand Down
Loading