diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 528e6608c3c82..3fb92be0940b7 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -45,8 +45,6 @@ exportMethods("cache", "showDF", "sortDF", "take", - "toJSON", - "toRDD", "unionAll", "unpersist", "where", @@ -95,14 +93,12 @@ export("cacheTable", "createExternalTable", "dropTempTable", "jsonFile", - "jsonRDD", "loadDF", "parquetFile", "sql", "table", "tableNames", "tables", - "toDF", "uncacheTable") export("sparkRSQL.init", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 56c305d912587..47d92f141cc7d 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -272,7 +272,7 @@ setMethod("names", setMethod("registerTempTable", signature(x = "DataFrame", tableName = "character"), function(x, tableName) { - callJMethod(x@sdf, "registerTempTable", tableName) + invisible(callJMethod(x@sdf, "registerTempTable", tableName)) }) #' insertInto diff --git a/examples/src/main/r/dataframe.R b/examples/src/main/r/dataframe.R new file mode 100644 index 0000000000000..53b817144f6ac --- /dev/null +++ b/examples/src/main/r/dataframe.R @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(SparkR) + +# Initialize SparkContext and SQLContext +sc <- sparkR.init(appName="SparkR-DataFrame-example") +sqlContext <- sparkRSQL.init(sc) + +# Create a simple local data.frame +localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18)) + +# Convert local data frame to a SparkR DataFrame +df <- createDataFrame(sqlContext, localDF) + +# Print its schema +printSchema(df) +# root +# |-- name: string (nullable = true) +# |-- age: double (nullable = true) + +# Create a DataFrame from a JSON file +path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json") +peopleDF <- jsonFile(sqlContext, path) +printSchema(peopleDF) + +# Register this DataFrame as a table. +registerTempTable(peopleDF, "people") + +# SQL statements can be run by using the sql methods provided by sqlContext +teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19") + +# Call collect to get a local data.frame +teenagersLocalDF <- collect(teenagers) + +# Print the teenagers in our dataset +print(teenagersLocalDF) + +# Stop the SparkContext now +sparkR.stop() diff --git a/examples/src/main/r/kmeans.R b/examples/src/main/r/kmeans.R deleted file mode 100644 index 6e6b5cb93789c..0000000000000 --- a/examples/src/main/r/kmeans.R +++ /dev/null @@ -1,93 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(SparkR) - -# Logistic regression in Spark. -# Note: unlike the example in Scala, a point here is represented as a vector of -# doubles. - -parseVectors <- function(lines) { - lines <- strsplit(as.character(lines) , " ", fixed = TRUE) - list(matrix(as.numeric(unlist(lines)), ncol = length(lines[[1]]))) -} - -dist.fun <- function(P, C) { - apply( - C, - 1, - function(x) { - colSums((t(P) - x)^2) - } - ) -} - -closestPoint <- function(P, C) { - max.col(-dist.fun(P, C)) -} -# Main program - -args <- commandArgs(trailing = TRUE) - -if (length(args) != 3) { - print("Usage: kmeans ") - q("no") -} - -sc <- sparkR.init(appName = "RKMeans") -K <- as.integer(args[[2]]) -convergeDist <- as.double(args[[3]]) - -lines <- textFile(sc, args[[1]]) -points <- cache(lapplyPartition(lines, parseVectors)) -# kPoints <- take(points, K) -kPoints <- do.call(rbind, takeSample(points, FALSE, K, 16189L)) -tempDist <- 1.0 - -while (tempDist > convergeDist) { - closest <- lapplyPartition( - lapply(points, - function(p) { - cp <- closestPoint(p, kPoints); - mapply(list, unique(cp), split.data.frame(cbind(1, p), cp), SIMPLIFY=FALSE) - }), - function(x) {do.call(c, x) - }) - - pointStats <- reduceByKey(closest, - function(p1, p2) { - t(colSums(rbind(p1, p2))) - }, - 2L) - - newPoints <- do.call( - rbind, - collect(lapply(pointStats, - function(tup) { - point.sum <- tup[[2]][, -1] - point.count <- tup[[2]][, 1] - point.sum/point.count - }))) - - D <- dist.fun(kPoints, newPoints) - tempDist <- sum(D[cbind(1:3, max.col(-D))]) - kPoints <- newPoints - cat("Finished iteration (delta = ", tempDist, ")\n") -} - -cat("Final centers:\n") -writeLines(unlist(lapply(kPoints, paste, collapse = " "))) diff --git a/examples/src/main/r/linear_solver_mnist.R b/examples/src/main/r/linear_solver_mnist.R deleted file mode 100644 index c864a4232d010..0000000000000 --- a/examples/src/main/r/linear_solver_mnist.R +++ /dev/null @@ -1,107 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Instructions: https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Example:-Digit-Recognition-on-EC2 - -library(SparkR) -library(Matrix) - -args <- commandArgs(trailing = TRUE) - -# number of random features; default to 1100 -D <- ifelse(length(args) > 0, as.integer(args[[1]]), 1100) -# number of partitions for training dataset -trainParts <- 12 -# dimension of digits -d <- 784 -# number of test examples -NTrain <- 60000 -# number of training examples -NTest <- 10000 -# scale of features -gamma <- 4e-4 - -sc <- sparkR.init(appName = "SparkR-LinearSolver") - -# You can also use HDFS path to speed things up: -# hdfs:///train-mnist-dense-with-labels.data -file <- textFile(sc, "/data/train-mnist-dense-with-labels.data", trainParts) - -W <- gamma * matrix(nrow=D, ncol=d, data=rnorm(D*d)) -b <- 2 * pi * matrix(nrow=D, ncol=1, data=runif(D)) -broadcastW <- broadcast(sc, W) -broadcastB <- broadcast(sc, b) - -includePackage(sc, Matrix) -numericLines <- lapplyPartitionsWithIndex(file, - function(split, part) { - matList <- sapply(part, function(line) { - as.numeric(strsplit(line, ",", fixed=TRUE)[[1]]) - }, simplify=FALSE) - mat <- Matrix(ncol=d+1, data=unlist(matList, F, F), - sparse=T, byrow=T) - mat - }) - -featureLabels <- cache(lapplyPartition( - numericLines, - function(part) { - label <- part[,1] - mat <- part[,-1] - ones <- rep(1, nrow(mat)) - features <- cos( - mat %*% t(value(broadcastW)) + (matrix(ncol=1, data=ones) %*% t(value(broadcastB)))) - onesMat <- Matrix(ones) - featuresPlus <- cBind(features, onesMat) - labels <- matrix(nrow=nrow(mat), ncol=10, data=-1) - for (i in 1:nrow(mat)) { - labels[i, label[i]] <- 1 - } - list(label=labels, features=featuresPlus) - })) - -FTF <- Reduce("+", collect(lapplyPartition(featureLabels, - function(part) { - t(part$features) %*% part$features - }), flatten=F)) - -FTY <- Reduce("+", collect(lapplyPartition(featureLabels, - function(part) { - t(part$features) %*% part$label - }), flatten=F)) - -# solve for the coefficient matrix -C <- solve(FTF, FTY) - -test <- Matrix(as.matrix(read.csv("/data/test-mnist-dense-with-labels.data", - header=F), sparse=T)) -testData <- test[,-1] -testLabels <- matrix(ncol=1, test[,1]) - -err <- 0 - -# contstruct the feature maps for all examples from this digit -featuresTest <- cos(testData %*% t(value(broadcastW)) + - (matrix(ncol=1, data=rep(1, NTest)) %*% t(value(broadcastB)))) -featuresTest <- cBind(featuresTest, Matrix(rep(1, NTest))) - -# extract the one vs. all assignment -results <- featuresTest %*% C -labelsGot <- apply(results, 1, which.max) -err <- sum(testLabels != labelsGot) / nrow(testLabels) - -cat("\nFinished running. The error rate is: ", err, ".\n") diff --git a/examples/src/main/r/logistic_regression.R b/examples/src/main/r/logistic_regression.R deleted file mode 100644 index 2a86aa98160d3..0000000000000 --- a/examples/src/main/r/logistic_regression.R +++ /dev/null @@ -1,62 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(SparkR) - -args <- commandArgs(trailing = TRUE) - -if (length(args) != 3) { - print("Usage: logistic_regression ") - q("no") -} - -# Initialize Spark context -sc <- sparkR.init(appName = "LogisticRegressionR") -iterations <- as.integer(args[[2]]) -D <- as.integer(args[[3]]) - -readPartition <- function(part){ - part = strsplit(part, " ", fixed = T) - list(matrix(as.numeric(unlist(part)), ncol = length(part[[1]]))) -} - -# Read data points and convert each partition to a matrix -points <- cache(lapplyPartition(textFile(sc, args[[1]]), readPartition)) - -# Initialize w to a random value -w <- runif(n=D, min = -1, max = 1) -cat("Initial w: ", w, "\n") - -# Compute logistic regression gradient for a matrix of data points -gradient <- function(partition) { - partition = partition[[1]] - Y <- partition[, 1] # point labels (first column of input file) - X <- partition[, -1] # point coordinates - - # For each point (x, y), compute gradient function - dot <- X %*% w - logit <- 1 / (1 + exp(-Y * dot)) - grad <- t(X) %*% ((logit - 1) * Y) - list(grad) -} - -for (i in 1:iterations) { - cat("On iteration ", i, "\n") - w <- w - reduce(lapplyPartition(points, gradient), "+") -} - -cat("Final w: ", w, "\n") diff --git a/examples/src/main/r/pi.R b/examples/src/main/r/pi.R deleted file mode 100644 index aa7a833e147a0..0000000000000 --- a/examples/src/main/r/pi.R +++ /dev/null @@ -1,46 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(SparkR) - -args <- commandArgs(trailing = TRUE) - -sc <- sparkR.init(appName = "PiR") - -slices <- ifelse(length(args) > 1, as.integer(args[[2]]), 2) - -n <- 100000 * slices - -piFunc <- function(elem) { - rands <- runif(n = 2, min = -1, max = 1) - val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0) - val -} - - -piFuncVec <- function(elems) { - message(length(elems)) - rands1 <- runif(n = length(elems), min = -1, max = 1) - rands2 <- runif(n = length(elems), min = -1, max = 1) - val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0) - sum(val) -} - -rdd <- parallelize(sc, 1:n, slices) -count <- reduce(lapplyPartition(rdd, piFuncVec), sum) -cat("Pi is roughly", 4.0 * count / n, "\n") -cat("Num elements in RDD ", count(rdd), "\n") diff --git a/examples/src/main/r/wordcount.R b/examples/src/main/r/wordcount.R deleted file mode 100644 index b734cb0ecf55b..0000000000000 --- a/examples/src/main/r/wordcount.R +++ /dev/null @@ -1,42 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(SparkR) - -args <- commandArgs(trailing = TRUE) - -if (length(args) != 1) { - print("Usage: wordcount ") - q("no") -} - -# Initialize Spark context -sc <- sparkR.init(appName = "RwordCount") -lines <- textFile(sc, args[[1]]) - -words <- flatMap(lines, - function(line) { - strsplit(line, " ")[[1]] - }) -wordCount <- lapply(words, function(word) { list(word, 1L) }) - -counts <- reduceByKey(wordCount, "+", 2L) -output <- collect(counts) - -for (wordcount in output) { - cat(wordcount[[1]], ": ", wordcount[[2]], "\n") -}