Skip to content

Commit

Permalink
Merge branch 'master' into to-file-max-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk authored Dec 20, 2018
2 parents e0aa626 + aa0d4ca commit 9acdf9d
Show file tree
Hide file tree
Showing 350 changed files with 7,769 additions and 4,498 deletions.
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ exportMethods("glm",
"spark.fpGrowth",
"spark.freqItemsets",
"spark.associationRules",
"spark.findFrequentSequentialPatterns")
"spark.findFrequentSequentialPatterns",
"spark.assignClusters")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ objectFile <- function(sc, path, minPartitions = NULL) {
#' in the list are split into \code{numSlices} slices and distributed to nodes
#' in the cluster.
#'
#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function
#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MiB), the function
#' will write it to disk and send the file name to JVM. Also to make sure each slice is not
#' larger than that limit, number of slices may be increased.
#'
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,10 @@ setGeneric("spark.associationRules", function(object) { standardGeneric("spark.a
setGeneric("spark.findFrequentSequentialPatterns",
function(data, ...) { standardGeneric("spark.findFrequentSequentialPatterns") })

#' @rdname spark.powerIterationClustering
setGeneric("spark.assignClusters",
function(data, ...) { standardGeneric("spark.assignClusters") })

#' @param object a fitted ML model object.
#' @param path the directory where the model is saved.
#' @param ... additional argument(s) passed to the method.
Expand Down
59 changes: 59 additions & 0 deletions R/pkg/R/mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' @note LDAModel since 2.1.0
setClass("LDAModel", representation(jobj = "jobj"))

#' S4 class that represents a PowerIterationClustering
#'
#' @param jobj a Java object reference to the backing Scala PowerIterationClustering
#' @note PowerIterationClustering since 3.0.0
setClass("PowerIterationClustering", slots = list(jobj = "jobj"))

#' Bisecting K-Means Clustering Model
#'
#' Fits a bisecting k-means clustering model against a SparkDataFrame.
Expand Down Expand Up @@ -610,3 +616,56 @@ setMethod("write.ml", signature(object = "LDAModel", path = "character"),
function(object, path, overwrite = FALSE) {
write_internal(object, path, overwrite)
})

#' PowerIterationClustering
#'
#' A scalable graph clustering algorithm. Users can call \code{spark.assignClusters} to
#' return a cluster assignment for each input vertex.
#' Run the PIC algorithm and returns a cluster assignment for each input vertex.
#' @param data a SparkDataFrame.
#' @param k the number of clusters to create.
#' @param initMode the initialization algorithm; "random" or "degree"
#' @param maxIter the maximum number of iterations.
#' @param sourceCol the name of the input column for source vertex IDs.
#' @param destinationCol the name of the input column for destination vertex IDs
#' @param weightCol weight column name. If this is not set or \code{NULL},
#' we treat all instance weights as 1.0.
#' @param ... additional argument(s) passed to the method.
#' @return A dataset that contains columns of vertex id and the corresponding cluster for the id.
#' The schema of it will be: \code{id: integer}, \code{cluster: integer}
#' @rdname spark.powerIterationClustering
#' @aliases spark.assignClusters,SparkDataFrame-method
#' @examples
#' \dontrun{
#' df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
#' list(1L, 2L, 1.0), list(3L, 4L, 1.0),
#' list(4L, 0L, 0.1)),
#' schema = c("src", "dst", "weight"))
#' clusters <- spark.assignClusters(df, initMode = "degree", weightCol = "weight")
#' showDF(clusters)
#' }
#' @note spark.assignClusters(SparkDataFrame) since 3.0.0
setMethod("spark.assignClusters",
signature(data = "SparkDataFrame"),
function(data, k = 2L, initMode = c("random", "degree"), maxIter = 20L,
sourceCol = "src", destinationCol = "dst", weightCol = NULL) {
if (!is.integer(k) || k < 1) {
stop("k should be a number with value >= 1.")
}
if (!is.integer(maxIter) || maxIter <= 0) {
stop("maxIter should be a number with value > 0.")
}
initMode <- match.arg(initMode)
if (!is.null(weightCol) && weightCol == "") {
weightCol <- NULL
} else if (!is.null(weightCol)) {
weightCol <- as.character(weightCol)
}
jobj <- callJStatic("org.apache.spark.ml.r.PowerIterationClusteringWrapper",
"getPowerIterationClustering",
as.integer(k), initMode,
as.integer(maxIter), as.character(sourceCol),
as.character(destinationCol), weightCol)
object <- new("PowerIterationClustering", jobj = jobj)
dataFrame(callJMethod(object@jobj, "assignClusters", data@sdf))
})
11 changes: 6 additions & 5 deletions R/pkg/R/mllib_fpm.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,17 @@ setMethod("write.ml", signature(object = "FPGrowthModel", path = "character"),
#' @return A complete set of frequent sequential patterns in the input sequences of itemsets.
#' The returned \code{SparkDataFrame} contains columns of sequence and corresponding
#' frequency. The schema of it will be:
#' \code{sequence: ArrayType(ArrayType(T))} (T is the item type)
#' \code{freq: Long}
#' \code{sequence: ArrayType(ArrayType(T))}, \code{freq: integer}
#' where T is the item type
#' @rdname spark.prefixSpan
#' @aliases findFrequentSequentialPatterns,PrefixSpan,SparkDataFrame-method
#' @examples
#' \dontrun{
#' df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
#' list(list(list(1L), list(3L, 2L), list(1L, 2L))),
#' list(list(list(1L, 2L), list(5L))),
#' list(list(list(6L)))), schema = c("sequence"))
#' list(list(list(1L), list(3L, 2L), list(1L, 2L))),
#' list(list(list(1L, 2L), list(5L))),
#' list(list(list(6L)))),
#' schema = c("sequence"))
#' frequency <- spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L,
#' maxLocalProjDBSize = 32000000L)
#' showDF(frequency)
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/R/mllib_tree.R
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ print.summary.decisionTree <- function(x) {
#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
#' @param maxMemoryInMB Maximum memory in MiB allocated to histogram aggregation.
#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
Expand Down Expand Up @@ -382,7 +382,7 @@ setMethod("write.ml", signature(object = "GBTClassificationModel", path = "chara
#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
#' @param maxMemoryInMB Maximum memory in MiB allocated to histogram aggregation.
#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
Expand Down Expand Up @@ -588,7 +588,7 @@ setMethod("write.ml", signature(object = "RandomForestClassificationModel", path
#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
#' @param maxMemoryInMB Maximum memory in MiB allocated to histogram aggregation.
#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
Expand Down
16 changes: 8 additions & 8 deletions R/pkg/inst/profile/shell.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@
sc <- SparkR:::callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", spark)
assign("sc", sc, envir = .GlobalEnv)
sparkVer <- SparkR:::callJMethod(sc, "version")
cat("\n Welcome to")
cat("\nWelcome to")
cat("\n")
cat(" ____ __", "\n")
cat(" / __/__ ___ _____/ /__", "\n")
cat(" _\\ \\/ _ \\/ _ `/ __/ '_/", "\n")
cat(" /___/ .__/\\_,_/_/ /_/\\_\\")
cat(" ____ __", "\n")
cat(" / __/__ ___ _____/ /__", "\n")
cat(" _\\ \\/ _ \\/ _ `/ __/ '_/", "\n")
cat(" /___/ .__/\\_,_/_/ /_/\\_\\")
if (nchar(sparkVer) == 0) {
cat("\n")
} else {
cat(" version ", sparkVer, "\n")
cat(" version", sparkVer, "\n")
}
cat(" /_/", "\n")
cat(" /_/", "\n")
cat("\n")

cat("\n SparkSession available as 'spark'.\n")
cat("\nSparkSession available as 'spark'.\n")
}
13 changes: 13 additions & 0 deletions R/pkg/tests/fulltests/test_mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,17 @@ test_that("spark.posterior and spark.perplexity", {
expect_equal(length(local.posterior), sum(unlist(local.posterior)))
})

test_that("spark.assignClusters", {
df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
list(1L, 2L, 1.0), list(3L, 4L, 1.0),
list(4L, 0L, 0.1)),
schema = c("src", "dst", "weight"))
clusters <- spark.assignClusters(df, initMode = "degree", weightCol = "weight")
expected_result <- createDataFrame(list(list(4L, 1L), list(0L, 0L),
list(1L, 0L), list(3L, 1L),
list(2L, 0L)),
schema = c("id", "cluster"))
expect_equivalent(expected_result, clusters)
})

sparkR.session.stop()
29 changes: 15 additions & 14 deletions R/pkg/tests/fulltests/test_mllib_fpm.R
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,20 @@ test_that("spark.fpGrowth", {
})

test_that("spark.prefixSpan", {
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
list(list(list(1L), list(3L, 2L), list(1L, 2L))),
list(list(list(1L, 2L), list(5L))),
list(list(list(6L)))), schema = c("sequence"))
result1 <- spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L,
maxLocalProjDBSize = 32000000L)

expected_result <- createDataFrame(list(list(list(list(1L)), 3L),
list(list(list(3L)), 2L),
list(list(list(2L)), 3L),
list(list(list(1L, 2L)), 3L),
list(list(list(1L), list(3L)), 2L)),
schema = c("sequence", "freq"))
})
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
list(list(list(1L), list(3L, 2L), list(1L, 2L))),
list(list(list(1L, 2L), list(5L))),
list(list(list(6L)))),
schema = c("sequence"))
result <- spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L,
maxLocalProjDBSize = 32000000L)

expected_result <- createDataFrame(list(list(list(list(1L)), 3L), list(list(list(3L)), 2L),
list(list(list(2L)), 3L), list(list(list(1L, 2L)), 3L),
list(list(list(1L), list(3L)), 2L)),
schema = c("sequence", "freq"))

expect_equivalent(expected_result, result)
})

sparkR.session.stop()
21 changes: 18 additions & 3 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,8 @@ SparkR supports the following machine learning models and algorithms.

* Latent Dirichlet Allocation (LDA)

* Power Iteration Clustering (PIC)

#### Collaborative Filtering

* Alternating Least Squares (ALS)
Expand Down Expand Up @@ -982,6 +984,18 @@ predicted <- predict(model, df)
head(predicted)
```

#### Power Iteration Clustering

Power Iteration Clustering (PIC) is a scalable graph clustering algorithm. `spark.assignClusters` method runs the PIC algorithm and returns a cluster assignment for each input vertex.

```{r}
df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
list(1L, 2L, 1.0), list(3L, 4L, 1.0),
list(4L, 0L, 0.1)),
schema = c("src", "dst", "weight"))
head(spark.assignClusters(df, initMode = "degree", weightCol = "weight"))
```

#### FP-growth

`spark.fpGrowth` executes FP-growth algorithm to mine frequent itemsets on a `SparkDataFrame`. `itemsCol` should be an array of values.
Expand Down Expand Up @@ -1019,9 +1033,10 @@ head(predict(fpm, df))

```{r}
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
list(list(list(1L), list(3L, 2L), list(1L, 2L))),
list(list(list(1L, 2L), list(5L))),
list(list(list(6L)))), schema = c("sequence"))
list(list(list(1L), list(3L, 2L), list(1L, 2L))),
list(list(list(1L, 2L), list(5L))),
list(list(list(6L)))),
schema = c("sequence"))
head(spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L))
```

Expand Down
10 changes: 0 additions & 10 deletions common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ public static float getFloat(Object object, long offset) {
}

public static void putFloat(Object object, long offset, float value) {
if (Float.isNaN(value)) {
value = Float.NaN;
} else if (value == -0.0f) {
value = 0.0f;
}
_UNSAFE.putFloat(object, offset, value);
}

Expand All @@ -187,11 +182,6 @@ public static double getDouble(Object object, long offset) {
}

public static void putDouble(Object object, long offset, double value) {
if (Double.isNaN(value)) {
value = Double.NaN;
} else if (value == -0.0d) {
value = 0.0d;
}
_UNSAFE.putDouble(object, offset, value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,22 +157,4 @@ public void heapMemoryReuse() {
Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
Assert.assertEquals(obj3, onheap4.getBaseObject());
}

@Test
// SPARK-26021
public void writeMinusZeroIsReplacedWithZero() {
byte[] doubleBytes = new byte[Double.BYTES];
byte[] floatBytes = new byte[Float.BYTES];
Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d);
Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f);

byte[] doubleBytes2 = new byte[Double.BYTES];
byte[] floatBytes2 = new byte[Float.BYTES];
Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, 0.0d);
Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, 0.0f);

// Make sure the bytes we write from 0.0 and -0.0 are same.
Assert.assertArrayEquals(doubleBytes, doubleBytes2);
Assert.assertArrayEquals(floatBytes, floatBytes2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@
* simultaneously opens separate serializers and file streams for all partitions. As a result,
* {@link SortShuffleManager} only selects this write path when
* <ul>
* <li>no Ordering is specified,</li>
* <li>no Aggregator is specified, and</li>
* <li>the number of partitions is less than
* <li>no map-side combine is specified, and</li>
* <li>the number of partitions is less than or equal to
* <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li>
* </ul>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ final class ShuffleExternalSorter extends MemoryConsumer {
*/
private void writeSortedFile(boolean isLastFile) {

// This call performs the actual sort.
final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
inMemSorter.getSortedIterator();

// If there are no sorted records, so we don't need to create an empty spill file.
if (!sortedRecords.hasNext()) {
return;
}

final ShuffleWriteMetricsReporter writeMetricsToUse;

if (isLastFile) {
Expand All @@ -157,10 +166,6 @@ private void writeSortedFile(boolean isLastFile) {
writeMetricsToUse = new ShuffleWriteMetrics();
}

// This call performs the actual sort.
final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
inMemSorter.getSortedIterator();

// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
// data through a byte array. This array does not need to be large enough to hold a single
Expand Down
Loading

0 comments on commit 9acdf9d

Please sign in to comment.