Skip to content

Commit

Permalink
Merge pull request #1518 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Jul 7, 2023
2 parents cfaa36e + d4277b8 commit 7abf7eb
Show file tree
Hide file tree
Showing 123 changed files with 3,381 additions and 1,139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,24 +281,21 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
verifyMetrics(4, 0, 0, 0, 0, 0, 4);
}

@Test(expected = RuntimeException.class)
@Test
public void testFailureAfterData() throws IOException {
StreamCallbackWithID stream =
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0, 0));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
try {
pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
} catch (RuntimeException e) {
assertTrue(e.getMessage().contains("is empty"));
verifyMetrics(4, 0, 0, 0, 0, 0, 4);
throw e;
}
RuntimeException e = assertThrows(RuntimeException.class,
() -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0));
assertTrue(e.getMessage().contains("is empty"));
verifyMetrics(4, 0, 0, 0, 0, 0, 4);
}

@Test(expected = RuntimeException.class)
@Test
public void testFailureAfterMultipleDataBlocks() throws IOException {
StreamCallbackWithID stream =
pushResolver.receiveBlockDataAsStream(
Expand All @@ -308,13 +305,10 @@ public void testFailureAfterMultipleDataBlocks() throws IOException {
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
try {
pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
} catch (RuntimeException e) {
assertTrue(e.getMessage().contains("is empty"));
verifyMetrics(9, 0, 0, 0, 0, 0, 9);
throw e;
}
RuntimeException e = assertThrows(RuntimeException.class,
() -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0));
assertTrue(e.getMessage().contains("is empty"));
verifyMetrics(9, 0, 0, 0, 0, 0, 9);
}

@Test
Expand Down
103 changes: 57 additions & 46 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
],
"sqlState" : "22546"
},
"CANNOT_INVOKE_IN_TRANSFORMATIONS" : {
"message" : [
"Dataset transformations and actions can only be invoked by the driver, not inside of other Dataset transformations; for example, dataset1.map(x => dataset2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the dataset1.map transformation. For more information, see SPARK-28702."
]
},
"CANNOT_LOAD_FUNCTION_CLASS" : {
"message" : [
"Cannot load class <className> when registering the function <functionName>, please make sure it is on the classpath."
Expand Down Expand Up @@ -724,6 +729,11 @@
],
"sqlState" : "42K04"
},
"FIELDS_ALREADY_EXISTS" : {
"message" : [
"Cannot <op> column, because <fieldNames> already exists in <struct>."
]
},
"FIELD_NOT_FOUND" : {
"message" : [
"No such struct field <fieldName> in <fields>."
Expand Down Expand Up @@ -1038,6 +1048,12 @@
],
"sqlState" : "22003"
},
"INVALID_BITMAP_POSITION" : {
"message" : [
"The 0-indexed bitmap position <bitPosition> is out of bounds. The bitmap has <bitmapNumBits> bits (<bitmapNumBytes> bytes)."
],
"sqlState" : "22003"
},
"INVALID_BOUNDARY" : {
"message" : [
"The boundary <boundary> is invalid: <invalidValue>."
Expand Down Expand Up @@ -1186,14 +1202,19 @@
"The escape character is not allowed to precede <char>."
]
},
"MISMATCH_INPUT" : {
"message" : [
"The input <inputType> '<input>' does not match the format."
]
},
"THOUSANDS_SEPS_MUST_BEFORE_DEC" : {
"message" : [
"Thousands separators (, or G) may not appear after the decimal point in the number format."
]
},
"UNEXPECTED_TOKEN" : {
"message" : [
"Found the unexpected <token> in the format string; the structure of the format string must match: [MI|S] [$] [0|9|G|,]* [.|D] [0|9]* [$] [PR|MI|S]."
"Found the unexpected <token> in the format string; the structure of the format string must match: `[MI|S]` `[$]` `[0|9|G|,]*` `[.|D]` `[0|9]*` `[$]` `[PR|MI|S]`."
]
},
"WRONG_NUM_DIGIT" : {
Expand Down Expand Up @@ -1341,6 +1362,16 @@
"Invalid observed metrics."
],
"subClass" : {
"AGGREGATE_EXPRESSION_WITH_DISTINCT_UNSUPPORTED" : {
"message" : [
"Aggregate expression with distinct are not allowed in observed metrics, but found: <expr>."
]
},
"AGGREGATE_EXPRESSION_WITH_FILTER_UNSUPPORTED" : {
"message" : [
"Aggregate expression with filter predicate are not allowed in observed metrics, but found: <expr>."
]
},
"MISSING_NAME" : {
"message" : [
"The observed metrics should be named: <operator>."
Expand All @@ -1351,6 +1382,11 @@
"Nested aggregates are not allowed in observed metrics, but found: <expr>."
]
},
"NON_AGGREGATE_FUNC_ARG_IS_ATTRIBUTE" : {
"message" : [
"Attribute <expr> can only be used as an argument to an aggregate function."
]
},
"NON_AGGREGATE_FUNC_ARG_IS_NON_DETERMINISTIC" : {
"message" : [
"Non-deterministic expression <expr> can only be used as an argument to an aggregate function."
Expand Down Expand Up @@ -2577,6 +2613,11 @@
"Drop the namespace <namespace>."
]
},
"HIVE_WITH_ANSI_INTERVALS" : {
"message" : [
"Hive table <tableName> with ANSI intervals."
]
},
"INSERT_PARTITION_SPEC_IF_NOT_EXISTS" : {
"message" : [
"INSERT INTO <tableName> with IF NOT EXISTS in the PARTITION spec."
Expand All @@ -2592,6 +2633,11 @@
"Referencing lateral column alias <lca> in the aggregate query both with window expressions and with having clause. Please rewrite the aggregate query by removing the having clause or removing lateral alias reference in the SELECT list."
]
},
"LATERAL_COLUMN_ALIAS_IN_GROUP_BY" : {
"message" : [
"Referencing a lateral column alias via GROUP BY alias/ALL is not supported yet."
]
},
"LATERAL_COLUMN_ALIAS_IN_WINDOW" : {
"message" : [
"Referencing a lateral column alias <lca> in window expression <windowExpr>."
Expand Down Expand Up @@ -2657,6 +2703,11 @@
"Remove a comment from the namespace <namespace>."
]
},
"REPLACE_NESTED_COLUMN" : {
"message" : [
"The replace function does not support nested column <colName>."
]
},
"SET_NAMESPACE_PROPERTY" : {
"message" : [
"<property> is a reserved namespace property, <msg>."
Expand Down Expand Up @@ -2903,6 +2954,11 @@
"3. set \"spark.sql.legacy.allowUntypedScalaUDF\" to \"true\" and use this API with caution."
]
},
"UPDATE_FIELD_WITH_STRUCT_UNSUPPORTED" : {
"message" : [
"Cannot update <table> field <fieldName> type: update a struct by updating its fields."
]
},
"VIEW_ALREADY_EXISTS" : {
"message" : [
"Cannot create view <relationName> because it already exists.",
Expand Down Expand Up @@ -5621,56 +5677,11 @@
"<message>"
]
},
"_LEGACY_ERROR_TEMP_2274" : {
"message" : [
"Nested field <colName> is not supported."
]
},
"_LEGACY_ERROR_TEMP_2275" : {
"message" : [
"Dataset transformations and actions can only be invoked by the driver, not inside of other Dataset transformations; for example, dataset1.map(x => dataset2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the dataset1.map transformation. For more information, see SPARK-28702."
]
},
"_LEGACY_ERROR_TEMP_2276" : {
"message" : [
"Hive table <tableName> with ANSI intervals is not supported."
]
},
"_LEGACY_ERROR_TEMP_2277" : {
"message" : [
"Number of dynamic partitions created is <numWrittenParts>, which is more than <maxDynamicPartitions>. To solve this try to set <maxDynamicPartitionsKey> to at least <numWrittenParts>."
]
},
"_LEGACY_ERROR_TEMP_2278" : {
"message" : [
"The input <valueType> '<input>' does not match the given number format: '<format>'."
]
},
"_LEGACY_ERROR_TEMP_2320" : {
"message" : [
"distinct aggregates are not allowed in observed metrics, but found: <sqlExpr>."
]
},
"_LEGACY_ERROR_TEMP_2321" : {
"message" : [
"aggregates with filter predicate are not allowed in observed metrics, but found: <sqlExpr>."
]
},
"_LEGACY_ERROR_TEMP_2322" : {
"message" : [
"attribute <sqlExpr> can only be used as an argument to an aggregate function."
]
},
"_LEGACY_ERROR_TEMP_2323" : {
"message" : [
"Cannot <op> column, because <fieldNames> already exists in <struct>."
]
},
"_LEGACY_ERROR_TEMP_2324" : {
"message" : [
"Cannot update <table> field <fieldName> type: update a struct by updating its fields."
]
},
"_LEGACY_ERROR_TEMP_2325" : {
"message" : [
"Cannot update <table> field <fieldName> type: update a map by updating <fieldName>.key or <fieldName>.value."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.{Collections, Locale}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.spark.SparkException
Expand Down Expand Up @@ -239,17 +240,13 @@ class Dataset[T] private[sql] (
* @since 3.4.0
*/
def schema: StructType = {
if (encoder == UnboundRowEncoder) {
DataTypeProtoConverter
.toCatalystType(
sparkSession
.analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA)
.getSchema
.getSchema)
.asInstanceOf[StructType]
} else {
encoder.schema
}
DataTypeProtoConverter
.toCatalystType(
sparkSession
.analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA)
.getSchema
.getSchema)
.asInstanceOf[StructType]
}

/**
Expand Down Expand Up @@ -572,7 +569,7 @@ class Dataset[T] private[sql] (
}
}

private def toJoinType(name: String): proto.Join.JoinType = {
private def toJoinType(name: String, skipSemiAnti: Boolean = false): proto.Join.JoinType = {
name.trim.toLowerCase(Locale.ROOT) match {
case "inner" =>
proto.Join.JoinType.JOIN_TYPE_INNER
Expand All @@ -584,12 +581,12 @@ class Dataset[T] private[sql] (
proto.Join.JoinType.JOIN_TYPE_LEFT_OUTER
case "right" | "rightouter" | "right_outer" =>
proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER
case "semi" | "leftsemi" | "left_semi" =>
case "semi" | "leftsemi" | "left_semi" if !skipSemiAnti =>
proto.Join.JoinType.JOIN_TYPE_LEFT_SEMI
case "anti" | "leftanti" | "left_anti" =>
case "anti" | "leftanti" | "left_anti" if !skipSemiAnti =>
proto.Join.JoinType.JOIN_TYPE_LEFT_ANTI
case _ =>
throw new IllegalArgumentException(s"Unsupported join type `joinType`.")
case e =>
throw new IllegalArgumentException(s"Unsupported join type '$e'.")
}
}

Expand Down Expand Up @@ -839,6 +836,80 @@ class Dataset[T] private[sql] (
}
}

/**
* Joins this Dataset returning a `Tuple2` for each pair where `condition` evaluates to true.
*
* This is similar to the relation `join` function with one important difference in the result
* schema. Since `joinWith` preserves objects present on either side of the join, the result
* schema is similarly nested into a tuple under the column names `_1` and `_2`.
*
* This type of join can be useful both for preserving type-safety with the original object
* types as well as working with relational data where either side of the join has column names
* in common.
*
* @param other
* Right side of the join.
* @param condition
* Join expression.
* @param joinType
* Type of join to perform. Default `inner`. Must be one of: `inner`, `cross`, `outer`,
* `full`, `fullouter`,`full_outer`, `left`, `leftouter`, `left_outer`, `right`, `rightouter`,
* `right_outer`.
*
* @group typedrel
* @since 3.5.0
*/
def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = {
val joinTypeValue = toJoinType(joinType, skipSemiAnti = true)
val (leftNullable, rightNullable) = joinTypeValue match {
case proto.Join.JoinType.JOIN_TYPE_INNER | proto.Join.JoinType.JOIN_TYPE_CROSS =>
(false, false)
case proto.Join.JoinType.JOIN_TYPE_FULL_OUTER =>
(true, true)
case proto.Join.JoinType.JOIN_TYPE_LEFT_OUTER =>
(false, true)
case proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER =>
(true, false)
case e =>
throw new IllegalArgumentException(s"Unsupported join type '$e'.")
}

val tupleEncoder =
ProductEncoder[(T, U)](
ClassTag(Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")),
Seq(
EncoderField(s"_1", this.encoder, leftNullable, Metadata.empty),
EncoderField(s"_2", other.encoder, rightNullable, Metadata.empty)))

sparkSession.newDataset(tupleEncoder) { builder =>
val joinBuilder = builder.getJoinBuilder
joinBuilder
.setLeft(plan.getRoot)
.setRight(other.plan.getRoot)
.setJoinType(joinTypeValue)
.setJoinCondition(condition.expr)
.setJoinDataType(joinBuilder.getJoinDataTypeBuilder
.setIsLeftFlattenableToRow(this.encoder.isFlattenable)
.setIsRightFlattenableToRow(other.encoder.isFlattenable))
}
}

/**
* Using inner equi-join to join this Dataset returning a `Tuple2` for each pair where
* `condition` evaluates to true.
*
* @param other
* Right side of the join.
* @param condition
* Join expression.
*
* @group typedrel
* @since 3.5.0
*/
def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = {
joinWith(other, condition, "inner")
}

/**
* Returns a new Dataset with each partition sorted by the given expressions.
*
Expand Down
Loading

0 comments on commit 7abf7eb

Please sign in to comment.