Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47818][CONNECT] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests #46012

Closed
wants to merge 10 commits into from

Conversation

xi-db
Copy link
Contributor

@xi-db xi-db commented Apr 11, 2024

What changes were proposed in this pull request?

While building the DataFrame step by step, each time a new DataFrame is generated with an empty schema, which is lazily computed on access. However, if a user's code frequently accesses the schema of these new DataFrames using methods such as df.columns, it will result in a large number of Analyze requests to the server. Each time, the entire plan needs to be reanalyzed, leading to poor performance, especially when constructing highly complex plans.

Now, by introducing plan cache in SparkConnectPlanner, we aim to reduce the overhead of repeated analysis during this process. This is achieved by saving significant computation if the resolved logical plan of a subtree of can be cached.

A minimal example of the problem:

import pyspark.sql.functions as F
df = spark.range(10)
for i in range(200):
  if str(i) not in df.columns: # <-- The df.columns call causes a new Analyze request in every iteration
    df = df.withColumn(str(i), F.col("id") + i)
df.show() 

With this patch, the performance of the above code improved from ~110s to ~5s.

Why are the changes needed?

The performance improvement is huge in the above cases.

Does this PR introduce any user-facing change?

Yes, a static conf spark.connect.session.planCache.maxSize and a dynamic conf spark.connect.session.planCache.enabled are added.

  • spark.connect.session.planCache.maxSize: Sets the maximum number of cached resolved logical plans in Spark Connect Session. If set to a value less or equal than zero will disable the plan cache
  • spark.connect.session.planCache.enabled: When true, the cache of resolved logical plans is enabled if spark.connect.session.planCache.maxSize is greater than zero. When false, the cache is disabled even if spark.connect.session.planCache.maxSize is greater than zero. The caching is best-effort and not guaranteed.

How was this patch tested?

Some new tests are added in SparkConnectSessionHolderSuite.scala.

Was this patch authored or co-authored using generative AI tooling?

No.

@beliefer
Copy link
Contributor

I submitted #43473 half year ago.
It seems not received enough attention.

@xi-db xi-db changed the title [WIP][SPARK-47818] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [SPARK-47818] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests Apr 12, 2024
@xi-db xi-db changed the title [SPARK-47818] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [SPARK-47818][CONNECT] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests Apr 12, 2024
@zhengruifeng
Copy link
Contributor

cc @HyukjinKwon and @ueshin

@xi-db xi-db changed the title [SPARK-47818][CONNECT] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [WIP][SPARK-47818][CONNECT] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests Apr 12, 2024
* @param transform Function to transform the relation into a logical plan.
* @return The logical plan.
*/
private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)(
Copy link
Contributor

@zhengruifeng zhengruifeng Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want to exclude some kinds of plans:
1, LOCAL_RELATION, its size might be large;
2, CACHED_REMOTE_RELATION, it is already cached in dataFrameCache in the SessionHolder;
3, CATALOG, it may be a command like CREATE_TABLE/DROP_TEMP_VIEW , should not be skipped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhengruifeng Does point 3 extend to all DDL/DML commands?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1, LOCAL_RELATION, its size might be large;
2, CACHED_REMOTE_RELATION, it is already cached in dataFrameCache in the SessionHolder;

We use a low default value for CONNECT_SESSION_PLAN_CACHE_SIZE, 5 entries to be specific, to prevent the cache from becoming too large. We haven't been too surgical in what kind of plans we cache to keep it as a simple wrapper in aiding common scenarios that may generate some repeated analysis/execute calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3, CATALOG, it may be a command like CREATE_TABLE/DROP_TEMP_VIEW , should not be skipped.

Commands should be fine. Each command will get a new plan_id, so a new command won't hit the cache. Besides, we only cache plans with plan_id set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I forgot the new plan_id. Then it's fine

@github-actions github-actions bot added the CORE label Apr 15, 2024
@xi-db xi-db changed the title [WIP][SPARK-47818][CONNECT] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [SPARK-47818][CONNECT] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests Apr 16, 2024
@HyukjinKwon
Copy link
Member

Merged to master.

hvanhovell pushed a commit that referenced this pull request Apr 26, 2024
…tPlanner to improve performance of Analyze requests

### What changes were proposed in this pull request?

In [the previous PR](#46012), we cache plans of AnalyzePlan requests. We're also enabling it for ExecutePlan in this PR.

### Why are the changes needed?

Some operations like spark.sql() issue ExecutePlan requests. By caching them, we can also improve performance if subsequent plans to be analyzed include the plan returned by ExecutePlan as a subtree.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46098 from xi-db/SPARK-47818-plan-cache-followup.

Authored-by: Xi Lyu <xi.lyu@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
…tPlanner to improve performance of Analyze requests

### What changes were proposed in this pull request?

In [the previous PR](apache#46012), we cache plans of AnalyzePlan requests. We're also enabling it for ExecutePlan in this PR.

### Why are the changes needed?

Some operations like spark.sql() issue ExecutePlan requests. By caching them, we can also improve performance if subsequent plans to be analyzed include the plan returned by ExecutePlan as a subtree.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46098 from xi-db/SPARK-47818-plan-cache-followup.

Authored-by: Xi Lyu <xi.lyu@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
hvanhovell pushed a commit that referenced this pull request May 17, 2024
…tPlanner to improve performance of Analyze requests

### What changes were proposed in this pull request?

In [this previous PR](#46012), we introduced two new confs for the introduced plan cache - a static conf `spark.connect.session.planCache.maxSize` and a dynamic conf `spark.connect.session.planCache.enabled`. The plan cache is enabled by default with size 5. In this PR, we are marking them as internal because we don't expect users to deal with it.

### Why are the changes needed?

These two confs are not expected to be used under normal circumstances, and we don't need to document them on the Spark Configuration reference page.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46638 from xi-db/SPARK-47818-plan-cache-followup2.

Authored-by: Xi Lyu <xi.lyu@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants