-
Notifications
You must be signed in to change notification settings - Fork 0
/
min-weather-on-year-1800.py
54 lines (42 loc) · 1.54 KB
/
min-weather-on-year-1800.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# set logging level
root = logging.getLogger()
root.setLevel(logging.INFO)
# create a pipeline
p = beam.Pipeline(options=PipelineOptions())
class ExtractWeatherData(beam.DoFn):
def process(self, element, *args, **kwargs):
yield (element[0], element[2], float(element[3])/10)
# station-id, date, entryType, celsius,,,,
lines = (p
| "Read Text File : Weather"
>> beam.io.ReadFromText("datasets/weather/year-1800.txt")
| "Split Data"
>> beam.Map(lambda line: line.split(","))
| "Reduce Columns"
>> beam.Map(lambda weather: (weather[0], weather[2], float(weather[3])/10)) #both Map and ParDo is ok
# >> beam.ParDo(ExtractWeatherData())
| "Filter TMIN"
>> beam.Filter(lambda weather: "TMIN" in weather[1])
| "Reduce K,V"
>> beam.Map(lambda (c): (c[0], c[2]))
| "Group By Key "
>> beam.GroupByKey() # make them iterable to use min()
| "Get Min Value"
>> beam.Map(lambda (k, v): (k, min(v)))
# | "Write output File"
# >> beam.io.WriteToText("datasets/words/book_output.txt")
)
(lines
| "Print Weather Data"
>> beam.ParDo(lambda (c): logging.info("Lines %s ", c))
)
(lines
| "Print Line Count"
>> beam.CombineGlobally(beam.combiners.CountCombineFn())
| "Print line count for %s"
>> beam.ParDo(lambda (c): logging.info("\nTotal line count = %s \n", c))
)
p.run().wait_until_finish()