Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic DeltaTable error: External error: Arrow error: Invalid argument error: arguments need to have the same data type - while merge data in to delta table #2423

Closed
murughu opened this issue Apr 15, 2024 · 12 comments
Labels
bug Something isn't working

Comments

@murughu
Copy link

murughu commented Apr 15, 2024

Environment

Delta-rs version:
v0.16.0 up to main
Binding:

Environment:

  • Cloud provider: Azure
  • OS: linux(debian)
  • Other:

Bug

when merge data into existing delta table(which created using spark) getting this error " Generic DeltaTable error: External error: Arrow error: Invalid argument error: arguments need to have the same data type"

What happened:

  1. created delta table using spark:
  2. getting error when merge the data into that delta table using delta-rs/pyarrow

What you expected to happen:
should able to merge the data into existing delta-table using delta-rs lib.

How to reproduce it:
Spark Code :


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import current_timestamp, from_utc_timestamp
from pyspark.sql import SparkSession

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, schema)

spark.conf.set("spark.sql.session.timeZone", "UTC")
df1 = df.withColumn("datalake_updated_timestampUtc", current_timestamp())

final_columns = ["datalake_updated_timestampUtc",*df.columns]
print(f"final delta table schema: {final_columns}")
df4 = df1.select(final_columns)
delta_path = "*********************"

df4.write.format("delta").mode("overwrite").save(delta_path):

delta-rs/pyarrow code

def generate_values( value, range_val):
      for i in range(range_val):
          yield value
  
def write_delta():
      import pyarrow as pa   
      from deltalake import DeltaTable
    
    schema = pa.schema([
        pa.field("name", pa.string()),
        pa.field("age", pa.int32())
    ])
    data = [
        pa.array(["Bala", "rama", "Charlie"]),
        pa.array([25, 30, 35])  
    ]
    
    batch = pa.record_batch(data, schema=schema)
    final_table = pa.Table.from_batches([batch])
    
    datalake_updated_timestampUtc = pa.array(generate_values(datetime.now(),3), type=pa.timestamp('ns'))
    fld_timestmp = pa.field("datalake_updated_timestampUtc",pa.timestamp('ns'), True)
    
    new_table_1 = final_table.add_column(0, fld_timestmp, datalake_updated_timestampUtc)    

    delta_table_path = f"************"
    account_name ="***********"
    container_name ="*********"
    tenant_id="*****************"
    client_id="*******************"
    client_secret="****************"
    storage_options = { 'account_name': account_name, 'container_name':container_name, 'tenant_id': tenant_id, 'client_id': client_id,'client_secret': client_secret}
    join_cond = "target.name = source.name"
    dt = DeltaTable(delta_table_path,storage_options =storage_options)
    dt.merge(
            source=new_table_1,
            predicate= join_cond ,
            error_on_type_mismatch = False,
            source_alias="source",
            target_alias="target", ).when_matched_update_all().when_not_matched_insert_all().execute()

More Detail:
getting this error when we add timestamp column.

@murughu murughu added the bug Something isn't working label Apr 15, 2024
@ion-elgreco
Copy link
Collaborator

Spark uses deprecated parquet dtypes, you should set this to resolve it:

SparkSession.config("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")

@ion-elgreco ion-elgreco closed this as not planned Won't fix, can't repro, duplicate, stale Apr 15, 2024
@murughu
Copy link
Author

murughu commented Apr 16, 2024

@ion-elgreco thanks,
I tried with above, but it doesn't work.. getting same error..

@ion-elgreco
Copy link
Collaborator

@murughu you should rewrite the table with that spark config setting

@murughu
Copy link
Author

murughu commented Apr 16, 2024

@ion-elgreco yes I did the same. i create new delta table with this config and then try to merge that newly created table using delta-rs

@ion-elgreco
Copy link
Collaborator

@murughu
This code blow should be in 'us' not 'ns' to start with, can you try that

datalake_updated_timestampUtc = pa.array(generate_values(datetime.now(),3), type=pa.timestamp('ns'))
fld_timestmp = pa.field("datalake_updated_timestampUtc",pa.timestamp('ns'), True)

@murughu
Copy link
Author

murughu commented Apr 16, 2024

@ion-elgreco, yes tried, still i am getting same error

    datalake_updated_timestampUtc = pa.array(generate_values(datetime.now(),3), type=pa.timestamp('us'))
    fld_timestmp = pa.field("datalake_updated_timestampUtc",pa.timestamp('us'), True)

@ion-elgreco ion-elgreco reopened this Apr 16, 2024
@ion-elgreco
Copy link
Collaborator

@murughu then I'm not sure what the issue is.

@sherlockbeard
Copy link
Contributor

just tested the code with @ion-elgreco suggestion SparkSession.config("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") and its working fine .

pyspark code


from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import current_timestamp
from delta import *
import pyspark

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])


data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, schema)

spark.conf.set("spark.sql.session.timeZone", "UTC")
df1 = df.withColumn("datalake_updated_timestampUtc", current_timestamp())

final_columns = ["datalake_updated_timestampUtc",*df.columns]
print(f"final delta table schema: {final_columns}")
df4 = df1.select(final_columns)
delta_path = "***" # local file path 

df4.write.format("delta").mode("overwrite").save(delta_path)

also you can check the data type of your parquet column i use parquet-tools for it .

column schema without config
image

with config
image

@ion-elgreco
Copy link
Collaborator

@sherlockbeard thanks for confirming! Will close it then :)

@murughu
Copy link
Author

murughu commented Apr 22, 2024

@ion-elgreco @sherlockbeard Thanks

still I am facing the same issue, blow are my observation
with .config("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
image

isAdjustedToUTC=true but delta-rs is expecting isAdjustedToUTC=false

to make timeisAdjustedToUTC=false, I converted the timestamp datetype to TIMESTAMP_NTZ in my spark code.
df1 = df.withColumn("datalake_updated_timestampUtc", current_timestamp().cast('TIMESTAMP_NTZ'))
image

but this time isAdjustedToUTC=false but converted_type(legacy) : None.

when I try to merge this table using delta-rs getting below error: ( i think while creating DeltaTable object got this error)

dt = DeltaTable(delta_table_path,storage_options =storage_options)
File "/home/airflow/.local/lib/python3.8/site-packages/deltalake/table.py", line 401, in init
self._table = RawDeltaTable(
_internal.DeltaProtocolError: Invalid JSON in file stats: data did not match any variant of untagged enum DataType at line 1 column 89

@sherlockbeard
Copy link
Contributor

isAdjustedToUTC=true but delta-rs is expecting isAdjustedToUTC=false

weird I tried with isAdjustedToUTC=true and delta-rs was able to merge it .

with df1 = df.withColumn("datalake_updated_timestampUtc", current_timestamp().cast('TIMESTAMP_NTZ'))
I got the same column schema as you .
but I am still able to run the python delta-rs code you provided and complete the merge

delta-rs 0.16.4
spark 3.5.1
delta-spark 3.1.0

@murughu
Copy link
Author

murughu commented May 5, 2024

@sherlockbeard @ion-elgreco thanks,
its working fine with latest delta-rs version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants