Skip to content

Commit

Permalink
Revert all changes since applying a given schema has not been testd.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Jan 9, 2015
1 parent a852b10 commit 65e9c73
Showing 1 changed file with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,37 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate

import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.util.ContextUtil

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{Partition => SparkPartition, Logging}
import org.apache.spark.rdd.{NewHadoopPartition, RDD}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate

import org.apache.spark.sql.{SQLConf, Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.{SQLConf, SQLContext}

import scala.collection.JavaConversions._


/**
* Allows creation of parquet based tables using the syntax
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
class DefaultSource extends SchemaRelationProvider {
class DefaultSource extends RelationProvider {
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: Option[StructType]): BaseRelation = {
parameters: Map[String, String]): BaseRelation = {
val path =
parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))

ParquetRelation2(path, schema)(sqlContext)
ParquetRelation2(path)(sqlContext)
}
}

Expand Down Expand Up @@ -82,9 +82,7 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files:
* discovery.
*/
@DeveloperApi
case class ParquetRelation2(
path: String,
userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext)
case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
extends CatalystScan with Logging {

def sparkContext = sqlContext.sparkContext
Expand Down Expand Up @@ -135,13 +133,12 @@ case class ParquetRelation2(

override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum

val dataSchema = userSpecifiedSchema.getOrElse(
StructType.fromAttributes( // TODO: Parquet code should not deal with attributes.
ParquetTypesConverter.readSchemaFromFile(
partitions.head.files.head.getPath,
Some(sparkContext.hadoopConfiguration),
sqlContext.isParquetBinaryAsString))
)
val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes.
ParquetTypesConverter.readSchemaFromFile(
partitions.head.files.head.getPath,
Some(sparkContext.hadoopConfiguration),
sqlContext.isParquetBinaryAsString))

val dataIncludesKey =
partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)

Expand Down

0 comments on commit 65e9c73

Please sign in to comment.