-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathlocal.py
110 lines (94 loc) · 2.92 KB
/
local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.textio import ReadFromText, WriteToText
input_filename = 'input-377k.txt'
output_filename = 'output.txt'
# input_filename = 'gs://hn-dataflow-intro/input.txt'
# output_filename = 'gs://hn-dataflow-intro/output.txt'
options = PipelineOptions()
gcloud_options = options.view_as(GoogleCloudOptions)
gcloud_options.job_name = 'test-job'
# gcloud_options.project = 'hnacino-sandbox'
# gcloud_options.temp_location = 'gs://hn-dataflow-intro/temp'
# gcloud_options.staging_location = "gs://hn-dataflow-intro/stage"
# Local runner
options.view_as(StandardOptions).runner = 'direct'
# Dataflow runner
# options.view_as(StandardOptions).runner = 'dataflow'
class Split(beam.DoFn):
def process(self, element):
"""
Splits each row on commas and returns a dictionary representing the
row
"""
country, duration, user = element.split(",")
return [{
'country': country,
'duration': float(duration),
'user': user
}]
class CollectTimings(beam.DoFn):
def process(self, element):
"""
Returns a list of tuples containing country and duration
"""
result = [
(element['country'], element['duration'])
]
return result
class CollectUsers(beam.DoFn):
def process(self, element):
"""
Returns a list of tuples containing country and user name
"""
result = [
(element['country'], element['user'])
]
return result
class WriteToCSV(beam.DoFn):
def process(self, element):
"""
Prepares each row to be written in the csv
"""
result = [
"{},{},{}".format(
element[0],
element[1]['users'][0],
element[1]['timings'][0]
)
]
return result
with beam.Pipeline(options=options) as p:
rows = (
p |
ReadFromText(input_filename) |
beam.ParDo(Split())
)
timings = (
rows |
beam.ParDo(CollectTimings()) |
"Grouping timings" >> beam.GroupByKey() |
"Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn()
)
)
users = (
rows |
beam.ParDo(CollectUsers()) |
"Grouping users" >> beam.GroupByKey() |
"Counting users" >> beam.CombineValues(beam.combiners.CountCombineFn()
)
)
to_be_joined = (
{
'timings': timings,
'users': users
} |
beam.CoGroupByKey() |
beam.ParDo(WriteToCSV()) |
WriteToText(output_filename)
)