Skip to content

Commit

Permalink
Merge pull request #1561 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Sep 21, 2023
2 parents bb9cdba + a2bab5e commit f093827
Show file tree
Hide file tree
Showing 99 changed files with 2,650 additions and 1,594 deletions.
20 changes: 10 additions & 10 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ jobs:
- name: Install Java ${{ matrix.java }}
uses: actions/setup-java@v3
with:
distribution: temurin
distribution: zulu
java-version: ${{ matrix.java }}
- name: Install Python 3.8
uses: actions/setup-python@v4
Expand Down Expand Up @@ -435,7 +435,7 @@ jobs:
- name: Install Java ${{ matrix.java }}
uses: actions/setup-java@v3
with:
distribution: temurin
distribution: zulu
java-version: ${{ matrix.java }}
- name: List Python packages (Python 3.9, PyPy3)
run: |
Expand Down Expand Up @@ -539,7 +539,7 @@ jobs:
- name: Install Java ${{ inputs.java }}
uses: actions/setup-java@v3
with:
distribution: temurin
distribution: zulu
java-version: ${{ inputs.java }}
- name: Run tests
env: ${{ fromJSON(inputs.envs) }}
Expand Down Expand Up @@ -653,7 +653,7 @@ jobs:
- name: Install Java 8
uses: actions/setup-java@v3
with:
distribution: temurin
distribution: zulu
java-version: 8
- name: License test
run: ./dev/check-license
Expand Down Expand Up @@ -780,7 +780,7 @@ jobs:
java:
- 11
- 17
- 21-ea
- 21
runs-on: ubuntu-22.04
timeout-minutes: 300
steps:
Expand Down Expand Up @@ -817,7 +817,7 @@ jobs:
- name: Install Java ${{ matrix.java }}
uses: actions/setup-java@v3
with:
distribution: temurin
distribution: zulu
java-version: ${{ matrix.java }}
- name: Build with Maven
run: |
Expand Down Expand Up @@ -868,7 +868,7 @@ jobs:
- name: Install Java 8
uses: actions/setup-java@v3
with:
distribution: temurin
distribution: zulu
java-version: 8
- name: Build with SBT
run: |
Expand Down Expand Up @@ -919,7 +919,7 @@ jobs:
- name: Install Java 8
uses: actions/setup-java@v3
with:
distribution: temurin
distribution: zulu
java-version: 8
- name: Cache TPC-DS generated data
id: cache-tpcds-sf-1
Expand Down Expand Up @@ -1025,7 +1025,7 @@ jobs:
- name: Install Java 8
uses: actions/setup-java@v3
with:
distribution: temurin
distribution: zulu
java-version: 8
- name: Run tests
run: |
Expand Down Expand Up @@ -1084,7 +1084,7 @@ jobs:
- name: Install Java ${{ inputs.java }}
uses: actions/setup-java@v3
with:
distribution: temurin
distribution: zulu
java-version: ${{ inputs.java }}
- name: start minikube
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/build_java21.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

name: "Build (master, Scala 2.12, Hadoop 3, JDK 21-ea)"
name: "Build (master, Scala 2.12, Hadoop 3, JDK 21)"

on:
schedule:
Expand All @@ -31,7 +31,7 @@ jobs:
uses: ./.github/workflows/build_and_test.yml
if: github.repository == 'apache/spark'
with:
java: 21-ea
java: 21
branch: master
hadoop: hadoop3
envs: >-
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2894,6 +2894,8 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructTypeOr
# treated as struct or element type of array in order to make it more
# R-friendly.
if (class(schema) == "Column") {
df <- createDataFrame(list(list(0)))
jschema <- collect(select(df, schema))[[1]][[1]]
jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"createArrayType",
jschema)
Expand Down
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,18 @@
"<details>"
]
},
"CANNOT_WRITE_STATE_STORE" : {
"message" : [
"Error writing state store files for provider <providerClass>."
],
"subClass" : {
"CANNOT_COMMIT" : {
"message" : [
"Cannot perform commit during state checkpoint."
]
}
}
},
"CAST_INVALID_INPUT" : {
"message" : [
"The value <expression> of the type <sourceType> cannot be cast to <targetType> because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set <ansiConfig> to \"false\" to bypass this error."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.expressions.{ScalarUserDefinedFunction, UserDefinedFunction}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.DataType.parseTypeWithFallback
import org.apache.spark.util.SparkClassUtils

/**
* Commonly used functions available for DataFrame operations. Using functions defined here
Expand Down Expand Up @@ -1831,7 +1832,7 @@ object functions {
* @group normal_funcs
* @since 3.4.0
*/
def rand(): Column = Column.fn("rand")
def rand(): Column = Column.fn("rand", lit(SparkClassUtils.random.nextLong))

/**
* Generate a column with independent and identically distributed (i.i.d.) samples from the
Expand All @@ -1855,7 +1856,7 @@ object functions {
* @group normal_funcs
* @since 3.4.0
*/
def randn(): Column = Column.fn("randn")
def randn(): Column = Column.fn("randn", lit(SparkClassUtils.random.nextLong))

/**
* Partition ID.
Expand Down Expand Up @@ -3392,7 +3393,7 @@ object functions {
* @group misc_funcs
* @since 3.5.0
*/
def uuid(): Column = Column.fn("uuid")
def uuid(): Column = Column.fn("uuid", lit(SparkClassUtils.random.nextLong))

/**
* Returns an encrypted value of `input` using AES in given `mode` with the specified `padding`.
Expand Down Expand Up @@ -3711,7 +3712,7 @@ object functions {
* @group misc_funcs
* @since 3.5.0
*/
def random(): Column = Column.fn("random")
def random(): Column = Column.fn("random", lit(SparkClassUtils.random.nextLong))

/**
* Returns the bit position for the given input column.
Expand Down Expand Up @@ -7069,7 +7070,7 @@ object functions {
* @group collection_funcs
* @since 3.4.0
*/
def shuffle(e: Column): Column = Column.fn("shuffle", e)
def shuffle(e: Column): Column = Column.fn("shuffle", e, lit(SparkClassUtils.random.nextLong))

/**
* Returns a reversed string or an array with reverse order of elements.
Expand Down Expand Up @@ -7102,7 +7103,8 @@ object functions {
* @group collection_funcs
* @since 3.4.0
*/
def sequence(start: Column, stop: Column): Column = sequence(start, stop, lit(1L))
def sequence(start: Column, stop: Column): Column =
Column.fn("sequence", start, stop)

/**
* Creates an array containing the left argument repeated the number of times given by the right
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,24 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
assert(rc == 100)
}
}

test("SPARK-45216: Non-deterministic functions with seed") {
val session: SparkSession = spark
import session.implicits._

val df = Seq(Array.range(0, 10)).toDF("a")

val r = rand()
val r2 = randn()
val r3 = random()
val r4 = uuid()
val r5 = shuffle(col("a"))
df.select(r, r.as("r"), r2, r2.as("r2"), r3, r3.as("r3"), r4, r4.as("r4"), r5, r5.as("r5"))
.collect
.foreach { row =>
(0 until 5).foreach(i => assert(row.get(i * 2) === row.get(i * 2 + 1)))
}
}
}

private[sql] case class ClassData(a: String, b: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ class FunctionTestSuite extends ConnectFunSuite {
to_json(a, Collections.emptyMap[String, String]),
to_json(a, Map.empty[String, String]))
testEquals("sort_array", sort_array(a), sort_array(a, asc = true))
testEquals("sequence", sequence(lit(1), lit(10)), sequence(lit(1), lit(10), lit(1L)))
testEquals(
"from_csv",
from_csv(a, lit(schema.toDDL), Collections.emptyMap[String, String]),
Expand Down Expand Up @@ -279,14 +278,14 @@ class FunctionTestSuite extends ConnectFunSuite {
assert(e.hasUnresolvedFunction)
val fn = e.getUnresolvedFunction
assert(fn.getFunctionName == "rand")
assert(fn.getArgumentsCount == 0)
assert(fn.getArgumentsCount == 1)
}

test("randn no seed") {
val e = randn().expr
assert(e.hasUnresolvedFunction)
val fn = e.getUnresolvedFunction
assert(fn.getFunctionName == "randn")
assert(fn.getArgumentsCount == 0)
assert(fn.getArgumentsCount == 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,60 @@ message ReleaseExecuteResponse {
optional string operation_id = 2;
}

message FetchErrorDetailsRequest {

// (Required)
// The session_id specifies a Spark session for a user identified by user_context.user_id.
// The id should be a UUID string of the format `00112233-4455-6677-8899-aabbccddeeff`.
string session_id = 1;

// User context
UserContext user_context = 2;

// (Required)
// The id of the error.
string error_id = 3;
}

message FetchErrorDetailsResponse {

message StackTraceElement {
// The fully qualified name of the class containing the execution point.
string declaring_class = 1;

// The name of the method containing the execution point.
string method_name = 2;

// The name of the file containing the execution point.
string file_name = 3;

// The line number of the source line containing the execution point.
int32 line_number = 4;
}

// Error defines the schema for the representing exception.
message Error {
// The fully qualified names of the exception class and its parent classes.
repeated string error_type_hierarchy = 1;

// The detailed message of the exception.
string message = 2;

// The stackTrace of the exception. It will be set
// if the SQLConf spark.sql.connect.serverStacktrace.enabled is true.
repeated StackTraceElement stack_trace = 3;

// The index of the cause error in errors.
optional int32 cause_idx = 4;
}

// The index of the root error in errors. The field will not be set if the error is not found.
optional int32 root_error_idx = 1;

// A list of errors.
repeated Error errors = 2;
}

// Main interface for the SparkConnect service.
service SparkConnectService {

Expand Down Expand Up @@ -813,5 +867,8 @@ service SparkConnectService {
// Non reattachable executions are released automatically and immediately after the ExecutePlan
// RPC and ReleaseExecute may not be used.
rpc ReleaseExecute(ReleaseExecuteRequest) returns (ReleaseExecuteResponse) {}

// FetchErrorDetails retrieves the matched exception with details based on a provided error id.
rpc FetchErrorDetails(FetchErrorDetailsRequest) returns (FetchErrorDetailsResponse) {}
}

Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [arrays_zip(e#0, sequence(cast(1 as bigint), cast(20 as bigint), Some(cast(1 as bigint)), Some(America/Los_Angeles)), e, 1) AS arrays_zip(e, sequence(1, 20, 1))#0]
Project [arrays_zip(e#0, sequence(1, 20, None, Some(America/Los_Angeles)), e, 1) AS arrays_zip(e, sequence(1, 20))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [concat(cast(e#0 as array<bigint>), cast(array(1, 2) as array<bigint>), sequence(cast(33 as bigint), cast(40 as bigint), Some(cast(1 as bigint)), Some(America/Los_Angeles))) AS concat(e, array(1, 2), sequence(33, 40, 1))#0]
Project [concat(e#0, array(1, 2), sequence(33, 40, None, Some(America/Los_Angeles))) AS concat(e, array(1, 2), sequence(33, 40))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [flatten(array(cast(e#0 as array<bigint>), sequence(cast(1 as bigint), cast(10 as bigint), Some(cast(1 as bigint)), Some(America/Los_Angeles)))) AS flatten(array(e, sequence(1, 10, 1)))#0]
Project [flatten(array(e#0, sequence(1, 10, None, Some(America/Los_Angeles)))) AS flatten(array(e, sequence(1, 10)))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [sequence(cast(1 as bigint), cast(10 as bigint), Some(cast(1 as bigint)), Some(America/Los_Angeles)) AS sequence(1, 10, 1)#0]
Project [sequence(1, 10, None, Some(America/Los_Angeles)) AS sequence(1, 10)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@
"literal": {
"integer": 20
}
}, {
"literal": {
"long": "1"
}
}]
}
}]
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@
"literal": {
"integer": 40
}
}, {
"literal": {
"long": "1"
}
}]
}
}]
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@
"literal": {
"integer": 10
}
}, {
"literal": {
"long": "1"
}
}]
}
}]
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
"literal": {
"integer": 10
}
}, {
"literal": {
"long": "1"
}
}]
}
}]
Expand Down
Binary file not shown.
Loading

0 comments on commit f093827

Please sign in to comment.