-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.py
271 lines (246 loc) · 12.9 KB
/
handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import os
import subprocess
import shlex
import psutil
from psutil import NoSuchProcess
import signal
import pickle
import ndjson
import json
import calendar
import time
import requests
import threading
import gzip
from stem import Signal
from stem.control import Controller
from file_read_backwards import FileReadBackwards
# SINCE = 1601510400
#QUERY = 'min_replies:20 trump OR biden OR @realDonaldTrump OR @JoeBiden OR usa2020elections OR usa2020 OR Election2020 OR Debates2020 OR VoteRedToSaveAmerica OR VoteRed OR VoteBlue OR VoteBlueToSaveAmerica OR trump2020 OR donaldtrump2020 OR DonaldTrump OR BidenHarris2020 OR joebiden2020 OR NeverTrump OR NeverBiden OR JoeBiden OR MAGA OR KAG OR WakeUpAmerica OR VoteEarly OR Ivoted OR VoteBidenHarrisToSaveAmerica OR VoteDonaldTrumpToSaveAmerica'
QUERY = '"Amber Heard" OR #AmberHeardIsAnAbuser OR #JusticeForJohnnyDepp OR #AmberHeardIsALiar'
# QUERY = 'mammamia'
# SINCE = 1601510400
# UNTIL = 1604188800
SINCE = 1577836800
UNTIL = 1609459200
TWEEPER_DIR = 'tweeper/'
HYDRATOR_DIR = 'hydrator/'
TWEEPER_CONFIG = 'configs/tweeper/'
HYDRATOR_CONFIG = 'configs/hydrator/'
TWEEPER_LOG = 'tweeper_logs/'
HYDRATOR_LOG = 'hydrator_logs/'
PYTHON = 'python3'
TWEEPER_DATA = 'result/'
HYDRATOR_DATA = 'result/'
TWEEPER_PARALLEL = 24
HYDRATOR_PARALLEL = 24
def change_ip():
while True:
with Controller.from_port(port=9051) as controller:
controller.authenticate(password='password')
controller.signal(Signal.NEWNYM)
session = requests.session()
session.proxies = {}
session.proxies['http'] = 'socks5h://localhost:9050'
session.proxies['https'] = 'socks5h://localhost:9050'
r = session.get('http://httpbin.org/ip')
print(r.text)
time.sleep(30)
def running(pid):
if pid is None:
return False
try:
proc = psutil.Process(pid)
if proc.status() == psutil.STATUS_ZOMBIE:
return False
os.kill(pid, 0)
except (OSError, NoSuchProcess):
return False
else:
return True
def main():
if not os.path.exists(TWEEPER_DATA):
os.makedirs(TWEEPER_DATA)
if not os.path.exists(HYDRATOR_DATA):
os.makedirs(HYDRATOR_DATA)
if not os.path.exists(TWEEPER_CONFIG):
os.makedirs(TWEEPER_CONFIG)
if not os.path.exists(HYDRATOR_CONFIG):
os.makedirs(HYDRATOR_CONFIG)
if not os.path.exists(TWEEPER_LOG):
os.makedirs(TWEEPER_LOG)
if not os.path.exists(HYDRATOR_LOG):
os.makedirs(HYDRATOR_LOG)
if os.path.exists('pids'):
with open('pids', 'rb') as f:
pids = pickle.load(f)
else:
pids = {
'tweeper': None,
'hydrator': None
}
if pids['tweeper'] is not None:
if running(pids['tweeper']):
os.kill(pids['tweeper'], signal.SIGTERM)
pids['tweeper'] = None
if pids['hydrator'] is not None:
if running(pids['hydrator']):
os.kill(pids['hydrator'], signal.SIGTERM)
pids['hydrator'] = None
with open('pids', 'wb') as f:
pickle.dump(pids, f)
thread = threading.Thread(target=change_ip, args=())
thread.daemon = True
thread.start()
counter = 0
while True:
HYDRATOR_PARALLEL = 48
with open('pids', 'rb') as f:
pids = pickle.load(f)
if not running(pids['tweeper']):
launched = False
logs = sorted(os.listdir('{0}'.format(TWEEPER_LOG)))
for log in logs:
with FileReadBackwards('{0}{1}'.format(TWEEPER_LOG, log), encoding='utf-8') as frb:
try:
line = frb.readline()
message = ndjson.loads(line)[0]['message']
except (json.decoder.JSONDecodeError, IndexError):
message = 'ERROR'
if message != 'DONE':
since, until = log.split('_')[1:]
since = int(since)
until = int(until)
config = {
'name': '{0}{1}_{2}'.format(TWEEPER_DATA, since, until),
'query': QUERY,
'since': since,
'until': until,
'interval': 1,
'parallel': TWEEPER_PARALLEL
}
with open('{0}config_{1}_{2}.json'.format(TWEEPER_CONFIG, since, until), 'w') as f:
json.dump(config, f, ensure_ascii=False)
command = '{0} {1}main.py config -f {2}config_{3}_{4}.json'.format(PYTHON, TWEEPER_DIR,
TWEEPER_CONFIG, since, until)
args = shlex.split(command)
tweeper_process = subprocess.Popen(args, stdin=None, stdout=open(
'{0}log_{1}_{2}'.format(TWEEPER_LOG, since, until), 'a'), stderr=open(
'{0}log_{1}_{2}'.format(TWEEPER_LOG, since, until), 'a'))
pids['tweeper'] = tweeper_process.pid
with open('pids', 'wb') as f:
pickle.dump(pids, f)
launched = True
HYDRATOR_PARALLEL = 24
break
if not launched:
since = int(logs[-1].split('_')[2]) if len(logs) > 0 else SINCE
# until = calendar.timegm(time.gmtime())
# DAY BY DAY
until = since + 60 * 60 * 24
if until <= UNTIL:
config = {
'name': '{0}{1}_{2}'.format(TWEEPER_DATA, since, until),
'query': QUERY,
'since': since,
'until': until,
'interval': 1,
'parallel': TWEEPER_PARALLEL
}
with open('{0}config_{1}_{2}.json'.format(TWEEPER_CONFIG, since, until), 'w') as f:
json.dump(config, f, ensure_ascii=False)
command = '{0} {1}main.py config -f {2}config_{3}_{4}.json'.format(PYTHON, TWEEPER_DIR, TWEEPER_CONFIG, since, until)
args = shlex.split(command)
tweeper_process = subprocess.Popen(args, stdin=None, stdout=open('{0}log_{1}_{2}'.format(TWEEPER_LOG, since, until), 'a'), stderr=open('{0}log_{1}_{2}'.format(TWEEPER_LOG, since, until), 'a'))
pids['tweeper'] = tweeper_process.pid
with open('pids', 'wb') as f:
pickle.dump(pids, f)
launched = True
HYDRATOR_PARALLEL = 24
# else:
# HYDRATOR_PARALLEL = 24
# time.sleep(5)
# folders = sorted(os.listdir('{0}'.format(HYDRATOR_DATA)))
# hydrated_conversations = []
# for folder in folders:
# if os.path.exists('{0}{1}/conversations/'.format(HYDRATOR_DATA, folder)):
# conversations = os.listdir('{0}{1}/conversations'.format(HYDRATOR_DATA, folder))
# for conversation in conversations:
# hydrated_conversations.append(conversation.split('_')[1].split('.')[0])
# if not running(pids['hydrator']):
# launched = False
# for folder in folders:
# if not os.path.exists('{0}{1}/conversations/'.format(HYDRATOR_DATA, folder)):
# # tweets_files = sorted(os.listdir('{0}{1}/tweets'.format(HYDRATOR_DATA, folder)))
# tweets_files = sorted(t for t in os.listdir('{0}{1}/tweets'.format(HYDRATOR_DATA, folder)) if t.endswith('.gz'))
# incomplete_tweets_files = sorted(t for t in os.listdir('{0}{1}/tweets'.format(HYDRATOR_DATA, folder)) if t.endswith('.ndjson'))
# if len(tweets_files) > 0 and len(incomplete_tweets_files) == 0:
# conversations = []
# for tweet_file in tweets_files:
# with gzip.open('{0}{1}/tweets/{2}'.format(HYDRATOR_DATA, folder, tweet_file), 'rt') as f:
# # tweets = ndjson.reader(f)
# for f_tweet in f:
# tweet = json.loads(f_tweet)
# # if tweet['id_str'] not in hydrated_conversations and ( tweet['in_reply_to_status_id_str'] is not None or tweet['reply_count'] > 0 ):
# if tweet['conversation_id_str'] not in hydrated_conversations:
# conversations.append(tweet['conversation_id_str'])
# hydrated_conversations.append(tweet['conversation_id_str'])
# since, until = folder.split('_')
# config = {
# 'name': '{0}{1}_{2}'.format(HYDRATOR_DATA, since, until),
# 'conversations': conversations,
# 'parallel': HYDRATOR_PARALLEL
# }
# with open('{0}config_{1}_{2}.json'.format(HYDRATOR_CONFIG, since, until), 'w') as f:
# json.dump(config, f, ensure_ascii=False)
# command = '{0} {1}main.py config -f {2}config_{3}_{4}.json'.format(PYTHON, HYDRATOR_DIR,
# HYDRATOR_CONFIG, since, until)
# args = shlex.split(command)
# hydrator_process = subprocess.Popen(args, stdin=None, stdout=open(
# '{0}log_{1}_{2}'.format(HYDRATOR_LOG, since, until), 'a'), stderr=open(
# '{0}log_{1}_{2}'.format(HYDRATOR_LOG, since, until), 'a'))
# pids['hydrator'] = hydrator_process.pid
# with open('pids', 'wb') as f:
# pickle.dump(pids, f)
# launched = True
# break
# if not launched and len(folders) > 0:
# folder = folders[counter]
# if counter == 0:
# hydrated_conversations = []
# counter = (counter + 1) % len(folders)
# # tweets_files = sorted(os.listdir('{0}{1}/tweets'.format(HYDRATOR_DATA, folder)))
# tweets_files = sorted(
# t for t in os.listdir('{0}{1}/tweets'.format(HYDRATOR_DATA, folder)) if t.endswith('.gz'))
# incomplete_tweets_files = sorted(t for t in os.listdir('{0}{1}/tweets'.format(HYDRATOR_DATA, folder)) if t.endswith('.ndjson'))
# if len(tweets_files) > 0 and len(incomplete_tweets_files) == 0:
# conversations = []
# for tweet_file in tweets_files:
# with gzip.open('{0}{1}/tweets/{2}'.format(HYDRATOR_DATA, folder, tweet_file), 'rt') as f:
# # tweets = ndjson.reader(f)
# for f_tweet in f:
# tweet = json.loads(f_tweet)
# # if tweet['conversation_id_str'] not in hydrated_conversations and (tweet['in_reply_to_status_id_str'] is not None or tweet['reply_count'] > 0):
# if tweet['conversation_id_str'] not in hydrated_conversations:
# conversations.append(tweet['conversation_id_str'])
# hydrated_conversations.append(tweet['conversation_id_str'])
# since, until = folder.split('_')
# config = {
# 'name': '{0}{1}_{2}'.format(HYDRATOR_DATA, since, until),
# 'conversations': conversations,
# 'parallel': HYDRATOR_PARALLEL
# }
# with open('{0}config_{1}_{2}.json'.format(HYDRATOR_CONFIG, since, until), 'w') as f:
# json.dump(config, f, ensure_ascii=False)
# command = '{0} {1}main.py config -f {2}config_{3}_{4}.json'.format(PYTHON, HYDRATOR_DIR,
# HYDRATOR_CONFIG, since, until)
# args = shlex.split(command)
# hydrator_process = subprocess.Popen(args, stdin=None, stdout=open(
# '{0}log_{1}_{2}'.format(HYDRATOR_LOG, since, until), 'a'), stderr=open(
# '{0}log_{1}_{2}'.format(HYDRATOR_LOG, since, until), 'a'))
# pids['hydrator'] = hydrator_process.pid
# with open('pids', 'wb') as f:
# pickle.dump(pids, f)
# launched = True
if __name__ == '__main__':
main()