Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Jul 23, 2020
1 parent d93f7fa commit be80c27
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
selectClause.hints.asScala.foldRight(withWindow)(withHints)
}

// Decode and input/output format.
type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
// Script Transform's input/output format.
type ScriptIOFormat =
(Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])

protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): Format = {
protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): ScriptIOFormat = {
// TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
// expects a seq of pairs in which the old parsers' token names are used as keys.
// Transforming the result of visitRowFormatDelimited would be quite a bit messier than
Expand Down Expand Up @@ -776,7 +777,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
recordReader: Token,
schemaLess: Boolean): ScriptInputOutputSchema = {

def format(fmt: RowFormatContext): Format = fmt match {
def format(fmt: RowFormatContext): ScriptIOFormat = fmt match {
case c: RowFormatDelimitedContext =>
getRowFormatDelimited(c)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ class PlanParserSuite extends AnalysisTest {
assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a, 'b))
}

test("SPARK-32106: TRANSFORM without serde") {
test("SPARK-32106: TRANSFORM plan") {
// verify schema less
assertEqual(
"""
Expand Down Expand Up @@ -1122,5 +1122,24 @@ class PlanParserSuite extends AnalysisTest {
("TOK_TABLEROWFORMATLINES", "'\\n'"),
("TOK_TABLEROWFORMATNULL", "'NULL'")), None, None,
List.empty, List.empty, None, None, false)))

// verify ROW FORMAT SERDE
intercept(
"""
|SELECT TRANSFORM(a, b, c)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
|WITH SERDEPROPERTIES(
| "separatorChar" = "\t",
| "quoteChar" = "'",
| "escapeChar" = "\\")
|USING 'cat' AS (a, b, c)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
|WITH SERDEPROPERTIES(
| "separatorChar" = "\t",
| "quoteChar" = "'",
| "escapeChar" = "\\")
|FROM testData
""".stripMargin,
"TRANSFORM with serde is only supported in hive mode")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
def format(
fmt: RowFormatContext,
configKey: String,
defaultConfigValue: String): Format = fmt match {
defaultConfigValue: String): ScriptIOFormat = fmt match {
case c: RowFormatDelimitedContext =>
getRowFormatDelimited(c)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object SparkScripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child, ioschema)
if ioschema.inputSerdeClass.isEmpty && ioschema.outputSerdeClass.isEmpty =>
case logical.ScriptTransformation(input, script, output, child, ioschema) =>
SparkScriptTransformationExec(
input,
script,
Expand Down
32 changes: 0 additions & 32 deletions sql/core/src/test/resources/sql-tests/inputs/transform.sql
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ LINES TERMINATED BY '\n'
NULL DEFINED AS 'NULL'
FROM t;


SELECT TRANSFORM(a, b, c, null)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
Expand All @@ -113,34 +112,3 @@ FIELDS TERMINATED BY '||'
LINES TERMINATED BY '\n'
NULL DEFINED AS 'NULL'
FROM t;

-- SPARK-31937 transform with defined row format delimit
--SELECT TRANSFORM(a, b, c, d, e, null)
--ROW FORMAT DELIMITED
--FIELDS TERMINATED BY '|'
--COLLECTION ITEMS TERMINATED BY '&'
--MAP KEYS TERMINATED BY '*'
--LINES TERMINATED BY '\n'
--NULL DEFINED AS 'NULL'
--USING 'cat' AS (a, b, c, d, e, f)
--ROW FORMAT DELIMITED
--FIELDS TERMINATED BY '|'
--COLLECTION ITEMS TERMINATED BY '&'
--MAP KEYS TERMINATED BY '*'
--LINES TERMINATED BY '\n'
--NULL DEFINED AS 'NULL'
--FROM VALUEW (1, 1.23, array(1,, 2, 3), map(1, '1'), struct(1, '1')) t(a, b, c, d, e);
--
--SELECT TRANSFORM(a, b, c, d, e, null)
--ROW FORMAT DELIMITED
--FIELDS TERMINATED BY '|'
--COLLECTION ITEMS TERMINATED BY '&'
--MAP KEYS TERMINATED BY '*'
--LINES TERMINATED BY '\n'
--NULL DEFINED AS 'NULL'
--USING 'cat' AS (a)
--ROW FORMAT DELIMITED
--FIELDS TERMINATED BY '||'
--LINES TERMINATED BY '\n'
--NULL DEFINED AS 'NULL'
--FROM VALUEW (1, 1.23, array(1,, 2, 3), map(1, '1'), struct(1, '1')) t(a, b, c, d, e);
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with
}
}

test("TRANSFORM doesn't support ArrayType/MapType/StructType as output data type (no serde)") {
test("SPARK-32106: TRANSFORM doesn't support ArrayType/MapType/StructType " +
"as output data type (no serde)") {
assume(TestUtils.testCommandAvailable("/bin/bash"))
// check for ArrayType
val e1 = intercept[SparkException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,8 +1064,8 @@ private[hive] trait HiveInspectors {
case TimestampType => timestampTypeInfo
case NullType => voidTypeInfo
case dt =>
throw new AnalysisException("TRANSFORM with hive serde does not support " +
s"${dt.getClass.getSimpleName.replace("$", "")} as input data type")
throw new AnalysisException("HiveInspectors does not support convert " +
s"${dt.getClass.getSimpleName.replace("$", "")} to Hive TypeInfo")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveInspectors
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.{CircularBuffer, Utils}

/**
Expand All @@ -53,84 +53,8 @@ case class HiveScriptTransformationExec(
output: Seq[Attribute],
child: SparkPlan,
ioschema: ScriptTransformationIOSchema)
extends BaseScriptTransformationExec with HiveInspectors {

private def initInputSerDe(
input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)] = {
ioschema.inputSerdeClass.map { serdeClass =>
val (columns, columnTypes) = parseAttrs(input)
val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.inputSerdeProps)
val fieldObjectInspectors = columnTypes.map(toInspector)
val objectInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(columns.asJava, fieldObjectInspectors.asJava)
(serde, objectInspector)
}
}

private def initOutputSerDe(
output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] = {
ioschema.outputSerdeClass.map { serdeClass =>
val (columns, columnTypes) = parseAttrs(output)
val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.outputSerdeProps)
val structObjectInspector = serde.getObjectInspector().asInstanceOf[StructObjectInspector]
(serde, structObjectInspector)
}
}

private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = {
val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}")
val columnTypes = attrs.map(_.dataType)
(columns, columnTypes)
}

private def initSerDe(
serdeClassName: String,
columns: Seq[String],
columnTypes: Seq[DataType],
serdeProps: Seq[(String, String)]): AbstractSerDe = {

val serde = Utils.classForName[AbstractSerDe](serdeClassName).getConstructor().
newInstance()

val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",")

var propsMap = serdeProps.toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(","))
propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames)

val properties = new Properties()
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
propsMap.foreach { case (k, v) => properties.put(k, v) }
serde.initialize(null, properties)

serde
}

private def recordReader(
inputStream: InputStream,
conf: Configuration): Option[RecordReader] = {
ioschema.recordReaderClass.map { klass =>
val instance = Utils.classForName[RecordReader](klass).getConstructor().
newInstance()
val props = new Properties()
// Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
ioschema.outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) }
instance.initialize(inputStream, conf, props)
instance
}
}

private def recordWriter(
outputStream: OutputStream,
conf: Configuration): Option[RecordWriter] = {
ioschema.recordWriterClass.map { klass =>
val instance = Utils.classForName[RecordWriter](klass).getConstructor().
newInstance()
instance.initialize(outputStream, conf)
instance
}
}
extends BaseScriptTransformationExec {
import HiveScriptIOSchema._

private def createOutputIteratorWithSerde(
writerThread: BaseScriptTransformationWriterThread,
Expand All @@ -144,7 +68,8 @@ case class HiveScriptTransformationExec(
var curLine: String = null
val scriptOutputStream = new DataInputStream(inputStream)

@Nullable val scriptOutputReader = recordReader(scriptOutputStream, hadoopConf).orNull
@Nullable val scriptOutputReader =
recordReader(ioschema, scriptOutputStream, hadoopConf).orNull

var scriptOutputWritable: Writable = null
val reusedWritableObject = outputSerde.getSerializedClass.getConstructor().newInstance()
Expand Down Expand Up @@ -218,7 +143,7 @@ case class HiveScriptTransformationExec(

// This nullability is a performance optimization in order to avoid an Option.foreach() call
// inside of a loop
@Nullable val (inputSerde, inputSoi) = initInputSerDe(input).getOrElse((null, null))
@Nullable val (inputSerde, inputSoi) = initInputSerDe(ioschema, input).getOrElse((null, null))

// For HiveScriptTransformationExec, if inputSerde == null, but outputSerde != null
// We will use StringBuffer to pass data, in this case, we should cast data as string too.
Expand All @@ -239,7 +164,6 @@ case class HiveScriptTransformationExec(
inputSoi,
ioschema,
outputStream,
recordWriter,
proc,
stderrBuffer,
TaskContext.get(),
Expand All @@ -249,7 +173,7 @@ case class HiveScriptTransformationExec(
// This nullability is a performance optimization in order to avoid an Option.foreach() call
// inside of a loop
@Nullable val (outputSerde, outputSoi) = {
initOutputSerDe(output).getOrElse((null, null))
initOutputSerDe(ioschema, output).getOrElse((null, null))
}

val outputIterator = if (outputSerde == null) {
Expand All @@ -272,16 +196,16 @@ case class HiveScriptTransformationWriterThread(
@Nullable inputSoi: StructObjectInspector,
ioSchema: ScriptTransformationIOSchema,
outputStream: OutputStream,
recordWriter: (OutputStream, Configuration) => Option[RecordWriter],
proc: Process,
stderrBuffer: CircularBuffer,
taskContext: TaskContext,
conf: Configuration)
extends BaseScriptTransformationWriterThread with HiveInspectors {
import HiveScriptIOSchema._

override def processRows(): Unit = {
val dataOutputStream = new DataOutputStream(outputStream)
@Nullable val scriptInputWriter = recordWriter(dataOutputStream, conf).orNull
@Nullable val scriptInputWriter = recordWriter(ioSchema, dataOutputStream, conf).orNull

if (inputSerde == null) {
processRowsWithoutSerde()
Expand All @@ -308,3 +232,87 @@ case class HiveScriptTransformationWriterThread(
}
}
}

object HiveScriptIOSchema extends HiveInspectors {

def initInputSerDe(
ioschema: ScriptTransformationIOSchema,
input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)] = {
ioschema.inputSerdeClass.map { serdeClass =>
val (columns, columnTypes) = parseAttrs(input)
val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.inputSerdeProps)
val fieldObjectInspectors = columnTypes.map(toInspector)
val objectInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(columns.asJava, fieldObjectInspectors.asJava)
(serde, objectInspector)
}
}

def initOutputSerDe(
ioschema: ScriptTransformationIOSchema,
output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] = {
ioschema.outputSerdeClass.map { serdeClass =>
val (columns, columnTypes) = parseAttrs(output)
val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.outputSerdeProps)
val structObjectInspector = serde.getObjectInspector().asInstanceOf[StructObjectInspector]
(serde, structObjectInspector)
}
}

private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = {
val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}")
val columnTypes = attrs.map(_.dataType)
(columns, columnTypes)
}

def initSerDe(
serdeClassName: String,
columns: Seq[String],
columnTypes: Seq[DataType],
serdeProps: Seq[(String, String)]): AbstractSerDe = {

val serde = Utils.classForName[AbstractSerDe](serdeClassName).getConstructor().
newInstance()

val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",")

var propsMap = serdeProps.toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(","))
propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames)

val properties = new Properties()
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
propsMap.foreach { case (k, v) => properties.put(k, v) }
serde.initialize(null, properties)

serde
}

def recordReader(
ioschema: ScriptTransformationIOSchema,
inputStream: InputStream,
conf: Configuration): Option[RecordReader] = {
ioschema.recordReaderClass.map { klass =>
val instance = Utils.classForName[RecordReader](klass).getConstructor().
newInstance()
val props = new Properties()
// Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
ioschema.outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) }
instance.initialize(inputStream, conf, props)
instance
}
}

def recordWriter(
ioschema: ScriptTransformationIOSchema,
outputStream: OutputStream,
conf: Configuration): Option[RecordWriter] = {
ioschema.recordWriterClass.map { klass =>
val instance = Utils.classForName[RecordWriter](klass).getConstructor().
newInstance()
instance.initialize(outputStream, conf)
instance
}
}
}
Loading

0 comments on commit be80c27

Please sign in to comment.