-
Notifications
You must be signed in to change notification settings - Fork 14
/
workers.py
185 lines (167 loc) · 7.08 KB
/
workers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import settings
from threading import Thread
import feedparser
import requests
from requests import ConnectionError
from dragnet import extract_content, extract_comments
import time
from langdetect import detect # lento
from textblob import TextBlob
from datetime import datetime
import dataset
import urllib2
class FeedWorker(Thread):
def __init__(self, feeds_queue, pages_queue, crawled_urls, mutex):
Thread.__init__(self)
self.feeds_queue = feeds_queue
self.pages_queue = pages_queue
self.crawled_urls = crawled_urls
self.mutex = mutex
def run(self):
item = self.feeds_queue.get()
while item != None:
d = feedparser.parse(item)
for post in d.entries:
try:
post.link = self.filter_url(post.link)
except:
pass
else:
self.mutex.acquire()
if post.link not in self.crawled_urls and self.is_not_forbidden(post):
self.crawled_urls.add(post.link)
try:
created = post.published # news
except:
created = post.updated # reddit
self.pages_queue.put({'title':post.title, 'page':post.link, 'date':created})
self.mutex.release()
time.sleep(0.1)
self.feeds_queue.put(item)
item = self.feeds_queue.get()
def is_not_forbidden(self, post):
# solo devuelve el contenido con extract_comments, pero con terminos y condiciones
if "financemagnates" in post.link:
return False
if "ambcrypto" in post.link: # lee mal
return False
# # if "https://medium.com/@koinmedya" in link: # ruso?
# return False
# if "https://medium.com/@clickchain/" in link:
# return False
# if "https://medium.com/@rojisaeroji92" in link:
# return False
# if "https://medium.com/@eosys" in link:
# return False
# if "ft.com" in link: # requiere suscripcion, lee cualquier cosa
# return False
if "reddit" in post.link:
try:
if post.updated is not None and settings.reddit==True:
return True
except:
pass
return False
if "twitter" in post.link:
return False
return True
def filter_url(self, link):
if "/#" in link: # unifico links que llegan con distinto #comment
return link.split("/#")[0]
if "?source" in link: # medium con diferente sources por feed
return link.split("?source")[0]
return link
class PageWorker(Thread):
def __init__(self, pages_queue, crawled_urls, mutex, news_id, news_queue):
Thread.__init__(self)
self.pages_queue = pages_queue
self.crawled_urls = crawled_urls
self.mutex = mutex
self.news_id = news_id
self.news_queue = news_queue
db = dataset.connect(settings.scraper_db)
self.table = db[settings.news_table]
def run(self):
item = self.pages_queue.get()
while item != None:
try:
r = requests.get(item['page'])
except ConnectionError as e:
# print(e)
pass
else:
if r.status_code != 200:
# print(datetime.now(), self.name, "WARNING: Removing url with bad response:", r.status_code, item["page"])
if r.status_code != 403: # else it is forbidden
self.crawled_urls.remove(item["page"])
else:
content = self.scraper(r.content, item["page"])
if self.content_is_important(content): # not empty
# if not not content.strip(): # not empty
if self.my_string_has_language(content, "en"):
self.mutex.acquire()
# print(datetime.now(), self.news_id[0], item["page"])
self.news_id[0] += 1
self.news_queue.put({
"id_str":self.news_id[0],
"created_at":item["date"],
"user_name":item["page"].encode('utf-8'),
"content":content.encode('utf-8')
})
# file = "news/doc"+str(self.news_id[0])+".txt"
# doc_file = open(file, "w")
# doc_file.write(str(item["date"])+"\n")
# doc_file.write(item["page"].encode('utf-8')+"\n")
# doc_file.write(content.encode('utf-8'))
# doc_file.close()
self.table.insert(dict(
created_at=item["date"],
user_name=item["page"],
content=content
))
self.mutex.release()
else:
# print(datetime.now(), self.name, "WARNING: unknown language", item["page"], content)
pass
else:
# print(datetime.now(), self.name, "WARNING: content empty or not important extracted from:", item["page"], content)
pass
item = self.pages_queue.get()
print(self.name, "finish")
def content_is_important(self, content):
if not content.strip():
return False
else:
for word in settings.track_terms:
if word in content:
return True
return False
def scraper(self, html, link):
text = extract_content(html)
if "cnbc" in link: # in this case content is extracted also in the comments
text += " " + extract_comments(html)
text = text.split("disclaimer")[0]
return text
def my_string_has_language(self, text, language): # lento!!!!
# text = self.robust_decode(text)
try:
if detect(text) == language: # lento!!!!
# if TextBlob(text).detect_language()==language:
return True
except urllib2.HTTPError as err:
raise urllib2.HTTPError("Google Translation API connection is in trouble. Try commenting line 165 in workers.py and uncommenting line 164.")
except:
pass
return False
# return True
def robust_decode(self, bs):
'''Takes a byte string as param and convert it into a unicode one.
First tries UTF8, and fallback to Latin1 if it fails'''
cr = None
try:
cr = bs.encode('utf8')
except UnicodeDecodeError:
cr = bs.decode('latin1')
except Exception as e:
pass
return cr