Skip to content

Commit

Permalink
Example rename.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jan 27, 2015
1 parent e8aa3d3 commit a728bf2
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static void main(String[] args) {

// Prepare training data.
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
// into SchemaRDDs, where it uses the bean metadata to infer the schema.
// into DataFrames, where it uses the bean metadata to infer the schema.
List<LabeledPoint> localTraining = Lists.newArrayList(
new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Person call(String line) {
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
@Override
Expand All @@ -93,12 +93,12 @@ public String call(Row row) {
}

System.out.println("=== Data source: Parquet File ===");
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
// DataFrames can be saved as parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");

// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlCtx.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
Expand All @@ -119,7 +119,7 @@ public String call(Row row) {
// A JSON dataset is pointed by path.
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a JavaSchemaRDD from the file(s) pointed by path
// Create a DataFrame from the file(s) pointed by path
DataFrame peopleFromJsonFile = sqlCtx.jsonFile(path);

// Because the schema of a JSON dataset is automatically inferred, to write queries,
Expand All @@ -130,13 +130,13 @@ public String call(Row row) {
// |-- age: IntegerType
// |-- name: StringType

// Register this JavaSchemaRDD as a table.
// Register this DataFrame as a table.
peopleFromJsonFile.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
DataFrame teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The results of SQL queries are DataFrame and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
@Override
Expand All @@ -146,14 +146,14 @@ public String call(Row row) {
System.out.println(name);
}

// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
DataFrame peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());

// Take a look at the schema of this new JavaSchemaRDD.
// Take a look at the schema of this new DataFrame.
peopleFromJsonRDD.printSchema();
// The schema of anotherPeople is ...
// root
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/mllib/dataset_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

"""
An example of how to use SchemaRDD as a dataset for ML. Run with::
An example of how to use DataFrame as a dataset for ML. Run with::
bin/spark-submit examples/src/main/python/mllib/dataset_example.py
"""

Expand Down
16 changes: 8 additions & 8 deletions examples/src/main/python/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,26 @@
some_rdd = sc.parallelize([Row(name="John", age=19),
Row(name="Smith", age=23),
Row(name="Sarah", age=18)])
# Infer schema from the first row, create a SchemaRDD and print the schema
some_schemardd = sqlContext.inferSchema(some_rdd)
some_schemardd.printSchema()
# Infer schema from the first row, create a DataFrame and print the schema
some_df = sqlContext.inferSchema(some_rdd)
some_df.printSchema()

# Another RDD is created from a list of tuples
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
StructField("person_age", IntegerType(), False)])
# Create a SchemaRDD by applying the schema to the RDD and print the schema
another_schemardd = sqlContext.applySchema(another_rdd, schema)
another_schemardd.printSchema()
# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = sqlContext.applySchema(another_rdd, schema)
another_df.printSchema()
# root
# |-- age: integer (nullable = true)
# |-- name: string (nullable = true)

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path = os.path.join(os.environ['SPARK_HOME'], "examples/src/main/resources/people.json")
# Create a SchemaRDD from the file(s) pointed to by path
# Create a DataFrame from the file(s) pointed to by path
people = sqlContext.jsonFile(path)
# root
# |-- person_name: string (nullable = false)
Expand All @@ -61,7 +61,7 @@
# |-- age: IntegerType
# |-- name: StringType

# Register this SchemaRDD as a table.
# Register this DataFrame as a table.
people.registerAsTable("people")

# SQL statements can be run by using the sql methods provided by sqlContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object SimpleParamsExample {

// Prepare training data.
// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of Java Beans
// into SchemaRDDs, where it uses the bean metadata to infer the schema.
// into DataFrames, where it uses the bean metadata to infer the schema.
val training = sparkContext.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object DatasetExample {
val defaultParams = Params()

val parser = new OptionParser[Params]("DatasetExample") {
head("Dataset: an example app using SchemaRDD as a Dataset for ML.")
head("Dataset: an example app using DataFrame as a Dataset for ML.")
opt[String]("input")
.text(s"input path to dataset")
.action((x, c) => c.copy(input = x))
Expand Down Expand Up @@ -80,20 +80,20 @@ object DatasetExample {
}
println(s"Loaded ${origData.count()} instances from file: ${params.input}")

// Convert input data to SchemaRDD explicitly.
val schemaRDD: DataFrame = origData
println(s"Inferred schema:\n${schemaRDD.schema.prettyJson}")
println(s"Converted to SchemaRDD with ${schemaRDD.count()} records")
// Convert input data to DataFrame explicitly.
val df: DataFrame = origData.toDF
println(s"Inferred schema:\n${df.schema.prettyJson}")
println(s"Converted to DataFrame with ${df.count()} records")

// Select columns, using implicit conversion to SchemaRDD.
val labelsSchemaRDD: DataFrame = origData.select("label")
val labels: RDD[Double] = labelsSchemaRDD.map { case Row(v: Double) => v }
// Select columns, using implicit conversion to DataFrames.
val labelsDf: DataFrame = origData.select("label")
val labels: RDD[Double] = labelsDf.map { case Row(v: Double) => v }
val numLabels = labels.count()
val meanLabel = labels.fold(0.0)(_ + _) / numLabels
println(s"Selected label column with average value $meanLabel")

val featuresSchemaRDD: DataFrame = origData.select("features")
val features: RDD[Vector] = featuresSchemaRDD.map { case Row(v: Vector) => v }
val featuresDf: DataFrame = origData.select("features")
val features: RDD[Vector] = featuresDf.map { case Row(v: Vector) => v }
val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
(summary, feat) => summary.add(feat),
(sum1, sum2) => sum1.merge(sum2))
Expand All @@ -103,7 +103,7 @@ object DatasetExample {
tmpDir.deleteOnExit()
val outputDir = new File(tmpDir, "dataset").toString
println(s"Saving to $outputDir as Parquet file.")
schemaRDD.saveAsParquetFile(outputDir)
df.saveAsParquetFile(outputDir)

println(s"Loading Parquet file with UDT from $outputDir.")
val newDataset = sqlContext.parquetFile(outputDir)
Expand Down
5 changes: 5 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,11 @@ class DataFrame protected[sql](
*/
override def head(): Row = head(1).head

/**
* Return the first row. Alias for head().
*/
override def first(): Row = head()

override def map[R: ClassTag](f: Row => R): RDD[R] = {
rdd.map(f)
}
Expand Down
2 changes: 2 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ trait RDDApi[T] {

def count(): Long

def first(): T

def repartition(numPartitions: Int): DataFrame
}

Expand Down
28 changes: 14 additions & 14 deletions sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ class DslQuerySuite extends QueryTest {
testData.collect().toSeq)
}

// test("repartition") {
// checkAnswer(
// testData.select('key).repartition(10).select('key),
// testData.select('key).collect().toSeq)
// }
test("repartition") {
checkAnswer(
testData.select('key).repartition(10).select('key),
testData.select('key).collect().toSeq)
}

test("agg") {
checkAnswer(
Expand Down Expand Up @@ -266,15 +266,15 @@ class DslQuerySuite extends QueryTest {
checkAnswer(lowerCaseData.intersect(upperCaseData), Nil)
}

// test("udf") {
// val foo = (a: Int, b: String) => a.toString + b
//
// checkAnswer(
// // SELECT *, foo(key, value) FROM testData
// testData.select(Star(None), foo.call('key, 'value)).limit(3),
// Row(1, "1", "11") :: Row(2, "2", "22") :: Row(3, "3", "33") :: Nil
// )
// }
test("udf") {
val foo = (a: Int, b: String) => a.toString + b

checkAnswer(
// SELECT *, foo(key, value) FROM testData
testData.select($"*", callUDF(foo, 'key, 'value)).limit(3),
Row(1, "1", "11") :: Row(2, "2", "22") :: Row(3, "3", "33") :: Nil
)
}

test("sqrt") {
checkAnswer(
Expand Down

0 comments on commit a728bf2

Please sign in to comment.