-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyflinkjob.py
49 lines (38 loc) · 1.38 KB
/
pyflinkjob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction
from org.apache.flink.api.java.functions import KeySelector
from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
import sys
class Generator(SourceFunction):
def __init__(self, num_iters):
self._running = True
self._num_iters = num_iters
def run(self, ctx):
counter = 0
while self._running and counter < self._num_iters:
ctx.collect('Hello World')
counter += 1
def cancel(self):
self._running = False
class Tokenizer(FlatMapFunction):
def flatMap(self, value, collector):
for word in value.lower().split():
collector.collect((1, word))
class Selector(KeySelector):
def getKey(self, input):
return input[1]
class Sum(ReduceFunction):
def reduce(self, input1, input2):
count1, word1 = input1
count2, word2 = input2
return (count1 + count2, word1)
def main(factory):
env = factory.get_execution_environment()
num = int(sys.argv[1])
env.set_parallelism(num)
env.create_python_source(Generator(num_iters=20000)) \
.flat_map(Tokenizer()) \
.key_by(Selector()) \
.time_window(milliseconds(50)) \
.reduce(Sum()) \
env.execute()