-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core #29085
Changes from 7 commits
dfcec3c
e53744b
a693722
5bfa669
ec754e2
a2b12a1
c3dc66b
cb19b7b
ce8a0a5
d37ef86
fce25ff
f3e05c6
5c049b5
04684a8
6811721
fc96e1f
ed901af
a6f1e7d
e367c05
e74d04c
4ef4d76
22d223c
72b2155
a3628ac
e16c136
858f4e5
cfecc90
43d0f24
9e18fa8
9537d9b
5227441
670f21b
ce8184a
4615733
08d97c8
33923b6
f5ec656
7916d72
a769aa7
d93f7fa
be80c27
7f3cff8
03d3409
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,25 +17,34 @@ | |
|
||
package org.apache.spark.sql.execution | ||
|
||
import java.io.OutputStream | ||
import java.io.{BufferedReader, InputStream, OutputStream} | ||
import java.nio.charset.StandardCharsets | ||
import java.util.concurrent.TimeUnit | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.util.control.NonFatal | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
|
||
import org.apache.spark.{SparkException, TaskContext} | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.network.util.JavaUtils | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, UnsafeProjection} | ||
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, GenericInternalRow, UnsafeProjection} | ||
import org.apache.spark.sql.catalyst.plans.physical.Partitioning | ||
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types.DataType | ||
import org.apache.spark.util.{CircularBuffer, SerializableConfiguration, Utils} | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.unsafe.types.UTF8String | ||
import org.apache.spark.util.{CircularBuffer, RedirectThread, SerializableConfiguration, Utils} | ||
|
||
trait BaseScriptTransformationExec extends UnaryExecNode { | ||
def input: Seq[Expression] | ||
def script: String | ||
def output: Seq[Attribute] | ||
def child: SparkPlan | ||
def ioschema: BaseScriptTransformIOSchema | ||
|
||
override def producedAttributes: AttributeSet = outputSet -- inputSet | ||
|
||
|
@@ -56,10 +65,45 @@ trait BaseScriptTransformationExec extends UnaryExecNode { | |
} | ||
} | ||
|
||
def initProc(name: String): (OutputStream, Process, InputStream, CircularBuffer) = { | ||
val cmd = List("/bin/bash", "-c", script) | ||
val builder = new ProcessBuilder(cmd.asJava) | ||
|
||
val proc = builder.start() | ||
val inputStream = proc.getInputStream | ||
val outputStream = proc.getOutputStream | ||
val errorStream = proc.getErrorStream | ||
|
||
// In order to avoid deadlocks, we need to consume the error output of the child process. | ||
// To avoid issues caused by large error output, we use a circular buffer to limit the amount | ||
// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang | ||
// that motivates this. | ||
val stderrBuffer = new CircularBuffer(2048) | ||
new RedirectThread( | ||
errorStream, | ||
stderrBuffer, | ||
s"Thread-$name-STDERR-Consumer").start() | ||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
(outputStream, proc, inputStream, stderrBuffer) | ||
} | ||
|
||
def processIterator( | ||
inputIterator: Iterator[InternalRow], | ||
hadoopConf: Configuration): Iterator[InternalRow] | ||
|
||
def processOutputWithoutSerde(prevLine: String, reader: BufferedReader): InternalRow = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. protected |
||
if (!ioschema.schemaLess) { | ||
new GenericInternalRow( | ||
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")) | ||
.zip(fieldWriters) | ||
.map { case (data, writer) => writer(data) }) | ||
} else { | ||
new GenericInternalRow( | ||
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2) | ||
.zip(fieldWriters) | ||
.map { case (data, writer) => writer(data) }) | ||
} | ||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
protected def checkFailureAndPropagate( | ||
writerThread: BaseScriptTransformationWriterThread, | ||
cause: Throwable = null, | ||
|
@@ -87,17 +131,55 @@ trait BaseScriptTransformationExec extends UnaryExecNode { | |
} | ||
} | ||
} | ||
|
||
private lazy val fieldWriters: Seq[String => Any] = output.map { attr => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On second thought, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
more accurate, done |
||
val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) | ||
attr.dataType match { | ||
case StringType => (data: String) => converter(data) | ||
case ByteType => (data: String) => converter(JavaUtils.stringToBytes(data)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you convert a string into a byte array for byte types? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh sorry for my mistake, changed and add this type check in UT |
||
case IntegerType => (data: String) => converter(data.toInt) | ||
case ShortType => (data: String) => converter(data.toShort) | ||
case LongType => (data: String) => converter(data.toLong) | ||
case FloatType => (data: String) => converter(data.toFloat) | ||
case DoubleType => (data: String) => converter(data.toDouble) | ||
case dt: DecimalType => (data: String) => converter(BigDecimal(data)) | ||
case DateType if conf.datetimeJava8ApiEnabled => (data: String) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked the existing
Seems like it support only basic data types: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @maropu Yes, current Script transform with default hive serde only can support basic data type, cant support array/map/staruct/timestamp etc. Here we support a simple Default serde that can replace LazySimpleSerde There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you update the PR description for describing it explicitly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, update later, and How to decide to use which serde or default serde, I think it's better to decide in alfozan's pr. |
||
converter(DateTimeUtils.stringToDate( | ||
UTF8String.fromString(data), | ||
DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) | ||
.map(DateTimeUtils.daysToLocalDate).orNull) | ||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
case DateType => (data: String) => | ||
converter(DateTimeUtils.stringToDate( | ||
UTF8String.fromString(data), | ||
DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) | ||
.map(DateTimeUtils.toJavaDate).orNull) | ||
case TimestampType if conf.datetimeJava8ApiEnabled => (data: String) => | ||
converter(DateTimeUtils.stringToTimestamp( | ||
UTF8String.fromString(data), | ||
DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) | ||
.map(DateTimeUtils.microsToInstant).orNull) | ||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
case TimestampType => (data: String) => | ||
converter(DateTimeUtils.stringToTimestamp( | ||
UTF8String.fromString(data), | ||
DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) | ||
.map(DateTimeUtils.toJavaTimestamp).orNull) | ||
case CalendarIntervalType => (data: String) => | ||
converter(IntervalUtils.stringToInterval(UTF8String.fromString(data))) | ||
case dataType: DataType => (data: String) => converter(data) | ||
} | ||
} | ||
} | ||
|
||
abstract class BaseScriptTransformationWriterThread( | ||
iter: Iterator[InternalRow], | ||
inputSchema: Seq[DataType], | ||
ioSchema: BaseScriptTransformIOSchema, | ||
outputStream: OutputStream, | ||
proc: Process, | ||
stderrBuffer: CircularBuffer, | ||
taskContext: TaskContext, | ||
conf: Configuration) extends Thread with Logging { | ||
abstract class BaseScriptTransformationWriterThread extends Thread with Logging { | ||
|
||
def iter: Iterator[InternalRow] | ||
def inputSchema: Seq[DataType] | ||
def ioSchema: BaseScriptTransformIOSchema | ||
def outputStream: OutputStream | ||
def proc: Process | ||
def stderrBuffer: CircularBuffer | ||
def taskContext: TaskContext | ||
def conf: Configuration | ||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
setDaemon(true) | ||
|
||
|
@@ -173,21 +255,13 @@ abstract class BaseScriptTransformIOSchema extends Serializable { | |
import ScriptIOSchema._ | ||
|
||
def inputRowFormat: Seq[(String, String)] | ||
|
||
def outputRowFormat: Seq[(String, String)] | ||
|
||
def inputSerdeClass: Option[String] | ||
|
||
def outputSerdeClass: Option[String] | ||
|
||
def inputSerdeProps: Seq[(String, String)] | ||
|
||
def outputSerdeProps: Seq[(String, String)] | ||
|
||
def recordReaderClass: Option[String] | ||
|
||
def recordWriterClass: Option[String] | ||
|
||
def schemaLess: Boolean | ||
|
||
val inputRowFormatMap = inputRowFormat.toMap.withDefault((k) => defaultFormat(k)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution | ||
|
||
import java.io._ | ||
import java.nio.charset.StandardCharsets | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.util.control.NonFatal | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
|
||
import org.apache.spark.TaskContext | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.util.{CircularBuffer, RedirectThread} | ||
|
||
/** | ||
* Transforms the input by forking and running the specified script. | ||
* | ||
* @param input the set of expression that should be passed to the script. | ||
* @param script the command that should be executed. | ||
* @param output the attributes that are produced by the script. | ||
*/ | ||
case class SparkScriptTransformationExec( | ||
input: Seq[Expression], | ||
script: String, | ||
output: Seq[Attribute], | ||
child: SparkPlan, | ||
ioschema: SparkScriptIOSchema) | ||
extends BaseScriptTransformationExec { | ||
|
||
override def processIterator( | ||
inputIterator: Iterator[InternalRow], | ||
hadoopConf: Configuration): Iterator[InternalRow] = { | ||
|
||
val (outputStream, proc, inputStream, stderrBuffer) = initProc(this.getClass.getSimpleName) | ||
|
||
val finalInput = input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone)) | ||
|
||
val outputProjection = new InterpretedProjection(finalInput, child.output) | ||
|
||
// This new thread will consume the ScriptTransformation's input rows and write them to the | ||
// external process. That process's output will be read by this current thread. | ||
val writerThread = SparkScriptTransformationWriterThread( | ||
inputIterator.map(outputProjection), | ||
finalInput.map(_.dataType), | ||
ioschema, | ||
outputStream, | ||
proc, | ||
stderrBuffer, | ||
TaskContext.get(), | ||
hadoopConf | ||
) | ||
|
||
val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) | ||
val outputIterator: Iterator[InternalRow] = new Iterator[InternalRow] { | ||
var curLine: String = null | ||
|
||
override def hasNext: Boolean = { | ||
try { | ||
if (curLine == null) { | ||
curLine = reader.readLine() | ||
if (curLine == null) { | ||
checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) | ||
return false | ||
} | ||
} | ||
true | ||
} catch { | ||
case NonFatal(e) => | ||
// If this exception is due to abrupt / unclean termination of `proc`, | ||
// then detect it and propagate a better exception message for end users | ||
checkFailureAndPropagate(writerThread, e, proc, stderrBuffer) | ||
|
||
throw e | ||
} | ||
} | ||
|
||
override def next(): InternalRow = { | ||
if (!hasNext) { | ||
throw new NoSuchElementException | ||
} | ||
val prevLine = curLine | ||
curLine = reader.readLine() | ||
processOutputWithoutSerde(prevLine, reader) | ||
} | ||
} | ||
|
||
writerThread.start() | ||
|
||
outputIterator | ||
} | ||
} | ||
|
||
case class SparkScriptTransformationWriterThread( | ||
iter: Iterator[InternalRow], | ||
inputSchema: Seq[DataType], | ||
ioSchema: SparkScriptIOSchema, | ||
outputStream: OutputStream, | ||
proc: Process, | ||
stderrBuffer: CircularBuffer, | ||
taskContext: TaskContext, | ||
conf: Configuration) | ||
extends BaseScriptTransformationWriterThread { | ||
|
||
setDaemon(true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can remove this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
override def processRows(): Unit = { | ||
processRowsWithoutSerde() | ||
} | ||
} | ||
|
||
object SparkScriptIOSchema { | ||
def apply(input: ScriptInputOutputSchema): SparkScriptIOSchema = { | ||
SparkScriptIOSchema( | ||
input.inputRowFormat, | ||
input.outputRowFormat, | ||
input.inputSerdeClass, | ||
input.outputSerdeClass, | ||
input.inputSerdeProps, | ||
input.outputSerdeProps, | ||
input.recordReaderClass, | ||
input.recordWriterClass, | ||
input.schemaLess) | ||
} | ||
} | ||
|
||
/** | ||
* The wrapper class of Spark script transformation input and output schema properties | ||
*/ | ||
case class SparkScriptIOSchema ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this class so big while it doesn't support hive serde? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For this , I think we should change this after decide if need to implement serde in script of sql/core There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation here offers a very limited support for A complete implementation (SerDes class for ROW FORMAT DELIMITED) can be added later and will live in the same folder. |
||
inputRowFormat: Seq[(String, String)], | ||
outputRowFormat: Seq[(String, String)], | ||
inputSerdeClass: Option[String], | ||
outputSerdeClass: Option[String], | ||
inputSerdeProps: Seq[(String, String)], | ||
outputSerdeProps: Seq[(String, String)], | ||
recordReaderClass: Option[String], | ||
recordWriterClass: Option[String], | ||
schemaLess: Boolean) extends BaseScriptTransformIOSchema | ||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ | |
import org.apache.spark.sql.execution.command._ | ||
import org.apache.spark.sql.execution.datasources._ | ||
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} | ||
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION | ||
import org.apache.spark.sql.types.StructType | ||
|
||
/** | ||
|
@@ -712,14 +713,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |
None | ||
} | ||
(Seq.empty, Option(name), props.toSeq, recordHandler) | ||
|
||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// SPARK-32106: When there is no definition about format, we return empty result | ||
// then we finally execute with SparkScriptTransformationExec | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about rephrasing it like this?
=>
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Down and I keep it like
The way to define how to use SparkScriptTransformExec or HiveScriptTransformExec is still need to be refactored after Spark's own serde added after @alfozan is pr. |
||
case null => | ||
// Use default (serde) format. | ||
val name = conf.getConfString("hive.script.serde", | ||
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") | ||
val props = Seq("field.delim" -> "\t") | ||
val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) | ||
(Nil, Option(name), props, recordHandler) | ||
(Nil, None, Seq.empty, None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @maropu For your confuse #29085 (comment), CalenderIntervalType/ArrayType/MapType/StructType as input of hive default serde will throw error, won't throw error in spark default way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If so, why does the tests pass in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Since it directly build SparkPlan, don't use sql parser There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
btw, we already have end-2-end tests for the unspported cases in the hive side? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Added |
||
} | ||
|
||
val (inFormat, inSerdeClass, inSerdeProps, reader) = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -532,6 +532,21 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
} | ||
} | ||
|
||
object SparkScripts extends Strategy { | ||
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | ||
case logical.ScriptTransformation(input, script, output, child, ioschema) | ||
if ioschema.inputSerdeClass.isEmpty && ioschema.outputSerdeClass.isEmpty => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to check this here? Seems like it has been checked in https://github.com/apache/spark/pull/29085/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R783-R784 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yea, don't need now |
||
SparkScriptTransformationExec( | ||
input, | ||
script, | ||
output, | ||
planLater(child), | ||
SparkScriptIOSchema(ioschema) | ||
) :: Nil | ||
case _ => Nil | ||
} | ||
} | ||
|
||
/** | ||
* This strategy is just for explaining `Dataset/DataFrame` created by `spark.readStream`. | ||
* It won't affect the execution, because `StreamingRelation` will be replaced with | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -243,7 +243,8 @@ private[hive] trait HiveStrategies { | |
|
||
object HiveScripts extends Strategy { | ||
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | ||
case ScriptTransformation(input, script, output, child, ioschema) => | ||
case ScriptTransformation(input, script, output, child, ioschema) | ||
if ioschema.inputSerdeClass.nonEmpty || ioschema.outputSerdeClass.nonEmpty => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this condition There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Removed and see #29085 (comment) |
||
val hiveIoSchema = HiveScriptIOSchema(ioschema) | ||
HiveScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil | ||
case _ => Nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done