diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 8217fa16148bc..aa54d93d94b7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import java.io.OutputStream import java.nio.charset.StandardCharsets -import java.time.ZoneId import java.util.concurrent.TimeUnit import scala.util.control.NonFatal @@ -32,9 +31,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeSet, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, DateType, TimestampType} +import org.apache.spark.sql.types.DataType import org.apache.spark.util.{CircularBuffer, SerializableConfiguration, Utils} trait BaseScriptTransformationExec extends UnaryExecNode { @@ -129,19 +127,7 @@ abstract class BaseScriptTransformationWriterThread extends Thread with Logging var i = 1 while (i < len) { sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD")) - val columnType = inputSchema(i) - val fieldValue = row.get(i, columnType) - val fieldStringValue = columnType match { - case _: DateType => - val dateFormatter = DateFormatter(ZoneId.systemDefault()) - dateFormatter.format(fieldValue.asInstanceOf[Int]) - case _: TimestampType => - TimestampFormatter.getFractionFormatter(ZoneId.systemDefault()) - .format(fieldValue.asInstanceOf[Long]) - case _ => - fieldValue.toString - } - sb.append(fieldStringValue) + sb.append(row.get(i, inputSchema(i))) i += 1 } sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 492750c60c7c0..3ef67994fb7c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -714,14 +714,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } (Seq.empty, Option(name), props.toSeq, recordHandler) - case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") => - // Use default (serde) format. - val name = conf.getConfString("hive.script.serde", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") - val props = Seq("field.delim" -> "\t") - val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) - (Nil, Option(name), props, recordHandler) - case null => (Nil, None, Seq.empty, None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d5366de2ea704..e595927ed0c55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import java.time.ZoneId + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, AnalysisException, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -38,7 +40,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.MemoryPlan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} /** * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting @@ -537,7 +539,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.ScriptTransformation(input, script, output, child, ioschema) if ioschema.inputSerdeClass.isEmpty && ioschema.outputSerdeClass.isEmpty => SparkScriptTransformationExec( - input, + input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone)), script, output, planLater(child),