Skip to content

Commit

Permalink
format code
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Jul 17, 2020
1 parent ce8a0a5 commit d37ef86
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,11 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
hadoopConf: Configuration): Iterator[InternalRow]

protected def processOutputWithoutSerde(prevLine: String, reader: BufferedReader): InternalRow = {
if (!ioschema.schemaLess) {
new GenericInternalRow(
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
.zip(fieldWriters)
.map { case (data, writer) => writer(data) })
} else {
new GenericInternalRow(
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2)
.zip(fieldWriters)
.map { case (data, writer) => writer(data) })
}
val limit = if (ioschema.schemaLess) 2 else 0
new GenericInternalRow(
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), limit)
.zip(fieldWriters)
.map { case (data, writer) => writer(data) })
}

protected def checkFailureAndPropagate(
Expand Down Expand Up @@ -273,6 +267,18 @@ object ScriptTransformationIOSchema {
("TOK_TABLEROWFORMATLINES", "\n")
)

val defaultIOSchema = ScriptTransformationIOSchema(
inputRowFormat = Seq.empty,
outputRowFormat = Seq.empty,
inputSerdeClass = None,
outputSerdeClass = None,
inputSerdeProps = Seq.empty,
outputSerdeProps = Seq.empty,
recordReaderClass = None,
recordWriterClass = None,
schemaLess = false
)

def apply(input: ScriptInputOutputSchema): ScriptTransformationIOSchema = {
ScriptTransformationIOSchema(
input.inputRowFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU

import spark.implicits._

var noSerdeIOSchema: ScriptTransformationIOSchema = _
var noSerdeIOSchema: ScriptTransformationIOSchema = ScriptTransformationIOSchema.defaultIOSchema

private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite {

import spark.implicits._

noSerdeIOSchema = ScriptTransformationIOSchema(
inputRowFormat = Seq.empty,
outputRowFormat = Seq.empty,
inputSerdeClass = None,
outputSerdeClass = None,
inputSerdeProps = Seq.empty,
outputSerdeProps = Seq.empty,
recordReaderClass = None,
recordWriterClass = None,
schemaLess = false
)

private val serdeIOSchema: ScriptTransformationIOSchema = {
noSerdeIOSchema.copy(
inputSerdeClass = Some(classOf[LazySimpleSerDe].getCanonicalName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.sql.{Date, Timestamp}

import org.apache.spark.TestUtils
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.{ScriptTransformationIOSchema, SparkPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
Expand All @@ -32,18 +32,6 @@ class SparkScriptTransformationSuite extends BaseScriptTransformationSuite {

override def scriptType: String = "SPARK"

noSerdeIOSchema = ScriptTransformationIOSchema(
inputRowFormat = Seq.empty,
outputRowFormat = Seq.empty,
inputSerdeClass = None,
outputSerdeClass = None,
inputSerdeProps = Seq.empty,
outputSerdeProps = Seq.empty,
recordReaderClass = None,
recordWriterClass = None,
schemaLess = false
)

test("SPARK-32106: SparkScriptTransformExec should handle different data types correctly") {
assume(TestUtils.testCommandAvailable("python"))
case class Struct(d: Int, str: String)
Expand Down

0 comments on commit d37ef86

Please sign in to comment.