forked from jimlee2048/Actions-WoZaiXiaoYuanPuncher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwzxy-dailyreport.py
279 lines (266 loc) · 11.5 KB
/
wzxy-dailyreport.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# -*- encoding:utf-8 -*-
import requests
import json
import os
import time
import hmac
import hashlib
import base64
import utils
import urllib
import urllib.parse
from urllib.parse import urlencode
from urllib3.util import Retry
class WoZaiXiaoYuanPuncher:
def __init__(self):
# JWSESSION
self.jwsession = None
# 打卡时段
self.seq = None
# 打卡结果
self.status_code = 0
# 登陆接口
self.loginUrl = "https://gw.wozaixiaoyuan.com/basicinfo/mobile/login/username"
# 请求头
self.header = {
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.13(0x18000d32) NetType/WIFI Language/zh_CN miniProgram",
"Content-Type": "application/json;charset=UTF-8",
"Content-Length": "2",
"Host": "gw.wozaixiaoyuan.com",
"Accept-Language": "en-us,en",
"Accept": "application/json, text/plain, */*"
}
# 请求体(必须有)
self.body = "{}"
# 登录
def login(self):
username, password = str(os.environ['WZXY_USERNAME']), str(os.environ['WZXY_PASSWORD'])
url = f'{self.loginUrl}?username={username}&password={password}'
self.session = requests.session()
# 登录
response = self.session.post(url=url, data=self.body, headers=self.header)
res = json.loads(response.text)
if res["code"] == 0:
print("使用账号信息登录成功")
jwsession = response.headers['JWSESSION']
self.setJwsession(jwsession)
return True
else:
print(res)
print("登录失败,请检查账号信息")
self.status_code = 5
return False
# 设置JWSESSION
def setJwsession(self, jwsession):
# 如果找不到cache,新建cache储存目录与文件
if not os.path.exists('.cache'):
print("正在创建cache储存目录与文件...")
os.mkdir('.cache')
data = {"jwsession": jwsession}
elif not os.path.exists('.cache/cache.json'):
print("正在创建cache文件...")
data = {"jwsession": jwsession}
# 如果找到cache,读取cache并更新jwsession
else:
print("找到cache文件,正在更新cache中的jwsession...")
data = utils.processJson('.cache/cache.json').read()
data['jwsession'] = jwsession
utils.processJson(".cache/cache.json").write(data)
self.jwsession = data['jwsession']
# 获取JWSESSION
def getJwsession(self):
if not self.jwsession: # 读取cache中的配置文件
data = utils.processJson(".cache/cache.json").read()
self.jwsession = data['jwsession']
return self.jwsession
# 获取打卡列表,判断当前打卡时间段与打卡情况,符合条件则自动进行打卡
def PunchIn(self):
print("获取打卡列表中...")
url = "https://student.wozaixiaoyuan.com/heat/getTodayHeatList.json"
self.header['Host'] = "student.wozaixiaoyuan.com"
self.header['JWSESSION'] = self.getJwsession()
self.session = requests.session()
response = self.session.post(url=url, data=self.body, headers=self.header)
res = json.loads(response.text)
# 如果 jwsession 无效,则重新 登录 + 打卡
if res['code'] == -10:
print(res)
print('jwsession 无效,将尝试使用账号信息重新登录')
self.status_code = 4
loginStatus = self.login()
if loginStatus:
self.PunchIn()
else:
print(res)
print("重新登录失败,请检查账号信息")
elif res['code'] == 0:
# 标志时段是否有效
inSeq = False
# 遍历每个打卡时段(不同学校的打卡时段数量可能不一样)
for i in res['data']:
# 判断时段是否有效
if int(i['state']) == 1:
inSeq = True
# 保存当前学校的打卡时段
self.seq = int(i['seq'])
# 判断是否已经打卡
if int(i['type']) == 0:
self.doPunchIn(str(i['seq']))
elif int(i['type']) == 1:
self.status_code = 2
print("已经打过卡了")
# 如果当前时间不在任何一个打卡时段内
if inSeq == False:
self.status_code = 3
print("打卡失败:不在打卡时间段内")
# 执行打卡
# 参数seq : 当前打卡的序号
def doPunchIn(self, seq):
print("正在进行:" + self.getSeq() + "...")
url = "https://student.wozaixiaoyuan.com/heat/save.json"
self.header['Host'] = "student.wozaixiaoyuan.com"
self.header['Content-Type'] = "application/x-www-form-urlencoded"
self.header['JWSESSION'] = self.getJwsession()
sign_data = {
"answers": '["0"]',
"seq": str(seq),
"temperature": utils.getRandomTemperature(os.environ['WZXY_TEMPERATURE']),
"latitude": os.environ['WZXY_LATITUDE'],
"longitude": os.environ['WZXY_LONGITUDE'],
"country": os.environ['WZXY_COUNTRY'],
"city": os.environ['WZXY_CITY'],
"district": os.environ['WZXY_DISTRICT'],
"province": os.environ['WZXY_PROVINCE'],
"township": os.environ['WZXY_TOWNSHIP'],
"street": os.environ['WZXY_STREET'],
"myArea": "",
"areacode": "",
"userId": ""
}
data = urlencode(sign_data)
self.session = requests.session()
response = self.session.post(url=url, data=data, headers=self.header)
response = json.loads(response.text)
# 打卡情况
if response["code"] == 0:
self.status_code = 1
print("打卡成功")
else:
print(response)
print("打卡失败")
# 获取打卡时段
def getSeq(self):
seq = self.seq
if seq == 1:
return "早打卡"
elif seq == 2:
return "午打卡"
elif seq == 3:
return "晚打卡"
else:
return "非打卡时段"
# 获取打卡结果
def getResult(self):
res = self.status_code
if res == 1:
return "✅ 打卡成功"
elif res == 2:
return "✅ 你已经打过卡了,无需重复打卡"
elif res == 3:
return "❌ 打卡失败,当前不在打卡时间段内"
elif res == 4:
return "❌ 打卡失败,jwsession 无效"
elif res == 5:
return "❌ 打卡失败,登录错误,请检查账号信息"
else:
return "❌ 打卡失败,发生未知错误,请检查日志"
# 推送打卡结果
def sendNotification(self):
notifyTime = utils.getCurrentTime()
notifyResult = self.getResult()
notifySeq = self.getSeq()
if os.environ.get('SCT_KEY'):
# serverchan 推送
notifyToken = os.environ['SCT_KEY']
url = "https://sctapi.ftqq.com/{}.send"
body = {
"title": "⏰ 我在校园打卡结果通知",
"desp": "打卡项目:日检日报\n\n打卡情况:{}\n\n打卡时段:{}\n\n打卡时间:{}".format(notifyResult, notifySeq, notifyTime)
}
requests.post(url.format(notifyToken), data=body)
print("消息经Serverchan-Turbo推送成功")
if os.environ.get('PUSHPLUS_TOKEN'):
# pushplus 推送
url = 'http://www.pushplus.plus/send'
notifyToken = os.environ['PUSHPLUS_TOKEN']
content = json.dumps({
"打卡项目": "日检日报",
"打卡情况": notifyResult,
"打卡时段": notifySeq,
"打卡时间": notifyTime
}, ensure_ascii=False)
msg = {
"token": notifyToken,
"title": "⏰ 我在校园打卡结果通知",
"content": content,
"template": "json"
}
body=json.dumps(msg).encode(encoding='utf-8')
headers = {'Content-Type':'application/json'}
r = requests.post(url, data=body, headers=headers).json()
if r["code"] == 200:
print("消息经pushplus推送成功")
else:
print("消息经pushplus推送失败,请检查token是否配置正确")
if os.environ.get('DD_BOT_ACCESS_TOKEN'):
# 钉钉推送
DD_BOT_ACCESS_TOKEN = os.environ["DD_BOT_ACCESS_TOKEN"]
DD_BOT_SECRET = os.environ["DD_BOT_SECRET"]
timestamp = str(round(time.time() * 1000)) # 时间戳
secret_enc = DD_BOT_SECRET.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, DD_BOT_SECRET)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 签名
print('开始使用 钉钉机器人 推送消息...', end='')
url = f'https://oapi.dingtalk.com/robot/send?access_token={DD_BOT_ACCESS_TOKEN}×tamp={timestamp}&sign={sign}'
headers = {'Content-Type': 'application/json;charset=utf-8'}
data = {
'msgtype': 'text',
'text': {'content': f'⏰ 我在校园打卡结果通知\n---------\n打卡项目:日检日报\n\n打卡情况:{notifyResult}\n\n打卡时间: {notifyTime}'}
}
response = requests.post(url=url, data=json.dumps(data), headers=headers, timeout=15).json()
if not response['errcode']:
print('消息经钉钉机器人推送成功!')
else:
print('消息经钉钉机器人推送失败!')
if os.environ.get('BARK_TOKEN'):
# bark 推送
notifyToken = os.environ['BARK_TOKEN']
req = "{}/{}/{}".format(notifyToken, "⏰ 我在校园打卡(日检日报)结果通知", notifyResult)
requests.get(req)
print("消息经bark推送成功")
if os.environ.get("MIAO_CODE"):
baseurl = "https://miaotixing.com/trigger"
body = {
"id": os.environ['MIAO_CODE'],
"text": "打卡项目:日检日报\n\n打卡情况:{}\n\n打卡时段:{}\n\n打卡时间:{}".format(notifyResult, notifySeq, notifyTime)
}
requests.post(baseurl, data=body)
print("消息经喵推送推送成功")
if __name__ == '__main__':
# 找不到cache,登录+打卡
wzxy = WoZaiXiaoYuanPuncher()
if not os.path.exists('.cache'):
print("找不到cache文件,正在使用账号信息登录...")
loginStatus = wzxy.login()
if loginStatus:
wzxy.PunchIn()
else:
print("登陆失败,请检查账号信息")
else:
print("找到cache文件,尝试使用jwsession打卡...")
wzxy.PunchIn()
wzxy.sendNotification()