-
Notifications
You must be signed in to change notification settings - Fork 1
/
analyze_stream.py
73 lines (56 loc) · 1.66 KB
/
analyze_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
import time
from secrets import *
import json
import multiprocessing
from utils import stream, TWITTER_HANDLE
#from utils import get_conn
logger = logging.getLogger(__name__)
PROCESS_NUM = 5
class SingTweetProcess(multiprocessing.Process):
def __init__(self, queue, pid):
multiprocessing.Process.__init__(self)
self.pid2 = pid
self.queue = queue
def sing(self, text):
#mock singing
#upload to soundcloud
pass
def run(self):
items = 0
while True:
items += 1
tweet = self.queue.get()
try:
self.sing(tweet)
except Exception, e:
logger.error("singing failed %s %s: \n\t%s", time.ctime(), self.pid2, e)
self.queue.task_done()
def check_stream():
queue = multiprocessing.JoinableQueue(10000)
processes = []
for i in range(PROCESS_NUM):
process = FixMetadataNullsProcess(queue, i, do_it)
process.start()
processes.append(process)
for i in stream():
try:
tweet = json.loads(i)
if tweet.get('user', {}).get('screen_name', None) == TWITTER_HANDLE:
#skip my own shit
continue
tweet_text = tweet.get('text', None)
if tweet_text:
queue.put(tweet_text)
except Exception, e:
logger.error("unable to decode %r", i)
logger.error("exception %r", e)
queue.join()
def chill_brah():
time.sleep(1)
def main():
check_stream()
chill_brah()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()