-
Notifications
You must be signed in to change notification settings - Fork 1
/
BaseUtil.py
150 lines (128 loc) · 5.24 KB
/
BaseUtil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import re
from urllib.parse import urlparse
import json
import requests
from bs4 import BeautifulSoup
from datetime import date, timedelta, datetime
from Util import Util
from cookie_test import fetch_chrome_cookie
class BaseUtil(Util):
def __init__(self, username, passwd, adminid='15870', factoryid='1', baseurl='https://crm.konka.com',
bjdomain='http://north.bangjia.me'):
parsed_uri = urlparse(baseurl)
self.host = parsed_uri.netloc
self.username = username
self.passwd = passwd
self.baseurl = baseurl
self.adminid = adminid
self.factoryid = factoryid
self.bjdomain = bjdomain
self.mainurl = self.baseurl + '/admin/page!main.action'
self.searchurl = self.baseurl + '/afterservice/afterservice!api.action'
self.session = requests.Session()
self.agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36'
self.datasuccess = {'code': 1, 'msg': '抓单成功', 'element': ''}
self.datafail = {'code': 0, 'msg': '抓单失败,请确认账号密码是否正确'}
self.dataverify = {'code': 2, 'msg': '登录过期,请重新登录', 'element': ''}
self.headers = {'Content-Type': 'application/json;charset=UTF-8',
'User-Agent': self.agent, 'Referer': self.baseurl,
'Upgrade-Insecure-Requests': '1', 'Host': self.host, 'Origin': self.baseurl,
'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept': 'application/json, text/plain, */*'}
self.initCookie()
def getsoup(self, response):
response.encoding = 'utf-8'
return BeautifulSoup(response.text, features="lxml")
def parseHtml(self, htmlstr):
bsObj = BeautifulSoup(htmlstr, features="lxml")
if not bsObj:
return ""
return bsObj.text.strip()
def getjson(self, response):
response.encoding = 'utf-8'
try:
result = json.loads(response.text)
except Exception as e:
print("getjson failed:{}".format(str(e)))
result = None
return result
@staticmethod
def merge(lst1, lst2, keys, isCover=False):
def generate_key(item):
if type(keys) == list:
return "_".join(str(v) for k, v in item.items() if k in keys)
else:
return "_".join(str(v) for k, v in item.items() if k == keys)
hash_map = {}
for item in lst1 + lst2:
if isCover:
hash_map[generate_key(item)] = item
else:
hash_map.setdefault(generate_key(item), item)
result = list(hash_map.values())
return result if result else []
def initCookie(self, cookies=None):
pass
def login(self, param=None):
pass
def loadOrders(self, param=None):
pass
@staticmethod
def getCookie(domains=[], isExact=False):
return fetch_chrome_cookie(domains, isExact=isExact)
@staticmethod
def getCookies(cookie):
cookies = dict([l.split("=", 1) for l in cookie.split("; ")])
return cookies
@staticmethod
def getDateBefore(day):
return (date.today() - timedelta(days=day)).strftime("%Y-%m-%d")
@staticmethod
def clearKey(data, datakey, destkey='address'):
if datakey in data and data[destkey] and data[destkey].strip().startswith(data[datakey].strip()):
data[destkey] = data[destkey].replace(data[datakey], '', 1).strip()
return data
@staticmethod
def clearAddress(orderinfo, destkey='address'):
if destkey not in orderinfo:
return orderinfo
orderinfo = BaseUtil.clearKey(orderinfo, "province", destkey)
orderinfo = BaseUtil.clearKey(orderinfo, "city", destkey)
orderinfo = BaseUtil.clearKey(orderinfo, "county", destkey)
orderinfo = BaseUtil.clearKey(orderinfo, "town", destkey)
return orderinfo
@staticmethod
def getTimeStr(string, isDefault=True):
defaultValue = '00:00:00' if isDefault else ''
try:
time_str = re.compile(r"\d{2}:\d{1,2}").findall(string)[0]
result = time_str if BaseUtil.isTime(time_str) else defaultValue
return result
except IndexError:
return defaultValue
@staticmethod
def isTime(time_str):
return BaseUtil.isTimesecondstr(time_str) or BaseUtil.isTimestr(time_str)
@staticmethod
def isTimesecondstr(time_str):
try:
datetime.strptime(time_str, '%H:%M:%S')
return True
except ValueError:
return False
@staticmethod
def isTimestr(time_str):
try:
datetime.strptime(time_str, '%H:%M')
return True
except ValueError:
return False
@staticmethod
def isDatetimestr(datetime_str):
try:
datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
return True
except ValueError:
return False
# print("getDateBefore(0)={}".format(BaseUtil.getDateBefore(0)))