Skip to content

Commit

Permalink
Spelling and grammatical fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebunting committed Nov 6, 2017
1 parent 27a2912 commit 87d4bb8
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 54 deletions.
97 changes: 70 additions & 27 deletions Labs/Lab04/Lab04-complete.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
"metadata": {},
"source": [
"### Load required packages\n",
"To use Spark Structured Streaming with Kafka, you must load the appropriate packages. The version must match the version of both Kafka and Spark that you are using, so for our setup we need to load packages that works with Kafka on HDInsight 3.6, and Spark 2.1 on HDInsight 3.6."
"To use Spark Structured Streaming with Kafka, you must load the appropriate packages. The version must match the version of both Kafka and Spark that you are using, so for our setup we need to load packages that work with Kafka on HDInsight 3.6, and Spark 2.1 on HDInsight 3.6."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%%configure -f\n",
Expand Down Expand Up @@ -87,7 +89,7 @@
"metadata": {},
"source": [
"### Get Zookeeper hosts\n",
"Topics are registered in ZooKeeper, which means you must provide the **Zookeeper host** information for your Kafka cluster. To find the Zookeeper host information for your Kafka HDInsight cluster, you can use the Ambari REST API. The following cell retrieves this information using the the `curl` and `jq` utilities using a `%%bash` shell magic command.\n",
"Topics are registered in ZooKeeper, which means you must provide the **Zookeeper host** information for your Kafka cluster. To find the Zookeeper host information for your Kafka HDInsight cluster, you can use the Ambari REST API. The following cell retrieves this information using the `curl` and `jq` utilities using a `%%bash` shell magic command.\n",
"\n",
"> While there may be more than two Zookeeper hosts for your cluster, you do not need to provide a full list of all hosts to clients. One or two is enough. In this case, we return two.\n",
"\n",
Expand All @@ -104,7 +106,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%%bash\n",
Expand All @@ -128,7 +132,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%%bash\n",
Expand All @@ -155,7 +161,7 @@
"From a **new** bash shell prompt, you will upload the compiled `jar` file to the local storage of your Spark HDInsight cluster head node using an `scp` command. As done earlier, replace SPARKCLUSTERNAME with the name you provided earlier for your Spark cluster. When prompted, enter the password for the SSH user. Replace the \"/path/to/Kafka-Producer-Consumer/kafka-producer-consumer.jar\" with the path to this file in the Lab04 folder.\n",
"\n",
"```bash\n",
"scp ./kafka-producer-consumer.jar sshuser@SPARKCLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar\n",
"scp /path/to/Kafka-Producer-Consumer/kafka-producer-consumer.jar sshuser@SPARKCLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar\n",
"```"
]
},
Expand All @@ -177,7 +183,7 @@
"metadata": {},
"source": [
"### Get you Kafka brokers\n",
"Before attempting to run the Kafka producer, you need to retrieve your **Kafka brokers**. These brokers provide the connection information needed for the kafka-producer-consumer command-line app to write and read records to and from your Kafka cluster. To find the Kafka broker information for your Kafka HDInsight cluster, you can use the Ambari REST API. The following cell retrieve this information using the the `curl` and `jq` utilities using a `%%bash` shell magic command.\n",
"Before attempting to run the Kafka producer, you need to retrieve your **Kafka brokers**. These brokers provide the connection information needed for the kafka-producer-consumer command-line app to write and read records to and from your Kafka cluster. To find the Kafka broker information for your Kafka HDInsight cluster, you can use the Ambari REST API. The following cell retrieve this information using the `curl` and `jq` utilities using a `%%bash` shell magic command.\n",
"\n",
"> While there may be more than two broker hosts for your cluster, you do not need to provide a full list of all hosts to clients. One or two is enough. In this case, we return two.\n",
"\n",
Expand All @@ -195,6 +201,7 @@
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true,
"scrolled": true
},
"outputs": [],
Expand Down Expand Up @@ -276,7 +283,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Use Spark SQL to retrieve all the records from the \"weblogs\" Hive table.\n",
Expand All @@ -291,13 +300,15 @@
"### Create sample dataset\n",
"From the weblogs data, create a small sample of data you can send to Kafka.\n",
"\n",
"> The weblogs data contains approximately 90 million records, as well as mulitple fields that are not need for this exercise, so the sampel dataset will make things more manageable."
"> The weblogs data contains approximately 90 million records, as well as mulitple fields that are not need for this exercise, so the sample dataset will make things more manageable."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Use the select and limit methods of a Spark Dataframe to create a new Dataframe\n",
Expand Down Expand Up @@ -329,7 +340,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Import required libraries\n",
Expand All @@ -356,15 +369,17 @@
"metadata": {},
"source": [
"### Start the producer stream\n",
"In the next cell, you will start streaming weblog data into Kafka. For accommodate this, you will generate a KafkaProducer, and use its send method to pass each row of weblog data into Kafka.\n",
"In the next cell, you will start streaming weblog data into Kafka. To accomplish this, you will generate a KafkaProducer, and use its send method to pass each row of weblog data into Kafka.\n",
"\n",
"Once you start running the cell, return to your bash shell with the open consumer session, and you should see the new records streaming into the topic in Kafka."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Create an accumulator to track the number of weblog entries emitted to Kafka\n",
Expand Down Expand Up @@ -412,7 +427,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Construct a streaming DataFrame that reads from weblogs\n",
Expand All @@ -439,7 +456,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Convert the value column to a string\n",
Expand Down Expand Up @@ -468,7 +487,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Import libraries used for declaring schemas and working with JSON data\n",
Expand All @@ -494,7 +515,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"val weblog_data = kafka_value.select(from_json(col(\"value\").cast(\"string\"), schema).alias(\"weblog\"))\n",
Expand All @@ -514,7 +537,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"weblogs_stream.isStreaming"
Expand All @@ -530,7 +555,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"weblogs.isStreaming"
Expand All @@ -555,7 +582,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"val query = { weblogs_stream\n",
Expand All @@ -575,7 +604,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"val user_product_mapping = spark.sql(\"select distinct ProductId, UserId from streamingLogs\")\n",
Expand Down Expand Up @@ -607,7 +638,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%%sh\n",
Expand All @@ -624,7 +657,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Import mllib recommendation data types\n",
Expand All @@ -645,7 +680,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"val predictions = model.transform(user_product_mapping)\n",
Expand All @@ -662,7 +699,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"val recommended_products = predictions.where(\"not isnan(prediction)\").orderBy(\"UserId\", \"prediction\")\n",
Expand All @@ -681,7 +720,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"val products_schema = { (new StructType)\n",
Expand Down Expand Up @@ -717,7 +758,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"recommended_products.join(products, \"ProductId\").where(\"UserId = 807\").orderBy(col(\"prediction\").desc).show(10)"
Expand Down
Loading

0 comments on commit 87d4bb8

Please sign in to comment.