-
Notifications
You must be signed in to change notification settings - Fork 6
/
streamer.py
executable file
·191 lines (149 loc) · 5.37 KB
/
streamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/python
""" This is a utility that allows tweets to be read off in real time
To stop, use a KeyboardInterrupt like CTRL + C"""
import Queue
import json
import sys
import threading
import time
import tweepy
import utils
global stream # so that CTRL + C kills stream
class ListenerQueue(tweepy.streaming.StreamListener):
"""A StreamListener implementation for accessing Twitter Streaming API
that writes to a queue object sent on initialization.
Usage: myListener = ListenerQueue(queue)
Stream(authorization, myListener)
"""
def __init__(self, queue, filename, search_terms):
super(ListenerQueue, self).__init__()
self.queue = queue
self.search_terms = search_terms
self.json_file = open(filename, 'a')
self.json_file.seek(0)
self.json_file.truncate()
def has_all_search_terms(self, text):
for term in self.search_terms:
if text.find(term) > -1:
continue
else:
return False
return True
def on_status(self, status):
text = status.text
if self.search_terms:
if not self.has_all_search_terms(text):
return True
self.queue.put(status)
# sj = status._json
sj = utils.get_simplified_tweet(status)
# filter_lev = status.filter_level
# print filter_lev
j = json.dumps(sj, indent=1)
self.json_file.write(j)
return True
def on_error(self, status):
# error codes: https://dev.twitter.com/overview/api/response-codes
print status
if status == 420:
print "Too many attempts made to contact the Twitter server"
print "Wait awhile to use the tool again"
return False # returning False in on_data disconnects the stream
def on_disconnect(self):
super(ListenerQueue, self).on_disconnect()
print "stream disconnected"
self.json_file.close()
if self.json_file.closed:
print "json file closed successfully"
# def stream_to_json_file(fn='tweets.json'):
# auth = get_creds()
# L = ListenerJSON(fn)
# stream = Stream(auth, L)
# stream.filter(locations=[-122.75, 36.8, -121.75, 37.8], async=True)
# # can find terms: by adding track=['python']
# print "waiting 15s"
# time.sleep(15)
# print "terminating"
# stream.disconnect()
# L.json_file.close()
def get_tweets_from_q(queue):
while True:
status = queue.get(True, 5)
print u"Tweet Message : {}\n\n".format(status.text)
queue.task_done()
def start_stream(q, bounding_box, fn='tweets.json', search_terms=None):
'''Takes in a Queue object, a bounding_box of [lon, lat, lon, lat] for
SW and NE corners, a filename and a search term list. Examples in:
bounding_box = geo_converter.get_bounding_box_from(g)
search_terms = geo_converter.get_search_terms_from(g)
'''
global stream
(__, auth) = utils.get_credentials("consumerkeyandsecret", False)
L = ListenerQueue(q, fn, search_terms)
stream = tweepy.Stream(auth, L)
stream.filter(locations=bounding_box, filter_level='none', async=True)
# if search_terms:
# # OR semantics:
# stream.filter(locations=bounding_box, track=search_terms, async=True)
# else:
# stream.filter(locations=bounding_box, async=True)
return stream
def kill_stream(stream):
if stream:
print "attempting to disconnect stream from kill_stream"
stream.disconnect()
print "closing file in 1 second..."
time.sleep(1)
stream.listener.json_file.close()
else:
print "stream not set"
def main():
print __doc__
q = Queue.Queue()
bounding_box = [-122.75, 36.8, -121.75, 37.8]
global stream
stream = start_stream(q, bounding_box)
# t = threading.Thread(target=start_stream, args=(q, bounding_box))
# t.daemon = True
# t.start()
# t.join()
# print "waiting 15s"
# time.sleep(15)
# kill_stream(stream)
# stream_to_json_file()
# get_tweets_from_q(q)
# now read in the files
# https://dev.twitter.com/streaming/overview/request-parameters
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print "Main function interrupted"
if "stream" in globals():
print "trying to kill stream"
kill_stream(stream)
sys.exit()
# class ListenerJSON(StreamListener):
# """A StreamListener implementation for accessing Twitter Streaming API
# that writes to a JSON file
# """
# def __init__(self, filename):
# super(ListenerJSON, self).__init__()
# self.json_file = open(filename, 'a')
# def on_status(self, status):
# # print data
# # print u"Tweet Message : {}\n\n".format(status.text)
# print type(status)
# sj = status._json
# j = json.dumps(sj, indent=1)
# self.json_file.write(j)
# return True
# def on_error(self, status):
# # error codes: https://dev.twitter.com/overview/api/response-codes
# print status
# if status == 420:
# return False # returning False in on_data disconnects the stream
# def on_disconnect(self):
# super(ListenerJSON, self).on_disconnect()
# print "made it to disconnector"
# self.json_file.close()