Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Spark - update SparkStep to be able to manage the SparkSession more effectively #32

Closed
dannymeijer opened this issue May 30, 2024 · 4 comments · Fixed by #69
Closed
Assignees
Labels
enhancement New feature or request

Comments

@dannymeijer
Copy link
Member

Is your feature request related to a problem? Please describe.

A feature that was previously requested (aligned to this) was:

Having the ability to clean checkpoints via spark.cleaner.referenceTracking.cleanCheckpoints=true to opt-in to spark internal checkpoint reference tracking and cleaning

Currently, Koheesio has no specific interaction with or management of the underlying SparkSession (it simply expects the session to just 'be there').

Describe the solution you'd like

I would like to see us extend the SparkStep to be able to manage the spark session, and also be able to pass the SparkSession to the Koheesio Step as a field. At the moment, spark is only set up as a property. I propose we keep the property, but add a Field with a default. The property would then either pass whatever value is assigned to self._spark or the getActiveSession (as it does currently).

By doing this we can introduce functionality like this (inspired by Delta library):

def clean_up_spark_streaming_checkpoint(spark: SparkSession) -> SparkSession:
    # add desired config to the given spark session and return it
    return spark

Suggestion on new SparkStep code:

class SparkStep(Step):
    ...
    _spark: Optional[SparkSession] = Field(default=SparkSession.getActiveSession(), alias="spark")
    
    # TODO: add a check that validates the sparksession, and sets it to pyspark.sql.getActiveSession if it is still none

    def spark_conf_clean_up_streaming_checkpoint(self):
        # as mentioned above

This opens the door to future optimizations also.

Describe alternatives you've considered

Open for suggestions :)

Additional context

For reference, a user would call the functionality like this:

# using an incomplete DeltaStreamReader, purely as an example
reader = DeltaStreamReader(
    spark=SparkSession.builder.getOrCreate(),   # optional !
    ...
)
reader.spark_conf_clean_up_streaming_checkpoint()
@mikita-sakalouski
Copy link
Contributor

@dannymeijer Can you provide more context, why do we need to provide a session which is different from the active one ? Based on current implementation koheesio will take current Active Session. Building session is responsibility of user ( where they can create local/ remote session)

@dannymeijer
Copy link
Member Author

I don't intend to change the behavior, just want users to be able to explicitly be able to pass the SparkSession if they have configured one. This is to have compatibility with delta API for example.

@mikita-sakalouski
Copy link
Contributor

I don't intend to change the behavior, just want users to be able to explicitly be able to pass the SparkSession if they have configured one. This is to have compatibility with delta API for example.

As for me I prefer not to have it separately and only allow active spark session, without possibility to pass anything.

@mikita-sakalouski
Copy link
Contributor

If this is method of a class:

def clean_up_spark_streaming_checkpoint(spark: SparkSession) -> SparkSession:
    # add desired config to the given spark session and return it
    return spark

then it should be as:

def clean_up_spark_streaming_checkpoint(self) -> SparkSession:
    # add desired config to the given spark session and return it
    # self.spark.config.set()
    return self.spark

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

2 participants