forked from yjqiang/bili2.0
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bili_statistics.py
311 lines (231 loc) · 10.4 KB
/
bili_statistics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
from typing import Optional
from collections import deque
import attr
from utils import curr_time
@attr.s(slots=True)
class CoverChecker:
number = attr.ib(
default=0,
validator=attr.validators.instance_of(int))
min_id = attr.ib(
default=-1,
validator=attr.validators.instance_of(int))
max_id = attr.ib(
default=-1,
validator=attr.validators.instance_of(int))
# 只添加,不管重复性
def add2checker(self, new_id: int):
if self.min_id == -1 and self.max_id == -1:
self.min_id = new_id
self.max_id = new_id
self.number += 1
return
if self.min_id < new_id: # 过滤掉过老的id
self.min_id = min(self.min_id, new_id)
self.max_id = max(self.max_id, new_id)
self.number += 1
def result(self):
if self.min_id == -1 and self.max_id == -1:
num_wanted = 0
num_actual = 0
cover = 0
else:
num_wanted = self.max_id - self.min_id + 1
num_actual = self.number
cover = num_actual / num_wanted
return f'覆盖率为 {num_actual} / {num_wanted} = {cover * 100:.2f}%'
@attr.s(slots=True)
class DuplicateChecker:
LIST_SIZE_LIMITED = 3000
number = attr.ib(default=0, init=False)
ids = attr.ib(init=False)
@ids.default
def _ids(self):
return deque(maxlen=DuplicateChecker.LIST_SIZE_LIMITED)
def add2checker(self, new_id: int, need_check_duplicated=True) -> bool:
if need_check_duplicated and self.is_duplicated(new_id):
return False
self.number += 1
self.ids.append(new_id)
return True
def is_duplicated(self, new_id: int):
return new_id in self.ids
def result(self):
return f'一共 {self.number} 个几乎不重复的 id'
@attr.s(slots=True)
class UniqueTaskChecker: # 准确的说是 task.work 的唯一性, task.check 不受控
# 可以发现,UniqueTaskChecker 初始化是 cancel 状态,即可以成功 restart
start_time = attr.ib(init=False, factory=curr_time)
end_time = attr.ib(init=False, default=-1)
# 已经 done 或 cancel 的 task,不需要删除该实例,下次用的时候 restart 一下就行
# 负责检查是否重复并且如不重复,restart
def restart(self) -> bool:
if self.is_unique():
self.start_time = curr_time()
self.end_time = 0
return True
return False
def cancel(self):
self.end_time = -1
def done(self):
self.end_time = curr_time()
def is_unique(self) -> bool:
return bool(self.end_time) # 非0
@attr.s(slots=True)
class UniqueTaskCheckers:
records = attr.ib(init=False, factory=dict)
def start(self, user_id, task) -> bool:
records_of_user = self.records.setdefault(user_id, {})
if task not in records_of_user:
records_of_user[task] = UniqueTaskChecker()
unique_task_checker = records_of_user[task]
return unique_task_checker.restart()
def cancel(self, user_id, task):
self.records[user_id][task].cancel()
def done(self, user_id, task):
self.records[user_id][task].done()
@attr.s(slots=True)
class MaxTimeTaskChecker: # 准确的说是task.work的唯一性, task.check不受控
num = attr.ib(init=False, default=0)
# 负责检查是否重复并且如不重复,restart
def add(self, max_time) -> bool:
if self.is_addable(max_time):
self.num += 1
return True
return False
def is_addable(self, max_time) -> bool:
if max_time == -1 or self.num < max_time: # -1 特殊处理,表示无限参与
return True
return False
@attr.s(slots=True)
class MaxTimeTaskCheckers:
records = attr.ib(init=False, factory=dict)
def add(self, user_id, task, max_time) -> bool:
records_of_user = self.records.setdefault(user_id, {})
if task not in records_of_user:
records_of_user[task] = MaxTimeTaskChecker()
max_time_task_checker = records_of_user[task]
return max_time_task_checker.add(max_time)
def clear(self):
self.records.clear()
class BiliStatistics:
__slots__ = (
'area_num', 'area_duplicated', 'pushed_raffles',
'joined_raffles', 'raffle_results',
'danmu_raffleid_checker', 'cover_checker0', 'cover_checker1',
'max_time_task_checkers', 'unique_task_checkers'
)
def __init__(self, area_num=0):
self.area_num = area_num
self.area_duplicated = False
# 只有一个(可以认为id为-1的super user)
self.pushed_raffles = {}
# 每个用户一个小dict
self.joined_raffles = {}
self.raffle_results = {}
# 这是用于具体统计
self.danmu_raffleid_checker = DuplicateChecker()
self.cover_checker0 = CoverChecker() # 舰队风暴遗漏统计
self.cover_checker1 = CoverChecker() # 小电视遗漏统计
# 用于限制每天用户最多某个任务的最大参与次数
self.max_time_task_checkers = MaxTimeTaskCheckers() # {use0: {task0: 1, task1: 2}, user1: {task1: 9}}
# 用于限制用户不可同时参加某任务
self.unique_task_checkers = UniqueTaskCheckers()
def init(self, area_num: int, area_duplicated: bool):
self.area_num = area_num
self.area_duplicated = area_duplicated
def print_statistics(self, user_id):
print('本次抽奖推送数据:')
print(f'舰队风暴推送遗漏统计:{self.cover_checker0.result()}')
print(f'小电视的推送遗漏统计:{self.cover_checker1.result()}')
print(f'全部弹幕抽奖推送统计:{self.danmu_raffleid_checker.result()}')
print()
print('本次推送抽奖统计:')
for k, v in self.pushed_raffles.items():
if isinstance(v, int):
print(f'{v:^5} X {k}')
else:
print(f'{v:^5.2f} X {k}')
print()
if user_id == -2:
print('暂时不支持全部打印,考虑到用户可能很多')
else:
print('本次参与抽奖统计:')
joined_of_id = self.joined_raffles.get(user_id, {})
for k, v in joined_of_id.items():
print(f'{v:^5} X {k}')
print()
print('本次抽奖结果统计:')
results_of_id = self.raffle_results.get(user_id, {})
for k, v in results_of_id.items():
print(f'{v:^5} X {k}')
print()
print('当日参与任务统计(null类任务不计入;只是压入计划,不一定已经参与;整点清零):')
print(self.max_time_task_checkers)
print(self.unique_task_checkers)
def add2pushed_raffles(self, raffle_name, broadcast_type, num):
orig_num = self.pushed_raffles.get(raffle_name, 0)
# broadcast_type 0全区 1分区 2本房间
if broadcast_type == 0:
self.pushed_raffles[raffle_name] = orig_num + num / self.area_num
elif broadcast_type == 1 and self.area_duplicated:
self.pushed_raffles[raffle_name] = orig_num + num / 2
else:
self.pushed_raffles[raffle_name] = orig_num + num
def add2joined_raffles(self, raffle_name, user_id, num):
# 活动(合计)
# 小电视(合计)
# 大航海(合计)
if user_id not in self.joined_raffles:
self.joined_raffles[user_id] = {}
raffles_of_id = self.joined_raffles[user_id]
raffles_of_id[raffle_name] = raffles_of_id.get(raffle_name, 0) + num
def add2results(self, gift_name, user_id, num=1):
if user_id not in self.raffle_results:
self.raffle_results[user_id] = {}
results_of_id = self.raffle_results[user_id]
results_of_id[gift_name] = results_of_id.get(gift_name, 0) + num
# raffle_id int
def add2raffle_ids(self, raffle_id: int, raffle_type: Optional[str]):
if raffle_type in ('STORM', 'GUARD'):
self.cover_checker0.add2checker(raffle_id)
elif raffle_type in ('TV',):
self.cover_checker1.add2checker(raffle_id)
self.danmu_raffleid_checker.add2checker(raffle_id, need_check_duplicated=False)
def is_raffleid_duplicate(self, raffle_id: int):
return self.danmu_raffleid_checker.is_duplicated(raffle_id)
def add2max_time_task_checkers(self, user_id, task, max_time: int) -> bool:
return self.max_time_task_checkers.add(user_id, task, max_time)
def start_new_day(self):
self.max_time_task_checkers.clear()
def start_unique_task(self, user_id, task) -> bool:
return self.unique_task_checkers.start(user_id, task)
def cancel_unique_task(self, user_id, task):
return self.unique_task_checkers.cancel(user_id, task)
def done_unique_task(self, user_id, task):
return self.unique_task_checkers.done(user_id, task)
var_bili_statistics = BiliStatistics()
def init(*args, **kwargs):
var_bili_statistics.init(*args, **kwargs)
def add2pushed_raffles(raffle_name, broadcast_type=0, num=1):
var_bili_statistics.add2pushed_raffles(raffle_name, broadcast_type, int(num))
def add2joined_raffles(raffle_name, user_id, num=1):
var_bili_statistics.add2joined_raffles(raffle_name, user_id, int(num))
def add2results(gift_name, user_id, num=1):
var_bili_statistics.add2results(gift_name, user_id, int(num))
def add2raffle_ids(raffle_id, raffle_type: Optional[str] = None):
var_bili_statistics.add2raffle_ids(int(raffle_id), raffle_type)
def is_raffleid_duplicate(raffle_id):
return var_bili_statistics.is_raffleid_duplicate(int(raffle_id))
def print_statistics(user_id=None):
var_bili_statistics.print_statistics(user_id)
def add2max_time_task_checkers(user_id, task, max_time) -> bool:
return var_bili_statistics.add2max_time_task_checkers(user_id, task, max_time)
def start_new_day():
var_bili_statistics.start_new_day()
def start_unique_task(user_id, task) -> bool:
return var_bili_statistics.start_unique_task(user_id, task)
def cancel_unique_task(user_id, task):
var_bili_statistics.cancel_unique_task(user_id, task)
def done_unique_task(user_id, task):
var_bili_statistics.done_unique_task(user_id, task)