Skip to content

Commit

Permalink
Allow coercible schema changes (#2167)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 committed Sep 18, 2024
1 parent 0c6b588 commit b9c1c06
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 19 deletions.
37 changes: 26 additions & 11 deletions warehouse/oso_dagster/factories/goldsky/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,38 @@
import threading
import time
from dataclasses import dataclass
from typing import (Any, Callable, Dict, List, Mapping, Optional, Tuple,
Unpack, cast)
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Unpack, cast

import arrow
import polars
from dagster import (AssetExecutionContext, DagsterLogManager,
DefaultSensorStatus, EventLogEntry, MetadataValue,
OpExecutionContext, ResourceParam, RunConfig, RunRequest,
SensorEvaluationContext, TableColumn, TableRecord,
TableSchema, asset, asset_sensor, job, op)
from dagster import (
AssetExecutionContext,
DagsterLogManager,
DefaultSensorStatus,
EventLogEntry,
MetadataValue,
OpExecutionContext,
ResourceParam,
RunConfig,
RunRequest,
SensorEvaluationContext,
TableColumn,
TableRecord,
TableSchema,
asset,
asset_sensor,
job,
op,
)
from dagster_gcp import BigQueryResource, GCSResource
from google.api_core.exceptions import (ClientError, InternalServerError,
NotFound)
from google.api_core.exceptions import ClientError, InternalServerError, NotFound
from google.cloud.bigquery import Client as BQClient
from google.cloud.bigquery import LoadJobConfig, SourceFormat, TableReference
from google.cloud.bigquery.schema import SchemaField
from oso_dagster.utils.bq import compare_schemas, get_table_schema
from oso_dagster.utils.bq import (
compare_schemas_and_ignore_safe_changes,
get_table_schema,
)
from polars.type_aliases import PolarsDataType

from ...cbt import CBTResource, TimePartitioning, UpdateStrategy
Expand Down Expand Up @@ -677,7 +692,7 @@ def ensure_schema_or_fail(self, log: logging.Logger, source_table: str, destinat
source_schema = self.load_schema_for_bq_table(source_table)
destination_schema = self.load_schema_for_bq_table(destination_table)

source_only, destination_only, modified = compare_schemas(source_schema, destination_schema)
source_only, destination_only, modified = compare_schemas_and_ignore_safe_changes(source_schema, destination_schema)
if len(modified) > 0:
log.error(dict(
msg=f"cannot merge automatically into {destination_table} schema has been altered:",
Expand Down
15 changes: 7 additions & 8 deletions warehouse/oso_dagster/factories/goldsky/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
to simplify the management of these things into a single place.
"""

from oso_dagster.utils import unpack_config, gcs_to_bucket_name
from .config import GoldskyNetworkConfig, NetworkAssetSourceConfig
from oso_dagster.utils import gcs_to_bucket_name, unpack_config

from ..common import AssetFactoryResponse, early_resources_asset_factory
from .additional import blocks_extensions, traces_extensions, transactions_extensions
from .assets import goldsky_asset
from ..common import early_resources_asset_factory, AssetFactoryResponse
from .additional import (
blocks_extensions,
transactions_extensions,
traces_extensions,
)
from .config import GoldskyNetworkConfig, NetworkAssetSourceConfig


@unpack_config(GoldskyNetworkConfig)
Expand Down Expand Up @@ -47,6 +44,7 @@ def _factory(project_id: str, staging_bucket: str):
partition_column_transform=blocks_asset_config.partition_column_transform,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
schema_overrides=blocks_asset_config.schema_overrides,
# uncomment the following value to test
# max_objects_to_load=1,
additional_factories=[blocks_extensions()],
Expand Down Expand Up @@ -130,6 +128,7 @@ def _factory(project_id: str, staging_bucket: str):
partition_column_transform=traces_asset_config.partition_column_transform,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
schema_overrides=traces_asset_config.schema_overrides,
# uncomment the following value to test
# max_objects_to_load=1,
deps=transactions.filter_assets_by_name("transactions"),
Expand Down
29 changes: 29 additions & 0 deletions warehouse/oso_dagster/utils/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ def get_table_schema(
return table.schema


SAFE_SCHEMA_MODIFICATIONS = [
{"NUMERIC", "FLOAT", "DOUBLE"},
]

SAFE_SCHEMA_MODIFICATIONS_MAP: t.Dict[str, t.Set[str]] = {}
for group in SAFE_SCHEMA_MODIFICATIONS:
for schema_type in group:
SAFE_SCHEMA_MODIFICATIONS_MAP[schema_type] = group


def compare_schemas(
schema1: t.List[SchemaField], schema2: t.List[SchemaField]
) -> t.Tuple[
Expand Down Expand Up @@ -174,6 +184,25 @@ def compare_schemas(
return schema1_only, schema2_only, modified_fields


def compare_schemas_and_ignore_safe_changes(
schema1: t.List[SchemaField], schema2: t.List[SchemaField]
) -> t.Tuple[
t.Dict[str, SchemaField],
t.Dict[str, SchemaField],
t.Dict[str, t.Dict[str, SchemaField]],
]:
schema1_only, schema2_only, modified = compare_schemas(schema1, schema2)

for field_name, modifications in modified.items():
schema1_field = modifications["schema1"]
schema2_field = modifications["schema2"]
safe_group = SAFE_SCHEMA_MODIFICATIONS_MAP[schema1_field.field_type]
if schema2_field.field_type in safe_group:
del modified[field_name]

return schema1_only, schema2_only, modified


def print_schema_diff(
schema1_only: t.Dict[str, SchemaField],
schema2_only: t.Dict[str, SchemaField],
Expand Down

0 comments on commit b9c1c06

Please sign in to comment.