forked from yjqiang/bili2.0
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bili_sched.py
133 lines (105 loc) · 4.62 KB
/
bili_sched.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import asyncio
from datetime import datetime
from typing import Optional
import schedule
import notifier
import bili_statistics
from printer import info as print
class BiliSched:
__slots__ = (
'_loop', '_sched_running', '_force_sleeping', '_sched_daily_jobs',
'_sched_shedule', '_monitors', '_switch_lock',
)
def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None):
if loop is None:
self._loop = asyncio.get_event_loop()
else:
self._loop = loop
self._sched_running = True
self._force_sleeping = False # force_sleep 的使用,用于保证unique即不会重复处理
self._sched_daily_jobs = schedule.Scheduler()
self._sched_shedule = schedule.Scheduler()
self._monitors = []
self._switch_lock = asyncio.Lock()
def init(self, monitors: list, sleep_ranges: list):
self._monitors = monitors
for sleep_time, wake_time in sleep_ranges:
self._sched_shedule.every().day.at(sleep_time.strftime("%H:%M:%S")).do(self.sleeping)
self._sched_shedule.every().day.at(wake_time.strftime("%H:%M:%S")).do(self.waking_up)
# 如果在休眠期间,就关闭self._sched_running
cur_time = datetime.now().time()
for sleep_time, wake_time in sleep_ranges:
if sleep_time <= cur_time <= wake_time:
self._sched_running = False
return
# 这是日常任务装载
def add_daily_jobs(self, task, every_hours: float, *args, **kwargs):
self._sched_daily_jobs.every(every_hours).hours.do(
notifier.exec_task_no_wait, task, *args, **kwargs)
@staticmethod
def start_new_day():
bili_statistics.start_new_day()
def sleeping(self):
print('🌇去睡吧')
self._sched_running = False
def waking_up(self):
print('🌅起床啦')
self._sched_running = True
async def resume(self, forced: bool = False):
async with self._switch_lock:
if self._sched_running or forced: # 仅在确认running后,真正执行resume;这里forced其实没用过
for i in self._monitors:
i.resume()
await notifier.resume()
async def force_sleep(self, sleep_time: int):
if self._sched_running and not self._force_sleeping:
self._force_sleeping = True
await self.pause(forced=True)
await asyncio.sleep(sleep_time)
await self.resume()
self._force_sleeping = False
async def pause(self, forced: bool = False):
async with self._switch_lock:
if not self._sched_running or forced: # 正常情况下,仅在确认not running后,真正执行pause;403时强制
for i in self._monitors:
i.pause()
await notifier.pause()
def do_nothing(self):
return
@staticmethod
def out_of_jail():
for user in notifier.get_users(-2):
user.out_of_jail()
async def run(self):
# 如果不装载任务,会挂在idle_seconds处
# self._sched_shedule.every().day.do(self.do_nothing)
# self._sched_daily_jobs.every().day.do(self.do_nothing)
self._sched_shedule.every().day.at('00:00:00').do(self.start_new_day)
self._sched_daily_jobs.every(4).hours.do(self.out_of_jail)
while True:
self._sched_shedule.run_pending()
if self._sched_running:
await self.resume()
self._sched_daily_jobs.run_all()
while True:
# print(self._sched_daily_jobs.jobs)
self._sched_daily_jobs.run_pending()
self._sched_shedule.run_pending()
if not self._sched_running:
break
idle_seconds = min(self._sched_daily_jobs.idle_seconds, self._sched_shedule.idle_seconds) + 1
print(f'Will sleep {idle_seconds}s,等待任务装载')
await asyncio.sleep(idle_seconds)
await self.pause()
idle_seconds = self._sched_shedule.idle_seconds + 1
print(f'Will sleep {idle_seconds}s, 等待唤醒')
await asyncio.sleep(idle_seconds)
var_bili_sched = BiliSched()
def init(*args, **kwargs):
var_bili_sched.init(*args, **kwargs)
def add_daily_jobs(task, every_hours: float, *args, **kwargs):
var_bili_sched.add_daily_jobs(task, every_hours, *args, **kwargs)
async def run():
await var_bili_sched.run()
async def force_sleep(sleep_time: int):
await var_bili_sched.force_sleep(sleep_time)