-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMicroBlogSearch.py
150 lines (111 loc) · 4.48 KB
/
MicroBlogSearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python
import sys
import time
import urllib2
from pprint import pprint
from xml.sax.saxutils import unescape
try:
import json
except ImportError:
import simplejson as json
class MicroSearch(object):
"""Queries search.twitter.com for query string. Remembers id of last retrieved message,
and in subsequent calls only returns new messages."""
# type of search object (I, M, T)
obj_type = "M"
# max no of tweets to report on the 1st run
max_tweets = 5
def __init__(self, url, channel, bot_fact=None):
self.url = url
self.bot_fact = bot_fact
self.first_run = True
# Key for storing IDs of last msgs retrieved
self.key = self.obj_type + "|" + channel + "|" + self.url
def get_last_id(self):
return self.bot_fact.memory.get(self.key, 0)
def set_last_id(self, value):
self.bot_fact.memory[self.key] = value
last_id = property(get_last_id, set_last_id)
def find_last_id(self, data):
ids = ( int(x['id']) for x in data['results'] )
return max(ids)
def read_data(self, msg=None):
# msg content can be passed in. This is useful when a custom method for retrieving URL content
# is preferable. Example: deferred calls in the Twisted framework.
if msg is None:
try:
msg = urllib2.urlopen(self.url).read()
except urllib2.HTTPError:
print "got urllib2.HTTPError"
return
data = json.loads(msg)
data['results'].reverse()
old_last_id = self.last_id
self.last_id = max(self.find_last_id(data), self.last_id)
# Print only max_tweets on the 1st run. Next calls will print new messages since previous polling.
if self.first_run:
data['results'] = data['results'][-self.max_tweets:]
self.first_run = False
print old_last_id, self.last_id
for x in data['results']:
if int(x['id']) > old_last_id:
# ignore retweets (or we can get too much "spam")
if not x['text'].startswith("RT @"):
yield x
class TwitterSearch(MicroSearch):
# type of search object (I, M, T)
obj_type = "T"
def __init__(self, url, channel, bot_fact = None):
super(TwitterSearch,self).__init__(url, channel, bot_fact)
def format_output(self, msg):
# Twitter does not supply the URL of the entry in its JSON results :(
msg_url = "http://twitter.com/%s/status/%s" % ( msg['from_user'], msg['id'] )
msg_date= msg['created_at']
if msg_date.endswith(" +0000"):
msg_date = msg_date[0:-len(" +0000")]
# unescape " in entry text (NOTE: unescape() does not do that by default)
txt = unescape(msg['text'],{""": '"'})
# txt = msg['text'].replace('"', '"')
tmpl = "Tw [%-12s] %s <<< %s"
return tmpl % ( msg['from_user'], txt, msg_url )
class IdentiSearch(MicroSearch):
# type of search object (I, M, T)
obj_type = "I"
def __init__(self, url, channel, bot_fact = None):
super(IdentiSearch,self).__init__(url, channel, bot_fact)
def format_output(self, msg):
# IdentiCa does not supply the URL of the entry in its JSON results :(
msg_url = "http://identi.ca/notice/%s" % (msg['id'],)
msg_date= msg['created_at']
if msg_date.endswith(" +0000"):
msg_date = msg_date[0:-len(" +0000")]
txt = msg['text']
tmpl = "Id [%-12s] %s <<< %s"
return tmpl % ( msg['from_user'], txt, msg_url )
def main():
#url = "http://search.twitter.com/search.json?q=+SIOC+OR+FOAF+OR+%23deri"
class MockFactory(object):
def __init__(self, cfg = None):
self.memory = {}
if len(sys.argv)>1:
if sys.argv[1]=="twitter":
url = "http://search.twitter.com/search.json?q=+SIOC+OR+FOAF+OR+%23deri"
t = TwitterSearch(url, "", MockFactory())
elif sys.argv[1]=="identica":
url = "http://identi.ca/api/search.json?q=SIOC"
t = IdentiSearch(url, "", MockFactory())
else:
print "Usage: %s [twitter|identica]" % (sys.argv[0],)
sys.exit(-1)
else:
print "Usage: %s [twitter|identica]" % (sys.argv[0],)
sys.exit(-1)
n=0
while 1:
n+=1
print "Run #%s" % (n,)
for i in t.read_data():
print t.format_output(i)
time.sleep(60)
if __name__ == "__main__":
main()