-
Notifications
You must be signed in to change notification settings - Fork 245
/
Wikipedia.py
78 lines (69 loc) · 2.37 KB
/
Wikipedia.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Databricks notebook source
#
# Wikipedia Clickstream
# An example Delta Live Tables pipeline that ingests wikipedia click stream data and builds some simple summary tables.
#
# Source: February 2015 English Wikipedia Clickstream in JSON
# More information of the columns can be found at: https://meta.wikimedia.org/wiki/Research:Wikipedia_clickstream
#
from pyspark.sql.functions import *
from pyspark.sql.types import *
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
@dlt.table(
comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.read.option("inferSchema", "true").json(json_path)
)
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
dlt.read("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def top_spark_referrers():
return (
dlt.read("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.sort(desc("click_count"))
.select("referrer", "click_count")
.limit(10)
)
@dlt.table(
comment="A list of the top 50 pages by number of clicks.",
table_properties={
"quality": "gold"
}
)
def top_pages():
return (
dlt.read("clickstream_clean")
.groupBy("current_page_title")
.agg(sum("click_count").alias("total_clicks"))
.sort(desc("total_clicks"))
.limit(50)
)