Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow coercible schema changes #2167

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions warehouse/oso_dagster/factories/goldsky/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,38 @@
import threading
import time
from dataclasses import dataclass
from typing import (Any, Callable, Dict, List, Mapping, Optional, Tuple,
Unpack, cast)
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Unpack, cast

import arrow
import polars
from dagster import (AssetExecutionContext, DagsterLogManager,
DefaultSensorStatus, EventLogEntry, MetadataValue,
OpExecutionContext, ResourceParam, RunConfig, RunRequest,
SensorEvaluationContext, TableColumn, TableRecord,
TableSchema, asset, asset_sensor, job, op)
from dagster import (
AssetExecutionContext,
DagsterLogManager,
DefaultSensorStatus,
EventLogEntry,
MetadataValue,
OpExecutionContext,
ResourceParam,
RunConfig,
RunRequest,
SensorEvaluationContext,
TableColumn,
TableRecord,
TableSchema,
asset,
asset_sensor,
job,
op,
)
from dagster_gcp import BigQueryResource, GCSResource
from google.api_core.exceptions import (ClientError, InternalServerError,
NotFound)
from google.api_core.exceptions import ClientError, InternalServerError, NotFound
from google.cloud.bigquery import Client as BQClient
from google.cloud.bigquery import LoadJobConfig, SourceFormat, TableReference
from google.cloud.bigquery.schema import SchemaField
from oso_dagster.utils.bq import compare_schemas, get_table_schema
from oso_dagster.utils.bq import (
compare_schemas_and_ignore_safe_changes,
get_table_schema,
)
from polars.type_aliases import PolarsDataType

from ...cbt import CBTResource, TimePartitioning, UpdateStrategy
Expand Down Expand Up @@ -677,7 +692,7 @@ def ensure_schema_or_fail(self, log: logging.Logger, source_table: str, destinat
source_schema = self.load_schema_for_bq_table(source_table)
destination_schema = self.load_schema_for_bq_table(destination_table)

source_only, destination_only, modified = compare_schemas(source_schema, destination_schema)
source_only, destination_only, modified = compare_schemas_and_ignore_safe_changes(source_schema, destination_schema)
if len(modified) > 0:
log.error(dict(
msg=f"cannot merge automatically into {destination_table} schema has been altered:",
Expand Down
15 changes: 7 additions & 8 deletions warehouse/oso_dagster/factories/goldsky/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
to simplify the management of these things into a single place.
"""

from oso_dagster.utils import unpack_config, gcs_to_bucket_name
from .config import GoldskyNetworkConfig, NetworkAssetSourceConfig
from oso_dagster.utils import gcs_to_bucket_name, unpack_config

from ..common import AssetFactoryResponse, early_resources_asset_factory
from .additional import blocks_extensions, traces_extensions, transactions_extensions
from .assets import goldsky_asset
from ..common import early_resources_asset_factory, AssetFactoryResponse
from .additional import (
blocks_extensions,
transactions_extensions,
traces_extensions,
)
from .config import GoldskyNetworkConfig, NetworkAssetSourceConfig


@unpack_config(GoldskyNetworkConfig)
Expand Down Expand Up @@ -47,6 +44,7 @@ def _factory(project_id: str, staging_bucket: str):
partition_column_transform=blocks_asset_config.partition_column_transform,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
schema_overrides=blocks_asset_config.schema_overrides,
# uncomment the following value to test
# max_objects_to_load=1,
additional_factories=[blocks_extensions()],
Expand Down Expand Up @@ -130,6 +128,7 @@ def _factory(project_id: str, staging_bucket: str):
partition_column_transform=traces_asset_config.partition_column_transform,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
schema_overrides=traces_asset_config.schema_overrides,
# uncomment the following value to test
# max_objects_to_load=1,
deps=transactions.filter_assets_by_name("transactions"),
Expand Down
29 changes: 29 additions & 0 deletions warehouse/oso_dagster/utils/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ def get_table_schema(
return table.schema


SAFE_SCHEMA_MODIFICATIONS = [
{"NUMERIC", "FLOAT", "DOUBLE"},
]

SAFE_SCHEMA_MODIFICATIONS_MAP: t.Dict[str, t.Set[str]] = {}
for group in SAFE_SCHEMA_MODIFICATIONS:
for schema_type in group:
SAFE_SCHEMA_MODIFICATIONS_MAP[schema_type] = group


def compare_schemas(
schema1: t.List[SchemaField], schema2: t.List[SchemaField]
) -> t.Tuple[
Expand Down Expand Up @@ -174,6 +184,25 @@ def compare_schemas(
return schema1_only, schema2_only, modified_fields


def compare_schemas_and_ignore_safe_changes(
schema1: t.List[SchemaField], schema2: t.List[SchemaField]
) -> t.Tuple[
t.Dict[str, SchemaField],
t.Dict[str, SchemaField],
t.Dict[str, t.Dict[str, SchemaField]],
]:
schema1_only, schema2_only, modified = compare_schemas(schema1, schema2)

for field_name, modifications in modified.items():
schema1_field = modifications["schema1"]
schema2_field = modifications["schema2"]
safe_group = SAFE_SCHEMA_MODIFICATIONS_MAP[schema1_field.field_type]
if schema2_field.field_type in safe_group:
del modified[field_name]

return schema1_only, schema2_only, modified


def print_schema_diff(
schema1_only: t.Dict[str, SchemaField],
schema2_only: t.Dict[str, SchemaField],
Expand Down
Loading