diff --git a/warehouse/oso_dagster/factories/goldsky/assets.py b/warehouse/oso_dagster/factories/goldsky/assets.py index 5d637291..6c1e0c72 100644 --- a/warehouse/oso_dagster/factories/goldsky/assets.py +++ b/warehouse/oso_dagster/factories/goldsky/assets.py @@ -8,23 +8,38 @@ import threading import time from dataclasses import dataclass -from typing import (Any, Callable, Dict, List, Mapping, Optional, Tuple, - Unpack, cast) +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Unpack, cast import arrow import polars -from dagster import (AssetExecutionContext, DagsterLogManager, - DefaultSensorStatus, EventLogEntry, MetadataValue, - OpExecutionContext, ResourceParam, RunConfig, RunRequest, - SensorEvaluationContext, TableColumn, TableRecord, - TableSchema, asset, asset_sensor, job, op) +from dagster import ( + AssetExecutionContext, + DagsterLogManager, + DefaultSensorStatus, + EventLogEntry, + MetadataValue, + OpExecutionContext, + ResourceParam, + RunConfig, + RunRequest, + SensorEvaluationContext, + TableColumn, + TableRecord, + TableSchema, + asset, + asset_sensor, + job, + op, +) from dagster_gcp import BigQueryResource, GCSResource -from google.api_core.exceptions import (ClientError, InternalServerError, - NotFound) +from google.api_core.exceptions import ClientError, InternalServerError, NotFound from google.cloud.bigquery import Client as BQClient from google.cloud.bigquery import LoadJobConfig, SourceFormat, TableReference from google.cloud.bigquery.schema import SchemaField -from oso_dagster.utils.bq import compare_schemas, get_table_schema +from oso_dagster.utils.bq import ( + compare_schemas_and_ignore_safe_changes, + get_table_schema, +) from polars.type_aliases import PolarsDataType from ...cbt import CBTResource, TimePartitioning, UpdateStrategy @@ -677,7 +692,7 @@ def ensure_schema_or_fail(self, log: logging.Logger, source_table: str, destinat source_schema = self.load_schema_for_bq_table(source_table) destination_schema = self.load_schema_for_bq_table(destination_table) - source_only, destination_only, modified = compare_schemas(source_schema, destination_schema) + source_only, destination_only, modified = compare_schemas_and_ignore_safe_changes(source_schema, destination_schema) if len(modified) > 0: log.error(dict( msg=f"cannot merge automatically into {destination_table} schema has been altered:", diff --git a/warehouse/oso_dagster/factories/goldsky/network.py b/warehouse/oso_dagster/factories/goldsky/network.py index 9f1cd44d..221d93a7 100644 --- a/warehouse/oso_dagster/factories/goldsky/network.py +++ b/warehouse/oso_dagster/factories/goldsky/network.py @@ -3,15 +3,12 @@ to simplify the management of these things into a single place. """ -from oso_dagster.utils import unpack_config, gcs_to_bucket_name -from .config import GoldskyNetworkConfig, NetworkAssetSourceConfig +from oso_dagster.utils import gcs_to_bucket_name, unpack_config + +from ..common import AssetFactoryResponse, early_resources_asset_factory +from .additional import blocks_extensions, traces_extensions, transactions_extensions from .assets import goldsky_asset -from ..common import early_resources_asset_factory, AssetFactoryResponse -from .additional import ( - blocks_extensions, - transactions_extensions, - traces_extensions, -) +from .config import GoldskyNetworkConfig, NetworkAssetSourceConfig @unpack_config(GoldskyNetworkConfig) @@ -47,6 +44,7 @@ def _factory(project_id: str, staging_bucket: str): partition_column_transform=blocks_asset_config.partition_column_transform, source_bucket_name=staging_bucket_name, destination_bucket_name=staging_bucket_name, + schema_overrides=blocks_asset_config.schema_overrides, # uncomment the following value to test # max_objects_to_load=1, additional_factories=[blocks_extensions()], @@ -130,6 +128,7 @@ def _factory(project_id: str, staging_bucket: str): partition_column_transform=traces_asset_config.partition_column_transform, source_bucket_name=staging_bucket_name, destination_bucket_name=staging_bucket_name, + schema_overrides=traces_asset_config.schema_overrides, # uncomment the following value to test # max_objects_to_load=1, deps=transactions.filter_assets_by_name("transactions"), diff --git a/warehouse/oso_dagster/utils/bq.py b/warehouse/oso_dagster/utils/bq.py index 5266b405..9db4fd1b 100644 --- a/warehouse/oso_dagster/utils/bq.py +++ b/warehouse/oso_dagster/utils/bq.py @@ -130,6 +130,16 @@ def get_table_schema( return table.schema +SAFE_SCHEMA_MODIFICATIONS = [ + {"NUMERIC", "FLOAT", "DOUBLE"}, +] + +SAFE_SCHEMA_MODIFICATIONS_MAP: t.Dict[str, t.Set[str]] = {} +for group in SAFE_SCHEMA_MODIFICATIONS: + for schema_type in group: + SAFE_SCHEMA_MODIFICATIONS_MAP[schema_type] = group + + def compare_schemas( schema1: t.List[SchemaField], schema2: t.List[SchemaField] ) -> t.Tuple[ @@ -174,6 +184,25 @@ def compare_schemas( return schema1_only, schema2_only, modified_fields +def compare_schemas_and_ignore_safe_changes( + schema1: t.List[SchemaField], schema2: t.List[SchemaField] +) -> t.Tuple[ + t.Dict[str, SchemaField], + t.Dict[str, SchemaField], + t.Dict[str, t.Dict[str, SchemaField]], +]: + schema1_only, schema2_only, modified = compare_schemas(schema1, schema2) + + for field_name, modifications in modified.items(): + schema1_field = modifications["schema1"] + schema2_field = modifications["schema2"] + safe_group = SAFE_SCHEMA_MODIFICATIONS_MAP[schema1_field.field_type] + if schema2_field.field_type in safe_group: + del modified[field_name] + + return schema1_only, schema2_only, modified + + def print_schema_diff( schema1_only: t.Dict[str, SchemaField], schema2_only: t.Dict[str, SchemaField],