forked from ecmulli/stable-diffusion
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsdhttp.py
100 lines (90 loc) · 4.26 KB
/
sdhttp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import requests, logging
from requests.models import Response
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from concurrent.futures import ThreadPoolExecutor
from flask import make_response, send_file
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s',level=logging.INFO)
logger = logging.getLogger(__name__)
class InternalRequests:
def __init__(self, secret="stablediffusion", verify_ssl=False):
self.secret = secret
self.verify_ssl = verify_ssl
self.wrong_secret = requests.models.Response()
self.wrong_secret.status_code = 401
self.wrong_secret._content = b'secret does not match'
self.http_error = requests.models.Response()
self.http_error.status_code = 404
self.http_error._content = b'error'
def get(self, url="", params=None, data=None, headers={}):
headers["Credentials"] = self.secret
try:
resp = requests.get(url, params=params, data=data, headers=headers, verify=self.verify_ssl)
if self.match_response_secret(resp):
return resp
else:
return self.wrong_secret
except requests.exceptions.HTTPError as errh:
return self.http_error
except requests.exceptions.ConnectionError as errc:
return self.http_error
except requests.exceptions.Timeout as errt:
return self.http_error
except requests.exceptions.RequestException as err:
return self.http_error
def post(self, url="", params=None, data=None, headers={}, json={}, files=None):
headers["Credentials"] = self.secret
try:
resp = requests.post(url, params=params, data=data, headers=headers, json=json, files=files, verify=self.verify_ssl)
logger.debug("response received")
if self.match_response_secret(resp):
return resp
else:
print(str(resp.text))
return self.wrong_secret
except requests.exceptions.HTTPError as errh:
logger.error(errh)
return self.http_error
except requests.exceptions.ConnectionError as errc:
logger.error(errc)
return self.http_error
except requests.exceptions.Timeout as errt:
logger.error(errt)
return self.http_error
except requests.exceptions.RequestException as err:
logger.error(err)
return self.http_error
def send_file_with_secret(self, data, type="img/png", filename="", resp=None, addl_headers=None):
if resp == None:
send_file_w_secret = send_file(data, mimetype=type, download_name=filename)
send_file_w_secret.headers["Credentials"] = self.secret
if addl_headers != None:
send_file_w_secret.headers = {**send_file_w_secret.headers, **addl_headers}
return send_file_w_secret
else:
resp.headers["Credentials"] = self.secret
if addl_headers != None:
resp.headers = {**resp.headers, **addl_headers}
return resp
def send_response_with_secret(self, msg, status, resp=None, addl_headers=None):
if resp == None:
resp_w_secret = make_response(msg, status)
resp_w_secret.headers["Credentials"] = self.secret
if addl_headers != None:
resp_w_secret.headers = {**resp_w_secret.headers, **addl_headers}
return resp_w_secret
else:
resp.headers["Credentials"] = self.secret
if addl_headers != None:
resp.headers = {**resp.headers, **addl_headers}
return resp
def match_request_secret(self, req):
if "Credentials" in req.headers and req.headers["Credentials"] == self.secret:
return True
else:
return False
def match_response_secret(self, resp):
if "Credentials" in resp.headers and resp.headers.get("Credentials") == self.secret:
return True
else:
return False