Skip to content

Commit

Permalink
A SQL config for bypassing parser in the case of empty schema
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed Aug 3, 2018
1 parent 359c4fc commit 168eb99
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,14 @@ object SQLConf {
.intConf
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createWithDefault(Deflater.DEFAULT_COMPRESSION)

val BYPASS_PARSER_FOR_EMPTY_SCHEMA = buildConf("spark.sql.bypassParserForEmptySchema")
.doc("If required schema passed to a text datasource is empty, the parameter controls " +
"invocation of underlying parser. For example, if it is set to false, uniVocity parser " +
"is invoke by CSV datasource or Jackson parser by JSON datasource. By default, it is set " +
"to true which means the parsers is not invoked for empty required schema.")
.booleanConf
.createWithDefault(true)
}

/**
Expand Down Expand Up @@ -1839,6 +1847,8 @@ class SQLConf extends Serializable with Logging {

def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)

def bypassParserForEmptySchema: Boolean = getConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.execution.datasources

import org.apache.spark.SparkException

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -57,7 +59,9 @@ class FailureSafeParser[IN](
}
}

private val skipParsing = optimizeEmptySchema && schema.isEmpty
private val skipParsing = {
SQLConf.get.bypassParserForEmptySchema && optimizeEmptySchema && schema.isEmpty
}
def parse(input: IN): Iterator[InternalRow] = {
try {
if (skipParsing) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2225,19 +2225,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {


test("SPARK-23723: specified encoding is not matched to actual encoding") {
val fileName = "test-data/utf16LE.json"
val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
val exception = intercept[SparkException] {
spark.read.schema(schema)
.option("mode", "FAILFAST")
.option("multiline", "true")
.options(Map("encoding" -> "UTF-16BE"))
.json(testFile(fileName))
.collect()
}
val errMsg = exception.getMessage
withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> "false") {
val fileName = "test-data/utf16LE.json"
val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
val exception = intercept[SparkException] {
spark.read.schema(schema)
.option("mode", "FAILFAST")
.option("multiline", "true")
.options(Map("encoding" -> "UTF-16BE"))
.json(testFile(fileName))
.count()
}
val errMsg = exception.getMessage

assert(errMsg.contains("Malformed records are detected in record parsing"))
assert(errMsg.contains("Malformed records are detected in record parsing"))
}
}

def checkEncoding(expectedEncoding: String, pathToJsonFiles: String,
Expand Down

0 comments on commit 168eb99

Please sign in to comment.