-
Notifications
You must be signed in to change notification settings - Fork 1
/
alisto92-sparkhistory.txt
executable file
·29 lines (29 loc) · 1.48 KB
/
alisto92-sparkhistory.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# read from kafka, at pyspark prompt
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessment").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()
# see schema
raw_assessments.printSchema()
# see messages
raw_assessments.show()
# cache to reduce warnings
raw_assessments.cache()
# cast as strings
assessments = raw_assessments.select(raw_assessments.value.cast('string'))
# take a look
assessments.show()
assessments.printSchema()
assessments.count()
assessments.write.parquet("/tmp/assessments")
extracted_assessments = assessments.rdd.map(lambda x: json.loads(x.value)).toDF()
import json
extracted_assessments = assessments.rdd.map(lambda x: json.loads(x.value)).toDF()
extracted_assessments.show()
extracted_assessments.write.parquet("/tmp/extracted_assessments")
extracted_assessments.registerTempTable('assessments')
spark.sql("select count(keen_id) from assessments").show()
assessment_counts = spark.sql("select count(keen_id) from assessments")
assessment_counts.write.parquet("/tmp/assessment_counts")
spark.sql("select keen_id, exam_name from assessments limit 10")
spark.sql("select keen_id, exam_name from assessments limit 10")some_assessment_info = spark.sql("select keen_id, exam_name from assessments limit 10")
some_assessment_info = spark.sql("select keen_id, exam_name from assessments limit 10")
some_assessment_info.write.parquet("/tmp/some_assessment_info")
quit()