Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1102: Create a saveAsNewAPIHadoopDataset method #12

Closed
wants to merge 5 commits into from

Conversation

CodingCat
Copy link
Contributor

https://spark-project.atlassian.net/browse/SPARK-1102

Create a saveAsNewAPIHadoopDataset method

By @mateiz: "Right now RDDs can only be saved as files using the new Hadoop API, not as "datasets" with no filename and just a JobConf. See http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ for an example of how you have to give a bogus filename. For the old Hadoop API, we have saveAsHadoopDataset."

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

1 similar comment
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@CodingCat
Copy link
Contributor Author

this is a re-opened PR, in the old PR, https://github.com/apache/incubator-spark/pull/636, all test cases have passed

Can anyone verify that and make further review?

@CodingCat
Copy link
Contributor Author

I changed back the parameter type of the new method to Configuration for keeping consistent with other APIs, and whether Job should be parameter type is still under discussion

https://spark-project.atlassian.net/browse/SPARK-1139

@CodingCat
Copy link
Contributor Author

I rebased the code after #11 was merged, and tested in my local side, I think it is ready for further review/testing

@mateiz
Copy link
Contributor

mateiz commented Mar 2, 2014

Jenkins, test this please

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12954/

@CodingCat
Copy link
Contributor Author

exceed with 5 chars....sorry.....fixed....

@mateiz
Copy link
Contributor

mateiz commented Mar 3, 2014

Jenkins, this is ok to test

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12962/

@CodingCat
Copy link
Contributor Author

This is ready to merge?

@CodingCat
Copy link
Contributor Author

any further comments?

@CodingCat
Copy link
Contributor Author

ping

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13073/

@CodingCat
Copy link
Contributor Author

@mateiz I have rebased the code, any further comments?

@CodingCat
Copy link
Contributor Author

ping

@mateiz
Copy link
Contributor

mateiz commented Mar 10, 2014

Sorry, haven't had time to look at this lately, but will do soon.

@CodingCat
Copy link
Contributor Author

No problem, thanks

@mateiz
Copy link
Contributor

mateiz commented Mar 17, 2014

Hey, sorry for the super late reply. This looks good but it would be good to add a test for saveAsNewAPIHadoopDataset directly (passing it the file output format and such through a Conf). Once you do that I think this is ready to merge.

@CodingCat
Copy link
Contributor Author

Hi, @mateiz, thank you for reviewing this,

just added the test cases for both old&new API-based saveAsHadoopDataset

JasonMWhite pushed a commit to JasonMWhite/spark that referenced this pull request Dec 2, 2015
JasonMWhite pushed a commit to JasonMWhite/spark that referenced this pull request Dec 2, 2015
…askTracker to reduce the chance of the communicating problem

Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the chance of the communicating problem

Author: YanTangZhai <hakeemzhai@tencent.com>
Author: yantangzhai <tyz0303@163.com>

Closes apache#3785 from YanTangZhai/SPARK-4946 and squashes the following commits:

9ca6541 [yantangzhai] [SPARK-4946] [CORE] Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the chance of the communicating problem
e4c2c0a [YanTangZhai] Merge pull request apache#15 from apache/master
718afeb [YanTangZhai] Merge pull request apache#12 from apache/master
6e643f8 [YanTangZhai] Merge pull request apache#11 from apache/master
e249846 [YanTangZhai] Merge pull request apache#10 from apache/master
d26d982 [YanTangZhai] Merge pull request apache#9 from apache/master
76d4027 [YanTangZhai] Merge pull request apache#8 from apache/master
03b62b0 [YanTangZhai] Merge pull request apache#7 from apache/master
8a00106 [YanTangZhai] Merge pull request apache#6 from apache/master
cbcba66 [YanTangZhai] Merge pull request apache#3 from apache/master
cdef539 [YanTangZhai] Merge pull request #1 from apache/master
JasonMWhite pushed a commit to JasonMWhite/spark that referenced this pull request Dec 2, 2015
Support ! boolean logic operator like NOT in sql as follows
select * from for_test where !(col1 > col2)

Author: YanTangZhai <hakeemzhai@tencent.com>
Author: Michael Armbrust <michael@databricks.com>

Closes apache#3555 from YanTangZhai/SPARK-4692 and squashes the following commits:

1a9f605 [YanTangZhai] Update HiveQuerySuite.scala
7c03c68 [YanTangZhai] Merge pull request apache#23 from apache/master
992046e [YanTangZhai] Update HiveQuerySuite.scala
ea618f4 [YanTangZhai] Update HiveQuerySuite.scala
192411d [YanTangZhai] Merge pull request apache#17 from YanTangZhai/master
e4c2c0a [YanTangZhai] Merge pull request apache#15 from apache/master
1e1ebb4 [YanTangZhai] Update HiveQuerySuite.scala
efc4210 [YanTangZhai] Update HiveQuerySuite.scala
bd2c444 [YanTangZhai] Update HiveQuerySuite.scala
1893956 [YanTangZhai] Merge pull request apache#14 from marmbrus/pr/3555
59e4de9 [Michael Armbrust] make hive test
718afeb [YanTangZhai] Merge pull request apache#12 from apache/master
950b21e [YanTangZhai] Update HiveQuerySuite.scala
74175b4 [YanTangZhai] Update HiveQuerySuite.scala
92242c7 [YanTangZhai] Update HiveQl.scala
6e643f8 [YanTangZhai] Merge pull request apache#11 from apache/master
e249846 [YanTangZhai] Merge pull request apache#10 from apache/master
d26d982 [YanTangZhai] Merge pull request apache#9 from apache/master
76d4027 [YanTangZhai] Merge pull request apache#8 from apache/master
03b62b0 [YanTangZhai] Merge pull request apache#7 from apache/master
8a00106 [YanTangZhai] Merge pull request apache#6 from apache/master
cbcba66 [YanTangZhai] Merge pull request apache#3 from apache/master
cdef539 [YanTangZhai] Merge pull request #1 from apache/master
WenboZhao added a commit to WenboZhao/spark that referenced this pull request May 17, 2017
* Support simple rsync schema in the executor uri

* Support simple rsync schema in the executor uri
erikerlandson pushed a commit to erikerlandson/spark that referenced this pull request Jul 28, 2017
curtishoward pushed a commit to curtishoward/spark that referenced this pull request Jan 29, 2018
* Support simple rsync schema in the executor uri

* Support simple rsync schema in the executor uri

(cherry picked from commit b043fdb)
curtishoward pushed a commit to curtishoward/spark that referenced this pull request Feb 15, 2018
* Support simple rsync schema in the executor uri

* Support simple rsync schema in the executor uri

(cherry picked from commit b043fdb)
(cherry picked from commit 6d59ab1)
curtishoward pushed a commit to curtishoward/spark that referenced this pull request Mar 23, 2018
* Support simple rsync schema in the executor uri

* Support simple rsync schema in the executor uri

(cherry picked from commit b043fdb)
(cherry picked from commit 6d59ab1)
(cherry picked from commit 8a7a336)
(cherry picked from commit e978875)
Igosuki pushed a commit to Adikteev/spark that referenced this pull request Jul 31, 2018
curtishoward pushed a commit to curtishoward/spark that referenced this pull request Oct 8, 2018
* Support simple rsync schema in the executor uri

* Support simple rsync schema in the executor uri

(cherry picked from commit b043fdb)
(cherry picked from commit 6d59ab1)
(cherry picked from commit 8a7a336)
(cherry picked from commit e978875)
(cherry picked from commit cdbef9b)
HyukjinKwon referenced this pull request in HyukjinKwon/spark Oct 16, 2018
yifeih pushed a commit to yifeih/spark that referenced this pull request Jan 23, 2019
SirOibaf added a commit to SirOibaf/spark that referenced this pull request Dec 11, 2019
ringtail added a commit to ringtail/spark that referenced this pull request May 9, 2020
…r-permission

remove script vars in entrypoint
HyukjinKwon pushed a commit that referenced this pull request Jun 10, 2020
### What changes were proposed in this pull request?

This PR proposes to make `PythonFunction` holds `Seq[Byte]` instead of `Array[Byte]` to be able to compare if the byte array has the same values for the cache manager.

### Why are the changes needed?

Currently the cache manager doesn't use the cache for `udf` if the `udf` is created again even if the functions is the same.

```py
>>> func = lambda x: x

>>> df = spark.range(1)
>>> df.select(udf(func)("id")).cache()
```
```py
>>> df.select(udf(func)("id")).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#14 AS <lambda>(id)#12]
+- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#14]
 +- *(1) Range (0, 1, step=1, splits=12)
```

This is because `PythonFunction` holds `Array[Byte]`, and `equals` method of array equals only when the both array is the same instance.

### Does this PR introduce _any_ user-facing change?

Yes, if the user reuse the Python function for the UDF, the cache manager will detect the same function and use the cache for it.

### How was this patch tested?

I added a test case and manually.

```py
>>> df.select(udf(func)("id")).explain()
== Physical Plan ==
InMemoryTableScan [<lambda>(id)#12]
   +- InMemoryRelation [<lambda>(id)#12], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(2) Project [pythonUDF0#5 AS <lambda>(id)#3]
            +- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#5]
               +- *(1) Range (0, 1, step=1, splits=12)
```

Closes #28774 from ueshin/issues/SPARK-31945/udf_cache.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
redsanket pushed a commit to redsanket/spark that referenced this pull request Feb 16, 2021
Align the dependency versions with current grid
dongjoon-hyun pushed a commit that referenced this pull request Feb 23, 2024
…n properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes #45214 from zhengruifeng/connect_fix_read_join.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
ericm-db referenced this pull request in ericm-db/spark Mar 5, 2024
…n properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#45214 from zhengruifeng/connect_fix_read_join.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
zhengruifeng added a commit that referenced this pull request Apr 30, 2024
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix #45214 to 3.5

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes #46291 from zhengruifeng/connect_fix_read_join_35.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
zhengruifeng added a commit that referenced this pull request Apr 30, 2024
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix #45214 to 3.4

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes #46290 from zhengruifeng/connect_fix_read_join_34.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.4

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[apache#12]Join LeftOuter, '`==`('index, 'id)                     '[apache#12]Join LeftOuter, '`==`('index, 'id)
!:- '[apache#9]UnresolvedRelation [test_table_1], [], false         :- '[apache#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[apache#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[apache#10]Join Inner, '`==`('id, 'index)                   +- '[apache#11]Project ['index, 'value_2]
!      :- '[apache#7]UnresolvedRelation [test_table_1], [], false      +- '[apache#10]Join Inner, '`==`('id, 'index)
!      +- '[apache#8]UnresolvedRelation [test_table_2], [], false         :- '[apache#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[apache#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[apache#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[apache#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46290 from zhengruifeng/connect_fix_read_join_34.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
(cherry picked from commit 5f58fa7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants