-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathmain.py
106 lines (91 loc) · 3.21 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import requests
import json
import schedule
import time
import hmac
import hashlib
import base64
import urllib.parse
from environs import Env
from loguru import logger
def send_telegram(msg):
token = bot_token
data = {
"chat_id": chat_id,
"text": msg
}
url = "https://api.telegram.org/bot" + token + "/sendMessage"
req = requests.session()
result = json.loads(req.post(url, data=data).text)['ok']
return result
def send_dingdingmsg(msg):
timestamp = str(round(time.time() * 1000))
secret = ding_secret
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
headers = {'Content-Type': 'application/json;charset=utf-8'}
body = {
"msgtype": "markdown",
"markdown": {
"title": "提醒",
"text": msg
},
"at": {
"isAtAll": "true"
}
}
dingding_url = dingding_base_url + "×tamp=" + timestamp + "&sign=" + sign
requests.post(dingding_url, json.dumps(body), headers=headers)
def aliyundrive_sign(token):
update_token_url = "https://auth.aliyundrive.com/v2/account/token"
signin_url = "https://member.aliyundrive.com/v1/activity/sign_in_list"
headers = {
'Content-Type': 'application/json'
}
data = json.dumps({
'grant_type': 'refresh_token',
'refresh_token': token
})
req = requests.Session()
resp = json.loads(req.post(update_token_url, data=data, headers=headers).text)
if "access_token" in resp:
access_token = resp['access_token']
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
resp = req.post(signin_url, data=data, headers=headers)
result = json.loads(resp.text)['success']
if result:
send_msg("阿里云盘签到成功")
logger.info("阿里云盘签到成功")
else:
send_msg("阿里云盘签到失败")
else:
logger.error('阿里云盘token 已失效请更新环境变量重新启动容器')
send_msg("阿里云盘token 已失效请更新环境变量重新启动容器")
def msg_channel_handle(msg_channel):
if msg_channel == "dingding":
return send_dingdingmsg
if msg_channel == "telegram":
return send_telegram
if __name__ == '__main__':
env = Env()
bot_token = env.str('BOT_TOKEN')
chat_id = env.str('CHAT_ID')
ding_secret = env.str('DING_SECRET')
dingding_base_url = env.str('DINGDING_BASE_URL')
refresh_token_list = env.list('REFRESH_TOKEN_LIST')
schedule_time = env.str('TIME')
msg_channel = env.str('MSG_CHANNEL')
send_msg = msg_channel_handle(msg_channel)
for refresh_token in refresh_token_list:
schedule.every().day.at(schedule_time).do(aliyundrive_sign, refresh_token)
# schedule.every(3).seconds.do(aliyundrive_sign, refresh_token)
logger.info("应用启动成功")
while True:
schedule.run_pending()
time.sleep(1)