Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Commit

Permalink
Refactor stream package to enable common method helpers for other str…
Browse files Browse the repository at this point in the history
…eaming api classes.
  • Loading branch information
iciclespider committed Aug 27, 2020
1 parent 54d188f commit fd62214
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 66 deletions.
19 changes: 6 additions & 13 deletions stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import types

from . import ws_client


def stream(func, *args, **kwargs):
"""Stream given API call using websocket.
Extra kwarg: capture-all=True - captures all stdout+stderr for use with WSClient.read_all()"""

def _intercept_request_call(*args, **kwargs):
# old generated code's api client has config. new ones has
# configuration
try:
config = func.__self__.api_client.configuration
except AttributeError:
config = func.__self__.api_client.config

return ws_client.websocket_call(config, *args, **kwargs)

prev_request = func.__self__.api_client.request
api_client = func.__self__.api_client
prev_request = api_client.request
try:
func.__self__.api_client.request = _intercept_request_call
api_client.request = types.MethodType(ws_client.websocket_call, api_client)
return func(*args, **kwargs)
finally:
func.__self__.api_client.request = prev_request
api_client.request = prev_request
117 changes: 64 additions & 53 deletions stream/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import six
import yaml

from six.moves.urllib.parse import urlencode, quote_plus, urlparse, urlunparse
from six.moves.urllib.parse import urlencode, urlparse, urlunparse
from six import StringIO

from websocket import WebSocket, ABNF, enableTrace
Expand Down Expand Up @@ -51,47 +51,13 @@ def __init__(self, configuration, url, headers, capture_all):
like port forwarding can forward different pods' streams to different
channels.
"""
enableTrace(False)
header = []
self._connected = False
self._channels = {}
if capture_all:
self._all = StringIO()
else:
self._all = _IgnoredIO()

# We just need to pass the Authorization, ignore all the other
# http headers we get from the generated code
if headers and 'authorization' in headers:
header.append("authorization: %s" % headers['authorization'])

if headers and 'sec-websocket-protocol' in headers:
header.append("sec-websocket-protocol: %s" %
headers['sec-websocket-protocol'])
else:
header.append("sec-websocket-protocol: v4.channel.k8s.io")

if url.startswith('wss://') and configuration.verify_ssl:
ssl_opts = {
'cert_reqs': ssl.CERT_REQUIRED,
'ca_certs': configuration.ssl_ca_cert or certifi.where(),
}
if configuration.assert_hostname is not None:
ssl_opts['check_hostname'] = configuration.assert_hostname
else:
ssl_opts = {'cert_reqs': ssl.CERT_NONE}

if configuration.cert_file:
ssl_opts['certfile'] = configuration.cert_file
if configuration.key_file:
ssl_opts['keyfile'] = configuration.key_file

self.sock = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False)
if configuration.proxy:
proxy_url = urlparse(configuration.proxy)
self.sock.connect(url, header=header, http_proxy_host=proxy_url.hostname, http_proxy_port=proxy_url.port)
else:
self.sock.connect(url, header=header)
self.sock = create_websocket(configuration, url, headers)
self._connected = True

def peek_channel(self, channel, timeout=0):
Expand Down Expand Up @@ -259,41 +225,86 @@ def close(self, **kwargs):
WSResponse = collections.namedtuple('WSResponse', ['data'])


def get_websocket_url(url):
def get_websocket_url(url, query_params=None):
parsed_url = urlparse(url)
parts = list(parsed_url)
if parsed_url.scheme == 'http':
parts[0] = 'ws'
elif parsed_url.scheme == 'https':
parts[0] = 'wss'
if query_params:
query = []
for key, value in query_params:
if key == 'command' and isinstance(value, list):
for command in value:
query.append((key, command))
else:
query.append((key, value))
if query:
parts[4] = urlencode(query)
return urlunparse(parts)


def websocket_call(configuration, *args, **kwargs):
def create_websocket(configuration, url, headers=None):
enableTrace(False)

# We just need to pass the Authorization, ignore all the other
# http headers we get from the generated code
header = []
if headers and 'authorization' in headers:
header.append("authorization: %s" % headers['authorization'])
if headers and 'sec-websocket-protocol' in headers:
header.append("sec-websocket-protocol: %s" %
headers['sec-websocket-protocol'])
else:
header.append("sec-websocket-protocol: v4.channel.k8s.io")

if url.startswith('wss://') and configuration.verify_ssl:
ssl_opts = {
'cert_reqs': ssl.CERT_REQUIRED,
'ca_certs': configuration.ssl_ca_cert or certifi.where(),
}
if configuration.assert_hostname is not None:
ssl_opts['check_hostname'] = configuration.assert_hostname
else:
ssl_opts = {'cert_reqs': ssl.CERT_NONE}

if configuration.cert_file:
ssl_opts['certfile'] = configuration.cert_file
if configuration.key_file:
ssl_opts['keyfile'] = configuration.key_file

websocket = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False)
if configuration.proxy:
proxy_url = urlparse(configuration.proxy)
websocket.connect(url, header=header, http_proxy_host=proxy_url.hostname, http_proxy_port=proxy_url.port)
else:
websocket.connect(url, header=header)
return websocket


def _configuration(api_client):
# old generated code's api client has config. new ones has
# configuration
try:
return api_client.configuration
except AttributeError:
return api_client.config


def websocket_call(api_client, _method, url, **kwargs):
"""An internal function to be called in api-client when a websocket
connection is required. args and kwargs are the parameters of
apiClient.request method."""

url = args[1]
url = get_websocket_url(url, kwargs.get("query_params"))
headers = kwargs.get("headers")
_request_timeout = kwargs.get("_request_timeout", 60)
_preload_content = kwargs.get("_preload_content", True)
capture_all = kwargs.get("capture_all", True)
headers = kwargs.get("headers")

# Expand command parameter list to indivitual command params
query_params = []
for key, value in kwargs.get("query_params", {}):
if key == 'command' and isinstance(value, list):
for command in value:
query_params.append((key, command))
else:
query_params.append((key, value))

if query_params:
url += '?' + urlencode(query_params)

try:
client = WSClient(configuration, get_websocket_url(url), headers, capture_all)
client = WSClient(_configuration(api_client), url, headers, capture_all)
if not _preload_content:
return client
client.run_forever(timeout=_request_timeout)
Expand Down

0 comments on commit fd62214

Please sign in to comment.