-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathengine_communicator.py
34 lines (25 loc) · 1.13 KB
/
engine_communicator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from websocket import create_connection
import jwt
import ssl
class EngineCommunicator:
def __init__(self, url):
self.url = url
self.ws = create_connection(self.url)
self.session = self.ws.recv() # Holds session object. Required for Qlik Sense Sept. 2017 and later
@staticmethod
def send_call(self, call_msg):
self.ws.send(call_msg)
return self.ws.recv()
@staticmethod
def close_qvengine_connection(self):
self.ws.close()
class SecureEngineCommunicator(EngineCommunicator):
def __init__(self, senseHost, proxyPrefix, userDirectory, userId, privateKeyPath, ignoreCertErrors=False):
self.url = "wss://" + senseHost + "/" + proxyPrefix + "/app/engineData"
sslOpts = {}
if ignoreCertErrors:
sslOpts = {"cert_reqs": ssl.CERT_NONE}
privateKey = open(privateKeyPath).read()
token = jwt.encode({'user': userId, 'directory': userDirectory}, privateKey, algorithm='RS256')
self.ws = create_connection(self.url, sslopt=sslOpts, header=['Authorization: BEARER ' + str(token)])
self.session = self.ws.recv()