You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, the Python Delta Table API does not fully support Spark Connect. This is because when using Spark Connect, lower level APIs such as the Spark Context are not available. The Spark Context object is however used in the DeltaTable.forPath and DeltaTable.forName methods, which means that these two methods cannot be used with Spark Connect.
Motivation
This feature will benefit all PySpark users who want to use the DeltaTable Python API in their code when using Spark Connect.
Further details
The Delta Table SQL API is supported by Spark Connect. For the moment I have been converting PySpark DataFrames to Temporary Views, and then used Spark SQL to do merges.
For others who might need this in the mean time, The following code provides a function to convert a PySpark DataFrame into a TempView.
importloggingimportrandomimportstringfromcontextlibimportcontextmanagerfromtypingimportGeneratorfrompyspark.errorsimportTempTableAlreadyExistsExceptionfrompyspark.sqlimportDataFramelogger=logging.getLogger(__name__)
classMultipleFailuresException(Exception):
"""Custom exception raised when an operation has failed after several retries."""passdefgenerate_random_temp_view_name(length: int) ->str:
""" Generates a random temp view name consisting of lowercase letters of specified length using the 'string.ascii_lowercase' module and the 'random.choice' function. :param length: Length of the random temp view name to be generated. :return: Random temp view name as a string. """# Use string.ascii_lowercase to get all lowercase letterslowercase_letters=string.ascii_lowercase# Use random.choice to randomly select characters from the set of lowercase lettersrandom_temp_view_name=''.join(random.choice(lowercase_letters) for_inrange(length))
returnrandom_temp_view_name@contextmanagerdefcreate_temp_view(df: DataFrame, max_attempts: int=5) ->Generator[str, None, None]:
""" Create a temporary view for a given DataFrame. :param df: The DataFrame to be used for creating the temporary view. :param max_attempts: The maximum number of attempts to create the temporary view if it already exists. Default is 5. :return: The name of the created temporary view. :raises MultipleFailuresException: If the temporary view creation fails after the maximum number of attempts. """temp_view_name=Noneleft_attempts=max_attemptstry:
whiletemp_view_nameisNoneandleft_attempts>0:
try:
temp_view_name=generate_random_temp_view_name(length=12)
df.createTempView(temp_view_name)
logger.info(f"Created temp view named '{temp_view_name}'!")
exceptTempTableAlreadyExistsExceptionase:
logger.warning(
msg=f"Temp view named '{temp_view_name}' already exists! Attempts left: {left_attempts}.",
exc_info=e
)
left_attempts-=1iftemp_view_nameisNone:
raiseMultipleFailuresException(f"Temp view creation failed after {max_attempts} attempts!")
yieldtemp_view_namefinally:
iftemp_view_nameisNone:
logger.warning("No temp view created so nothing to drop!")
else:
ifdf.sparkSession.catalog.dropTempView(temp_view_name):
logger.info(f"Dropped temp view named '{temp_view_name}'!")
else:
logger.info(f"Failed to drop temp view named '{temp_view_name}'!")
Now, say to_upsert is the PySpark DataFrame you want to upsert into the DeltaTable located at path. You can use the provided function like so:
withcreate_temp_view(to_upsert) astemp_view:
merge_condition=f"target.some_column = {temp_view}.some_column"logger.info(f"Merge condition is: {merge_condition}.")
merge_query=f""" MERGE INTO delta.`{path}` AS target USING {temp_view} ON {merge_condition} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """logger.info(f"Merge query is: {merge_query}.")
spark.sql(merge_query)
Willingness to contribute
The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?
Yes. I can contribute this feature independently.
Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
No. I cannot contribute this feature at this time.
The text was updated successfully, but these errors were encountered:
Hey @sebastiandaberdaku thanks for opening this! I just created a parent issue #3240 which addresses this for both the Scala and Python DeltaTable APIs.
This is definitely on our roadmap for future releases (and the Delta 4.0.0 Preview release will include some partial support!)
Feature request
Which Delta project/connector is this regarding?
Spark Connect support for the Python API
Overview
Currently, the Python Delta Table API does not fully support Spark Connect. This is because when using Spark Connect, lower level APIs such as the Spark Context are not available. The Spark Context object is however used in the
DeltaTable.forPath
andDeltaTable.forName
methods, which means that these two methods cannot be used with Spark Connect.Motivation
This feature will benefit all PySpark users who want to use the DeltaTable Python API in their code when using Spark Connect.
Further details
The Delta Table SQL API is supported by Spark Connect. For the moment I have been converting PySpark DataFrames to Temporary Views, and then used Spark SQL to do merges.
For others who might need this in the mean time, The following code provides a function to convert a PySpark DataFrame into a TempView.
Now, say
to_upsert
is the PySpark DataFrame you want to upsert into the DeltaTable located atpath
. You can use the provided function like so:Willingness to contribute
The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?
The text was updated successfully, but these errors were encountered: