-
Notifications
You must be signed in to change notification settings - Fork 6
/
SessionMangement.py
300 lines (264 loc) · 9.07 KB
/
SessionMangement.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# coding=utf8
"""
author: wangjiawei
date: 2018-09-03
# 响应党的号召,给schedule减负
# 由slave节点来调用该模块,从而开始sessoin之路
# 09-06:
思路再次更变
cookie的派发是来一个请求发一个
不用等大家
# 09-26 需要引入计数原则
每个id,出现错误次数,不能一出现就ban,计数,当达到阈值,再ban
交给spider去做
"""
import os
import config
import datetime
from config import redis_cli
from LoginHandler import main_theme
from utils import initial_file
from utils import loads_json
from utils import dumps_json
from utils import write_2_file
from utils import make_list
from utils import wait_for_msg_list
from utils import wait_for_msg_long
class SessionMangement():
"""
session管理器
提供2个功能
1. 添加新用户
2. 删除无效的用户
# 08-30
# 添加一个api 用来调用cookie
# 09-03
# 添加redis模块
"""
def __add_cookie(self):
"""
登录模块
:return:
"""
main_theme()
return
def __delete_cookie(self, user_id):
"""
这里是删除思路就是遍历一遍cookie_list
除开删除的用户,其余重新写入文件里
:param user_id:待删除的 user_id
"""
cookie_list = self.load_cookies_list()
new_cookie_list = []
for cookie in cookie_list:
cookie = loads_json(cookie)
if not cookie.get('userid') == user_id:
new_cookie_list.append(cookie)
# 重新写入文件
initial_file(config.user_info_file)
if new_cookie_list != []:
for each in new_cookie_list:
write_2_file(config.user_info_file, dumps_json(each))
return
def __is_cookie_empty(self):
"""检查文件里cookie状态
提交给上层,决定是否追加用户
"""
is_empty = False
path = config.user_info_file
if not os.path.getsize(path):
is_empty = True
return is_empty
def __check_and_add_cookie(self):
"""检查文件
是否添加新用户
"""
is_empty = self.__is_cookie_empty()
if is_empty:
# 需要添加用户
self.__add_cookie()
def __check_and_delete_cookie(self, user_id):
"""检查文件
如果用户为0,这里需要去调 add_cookie
:param user_id:
:return:
"""
is_empty = self.__is_cookie_empty()
if not is_empty:
# 不是空,则可以删除用户
self.__delete_cookie(user_id)
# 删除后,仍然要验证,如果为空了,则添加用户
self.__check_and_add_cookie()
return
def logic_add_cookie(self):
"""外部调用的逻辑模块
添加cookie
"""
self.__check_and_add_cookie()
def logic_delete_cookie(self, user_ids):
"""外部调用的逻辑模块
删除指定的cookie
:param user_ids: 待删除列表
"""
for user_id in user_ids:
deal_userid = self.check_userid_count(user_id)
if deal_userid:
self.__check_and_delete_cookie(user_id)
def load_cookies_list(self):
"""返回cookie列表
交给schedule去调用
seed合并cookie放入队列里
"""
cookie_list = make_list(file_path=config.user_info_file, index='', blank='')
return cookie_list
def put_cookie_2_que(self, cookie):
"""
将cookie放入队列里
"""
que = config.ssn_2_slv
cookie = loads_json(cookie)
redis_cli.lpush(que, dumps_json(cookie))
def deal_feed_back(self, msg_list):
"""
消息传递过来就是种子信息,通过解析种子里的cookie_status
如果cookie_status 为 1 就代表需要删除cookiel
如果出现stop_sign, 则代表slave长时间没有拿到seed后,默认结束
"""
is_deal = False
userid_list = []
# 如果是带cookie字段,只处理,不返回
for each in msg_list:
if each.get('cookie_status') == 1:
is_deal = True
# 这就代表要删除id了
userid = each.get('cookie').get('userid')
userid_list.append(userid)
# 开始处理
if userid_list:
self.logic_delete_cookie(userid_list)
return is_deal
def check_userid_count(self, userid):
"""查询该id的次数
每个id重试次数为20次
"""
count = redis_cli.get(userid)
deal_userid = False
if count:
if int(count) < 50:
count =int(count) + 1
redis_cli.set(userid, count)
else:
redis_cli.set(userid, 0)
deal_userid = True
else:
redis_cli.set(userid, 0)
return deal_userid
def wait_mechanism(self):
"""等待机制
当第一个出现的情况下被触发
"""
count = 10
que = config.slv_2_ssn
msg_list = wait_for_msg_list(que, count)
return msg_list
def choos_a_offset(self, cookie_len):
"""选择一个偏移量来"""
offset = int(redis_cli.get('cookie_offset').decode())
if offset == cookie_len - 1:
offset = 0
else:
offset += 1
return offset
def decide_psuh_cookie_2_que(self, count):
"""告知要推几个cookie到队里去"""
cookie_list = self.load_cookies_list()
cookie_len = len(cookie_list)
# 每一次都要讲offset重置
redis_cli.set('cookie_offset', 0)
if count == 0:
count = cookie_len
num = 1
while num <= count:
offset = self.choos_a_offset(cookie_len)
cookie = cookie_list[offset]
self.put_cookie_2_que(cookie)
num += 1
return
def session_main_logic(self):
"""
由slave调用的部分
实例化后,实现登录
在 删除,导入列表时候通过消息通信来完成
需要加一个结束模块
#09-05 解决bug
等待机制:
当收到第一个请求触发
统计数量
放入队列
"""
print('session管理已启动')
# 实例化我们的种子模块,并开始登录
self.logic_add_cookie()
# # 并把所有的cookie都扔到消息队列里去
# self.decide_psuh_cookie_2_que(0)
# 完成后像队里推送一条已完成启动
que = config.task_que_fb
ctx = dumps_json({'ssnm': 'done'})
redis_cli.lpush(que, ctx)
# 开始监听反馈队列
print('开始监听ssn_req队列')
slv_2_ssn = config.slv_2_ssn
while True:
msg_list = []
msg = wait_for_msg_long(slv_2_ssn)
print(datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'), '\t接受反馈')
msg_list.append(msg)
# 只要有消息来了,先处理,再就发一条cookie出去
is_deal = self.deal_feed_back(msg_list)
if not is_deal:
self.decide_psuh_cookie_2_que(1)
print('完成cookie派发\n')
"""
# 当反馈队列里有消息时,激活cookie派发和处理
if msg:
# 是确保msg真实存在
# 接下来获取 msg_list
msg_list = self.wait_mechanism()
msg_list.append(msg)
# 先处理cookie
self.deal_feed_back(msg_list)
# 再是放cookie到队列
msg_len = len(msg_list)
self.decide_psuh_cookie_2_que(msg_len)
"""
"""
# 这里的处理逻辑发生变化
# 每次统计需要的session数量
# 根据统计数据,丢相应的数量的cookie到ssn_rep队列里
while True:
if redis_cli.exists(ssn_req):
msg = redis_cli.rpop(ssn_req)
msg_dict = loads_json(translate_2_json_dict(msg))
task = msg_dict.get('task')
if task == 'delete_id':
# 删除cookie
user_id = msg_dict.get('userid')
if user_id:
# 针对user_id是空集时候的情况
self.logic_delete_cookie(user_id)
# 告知已完成
ctx = 'delete_id_done'
redis_cli.lpush(ssn_rep, ctx)
elif task == 'cookie_list':
# 返回cookie列表
cookie_list = self.load_cookies_list()
# 放入消息队列里
redis_cli.lpush(ssn_rep, dumps_json(cookie_list))
else:
# 中止程序,退出
break
time.sleep(0.1)
"""
if __name__ == '__main__':
sm = SessionMangement()
sm.session_main_logic()