From 529a72a2bf4901d40e7551c4acaf8219609dcfb9 Mon Sep 17 00:00:00 2001 From: Ben Picolo Date: Mon, 30 Jul 2018 14:23:18 -0400 Subject: [PATCH 01/32] Fix base64 padding for kube config --- config/kube_config.py | 6 ++++-- config/kube_config_test.py | 10 ++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/config/kube_config.py b/config/kube_config.py index ddd3d02b..3691a18b 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -257,13 +257,15 @@ def _load_oid_token(self, provider): if len(parts) != 3: # Not a valid JWT return None + padding = (4 - len(parts[1]) % 4) * '=' + if PY3: jwt_attributes = json.loads( - base64.b64decode(parts[1]).decode('utf-8') + base64.b64decode(parts[1] + padding).decode('utf-8') ) else: jwt_attributes = json.loads( - base64.b64decode(parts[1] + "==") + base64.b64decode(parts[1] + padding) ) expire = jwt_attributes.get('exp') diff --git a/config/kube_config_test.py b/config/kube_config_test.py index a79efb9a..12d6916d 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -43,6 +43,10 @@ def _base64(string): return base64.encodestring(string.encode()).decode() +def _unpadded_base64(string): + return base64.b64encode(string.encode()).decode().rstrip('=') + + def _format_expiry_datetime(dt): return dt.strftime(EXPIRY_DATETIME_FORMAT) @@ -87,11 +91,13 @@ def _raise_exception(st): TEST_OIDC_TOKEN = "test-oidc-token" TEST_OIDC_INFO = "{\"name\": \"test\"}" -TEST_OIDC_BASE = _base64(TEST_OIDC_TOKEN) + "." + _base64(TEST_OIDC_INFO) +TEST_OIDC_BASE = _unpadded_base64( + TEST_OIDC_TOKEN) + "." + _unpadded_base64(TEST_OIDC_INFO) TEST_OIDC_LOGIN = TEST_OIDC_BASE + "." + TEST_CLIENT_CERT_BASE64 TEST_OIDC_TOKEN = "Bearer %s" % TEST_OIDC_LOGIN TEST_OIDC_EXP = "{\"name\": \"test\",\"exp\": 536457600}" -TEST_OIDC_EXP_BASE = _base64(TEST_OIDC_TOKEN) + "." + _base64(TEST_OIDC_EXP) +TEST_OIDC_EXP_BASE = _unpadded_base64( + TEST_OIDC_TOKEN) + "." + _unpadded_base64(TEST_OIDC_EXP) TEST_OIDC_EXPIRED_LOGIN = TEST_OIDC_EXP_BASE + "." + TEST_CLIENT_CERT_BASE64 TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH) From 86ae2de36f56742d70e6caf6c15eda75a168aab6 Mon Sep 17 00:00:00 2001 From: Neha Yadav Date: Wed, 5 Dec 2018 22:22:10 +0530 Subject: [PATCH 02/32] Add verify-boilerplate script --- .travis.yml | 1 + hack/boilerplate/boilerplate.py | 197 ++++++++++++++++++++++++++++ hack/boilerplate/boilerplate.py.txt | 15 +++ hack/boilerplate/boilerplate.sh.txt | 13 ++ hack/verify-boilerplate.sh | 35 +++++ 5 files changed, 261 insertions(+) create mode 100755 hack/boilerplate/boilerplate.py create mode 100644 hack/boilerplate/boilerplate.py.txt create mode 100644 hack/boilerplate/boilerplate.sh.txt create mode 100755 hack/verify-boilerplate.sh diff --git a/.travis.yml b/.travis.yml index c3fefd02..7aa0138b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,4 +31,5 @@ install: script: - ./run_tox.sh tox + - ./hack/verify-boilerplate.sh diff --git a/hack/boilerplate/boilerplate.py b/hack/boilerplate/boilerplate.py new file mode 100755 index 00000000..bdc70c31 --- /dev/null +++ b/hack/boilerplate/boilerplate.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import argparse +import datetime +import difflib +import glob +import os +import re +import sys + +parser = argparse.ArgumentParser() +parser.add_argument( + "filenames", + help="list of files to check, all files if unspecified", + nargs='*') + +rootdir = os.path.dirname(__file__) + "/../../" +rootdir = os.path.abspath(rootdir) +parser.add_argument( + "--rootdir", default=rootdir, help="root directory to examine") + +default_boilerplate_dir = os.path.join(rootdir, "hack/boilerplate") +parser.add_argument( + "--boilerplate-dir", default=default_boilerplate_dir) + +parser.add_argument( + "-v", "--verbose", + help="give verbose output regarding why a file does not pass", + action="store_true") + +args = parser.parse_args() + +verbose_out = sys.stderr if args.verbose else open("/dev/null", "w") + + +def get_refs(): + refs = {} + + for path in glob.glob(os.path.join(args.boilerplate_dir, "boilerplate.*.txt")): + extension = os.path.basename(path).split(".")[1] + + ref_file = open(path, 'r') + ref = ref_file.read().splitlines() + ref_file.close() + refs[extension] = ref + + return refs + + +def file_passes(filename, refs, regexs): + try: + f = open(filename, 'r') + except Exception as exc: + print("Unable to open %s: %s" % (filename, exc), file=verbose_out) + return False + + data = f.read() + f.close() + + basename = os.path.basename(filename) + extension = file_extension(filename) + + if extension != "": + ref = refs[extension] + else: + ref = refs[basename] + + # remove extra content from the top of files + if extension == "sh": + p = regexs["shebang"] + (data, found) = p.subn("", data, 1) + + data = data.splitlines() + + # if our test file is smaller than the reference it surely fails! + if len(ref) > len(data): + print('File %s smaller than reference (%d < %d)' % + (filename, len(data), len(ref)), + file=verbose_out) + return False + + # trim our file to the same number of lines as the reference file + data = data[:len(ref)] + + p = regexs["year"] + for d in data: + if p.search(d): + print('File %s has the YEAR field, but missing the year of date' % + filename, file=verbose_out) + return False + + # Replace all occurrences of the regex "2014|2015|2016|2017|2018" with "YEAR" + p = regexs["date"] + for i, d in enumerate(data): + (data[i], found) = p.subn('YEAR', d) + if found != 0: + break + + # if we don't match the reference at this point, fail + if ref != data: + print("Header in %s does not match reference, diff:" % + filename, file=verbose_out) + if args.verbose: + print(file=verbose_out) + for line in difflib.unified_diff(ref, data, 'reference', filename, lineterm=''): + print(line, file=verbose_out) + print(file=verbose_out) + return False + + return True + + +def file_extension(filename): + return os.path.splitext(filename)[1].split(".")[-1].lower() + + +# list all the files contain 'DO NOT EDIT', but are not generated +skipped_ungenerated_files = ['hack/boilerplate/boilerplate.py'] + + +def normalize_files(files): + newfiles = [] + for pathname in files: + newfiles.append(pathname) + for i, pathname in enumerate(newfiles): + if not os.path.isabs(pathname): + newfiles[i] = os.path.join(args.rootdir, pathname) + return newfiles + + +def get_files(extensions): + files = [] + if len(args.filenames) > 0: + files = args.filenames + else: + for root, dirs, walkfiles in os.walk(args.rootdir): + for name in walkfiles: + pathname = os.path.join(root, name) + files.append(pathname) + + files = normalize_files(files) + outfiles = [] + for pathname in files: + basename = os.path.basename(pathname) + extension = file_extension(pathname) + if extension in extensions or basename in extensions: + outfiles.append(pathname) + return outfiles + + +def get_dates(): + years = datetime.datetime.now().year + return '(%s)' % '|'.join((str(year) for year in range(2014, years+1))) + + +def get_regexs(): + regexs = {} + # Search for "YEAR" which exists in the boilerplate, but shouldn't in the real thing + regexs["year"] = re.compile('YEAR') + # get_dates return 2014, 2015, 2016, 2017, or 2018 until the current year as a regex like: "(2014|2015|2016|2017|2018)"; + # company holder names can be anything + regexs["date"] = re.compile(get_dates()) + # strip #!.* from shell scripts + regexs["shebang"] = re.compile(r"^(#!.*\n)\n*", re.MULTILINE) + return regexs + + +def main(): + regexs = get_regexs() + refs = get_refs() + filenames = get_files(refs.keys()) + + for filename in filenames: + if not file_passes(filename, refs, regexs): + print(filename, file=sys.stdout) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/hack/boilerplate/boilerplate.py.txt b/hack/boilerplate/boilerplate.py.txt new file mode 100644 index 00000000..d781daf9 --- /dev/null +++ b/hack/boilerplate/boilerplate.py.txt @@ -0,0 +1,15 @@ +#!/usr/bin/env python + +# Copyright YEAR The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/hack/boilerplate/boilerplate.sh.txt b/hack/boilerplate/boilerplate.sh.txt new file mode 100644 index 00000000..34cb349c --- /dev/null +++ b/hack/boilerplate/boilerplate.sh.txt @@ -0,0 +1,13 @@ +# Copyright YEAR The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/hack/verify-boilerplate.sh b/hack/verify-boilerplate.sh new file mode 100755 index 00000000..2f54c8cc --- /dev/null +++ b/hack/verify-boilerplate.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT=$(dirname "${BASH_SOURCE}")/.. + +boilerDir="${KUBE_ROOT}/hack/boilerplate" +boiler="${boilerDir}/boilerplate.py" + +files_need_boilerplate=($(${boiler} "$@")) + +# Run boilerplate check +if [[ ${#files_need_boilerplate[@]} -gt 0 ]]; then + for file in "${files_need_boilerplate[@]}"; do + echo "Boilerplate header is wrong for: ${file}" >&2 + done + + exit 1 +fi From d56fdbc0cc33a6c8e4782c93b50c56c889fb3fa3 Mon Sep 17 00:00:00 2001 From: Neha Yadav Date: Wed, 5 Dec 2018 22:22:59 +0530 Subject: [PATCH 03/32] Verify Boilerplate fix --- config/__init__.py | 2 ++ config/config_exception.py | 2 ++ config/dateutil.py | 2 ++ config/dateutil_test.py | 2 ++ config/exec_provider.py | 2 ++ config/exec_provider_test.py | 2 ++ config/incluster_config.py | 2 ++ config/incluster_config_test.py | 2 ++ config/kube_config.py | 2 ++ config/kube_config_test.py | 2 ++ run_tox.sh | 3 +-- stream/__init__.py | 2 ++ stream/stream.py | 20 ++++++++++++-------- stream/ws_client.py | 20 ++++++++++++-------- stream/ws_client_test.py | 4 +++- watch/__init__.py | 2 ++ watch/watch.py | 2 ++ watch/watch_test.py | 2 ++ 18 files changed, 56 insertions(+), 19 deletions(-) diff --git a/config/__init__.py b/config/__init__.py index 3476ff71..02a7532d 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/config_exception.py b/config/config_exception.py index 23fab022..9bf049c6 100644 --- a/config/config_exception.py +++ b/config/config_exception.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/dateutil.py b/config/dateutil.py index ed88cba8..402751cd 100644 --- a/config/dateutil.py +++ b/config/dateutil.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2017 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/dateutil_test.py b/config/dateutil_test.py index deb0ea88..7a13fad0 100644 --- a/config/dateutil_test.py +++ b/config/dateutil_test.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/exec_provider.py b/config/exec_provider.py index 436942f0..a4198353 100644 --- a/config/exec_provider.py +++ b/config/exec_provider.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/exec_provider_test.py b/config/exec_provider_test.py index 44579beb..8b6517b0 100644 --- a/config/exec_provider_test.py +++ b/config/exec_provider_test.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/incluster_config.py b/config/incluster_config.py index 60fc0af8..e643f0df 100644 --- a/config/incluster_config.py +++ b/config/incluster_config.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/incluster_config_test.py b/config/incluster_config_test.py index 622b31b3..3cb0abfc 100644 --- a/config/incluster_config_test.py +++ b/config/incluster_config_test.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/kube_config.py b/config/kube_config.py index 958959e3..058ae290 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/kube_config_test.py b/config/kube_config_test.py index ae9dc225..ee4f49d9 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/run_tox.sh b/run_tox.sh index 55733785..4b583924 100755 --- a/run_tox.sh +++ b/run_tox.sh @@ -11,7 +11,7 @@ # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and +# See the License for the specific language governing permissions and # limitations under the License. set -o errexit @@ -51,4 +51,3 @@ git status echo "Running tox from the main repo on $TOXENV environment" # Run the user-provided command. "${@}" - diff --git a/stream/__init__.py b/stream/__init__.py index e72d0583..e9b7d24f 100644 --- a/stream/__init__.py +++ b/stream/__init__.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2017 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/stream/stream.py b/stream/stream.py index 0412fc33..3eab0b9a 100644 --- a/stream/stream.py +++ b/stream/stream.py @@ -1,14 +1,18 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from . import ws_client diff --git a/stream/ws_client.py b/stream/ws_client.py index 1cc56cdd..c6fea7ba 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -1,14 +1,18 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from kubernetes.client.rest import ApiException diff --git a/stream/ws_client_test.py b/stream/ws_client_test.py index e2eca96c..756d9597 100644 --- a/stream/ws_client_test.py +++ b/stream/ws_client_test.py @@ -1,4 +1,6 @@ -# Copyright 2017 The Kubernetes Authors. +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/watch/__init__.py b/watch/__init__.py index ca9ac069..46a31ced 100644 --- a/watch/__init__.py +++ b/watch/__init__.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/watch/watch.py b/watch/watch.py index 21899dd8..fb4c1abf 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/watch/watch_test.py b/watch/watch_test.py index d1ec80a1..f2804f4a 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); From 375befb15cbbf418468d56554ee4b5de77232f3f Mon Sep 17 00:00:00 2001 From: Neha Yadav Date: Tue, 11 Dec 2018 22:46:45 +0530 Subject: [PATCH 04/32] Make dependancy adal optional --- config/kube_config.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/config/kube_config.py b/config/kube_config.py index 958959e3..e51697bc 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -21,7 +21,6 @@ import tempfile import time -import adal import google.auth import google.auth.transport.requests import oauthlib.oauth2 @@ -36,6 +35,11 @@ from .config_exception import ConfigException from .dateutil import UTC, format_rfc3339, parse_rfc3339 +try: + import adal +except ImportError: + pass + EXPIRY_SKEW_PREVENTION_DELAY = datetime.timedelta(minutes=5) KUBE_CONFIG_DEFAULT_LOCATION = os.environ.get('KUBECONFIG', '~/.kube/config') _temp_files = {} @@ -218,6 +222,9 @@ def _load_azure_token(self, provider): return self.token def _refresh_azure_token(self, config): + if 'adal' not in globals(): + raise ImportError('refresh token error, adal library not imported') + tenant = config['tenant-id'] authority = 'https://login.microsoftonline.com/{}'.format(tenant) context = adal.AuthenticationContext( From 1637d56364e62cff2d2d188e8a046f9ba77bb763 Mon Sep 17 00:00:00 2001 From: axelsteingrimsson Date: Wed, 12 Dec 2018 12:47:12 +0100 Subject: [PATCH 05/32] Add email scope to GCP provided credential refresh --- config/kube_config.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/config/kube_config.py b/config/kube_config.py index 958959e3..b391fb22 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -141,9 +141,10 @@ def __init__(self, config_dict, active_context=None, self._config_persister = config_persister def _refresh_credentials(): - credentials, project_id = google.auth.default( - scopes=['https://www.googleapis.com/auth/cloud-platform'] - ) + credentials, project_id = google.auth.default(scopes=[ + 'https://www.googleapis.com/auth/cloud-platform', + 'https://www.googleapis.com/auth/userinfo.email' + ]) request = google.auth.transport.requests.Request() credentials.refresh(request) return credentials From 989e4b36b57f2b5566bdfc1309e781acf67ca0d7 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 21 Jan 2019 16:43:38 +0300 Subject: [PATCH 06/32] fix stream data decoding Related to https://github.com/kubernetes-client/python-base/issues/88 and https://github.com/kubernetes-client/python-base/pull/104 I suppose decoding complete data is better than decoding data chunks. --- stream/ws_client.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 1cc56cdd..706c35dc 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -41,7 +41,10 @@ def __init__(self, configuration, url, headers): header = [] self._connected = False self._channels = {} - self._all = "" + if six.PY3: + self._all = b"" + else: + self._all = "" # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code @@ -98,8 +101,11 @@ def readline_channel(self, channel, timeout=None): while self.is_open() and time.time() - start < timeout: if channel in self._channels: data = self._channels[channel] - if "\n" in data: - index = data.find("\n") + newline_symbol = "\n" + if six.PY3: + newline_Symbol = b"\n" + if newline_symbol in data: + index = data.find(newline_symbol) ret = data[:index] data = data[index+1:] if data: @@ -147,9 +153,15 @@ def read_all(self): channels mapped for each input. """ out = self._all - self._all = "" + if six.PY3: + self._all = b"" + else: + self._all = "" self._channels = {} - return out + if six.PY3: + return out.decode("utf-8", "replace") + else: + return out def is_open(self): """True if the connection is still alive.""" @@ -175,10 +187,11 @@ def update(self, timeout=0): return elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT: data = frame.data - if six.PY3: - data = data.decode("utf-8") if len(data) > 1: - channel = ord(data[0]) + if six.PY3: + channel = data[0] + else: + channel = ord(data[0]) data = data[1:] if data: if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]: From 545690ca3d6789e361e2d2f45d337030d367b2ef Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 21 Jan 2019 16:57:00 +0300 Subject: [PATCH 07/32] complete data decoding --- stream/ws_client.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 706c35dc..d3da3765 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -80,7 +80,11 @@ def peek_channel(self, channel, timeout=0): empty string otherwise.""" self.update(timeout=timeout) if channel in self._channels: + if six.PY3: + return self._channels[channel].decode("utf-8", "replace") return self._channels[channel] + if six.PY3: + return b"" return "" def read_channel(self, channel, timeout=0): @@ -91,6 +95,8 @@ def read_channel(self, channel, timeout=0): ret = self._channels[channel] if channel in self._channels: del self._channels[channel] + if six.PY3 and isinstance(ret, bytes): + return ret.decode("utf-8", "replace") return ret def readline_channel(self, channel, timeout=None): @@ -112,7 +118,10 @@ def readline_channel(self, channel, timeout=None): self._channels[channel] = data else: del self._channels[channel] - return ret + if six.PY3 and isinstance(ret, bytes): + return ret.decode("utf-8", "replace") + else: + return ret self.update(timeout=(timeout - time.time() + start)) def write_channel(self, channel, data): From 3c30a3099336a5976074c18ea61814646689b4a8 Mon Sep 17 00:00:00 2001 From: Julian Taylor Date: Sat, 19 Jan 2019 12:38:57 +0100 Subject: [PATCH 08/32] fix watching with a specified resource version The watch code reset the version to the last found in the response. When you first list existing objects and then start watching from that resource version the existing versions are older than the version you wanted and the watch starts from the wrong version after the first restart. This leads to for example already deleted objects ending in the stream again. Fix this by setting the minimum resource version to reset from to the input resource version. As long as k8s returns all objects in order in the watch this should work. We cannot use the integer value of the resource version to order it as one should be treat the value as opaque. Closes https://github.com/kubernetes-client/python/issues/700 --- watch/watch.py | 2 ++ watch/watch_test.py | 73 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/watch/watch.py b/watch/watch.py index 21899dd8..a9c315cd 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -122,6 +122,8 @@ def stream(self, func, *args, **kwargs): return_type = self.get_return_type(func) kwargs['watch'] = True kwargs['_preload_content'] = False + if 'resource_version' in kwargs: + self.resource_version = kwargs['resource_version'] timeouts = ('timeout_seconds' in kwargs) while True: diff --git a/watch/watch_test.py b/watch/watch_test.py index d1ec80a1..672c0526 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -14,12 +14,15 @@ import unittest -from mock import Mock +from mock import Mock, call from .watch import Watch class WatchTests(unittest.TestCase): + def setUp(self): + # counter for a test that needs test global state + self.callcount = 0 def test_watch_with_decode(self): fake_resp = Mock() @@ -62,6 +65,74 @@ def test_watch_with_decode(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_resource_version_set(self): + # https://github.com/kubernetes-client/python/issues/700 + # ensure watching from a resource version does reset to resource + # version 0 after k8s resets the watch connection + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + values = [ + '{"type": "ADDED", "object": {"metadata": {"name": "test1",' + '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', + '{"type": "ADDED", "object": {"metadata": {"name": "test2",' + '"resourceVersion": "2"}, "spec": {}, "sta', + 'tus": {}}}\n' + '{"type": "ADDED", "object": {"metadata": {"name": "test3",' + '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n' + ] + # return nothing on the first call and values on the second + # this emulates a watch from a rv that returns nothing in the first k8s + # watch reset and values later + + def get_values(*args, **kwargs): + self.callcount += 1 + if self.callcount == 1: + return [] + else: + return values + + fake_resp.read_chunked = Mock( + side_effect=get_values) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + # ensure we keep our requested resource version or the version latest + # returned version when the existing versions are older than the + # requested version + # needed for the list existing objects, then watch from there use case + calls = [] + + iterations = 2 + # first two calls must use the passed rv, the first call is a + # "reset" and does not actually return anything + # the second call must use the same rv but will return values + # (with a wrong rv but a real cluster would behave correctly) + # calls following that will use the rv from those returned values + calls.append(call(_preload_content=False, watch=True, + resource_version="5")) + calls.append(call(_preload_content=False, watch=True, + resource_version="5")) + for i in range(iterations): + # ideally we want 5 here but as rv must be treated as an + # opaque value we cannot interpret it and order it so rely + # on k8s returning the events completely and in order + calls.append(call(_preload_content=False, watch=True, + resource_version="3")) + + for c, e in enumerate(w.stream(fake_api.get_namespaces, + resource_version="5")): + if c == len(values) * iterations: + w.stop() + + # check calls are in the list, gives good error output + fake_api.get_namespaces.assert_has_calls(calls) + # more strict test with worse error message + self.assertEqual(fake_api.get_namespaces.mock_calls, calls) + def test_watch_stream_twice(self): w = Watch(float) for step in ['first', 'second']: From 97ad43bf1bad87b035de9e7b2d5a7afe246e8942 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Thu, 31 Jan 2019 23:19:16 +0300 Subject: [PATCH 09/32] fix symbols definitions tx to micw comments --- stream/ws_client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index d3da3765..3c46ecec 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -41,10 +41,9 @@ def __init__(self, configuration, url, headers): header = [] self._connected = False self._channels = {} + self._all = "" if six.PY3: self._all = b"" - else: - self._all = "" # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code @@ -109,7 +108,7 @@ def readline_channel(self, channel, timeout=None): data = self._channels[channel] newline_symbol = "\n" if six.PY3: - newline_Symbol = b"\n" + newline_symbol = b"\n" if newline_symbol in data: index = data.find(newline_symbol) ret = data[:index] From 89c802ca15c1312102e1b03c4c20ad3800348c4a Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Fri, 1 Feb 2019 13:06:04 +0300 Subject: [PATCH 10/32] fix segment decode errors for watch ``` Traceback (most recent call last): File "controller_unitproject/run.py", line 33, in run() File "controller_unitproject/run.py", line 29, in run unitproject.run() File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/module_k8s_controller_sdk/controller.py", line 215, in run self.resync_and_watch() File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/module_k8s_controller_sdk/controller.py", line 209, in resync_and_watch from_version=last_known_resource_version File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/module_k8s_controller_sdk/watcher.py", line 92, in watch self.__watch(seconds_to_out) File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/module_k8s_controller_sdk/watcher.py", line 51, in __watch for event in stream: File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/kubernetes/watch/watch.py", line 130, in stream for line in iter_resp_lines(resp): File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/kubernetes/watch/watch.py", line 47, in iter_resp_lines seg = seg.decode('utf8') UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd0 in position 2047: unexpected end of data ``` --- watch/watch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/watch/watch.py b/watch/watch.py index 21899dd8..f26c1ba1 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -44,7 +44,7 @@ def iter_resp_lines(resp): prev = "" for seg in resp.read_chunked(decode_content=False): if isinstance(seg, bytes): - seg = seg.decode('utf8') + seg = seg.decode('utf8', 'replace') seg = prev + seg lines = seg.split("\n") if not seg.endswith("\n"): From 8aaa9297324382fb21c912e4a631d02cf0f8f7b6 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Fri, 1 Feb 2019 13:24:30 +0300 Subject: [PATCH 11/32] correct decoding inside of watch --- watch/watch.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/watch/watch.py b/watch/watch.py index f26c1ba1..477b3735 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -42,19 +42,26 @@ def _find_return_type(func): def iter_resp_lines(resp): prev = "" + newline_symbol = "\n" + if six.PY3: + prev = b"" + newline_symbol = b"\n" for seg in resp.read_chunked(decode_content=False): - if isinstance(seg, bytes): - seg = seg.decode('utf8', 'replace') seg = prev + seg - lines = seg.split("\n") - if not seg.endswith("\n"): + lines = seg.split(newline_symbol) + if not seg.endswith(newline_symbol): prev = lines[-1] lines = lines[:-1] else: prev = "" + if six.PY3: + prev = b"" for line in lines: if line: - yield line + if isinstance(line, bytes): + yield line.decode('utf8', 'replace') + else: + yield line class Watch(object): From 4d387d5879ab280ecf18ffb0b39846b040fd533b Mon Sep 17 00:00:00 2001 From: Roy Lenferink Date: Mon, 4 Feb 2019 19:01:16 +0100 Subject: [PATCH 12/32] Updated OWNERS to include link to docs --- OWNERS | 2 ++ 1 file changed, 2 insertions(+) diff --git a/OWNERS b/OWNERS index 7a860ad2..cfec4b11 100644 --- a/OWNERS +++ b/OWNERS @@ -1,3 +1,5 @@ +# See the OWNERS docs at https://go.k8s.io/owners + approvers: - mbohlool - yliaog From 0fc0d404acd4a6080409e2796b7f6d6002039861 Mon Sep 17 00:00:00 2001 From: Neha Yadav Date: Fri, 8 Feb 2019 02:46:07 +0530 Subject: [PATCH 13/32] Update pycodestyle --- config/exec_provider.py | 7 ++++--- config/incluster_config.py | 3 ++- config/kube_config.py | 6 ++++-- hack/boilerplate/boilerplate.py | 14 +++++++++----- stream/ws_client.py | 7 ++++--- 5 files changed, 23 insertions(+), 14 deletions(-) diff --git a/config/exec_provider.py b/config/exec_provider.py index a4198353..89d81e8c 100644 --- a/config/exec_provider.py +++ b/config/exec_provider.py @@ -23,9 +23,10 @@ class ExecProvider(object): """ - Implementation of the proposal for out-of-tree client authentication providers - as described here -- - https://github.com/kubernetes/community/blob/master/contributors/design-proposals/auth/kubectl-exec-plugins.md + Implementation of the proposal for out-of-tree client + authentication providers as described here -- + https://github.com/kubernetes/community/blob/master/contributors + /design-proposals/auth/kubectl-exec-plugins.md Missing from implementation: diff --git a/config/incluster_config.py b/config/incluster_config.py index e643f0df..c9bdc907 100644 --- a/config/incluster_config.py +++ b/config/incluster_config.py @@ -87,7 +87,8 @@ def _set_config(self): def load_incluster_config(): - """Use the service account kubernetes gives to pods to connect to kubernetes + """ + Use the service account kubernetes gives to pods to connect to kubernetes cluster. It's intended for clients that expect to be running inside a pod running on kubernetes. It will raise an exception if called from a process not running in a kubernetes environment.""" diff --git a/config/kube_config.py b/config/kube_config.py index c0e0e26d..743046db 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -556,9 +556,11 @@ def new_client_from_config( config_file=None, context=None, persist_config=True): - """Loads configuration the same as load_kube_config but returns an ApiClient + """ + Loads configuration the same as load_kube_config but returns an ApiClient to be used with any API object. This will allow the caller to concurrently - talk with multiple clusters.""" + talk with multiple clusters. + """ client_config = type.__call__(Configuration) load_kube_config(config_file=config_file, context=context, client_configuration=client_config, diff --git a/hack/boilerplate/boilerplate.py b/hack/boilerplate/boilerplate.py index bdc70c31..61d4cb94 100755 --- a/hack/boilerplate/boilerplate.py +++ b/hack/boilerplate/boilerplate.py @@ -52,7 +52,8 @@ def get_refs(): refs = {} - for path in glob.glob(os.path.join(args.boilerplate_dir, "boilerplate.*.txt")): + for path in glob.glob(os.path.join( + args.boilerplate_dir, "boilerplate.*.txt")): extension = os.path.basename(path).split(".")[1] ref_file = open(path, 'r') @@ -105,7 +106,7 @@ def file_passes(filename, refs, regexs): filename, file=verbose_out) return False - # Replace all occurrences of the regex "2014|2015|2016|2017|2018" with "YEAR" + # Replace all occurrences of regex "2014|2015|2016|2017|2018" with "YEAR" p = regexs["date"] for i, d in enumerate(data): (data[i], found) = p.subn('YEAR', d) @@ -118,7 +119,8 @@ def file_passes(filename, refs, regexs): filename, file=verbose_out) if args.verbose: print(file=verbose_out) - for line in difflib.unified_diff(ref, data, 'reference', filename, lineterm=''): + for line in difflib.unified_diff( + ref, data, 'reference', filename, lineterm=''): print(line, file=verbose_out) print(file=verbose_out) return False @@ -171,9 +173,11 @@ def get_dates(): def get_regexs(): regexs = {} - # Search for "YEAR" which exists in the boilerplate, but shouldn't in the real thing + # Search for "YEAR" which exists in the boilerplate, + # but shouldn't in the real thing regexs["year"] = re.compile('YEAR') - # get_dates return 2014, 2015, 2016, 2017, or 2018 until the current year as a regex like: "(2014|2015|2016|2017|2018)"; + # get_dates return 2014, 2015, 2016, 2017, or 2018 until the current year + # as a regex like: "(2014|2015|2016|2017|2018)"; # company holder names can be anything regexs["date"] = re.compile(get_dates()) # strip #!.* from shell scripts diff --git a/stream/ws_client.py b/stream/ws_client.py index c6fea7ba..cf8a3fe9 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -53,7 +53,8 @@ def __init__(self, configuration, url, headers): header.append("authorization: %s" % headers['authorization']) if headers and 'sec-websocket-protocol' in headers: - header.append("sec-websocket-protocol: %s" % headers['sec-websocket-protocol']) + header.append("sec-websocket-protocol: %s" % + headers['sec-websocket-protocol']) else: header.append("sec-websocket-protocol: v4.channel.k8s.io") @@ -186,8 +187,8 @@ def update(self, timeout=0): data = data[1:] if data: if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]: - # keeping all messages in the order they received for - # non-blocking call. + # keeping all messages in the order they received + # for non-blocking call. self._all += data if channel not in self._channels: self._channels[channel] = data From e413acd8bff1fd01e0614bf5d6f121aa60af8d59 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 11 Feb 2019 20:01:55 +0300 Subject: [PATCH 14/32] make ws client decode data only on OPCODE_TEXT Following comment https://github.com/kubernetes-client/python-base/pull/112#issuecomment-460427209 Tested on running controller. --- stream/ws_client.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 3c46ecec..124d4825 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -41,9 +41,11 @@ def __init__(self, configuration, url, headers): header = [] self._connected = False self._channels = {} + self._channels_opcodes = {} self._all = "" if six.PY3: self._all = b"" + self._all_contains_binary = False # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code @@ -73,13 +75,16 @@ def __init__(self, configuration, url, headers): self.sock = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False) self.sock.connect(url, header=header) self._connected = True + + def __channel_is_text(self, channel): + return self._channels_opcodes.get(channel, ABNF.OPCODE_TEXT) == ABNF.OPCODE_TEXT def peek_channel(self, channel, timeout=0): """Peek a channel and return part of the input, empty string otherwise.""" self.update(timeout=timeout) if channel in self._channels: - if six.PY3: + if six.PY3 and self.__channel_is_text(channel): return self._channels[channel].decode("utf-8", "replace") return self._channels[channel] if six.PY3: @@ -94,7 +99,7 @@ def read_channel(self, channel, timeout=0): ret = self._channels[channel] if channel in self._channels: del self._channels[channel] - if six.PY3 and isinstance(ret, bytes): + if six.PY3 and isinstance(ret, bytes) and self.__channel_is_text(channel): return ret.decode("utf-8", "replace") return ret @@ -166,7 +171,7 @@ def read_all(self): else: self._all = "" self._channels = {} - if six.PY3: + if six.PY3 and not self._all_contains_binary: return out.decode("utf-8", "replace") else: return out @@ -206,7 +211,10 @@ def update(self, timeout=0): # keeping all messages in the order they received for # non-blocking call. self._all += data + if not self._all_contains_binary: + self._all_contains_binary = (op_code == ABNF.OPCODE_BINARY) if channel not in self._channels: + self._channels_opcodes[channel] = op_code self._channels[channel] = data else: self._channels[channel] += data From d928310a24ed6fd15d0c84862f55b7cc11829cc4 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 11 Feb 2019 20:24:23 +0300 Subject: [PATCH 15/32] Update watch.py --- watch/watch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/watch/watch.py b/watch/watch.py index 477b3735..582acf83 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -14,6 +14,7 @@ import json import pydoc +import six from kubernetes import client From 5acc854bb66e37a6671ba1e2f5036f7e23c5b321 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 11 Feb 2019 20:49:47 +0300 Subject: [PATCH 16/32] Update watch_test.py --- watch/watch_test.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/watch/watch_test.py b/watch/watch_test.py index d1ec80a1..742439b6 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import six import unittest from mock import Mock @@ -35,6 +36,17 @@ def test_watch_with_decode(self): '{"type": "ADDED", "object": {"metadata": {"name": "test3",' '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n', 'should_not_happened\n']) + if six.PY3: + fake_resp.read_chunked = Mock( + return_value=[ + b'{"type": "ADDED", "object": {"metadata": {"name": "test1",' + b'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', + b'{"type": "ADDED", "object": {"metadata": {"name": "test2",' + b'"resourceVersion": "2"}, "spec": {}, "sta', + b'tus": {}}}\n' + b'{"type": "ADDED", "object": {"metadata": {"name": "test3",' + b'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n', + b'should_not_happened\n']) fake_api = Mock() fake_api.get_namespaces = Mock(return_value=fake_resp) @@ -70,6 +82,9 @@ def test_watch_stream_twice(self): fake_resp.release_conn = Mock() fake_resp.read_chunked = Mock( return_value=['{"type": "ADDED", "object": 1}\n'] * 4) + if six.PY3: + fake_resp.read_chunked = Mock( + return_value=[b'{"type": "ADDED", "object": 1}\n'] * 4) fake_api = Mock() fake_api.get_namespaces = Mock(return_value=fake_resp) @@ -97,6 +112,9 @@ def test_watch_stream_loop(self): fake_resp.release_conn = Mock() fake_resp.read_chunked = Mock( return_value=['{"type": "ADDED", "object": 1}\n']) + if six.PY3: + fake_resp.read_chunked = Mock( + return_value=[b'{"type": "ADDED", "object": 1}\n']) fake_api = Mock() fake_api.get_namespaces = Mock(return_value=fake_resp) From 0229f0adb26951e82bd9fb3ef7344951c52e4b75 Mon Sep 17 00:00:00 2001 From: micw523 Date: Mon, 11 Feb 2019 17:11:37 -0600 Subject: [PATCH 17/32] Restore one-line link --- config/exec_provider.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/config/exec_provider.py b/config/exec_provider.py index 89d81e8c..a0348f1e 100644 --- a/config/exec_provider.py +++ b/config/exec_provider.py @@ -25,8 +25,7 @@ class ExecProvider(object): """ Implementation of the proposal for out-of-tree client authentication providers as described here -- - https://github.com/kubernetes/community/blob/master/contributors - /design-proposals/auth/kubectl-exec-plugins.md + https://github.com/kubernetes/community/blob/master/contributors/design-proposals/auth/kubectl-exec-plugins.md Missing from implementation: From 8e6f0435a38e24aac700d9ebac700bdf6138ba8c Mon Sep 17 00:00:00 2001 From: Mitar Date: Mon, 15 Oct 2018 23:57:46 -0700 Subject: [PATCH 18/32] Making watch work with read_namespaced_pod_log. Fixes https://github.com/kubernetes-client/python/issues/199. --- watch/watch.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/watch/watch.py b/watch/watch.py index bdf24f1a..79b2358d 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -20,6 +20,7 @@ from kubernetes import client PYDOC_RETURN_LABEL = ":return:" +PYDOC_FOLLOW_PARAM = ":param bool follow:" # Removing this suffix from return type name should give us event's object # type. e.g., if list_namespaces() returns "NamespaceList" type, @@ -65,7 +66,7 @@ def __init__(self, return_type=None): self._raw_return_type = return_type self._stop = False self._api_client = client.ApiClient() - self.resource_version = 0 + self.resource_version = None def stop(self): self._stop = True @@ -78,8 +79,17 @@ def get_return_type(self, func): return return_type[:-len(TYPE_LIST_SUFFIX)] return return_type + def get_watch_argument_name(self, func): + if PYDOC_FOLLOW_PARAM in pydoc.getdoc(func): + return 'follow' + else: + return 'watch' + def unmarshal_event(self, data, return_type): - js = json.loads(data) + try: + js = json.loads(data) + except ValueError: + return data js['raw_object'] = js['object'] if return_type: obj = SimpleNamespace(data=json.dumps(js['raw_object'])) @@ -122,7 +132,7 @@ def stream(self, func, *args, **kwargs): self._stop = False return_type = self.get_return_type(func) - kwargs['watch'] = True + kwargs[self.get_watch_argument_name(func)] = True kwargs['_preload_content'] = False if 'resource_version' in kwargs: self.resource_version = kwargs['resource_version'] @@ -136,9 +146,12 @@ def stream(self, func, *args, **kwargs): if self._stop: break finally: - kwargs['resource_version'] = self.resource_version resp.close() resp.release_conn() + if self.resource_version is not None: + kwargs['resource_version'] = self.resource_version + else: + break if timeouts or self._stop: break From 4750aa9d3691cd0652654b56f54fb6897001a4a7 Mon Sep 17 00:00:00 2001 From: Ben Picolo Date: Mon, 18 Feb 2019 11:16:07 -0500 Subject: [PATCH 19/32] Add additional checks + test case fixes --- config/kube_config.py | 13 ++++++++++++- config/kube_config_test.py | 24 ++++++++++++++++-------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/config/kube_config.py b/config/kube_config.py index 3691a18b..b939685e 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -252,12 +252,23 @@ def _load_oid_token(self, provider): if 'config' not in provider: return - parts = provider['config']['id-token'].split('.') + reserved_characters = frozenset(["=", "+", "/"]) + token = provider['config']['id-token'] + if any(char in token for char in reserved_characters): + # Invalid jwt, as it contains url-unsafe chars + return None + + parts = token.split('.') if len(parts) != 3: # Not a valid JWT return None padding = (4 - len(parts[1]) % 4) * '=' + if len(padding) == 3: + # According to spec, 3 padding characters cannot occur + # in a valid jwt + # https://tools.ietf.org/html/rfc7515#appendix-C + return None if PY3: jwt_attributes = json.loads( diff --git a/config/kube_config_test.py b/config/kube_config_test.py index 12d6916d..faa4c417 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -43,8 +43,8 @@ def _base64(string): return base64.encodestring(string.encode()).decode() -def _unpadded_base64(string): - return base64.b64encode(string.encode()).decode().rstrip('=') +def _urlsafe_unpadded_b64encode(string): + return base64.urlsafe_b64encode(string.encode()).decode().rstrip('=') def _format_expiry_datetime(dt): @@ -91,14 +91,22 @@ def _raise_exception(st): TEST_OIDC_TOKEN = "test-oidc-token" TEST_OIDC_INFO = "{\"name\": \"test\"}" -TEST_OIDC_BASE = _unpadded_base64( - TEST_OIDC_TOKEN) + "." + _unpadded_base64(TEST_OIDC_INFO) -TEST_OIDC_LOGIN = TEST_OIDC_BASE + "." + TEST_CLIENT_CERT_BASE64 +TEST_OIDC_BASE = ".".join([ + _urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN), + _urlsafe_unpadded_b64encode(TEST_OIDC_INFO) +]) +TEST_OIDC_LOGIN = ".".join([ + TEST_OIDC_BASE, + _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT_BASE64) +]) TEST_OIDC_TOKEN = "Bearer %s" % TEST_OIDC_LOGIN TEST_OIDC_EXP = "{\"name\": \"test\",\"exp\": 536457600}" -TEST_OIDC_EXP_BASE = _unpadded_base64( - TEST_OIDC_TOKEN) + "." + _unpadded_base64(TEST_OIDC_EXP) -TEST_OIDC_EXPIRED_LOGIN = TEST_OIDC_EXP_BASE + "." + TEST_CLIENT_CERT_BASE64 +TEST_OIDC_EXP_BASE = _urlsafe_unpadded_b64encode( + TEST_OIDC_TOKEN) + "." + _urlsafe_unpadded_b64encode(TEST_OIDC_EXP) +TEST_OIDC_EXPIRED_LOGIN = ".".join([ + TEST_OIDC_EXP_BASE, + _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT) +]) TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH) From ad06e5c923b2d4e5db86f7e91deddb95a6dc9a43 Mon Sep 17 00:00:00 2001 From: Mitar Date: Mon, 18 Feb 2019 16:43:50 -0800 Subject: [PATCH 20/32] Added tests. --- watch/watch_test.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/watch/watch_test.py b/watch/watch_test.py index 08eb36c2..ebc400af 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -67,6 +67,35 @@ def test_watch_with_decode(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_for_follow(self): + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.read_chunked = Mock( + return_value=[ + 'log_line_1\n', + 'log_line_2\n']) + + fake_api = Mock() + fake_api.read_namespaced_pod_log = Mock(return_value=fake_resp) + fake_api.read_namespaced_pod_log.__doc__ = ':param bool follow:\n:return: str' + + w = Watch() + count = 1 + for e in w.stream(fake_api.read_namespaced_pod_log): + self.assertEqual("log_line_1", e) + count += 1 + # make sure we can stop the watch and the last event with won't be + # returned + if count == 2: + w.stop() + + fake_api.read_namespaced_pod_log.assert_called_once_with( + _preload_content=False, follow=True) + fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.close.assert_called_once() + fake_resp.release_conn.assert_called_once() + def test_watch_resource_version_set(self): # https://github.com/kubernetes-client/python/issues/700 # ensure watching from a resource version does reset to resource From 972a76a83d0133b45db03495b0f9fd05ed2b94a3 Mon Sep 17 00:00:00 2001 From: Mitar Date: Wed, 20 Feb 2019 23:56:38 -0800 Subject: [PATCH 21/32] Don't use break inside finally. It swallows exceptions. --- watch/watch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/watch/watch.py b/watch/watch.py index 79b2358d..5966eace 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -151,7 +151,7 @@ def stream(self, func, *args, **kwargs): if self.resource_version is not None: kwargs['resource_version'] = self.resource_version else: - break + self._stop = True if timeouts or self._stop: break From 328b2d12452c9125fa74590e971423970c1d750a Mon Sep 17 00:00:00 2001 From: Tomasz Prus Date: Sat, 20 Oct 2018 00:49:51 +0200 Subject: [PATCH 22/32] feat: merging kubeconfig files --- config/kube_config.py | 134 ++++++++++++++++++++++++------ config/kube_config_test.py | 165 ++++++++++++++++++++++++++++++++++++- 2 files changed, 274 insertions(+), 25 deletions(-) diff --git a/config/kube_config.py b/config/kube_config.py index 300d924e..be6156cb 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -14,10 +14,12 @@ import atexit import base64 +import copy import datetime import json import logging import os +import platform import tempfile import time @@ -38,6 +40,7 @@ EXPIRY_SKEW_PREVENTION_DELAY = datetime.timedelta(minutes=5) KUBE_CONFIG_DEFAULT_LOCATION = os.environ.get('KUBECONFIG', '~/.kube/config') +ENV_KUBECONFIG_PATH_SEPARATOR = ';' if platform.system() == 'Windows' else ':' _temp_files = {} @@ -132,7 +135,12 @@ def __init__(self, config_dict, active_context=None, get_google_credentials=None, config_base_path="", config_persister=None): - self._config = ConfigNode('kube-config', config_dict) + + if isinstance(config_dict, ConfigNode): + self._config = config_dict + else: + self._config = ConfigNode('kube-config', config_dict) + self._current_context = None self._user = None self._cluster = None @@ -361,9 +369,10 @@ def _load_from_exec_plugin(self): logging.error(str(e)) def _load_user_token(self): + base_path = self._get_base_path(self._user.path) token = FileOrData( self._user, 'tokenFile', 'token', - file_base_path=self._config_base_path, + file_base_path=base_path, base64_file_content=False).as_data() if token: self.token = "Bearer %s" % token @@ -376,19 +385,27 @@ def _load_user_pass_token(self): self._user['password'])).get('authorization') return True + def _get_base_path(self, config_path): + if self._config_base_path is not None: + return self._config_base_path + if config_path is not None: + return os.path.abspath(os.path.dirname(config_path)) + return "" + def _load_cluster_info(self): if 'server' in self._cluster: self.host = self._cluster['server'].rstrip('/') if self.host.startswith("https"): + base_path = self._get_base_path(self._cluster.path) self.ssl_ca_cert = FileOrData( self._cluster, 'certificate-authority', - file_base_path=self._config_base_path).as_file() + file_base_path=base_path).as_file() self.cert_file = FileOrData( self._user, 'client-certificate', - file_base_path=self._config_base_path).as_file() + file_base_path=base_path).as_file() self.key_file = FileOrData( self._user, 'client-key', - file_base_path=self._config_base_path).as_file() + file_base_path=base_path).as_file() if 'insecure-skip-tls-verify' in self._cluster: self.verify_ssl = not self._cluster['insecure-skip-tls-verify'] @@ -435,9 +452,10 @@ class ConfigNode(object): message in case of missing keys. The assumption is all access keys are present in a well-formed kube-config.""" - def __init__(self, name, value): + def __init__(self, name, value, path=None): self.name = name self.value = value + self.path = path def __contains__(self, key): return key in self.value @@ -457,7 +475,7 @@ def __getitem__(self, key): 'Invalid kube-config file. Expected key %s in %s' % (key, self.name)) if isinstance(v, dict) or isinstance(v, list): - return ConfigNode('%s/%s' % (self.name, key), v) + return ConfigNode('%s/%s' % (self.name, key), v, self.path) else: return v @@ -482,7 +500,12 @@ def get_with_name(self, name, safe=False): 'Expected only one object with name %s in %s list' % (name, self.name)) if result is not None: - return ConfigNode('%s[name=%s]' % (self.name, name), result) + if isinstance(result, ConfigNode): + return result + else: + return ConfigNode( + '%s[name=%s]' % + (self.name, name), result, self.path) if safe: return None raise ConfigException( @@ -490,18 +513,87 @@ def get_with_name(self, name, safe=False): 'Expected object with name %s in %s list' % (name, self.name)) -def _get_kube_config_loader_for_yaml_file(filename, **kwargs): - with open(filename) as f: - return KubeConfigLoader( - config_dict=yaml.safe_load(f), - config_base_path=os.path.abspath(os.path.dirname(filename)), - **kwargs) +class KubeConfigMerger: + + """Reads and merges configuration from one or more kube-config's. + The propery `config` can be passed to the KubeConfigLoader as config_dict. + + It uses a path attribute from ConfigNode to store the path to kubeconfig. + This path is required to load certs from relative paths. + + A method `save_changes` updates changed kubeconfig's (it compares current + state of dicts with). + """ + + def __init__(self, paths): + self.paths = [] + self.config_files = {} + self.config_merged = None + + for path in paths.split(ENV_KUBECONFIG_PATH_SEPARATOR): + if path: + path = os.path.expanduser(path) + if os.path.exists(path): + self.paths.append(path) + self.load_config(path) + self.config_saved = copy.deepcopy(self.config_files) + + @property + def config(self): + return self.config_merged + + def load_config(self, path): + with open(path) as f: + config = yaml.safe_load(f) + + if self.config_merged is None: + config_merged = copy.deepcopy(config) + for item in ('clusters', 'contexts', 'users'): + config_merged[item] = [] + self.config_merged = ConfigNode(path, config_merged, path) + + for item in ('clusters', 'contexts', 'users'): + self._merge(item, config[item], path) + self.config_files[path] = config + + def _merge(self, item, add_cfg, path): + for new_item in add_cfg: + for exists in self.config_merged.value[item]: + if exists['name'] == new_item['name']: + break + else: + self.config_merged.value[item].append(ConfigNode( + '{}/{}'.format(path, new_item), new_item, path)) + + def save_changes(self): + for path in self.paths: + if self.config_saved[path] != self.config_files[path]: + self.save_config(path) + self.config_saved = copy.deepcopy(self.config_files) + + def save_config(self, path): + with open(path, 'w') as f: + yaml.safe_dump(self.config_files[path], f, + default_flow_style=False) + + +def _get_kube_config_loader_for_yaml_file( + filename, persist_config=False, **kwargs): + + kcfg = KubeConfigMerger(filename) + if persist_config and 'config_persister' not in kwargs: + kwargs['config_persister'] = kcfg.save_changes() + + return KubeConfigLoader( + config_dict=kcfg.config, + config_base_path=None, + **kwargs) def list_kube_config_contexts(config_file=None): if config_file is None: - config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION) + config_file = KUBE_CONFIG_DEFAULT_LOCATION loader = _get_kube_config_loader_for_yaml_file(config_file) return loader.list_contexts(), loader.current_context @@ -523,18 +615,12 @@ def load_kube_config(config_file=None, context=None, """ if config_file is None: - config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION) - - config_persister = None - if persist_config: - def _save_kube_config(config_map): - with open(config_file, 'w') as f: - yaml.safe_dump(config_map, f, default_flow_style=False) - config_persister = _save_kube_config + config_file = KUBE_CONFIG_DEFAULT_LOCATION loader = _get_kube_config_loader_for_yaml_file( config_file, active_context=context, - config_persister=config_persister) + persist_config=persist_config) + if client_configuration is None: config = type.__call__(Configuration) loader.load_and_set(config) diff --git a/config/kube_config_test.py b/config/kube_config_test.py index 37ff3e27..dc783c21 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -27,7 +27,8 @@ from kubernetes.client import Configuration from .config_exception import ConfigException -from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader, +from .kube_config import (ENV_KUBECONFIG_PATH_SEPARATOR, ConfigNode, + FileOrData, KubeConfigLoader, KubeConfigMerger, _cleanup_temp_files, _create_temp_file_with_content, list_kube_config_contexts, load_kube_config, new_client_from_config) @@ -987,5 +988,167 @@ def fake_get_api_key_with_prefix(identifier): config.auth_settings()['BearerToken']['value']) +class TestKubeConfigMerger(BaseTestCase): + TEST_KUBE_CONFIG_PART1 = { + "current-context": "no_user", + "contexts": [ + { + "name": "no_user", + "context": { + "cluster": "default" + } + }, + ], + "clusters": [ + { + "name": "default", + "cluster": { + "server": TEST_HOST + } + }, + ], + "users": [] + } + + TEST_KUBE_CONFIG_PART2 = { + "current-context": "", + "contexts": [ + { + "name": "ssl", + "context": { + "cluster": "ssl", + "user": "ssl" + } + }, + { + "name": "simple_token", + "context": { + "cluster": "default", + "user": "simple_token" + } + }, + ], + "clusters": [ + { + "name": "ssl", + "cluster": { + "server": TEST_SSL_HOST, + "certificate-authority-data": + TEST_CERTIFICATE_AUTH_BASE64, + } + }, + ], + "users": [ + { + "name": "ssl", + "user": { + "token": TEST_DATA_BASE64, + "client-certificate-data": TEST_CLIENT_CERT_BASE64, + "client-key-data": TEST_CLIENT_KEY_BASE64, + } + }, + ] + } + + TEST_KUBE_CONFIG_PART3 = { + "current-context": "no_user", + "contexts": [ + { + "name": "expired_oidc", + "context": { + "cluster": "default", + "user": "expired_oidc" + } + }, + { + "name": "ssl", + "context": { + "cluster": "skipped-part2-defined-this-context", + "user": "skipped" + } + }, + ], + "clusters": [ + ], + "users": [ + { + "name": "expired_oidc", + "user": { + "auth-provider": { + "name": "oidc", + "config": { + "client-id": "tectonic-kubectl", + "client-secret": "FAKE_SECRET", + "id-token": TEST_OIDC_EXPIRED_LOGIN, + "idp-certificate-authority-data": TEST_OIDC_CA, + "idp-issuer-url": "https://example.org/identity", + "refresh-token": + "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" + } + } + } + }, + { + "name": "simple_token", + "user": { + "token": TEST_DATA_BASE64, + "username": TEST_USERNAME, # should be ignored + "password": TEST_PASSWORD, # should be ignored + } + }, + ] + } + + def _create_multi_config(self): + files = [] + for part in ( + self.TEST_KUBE_CONFIG_PART1, + self.TEST_KUBE_CONFIG_PART2, + self.TEST_KUBE_CONFIG_PART3): + files.append(self._create_temp_file(yaml.safe_dump(part))) + return ENV_KUBECONFIG_PATH_SEPARATOR.join(files) + + def test_list_kube_config_contexts(self): + kubeconfigs = self._create_multi_config() + expected_contexts = [ + {'context': {'cluster': 'default'}, 'name': 'no_user'}, + {'context': {'cluster': 'ssl', 'user': 'ssl'}, 'name': 'ssl'}, + {'context': {'cluster': 'default', 'user': 'simple_token'}, + 'name': 'simple_token'}, + {'context': {'cluster': 'default', 'user': 'expired_oidc'}, 'name': 'expired_oidc'}] + + contexts, active_context = list_kube_config_contexts( + config_file=kubeconfigs) + + self.assertEqual(contexts, expected_contexts) + self.assertEqual(active_context, expected_contexts[0]) + + def test_new_client_from_config(self): + kubeconfigs = self._create_multi_config() + client = new_client_from_config( + config_file=kubeconfigs, context="simple_token") + self.assertEqual(TEST_HOST, client.configuration.host) + self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, + client.configuration.api_key['authorization']) + + def test_save_changes(self): + kubeconfigs = self._create_multi_config() + + # load configuration, update token, save config + kconf = KubeConfigMerger(kubeconfigs) + user = kconf.config['users'].get_with_name('expired_oidc')['user'] + provider = user['auth-provider']['config'] + provider.value['id-token'] = "token-changed" + kconf.save_changes() + + # re-read configuration + kconf = KubeConfigMerger(kubeconfigs) + user = kconf.config['users'].get_with_name('expired_oidc')['user'] + provider = user['auth-provider']['config'] + + # new token + self.assertEqual(provider.value['id-token'], "token-changed") + + if __name__ == '__main__': unittest.main() From b3ddbd903a45d24091c56060bae3bc9fe74f4e6d Mon Sep 17 00:00:00 2001 From: Ben Picolo Date: Tue, 19 Feb 2019 18:28:50 -0500 Subject: [PATCH 23/32] Add tests for updated pieces --- config/kube_config.py | 6 +-- config/kube_config_test.py | 79 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/config/kube_config.py b/config/kube_config.py index b939685e..9f9dcf8a 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -257,18 +257,18 @@ def _load_oid_token(self, provider): if any(char in token for char in reserved_characters): # Invalid jwt, as it contains url-unsafe chars - return None + return parts = token.split('.') if len(parts) != 3: # Not a valid JWT - return None + return padding = (4 - len(parts[1]) % 4) * '=' if len(padding) == 3: # According to spec, 3 padding characters cannot occur # in a valid jwt # https://tools.ietf.org/html/rfc7515#appendix-C - return None + return if PY3: jwt_attributes = json.loads( diff --git a/config/kube_config_test.py b/config/kube_config_test.py index faa4c417..4ddc6f35 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -107,6 +107,17 @@ def _raise_exception(st): TEST_OIDC_EXP_BASE, _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT) ]) +TEST_OIDC_CONTAINS_RESERVED_CHARACTERS = ".".join([ + _urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN), + _urlsafe_unpadded_b64encode(TEST_OIDC_INFO).replace("a", "+"), + _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT) +]) +TEST_OIDC_INVALID_PADDING_LENGTH = ".".join([ + _urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN), + "aaaaa", + _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT) +]) + TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH) @@ -394,6 +405,22 @@ class TestKubeConfigLoader(BaseTestCase): "user": "expired_oidc_nocert" } }, + { + "name": "oidc_contains_reserved_character", + "context": { + "cluster": "default", + "user": "oidc_contains_reserved_character" + + } + }, + { + "name": "oidc_invalid_padding_length", + "context": { + "cluster": "default", + "user": "oidc_invalid_padding_length" + + } + }, { "name": "user_pass", "context": { @@ -556,6 +583,38 @@ class TestKubeConfigLoader(BaseTestCase): } } }, + { + "name": "oidc_contains_reserved_character", + "user": { + "auth-provider": { + "name": "oidc", + "config": { + "client-id": "tectonic-kubectl", + "client-secret": "FAKE_SECRET", + "id-token": TEST_OIDC_CONTAINS_RESERVED_CHARACTERS, + "idp-issuer-url": "https://example.org/identity", + "refresh-token": + "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" + } + } + } + }, + { + "name": "oidc_invalid_padding_length", + "user": { + "auth-provider": { + "name": "oidc", + "config": { + "client-id": "tectonic-kubectl", + "client-secret": "FAKE_SECRET", + "id-token": TEST_OIDC_INVALID_PADDING_LENGTH, + "idp-issuer-url": "https://example.org/identity", + "refresh-token": + "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" + } + } + } + }, { "name": "user_pass", "user": { @@ -712,6 +771,26 @@ def test_oidc_with_refresh_nocert( self.assertTrue(loader._load_auth_provider_token()) self.assertEqual("Bearer abc123", loader.token) + def test_oidc_fails_if_contains_reserved_chars(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="oidc_contains_reserved_character", + ) + self.assertEqual( + loader._load_oid_token("oidc_contains_reserved_character"), + None, + ) + + def test_oidc_fails_if_invalid_padding_length(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="oidc_invalid_padding_length", + ) + self.assertEqual( + loader._load_oid_token("oidc_invalid_padding_length"), + None, + ) + def test_user_pass(self): expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN) actual = FakeConfig() From c1a0a37e4e55003de06793a86c616db2e2c6274d Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 21 Jan 2019 16:43:38 +0300 Subject: [PATCH 24/32] fix stream data decoding Related to https://github.com/kubernetes-client/python-base/issues/88 and https://github.com/kubernetes-client/python-base/pull/104 I suppose decoding complete data is better than decoding data chunks. --- stream/ws_client.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index cf8a3fe9..c6f1f0da 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -45,7 +45,10 @@ def __init__(self, configuration, url, headers): header = [] self._connected = False self._channels = {} - self._all = "" + if six.PY3: + self._all = b"" + else: + self._all = "" # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code @@ -103,8 +106,11 @@ def readline_channel(self, channel, timeout=None): while self.is_open() and time.time() - start < timeout: if channel in self._channels: data = self._channels[channel] - if "\n" in data: - index = data.find("\n") + newline_symbol = "\n" + if six.PY3: + newline_Symbol = b"\n" + if newline_symbol in data: + index = data.find(newline_symbol) ret = data[:index] data = data[index+1:] if data: @@ -152,9 +158,15 @@ def read_all(self): channels mapped for each input. """ out = self._all - self._all = "" + if six.PY3: + self._all = b"" + else: + self._all = "" self._channels = {} - return out + if six.PY3: + return out.decode("utf-8", "replace") + else: + return out def is_open(self): """True if the connection is still alive.""" @@ -180,10 +192,11 @@ def update(self, timeout=0): return elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT: data = frame.data - if six.PY3: - data = data.decode("utf-8") if len(data) > 1: - channel = ord(data[0]) + if six.PY3: + channel = data[0] + else: + channel = ord(data[0]) data = data[1:] if data: if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]: From 6cc9fa0b284dec1d7d8c1e269ac1cb2430ff1508 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 21 Jan 2019 16:57:00 +0300 Subject: [PATCH 25/32] complete data decoding --- stream/ws_client.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index c6f1f0da..db10b30d 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -85,7 +85,11 @@ def peek_channel(self, channel, timeout=0): empty string otherwise.""" self.update(timeout=timeout) if channel in self._channels: + if six.PY3: + return self._channels[channel].decode("utf-8", "replace") return self._channels[channel] + if six.PY3: + return b"" return "" def read_channel(self, channel, timeout=0): @@ -96,6 +100,8 @@ def read_channel(self, channel, timeout=0): ret = self._channels[channel] if channel in self._channels: del self._channels[channel] + if six.PY3 and isinstance(ret, bytes): + return ret.decode("utf-8", "replace") return ret def readline_channel(self, channel, timeout=None): @@ -117,7 +123,10 @@ def readline_channel(self, channel, timeout=None): self._channels[channel] = data else: del self._channels[channel] - return ret + if six.PY3 and isinstance(ret, bytes): + return ret.decode("utf-8", "replace") + else: + return ret self.update(timeout=(timeout - time.time() + start)) def write_channel(self, channel, data): From 678224ad3d25cbdffbef2981a3fbabe7b5f045d6 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Thu, 31 Jan 2019 23:19:16 +0300 Subject: [PATCH 26/32] fix symbols definitions tx to micw comments --- stream/ws_client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index db10b30d..8d6bdfd0 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -45,10 +45,9 @@ def __init__(self, configuration, url, headers): header = [] self._connected = False self._channels = {} + self._all = "" if six.PY3: self._all = b"" - else: - self._all = "" # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code @@ -114,7 +113,7 @@ def readline_channel(self, channel, timeout=None): data = self._channels[channel] newline_symbol = "\n" if six.PY3: - newline_Symbol = b"\n" + newline_symbol = b"\n" if newline_symbol in data: index = data.find(newline_symbol) ret = data[:index] From ef65b0474a1000f226a7db4be2fd55c186609ec1 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Fri, 1 Feb 2019 13:06:04 +0300 Subject: [PATCH 27/32] fix segment decode errors for watch ``` Traceback (most recent call last): File "controller_unitproject/run.py", line 33, in run() File "controller_unitproject/run.py", line 29, in run unitproject.run() File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/module_k8s_controller_sdk/controller.py", line 215, in run self.resync_and_watch() File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/module_k8s_controller_sdk/controller.py", line 209, in resync_and_watch from_version=last_known_resource_version File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/module_k8s_controller_sdk/watcher.py", line 92, in watch self.__watch(seconds_to_out) File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/module_k8s_controller_sdk/watcher.py", line 51, in __watch for event in stream: File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/kubernetes/watch/watch.py", line 130, in stream for line in iter_resp_lines(resp): File "/home/bajal/.local/share/virtualenvs/controller_unitproject-1u3cbEWP/lib/python3.7/site-packages/kubernetes/watch/watch.py", line 47, in iter_resp_lines seg = seg.decode('utf8') UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd0 in position 2047: unexpected end of data ``` --- watch/watch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/watch/watch.py b/watch/watch.py index 5966eace..a8ef58bd 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -47,7 +47,7 @@ def iter_resp_lines(resp): prev = "" for seg in resp.read_chunked(decode_content=False): if isinstance(seg, bytes): - seg = seg.decode('utf8') + seg = seg.decode('utf8', 'replace') seg = prev + seg lines = seg.split("\n") if not seg.endswith("\n"): From fc1dc36975ce1dfa72b36a599a26271cd21f4cbe Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Fri, 1 Feb 2019 13:24:30 +0300 Subject: [PATCH 28/32] correct decoding inside of watch --- watch/watch.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/watch/watch.py b/watch/watch.py index a8ef58bd..b6ee9169 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -45,19 +45,26 @@ def _find_return_type(func): def iter_resp_lines(resp): prev = "" + newline_symbol = "\n" + if six.PY3: + prev = b"" + newline_symbol = b"\n" for seg in resp.read_chunked(decode_content=False): - if isinstance(seg, bytes): - seg = seg.decode('utf8', 'replace') seg = prev + seg - lines = seg.split("\n") - if not seg.endswith("\n"): + lines = seg.split(newline_symbol) + if not seg.endswith(newline_symbol): prev = lines[-1] lines = lines[:-1] else: prev = "" + if six.PY3: + prev = b"" for line in lines: if line: - yield line + if isinstance(line, bytes): + yield line.decode('utf8', 'replace') + else: + yield line class Watch(object): From 8a6f093314c2a3535d5256212fb9ab8f5b5b49f3 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 11 Feb 2019 20:01:55 +0300 Subject: [PATCH 29/32] make ws client decode data only on OPCODE_TEXT Following comment https://github.com/kubernetes-client/python-base/pull/112#issuecomment-460427209 Tested on running controller. --- stream/ws_client.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 8d6bdfd0..101632e6 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -45,9 +45,11 @@ def __init__(self, configuration, url, headers): header = [] self._connected = False self._channels = {} + self._channels_opcodes = {} self._all = "" if six.PY3: self._all = b"" + self._all_contains_binary = False # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code @@ -78,13 +80,16 @@ def __init__(self, configuration, url, headers): self.sock = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False) self.sock.connect(url, header=header) self._connected = True + + def __channel_is_text(self, channel): + return self._channels_opcodes.get(channel, ABNF.OPCODE_TEXT) == ABNF.OPCODE_TEXT def peek_channel(self, channel, timeout=0): """Peek a channel and return part of the input, empty string otherwise.""" self.update(timeout=timeout) if channel in self._channels: - if six.PY3: + if six.PY3 and self.__channel_is_text(channel): return self._channels[channel].decode("utf-8", "replace") return self._channels[channel] if six.PY3: @@ -99,7 +104,7 @@ def read_channel(self, channel, timeout=0): ret = self._channels[channel] if channel in self._channels: del self._channels[channel] - if six.PY3 and isinstance(ret, bytes): + if six.PY3 and isinstance(ret, bytes) and self.__channel_is_text(channel): return ret.decode("utf-8", "replace") return ret @@ -171,7 +176,7 @@ def read_all(self): else: self._all = "" self._channels = {} - if six.PY3: + if six.PY3 and not self._all_contains_binary: return out.decode("utf-8", "replace") else: return out @@ -211,7 +216,10 @@ def update(self, timeout=0): # keeping all messages in the order they received # for non-blocking call. self._all += data + if not self._all_contains_binary: + self._all_contains_binary = (op_code == ABNF.OPCODE_BINARY) if channel not in self._channels: + self._channels_opcodes[channel] = op_code self._channels[channel] = data else: self._channels[channel] += data From 90084915171150b0ec4bc4549351ddbd89a8deca Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 11 Feb 2019 20:24:23 +0300 Subject: [PATCH 30/32] Update watch.py --- watch/watch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/watch/watch.py b/watch/watch.py index b6ee9169..67c1ce80 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -16,6 +16,7 @@ import json import pydoc +import six from kubernetes import client From 4295b2de85a4199f440e3026952b14e4eb07733c Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 11 Feb 2019 20:49:47 +0300 Subject: [PATCH 31/32] Update watch_test.py --- watch/watch_test.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/watch/watch_test.py b/watch/watch_test.py index ebc400af..56278d2d 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import six import unittest from mock import Mock, call @@ -40,6 +41,17 @@ def test_watch_with_decode(self): '{"type": "ADDED", "object": {"metadata": {"name": "test3",' '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n', 'should_not_happened\n']) + if six.PY3: + fake_resp.read_chunked = Mock( + return_value=[ + b'{"type": "ADDED", "object": {"metadata": {"name": "test1",' + b'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', + b'{"type": "ADDED", "object": {"metadata": {"name": "test2",' + b'"resourceVersion": "2"}, "spec": {}, "sta', + b'tus": {}}}\n' + b'{"type": "ADDED", "object": {"metadata": {"name": "test3",' + b'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n', + b'should_not_happened\n']) fake_api = Mock() fake_api.get_namespaces = Mock(return_value=fake_resp) @@ -172,6 +184,9 @@ def test_watch_stream_twice(self): fake_resp.release_conn = Mock() fake_resp.read_chunked = Mock( return_value=['{"type": "ADDED", "object": 1}\n'] * 4) + if six.PY3: + fake_resp.read_chunked = Mock( + return_value=[b'{"type": "ADDED", "object": 1}\n'] * 4) fake_api = Mock() fake_api.get_namespaces = Mock(return_value=fake_resp) @@ -199,6 +214,9 @@ def test_watch_stream_loop(self): fake_resp.release_conn = Mock() fake_resp.read_chunked = Mock( return_value=['{"type": "ADDED", "object": 1}\n']) + if six.PY3: + fake_resp.read_chunked = Mock( + return_value=[b'{"type": "ADDED", "object": 1}\n']) fake_api = Mock() fake_api.get_namespaces = Mock(return_value=fake_resp) From e8db1ba071fbf2575f0db1c854f497089580e05a Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Thu, 20 Jun 2019 20:55:48 +0300 Subject: [PATCH 32/32] no need to fix that --- stream/ws_client.py | 45 ++++++++------------------------------------- 1 file changed, 8 insertions(+), 37 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 101632e6..cf8a3fe9 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -45,11 +45,7 @@ def __init__(self, configuration, url, headers): header = [] self._connected = False self._channels = {} - self._channels_opcodes = {} self._all = "" - if six.PY3: - self._all = b"" - self._all_contains_binary = False # We just need to pass the Authorization, ignore all the other # http headers we get from the generated code @@ -80,20 +76,13 @@ def __init__(self, configuration, url, headers): self.sock = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False) self.sock.connect(url, header=header) self._connected = True - - def __channel_is_text(self, channel): - return self._channels_opcodes.get(channel, ABNF.OPCODE_TEXT) == ABNF.OPCODE_TEXT def peek_channel(self, channel, timeout=0): """Peek a channel and return part of the input, empty string otherwise.""" self.update(timeout=timeout) if channel in self._channels: - if six.PY3 and self.__channel_is_text(channel): - return self._channels[channel].decode("utf-8", "replace") return self._channels[channel] - if six.PY3: - return b"" return "" def read_channel(self, channel, timeout=0): @@ -104,8 +93,6 @@ def read_channel(self, channel, timeout=0): ret = self._channels[channel] if channel in self._channels: del self._channels[channel] - if six.PY3 and isinstance(ret, bytes) and self.__channel_is_text(channel): - return ret.decode("utf-8", "replace") return ret def readline_channel(self, channel, timeout=None): @@ -116,21 +103,15 @@ def readline_channel(self, channel, timeout=None): while self.is_open() and time.time() - start < timeout: if channel in self._channels: data = self._channels[channel] - newline_symbol = "\n" - if six.PY3: - newline_symbol = b"\n" - if newline_symbol in data: - index = data.find(newline_symbol) + if "\n" in data: + index = data.find("\n") ret = data[:index] data = data[index+1:] if data: self._channels[channel] = data else: del self._channels[channel] - if six.PY3 and isinstance(ret, bytes): - return ret.decode("utf-8", "replace") - else: - return ret + return ret self.update(timeout=(timeout - time.time() + start)) def write_channel(self, channel, data): @@ -171,15 +152,9 @@ def read_all(self): channels mapped for each input. """ out = self._all - if six.PY3: - self._all = b"" - else: - self._all = "" + self._all = "" self._channels = {} - if six.PY3 and not self._all_contains_binary: - return out.decode("utf-8", "replace") - else: - return out + return out def is_open(self): """True if the connection is still alive.""" @@ -205,21 +180,17 @@ def update(self, timeout=0): return elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT: data = frame.data + if six.PY3: + data = data.decode("utf-8") if len(data) > 1: - if six.PY3: - channel = data[0] - else: - channel = ord(data[0]) + channel = ord(data[0]) data = data[1:] if data: if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]: # keeping all messages in the order they received # for non-blocking call. self._all += data - if not self._all_contains_binary: - self._all_contains_binary = (op_code == ABNF.OPCODE_BINARY) if channel not in self._channels: - self._channels_opcodes[channel] = op_code self._channels[channel] = data else: self._channels[channel] += data