Skip to content

Commit

Permalink
[SPARK-20015][SPARKR][SS][DOC][EXAMPLE] Document R Structured Streami…
Browse files Browse the repository at this point in the history
…ng (experimental) in R vignettes and R & SS programming guide, R example

## What changes were proposed in this pull request?

Add
- R vignettes
- R programming guide
- SS programming guide
- R example

Also disable spark.als in vignettes for now since it's failing (SPARK-20402)

## How was this patch tested?

manually

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17814 from felixcheung/rdocss.
  • Loading branch information
felixcheung authored and Felix Cheung committed May 4, 2017
1 parent fc472bd commit b8302cc
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 44 deletions.
79 changes: 73 additions & 6 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ head(df)
```

### Data Sources
SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL programming guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.
SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL Programming Guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.

The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session`.

Expand Down Expand Up @@ -232,7 +232,7 @@ write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite"
```

### Hive Tables
You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).
You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).

```{r, eval=FALSE}
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
Expand Down Expand Up @@ -314,7 +314,7 @@ Use `cube` or `rollup` to compute subtotals across multiple dimensions.
mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
```

generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while
generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while

```{r}
mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
Expand Down Expand Up @@ -672,6 +672,7 @@ head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction
Survival analysis studies the expected duration of time until an event happens, and often the relationship with risk factors or treatment taken on the subject. In contrast to standard regression analysis, survival modeling has to deal with special characteristics in the data including non-negative survival time and censoring.

Accelerated Failure Time (AFT) model is a parametric survival model for censored data that assumes the effect of a covariate is to accelerate or decelerate the life course of an event by some constant. For more information, refer to the Wikipedia page [AFT Model](https://en.wikipedia.org/wiki/Accelerated_failure_time_model) and the references there. Different from a [Proportional Hazards Model](https://en.wikipedia.org/wiki/Proportional_hazards_model) designed for the same purpose, the AFT model is easier to parallelize because each instance contributes to the objective function independently.

```{r, warning=FALSE}
library(survival)
ovarianDF <- createDataFrame(ovarian)
Expand Down Expand Up @@ -902,15 +903,15 @@ perplexity

There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, `nonnegative`. For a complete list, refer to the help file.

```{r}
```{r, eval=FALSE}
ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(ratings, c("user", "item", "rating"))
model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegative = TRUE)
```

Extract latent factors.
```{r}
```{r, eval=FALSE}
stats <- summary(model)
userFactors <- stats$userFactors
itemFactors <- stats$itemFactors
Expand All @@ -920,7 +921,7 @@ head(itemFactors)

Make predictions.

```{r}
```{r, eval=FALSE}
predicted <- predict(model, df)
head(predicted)
```
Expand Down Expand Up @@ -1002,6 +1003,72 @@ unlink(modelPath)
```


## Structured Streaming

SparkR supports the Structured Streaming API (experimental).

You can check the Structured Streaming Programming Guide for [an introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model) to its programming model and basic concepts.

### Simple Source and Sink

Spark has a few built-in input sources. As an example, to test with a socket source reading text into words and displaying the computed word counts:

```{r, eval=FALSE}
# Create DataFrame representing the stream of input lines from connection
lines <- read.stream("socket", host = hostname, port = port)
# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# Generate running word count
wordCounts <- count(groupBy(words, "word"))
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")
```

### Kafka Source

It is simple to read data from Kafka. For more information, see [Input Sources](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources) supported by Structured Streaming.

```{r, eval=FALSE}
topic <- read.stream("kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1")
keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")
```

### Operations and Sinks

Most of the common operations on `SparkDataFrame` are supported for streaming, including selection, projection, and aggregation. Once you have defined the final result, to start the streaming computation, you will call the `write.stream` method setting a sink and `outputMode`.

A streaming `SparkDataFrame` can be written for debugging to the console, to a temporary in-memory table, or for further processing in a fault-tolerant manner to a File Sink in different formats.

```{r, eval=FALSE}
noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")
# Print new data to console
write.stream(noAggDF, "console")
# Write new data to Parquet files
write.stream(noAggDF,
"parquet",
path = "path/to/destination/dir",
checkpointLocation = "path/to/checkpoint/dir")
# Aggregate
aggDF <- count(groupBy(noAggDF, "device"))
# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")
# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
head(sql("select * from aggregates"))
```


## Advanced Topics

### SparkR Object Classes
Expand Down
4 changes: 4 additions & 0 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,10 @@ The following example shows how to save/load a MLlib model by SparkR.
</tr>
</table>

# Structured Streaming

SparkR supports the Structured Streaming API (experimental). Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html)

# R Function Name Conflicts

When loading and attaching a new package in R, it is possible to have a name [conflict](https://stat.ethz.ch/R-manual/R-devel/library/base/html/library.html), where a
Expand Down
Loading

0 comments on commit b8302cc

Please sign in to comment.