From b621c8f8885dbcc67dc34c53fbef79346900cb9c Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 7 Jan 2015 16:43:41 +0800 Subject: [PATCH] minor refactory --- .../apache/spark/sql/sources/commands.scala | 61 +++++++++++++++++++ .../org/apache/spark/sql/sources/ddl.scala | 41 ------------- 2 files changed, 61 insertions(+), 41 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala new file mode 100644 index 0000000000000..b8c10c1eb0d23 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.types.{StructType, StructField} +import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.util.Utils + +private[sql] case class CreateTableUsing( + tableName: String, + tableCols: Seq[StructField], + provider: String, + options: Map[String, String]) extends RunnableCommand { + + def run(sqlContext: SQLContext) = { + val loader = Utils.getContextOrSparkClassLoader + val clazz: Class[_] = try loader.loadClass(provider) catch { + case cnf: java.lang.ClassNotFoundException => + try loader.loadClass(provider + ".DefaultSource") catch { + case cnf: java.lang.ClassNotFoundException => + sys.error(s"Failed to load class for data source: $provider") + } + } + val relation = clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.RelationProvider => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => + if(tableCols.isEmpty) { + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + } else { + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation( + sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) + } + } + + sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) + Seq.empty + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index f5b72f3c4ca52..1d1af2795958b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -22,10 +22,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers import scala.util.parsing.combinator.PackratParsers import org.apache.spark.Logging -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical @@ -182,44 +179,6 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi } } -private[sql] case class CreateTableUsing( - tableName: String, - tableCols: Seq[StructField], - provider: String, - options: Map[String, String]) extends RunnableCommand { - - def run(sqlContext: SQLContext) = { - val loader = Utils.getContextOrSparkClassLoader - val clazz: Class[_] = try loader.loadClass(provider) catch { - case cnf: java.lang.ClassNotFoundException => - try loader.loadClass(provider + ".DefaultSource") catch { - case cnf: java.lang.ClassNotFoundException => - sys.error(s"Failed to load class for data source: $provider") - } - } - val relation = clazz.newInstance match { - case dataSource: org.apache.spark.sql.sources.RelationProvider => - dataSource - .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] - .createRelation(sqlContext, new CaseInsensitiveMap(options)) - case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => - if(tableCols.isEmpty) { - dataSource - .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - .createRelation(sqlContext, new CaseInsensitiveMap(options)) - } else { - dataSource - .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - .createRelation( - sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) - } - } - - sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) - Seq.empty - } -} - /** * Builds a map in which keys are case insensitive */