Skip to content

Commit

Permalink
follow comment
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Jul 14, 2020
1 parent a693722 commit 5bfa669
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution

import java.io.OutputStream
import java.nio.charset.StandardCharsets
import java.time.ZoneId
import java.util.concurrent.TimeUnit

import scala.util.control.NonFatal
Expand All @@ -32,9 +31,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, DateType, TimestampType}
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.{CircularBuffer, SerializableConfiguration, Utils}

trait BaseScriptTransformationExec extends UnaryExecNode {
Expand Down Expand Up @@ -129,19 +127,7 @@ abstract class BaseScriptTransformationWriterThread extends Thread with Logging
var i = 1
while (i < len) {
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
val columnType = inputSchema(i)
val fieldValue = row.get(i, columnType)
val fieldStringValue = columnType match {
case _: DateType =>
val dateFormatter = DateFormatter(ZoneId.systemDefault())
dateFormatter.format(fieldValue.asInstanceOf[Int])
case _: TimestampType =>
TimestampFormatter.getFractionFormatter(ZoneId.systemDefault())
.format(fieldValue.asInstanceOf[Long])
case _ =>
fieldValue.toString
}
sb.append(fieldStringValue)
sb.append(row.get(i, inputSchema(i)))
i += 1
}
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,14 +714,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
(Seq.empty, Option(name), props.toSeq, recordHandler)

case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") =>
// Use default (serde) format.
val name = conf.getConfString("hive.script.serde",
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
val props = Seq("field.delim" -> "\t")
val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue))
(Nil, Option(name), props, recordHandler)

case null =>
(Nil, None, Seq.empty, None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution

import java.time.ZoneId

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -38,7 +40,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemoryPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType}

/**
* Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
Expand Down Expand Up @@ -537,7 +539,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.ScriptTransformation(input, script, output, child, ioschema)
if ioschema.inputSerdeClass.isEmpty && ioschema.outputSerdeClass.isEmpty =>
SparkScriptTransformationExec(
input,
input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone)),
script,
output,
planLater(child),
Expand Down

0 comments on commit 5bfa669

Please sign in to comment.