From 97c84976a61c0c0793a5ad521401c0e7643057b9 Mon Sep 17 00:00:00 2001 From: Flynn Date: Sun, 27 Jan 2019 00:53:05 -0500 Subject: [PATCH 1/4] Don't read secrets we haven't been told to use. Do make startup more stable. Don't re-parse the config for every diag page load. --- Dockerfile | 5 +- ambassador/ambassador/cli.py | 14 +- .../ambassador/diagnostics/envoy_stats.py | 6 + ambassador/ambassador/envoy/common.py | 42 ++-- ambassador/ambassador/envoy/v1/v1config.py | 11 +- ambassador/ambassador/envoy/v2/v2config.py | 7 +- ambassador/ambassador/ir/ir.py | 6 +- ambassador/ambassador/ir/irtlscontext.py | 2 +- ambassador/ambassador/utils.py | 118 ++++++++--- ambassador/ambassador_diag/diagd.py | 200 +++++++++++++----- ambassador/entrypoint.sh | 40 +++- ambassador/kubewatch.py | 5 +- ambassador/post_update.py | 20 ++ ambassador/tests/ambassador_test.py | 2 +- ambassador/tests/kat/abstract_tests.py | 7 +- 15 files changed, 355 insertions(+), 130 deletions(-) create mode 100644 ambassador/post_update.py diff --git a/Dockerfile b/Dockerfile index 402f12a8a9..0b78987a03 100644 --- a/Dockerfile +++ b/Dockerfile @@ -63,7 +63,7 @@ RUN releng/install-py.sh prd install */requirements.txt RUN rm -rf ./multi ./ambassador # Grab kubewatch -RUN wget -q https://s3.amazonaws.com/datawire-static-files/kubewatch/0.3.13/$(go env GOOS)/$(go env GOARCH)/kubewatch +RUN wget -q https://s3.amazonaws.com/datawire-static-files/kubewatch/0.3.15/$(go env GOOS)/$(go env GOARCH)/kubewatch RUN chmod +x kubewatch # Clean up no-longer-needed dev stuff. @@ -109,7 +109,8 @@ RUN chgrp -R 0 ${AMBASSADOR_ROOT} && \ # COPY the entrypoint and Python-kubewatch and make them runnable. COPY ambassador/kubewatch.py . COPY ambassador/entrypoint.sh . -RUN chmod 755 kubewatch.py entrypoint.sh +COPY ambassador/post_update.py . +RUN chmod 755 kubewatch.py entrypoint.sh post_update.py # Grab ambex, too. RUN wget -q https://s3.amazonaws.com/datawire-static-files/ambex/0.1.1/ambex diff --git a/ambassador/ambassador/cli.py b/ambassador/ambassador/cli.py index a1e3132397..9d024f1623 100644 --- a/ambassador/ambassador/cli.py +++ b/ambassador/ambassador/cli.py @@ -30,7 +30,7 @@ from .envoy import EnvoyConfig, V1Config, V2Config from .ir.irtlscontext import IRTLSContext -from .utils import RichStatus, SavedSecret, SplitConfigChecker +from .utils import RichStatus, SavedSecret, SecretSaver __version__ = Version @@ -104,10 +104,12 @@ def file_checker(path: str) -> bool: return True -def cli_secret_reader(context: IRTLSContext, secret_name: str, namespace: str, secret_root: str) -> SavedSecret: +def cli_secret_reader(context: IRTLSContext, secret_name: str, namespace: str) -> SavedSecret: # In the Real World, the secret reader should, y'know, read secrets.. # Here we're just gonna fake it. + secret_root = os.environ.get('AMBASSADOR_CONFIG_BASE_DIR', "/ambassador") + cert_path = os.path.join(secret_root, namespace, "cli-secrets", secret_name, "tls.crt") key_path = os.path.join(secret_root, namespace, "cli-secrets", secret_name, "tls.key") @@ -371,7 +373,7 @@ def splitconfig(root_path: Parameter.REQUIRED, *, ambex_pid: int=0, # root_path contains directories for each resource type: services, secrets, optional # crd-whatever paths. - scc = SplitConfigChecker(logger, root_path) + scc = SecretSaver(logger, root_path, root_path) # Start by assuming that we're going to look at everything. config_root = root_path @@ -384,10 +386,8 @@ def splitconfig(root_path: Parameter.REQUIRED, *, ambex_pid: int=0, aconf = Config() aconf.load_from_directory(config_root, k8s=k8s, recurse=True) - # Use the SplitConfigChecker to resolve secrets. We don't pass a file checker - # because anything in the config using an actual path needs to be passing a - # correct path by this point. - ir = IR(aconf, secret_reader=scc.secret_reader) + # Use the SecretSaver to read secret from the filesystem. + ir = IR(aconf, secret_reader=scc.file_reader) # Generate a V2Config from that, and grab the split bootstrap and ADS configs. v2config = V2Config(ir) diff --git a/ambassador/ambassador/diagnostics/envoy_stats.py b/ambassador/ambassador/diagnostics/envoy_stats.py index f51b840b2b..f810d861f9 100644 --- a/ambassador/ambassador/diagnostics/envoy_stats.py +++ b/ambassador/ambassador/diagnostics/envoy_stats.py @@ -141,6 +141,8 @@ def cluster_stats(self, name): return cstat def update_log_levels(self, last_attempt, level=None): + # logging.info("updating levels") + try: url = "http://127.0.0.1:8001/logging" @@ -182,6 +184,8 @@ def update_log_levels(self, last_attempt, level=None): return True def update_envoy_stats(self, last_attempt): + # logging.info("updating stats") + try: r = requests.get("http://127.0.0.1:8001/stats") except OSError as e: @@ -297,6 +301,8 @@ def update_envoy_stats(self, last_attempt): "envoy": envoy_stats }) + # logging.info("stats updated") + # def update(self, active_mapping_names): def update(self): try: diff --git a/ambassador/ambassador/envoy/common.py b/ambassador/ambassador/envoy/common.py index 2e24b153d0..8582e7df77 100644 --- a/ambassador/ambassador/envoy/common.py +++ b/ambassador/ambassador/envoy/common.py @@ -12,13 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Tuple + +import json from abc import abstractmethod from ..ir import IR, IRResource from ..ir.irmapping import IRMappingGroup +def sanitize_pre_json(input): + # Removes all potential null values + if isinstance(input, dict): + for key, value in list(input.items()): + if value is None: + del input[key] + else: + sanitize_pre_json(value) + elif isinstance(input, list): + for item in input: + sanitize_pre_json(item) + return input + class EnvoyConfig: """ Base class for Envoy configuration that permits fetching configuration @@ -49,6 +64,17 @@ def save_element(self, kind: str, resource: IRResource, obj: Any): self.add_element(kind, resource.location, obj) return obj + @abstractmethod + def split_config(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: + pass + + @abstractmethod + def as_dict(self) -> Dict[str, Any]: + pass + + def as_json(self): + return json.dumps(sanitize_pre_json(self.as_dict()), sort_keys=True, indent=4) + @classmethod def generate(cls, ir: IR, version: str="V2") -> 'EnvoyConfig': if version == "V1": @@ -70,17 +96,3 @@ def _get_envoy_route(self, group: IRMappingGroup) -> str: return self.regex else: return self.prefix - - -def sanitize_pre_json(input): - # Removes all potential null values - if isinstance(input, dict): - for key, value in list(input.items()): - if value is None: - del input[key] - else: - sanitize_pre_json(value) - elif isinstance(input, list): - for item in input: - sanitize_pre_json(item) - return input \ No newline at end of file diff --git a/ambassador/ambassador/envoy/v1/v1config.py b/ambassador/ambassador/envoy/v1/v1config.py index 0c1958014c..cb6f336ab8 100644 --- a/ambassador/ambassador/envoy/v1/v1config.py +++ b/ambassador/ambassador/envoy/v1/v1config.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License -from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union - -import json -import logging +from typing import Any, Dict, List, Optional, Tuple from ...ir import IR from ..common import EnvoyConfig @@ -55,7 +52,7 @@ def __init__(self, ir: IR) -> None: V1Tracing.generate(self) V1GRPCService.generate(self) - def as_dict(self): + def as_dict(self) -> Dict[str, Any]: d = { 'admin': self.admin, 'listeners': self.listeners, @@ -74,5 +71,5 @@ def as_dict(self): return d - def as_json(self): - return json.dumps(self.as_dict(), sort_keys=True, indent=4) + def split_config(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: + raise NotImplementedError \ No newline at end of file diff --git a/ambassador/ambassador/envoy/v2/v2config.py b/ambassador/ambassador/envoy/v2/v2config.py index dfb28f228c..e3a834e69f 100644 --- a/ambassador/ambassador/envoy/v2/v2config.py +++ b/ambassador/ambassador/envoy/v2/v2config.py @@ -57,7 +57,7 @@ def __init__(self, ir: IR) -> None: V2StaticResources.generate(self) V2Bootstrap.generate(self) - def as_dict(self): + def as_dict(self) -> Dict[str, Any]: d = { 'bootstrap': self.bootstrap, 'static_resources': self.static_resources @@ -65,10 +65,7 @@ def as_dict(self): return d - def as_json(self): - return json.dumps(sanitize_pre_json(self.as_dict()), sort_keys=True, indent=4) - - def split_config(self) -> tuple: + def split_config(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: ads_config = { '@type': '/envoy.config.bootstrap.v2.Bootstrap', 'static_resources': self.static_resources diff --git a/ambassador/ambassador/ir/ir.py b/ambassador/ambassador/ir/ir.py index 50b723d002..06bc42b969 100644 --- a/ambassador/ambassador/ir/ir.py +++ b/ambassador/ambassador/ir/ir.py @@ -74,12 +74,10 @@ def __init__(self, aconf: Config, secret_reader=None, file_checker=None) -> None self.logger = logging.getLogger("ambassador.ir") # We're using setattr since since mypy complains about assigning directly to a method. - setattr(self, 'secret_reader', secret_reader or KubeSecretReader()) + secret_root = os.environ.get('AMBASSADOR_CONFIG_BASE_DIR', "/ambassador") + setattr(self, 'secret_reader', secret_reader or KubeSecretReader(secret_root)) setattr(self, 'file_checker', file_checker if file_checker is not None else os.path.isfile) - # OK. Remember the root of the secret store... - self.secret_root = os.environ.get('AMBASSADOR_CONFIG_BASE_DIR', "/ambassador") - self.logger.debug("IR __init__:") self.logger.debug("IR: Version %s built from %s on %s" % (Version, Build.git.commit, Build.git.branch)) self.logger.debug("IR: AMBASSADOR_ID %s" % self.ambassador_id) diff --git a/ambassador/ambassador/ir/irtlscontext.py b/ambassador/ambassador/ir/irtlscontext.py index 9d9c7aeb6f..a3dc25e3e4 100644 --- a/ambassador/ambassador/ir/irtlscontext.py +++ b/ambassador/ambassador/ir/irtlscontext.py @@ -127,7 +127,7 @@ def load_secret(self, secret_name: str) -> SavedSecret: secret_name, namespace = secret_name.split('.', 1) sr = getattr(self.ir, 'secret_reader') - return sr(self, secret_name, namespace, self.ir.secret_root) + return sr(self, secret_name, namespace) def resolve(self) -> bool: # is_valid determines if the TLS context is valid diff --git a/ambassador/ambassador/utils.py b/ambassador/ambassador/utils.py index 5fe1f0a012..16259d5a7a 100644 --- a/ambassador/ambassador/utils.py +++ b/ambassador/ambassador/utils.py @@ -14,14 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License -from typing import Dict, Optional, TYPE_CHECKING +from typing import Dict, Optional, TextIO, TYPE_CHECKING import binascii +import io import socket import threading import time import os import logging +import requests import yaml from kubernetes import client, config @@ -36,6 +38,42 @@ logger.setLevel(logging.INFO) +def _load_url_contents(logger: logging.Logger, url: str, stream: TextIO) -> bool: + saved = False + + try: + with requests.get(url, stream=True) as r: + if r.status_code == 200: + + # All's well, pull the config down. + try: + for chunk in r.iter_content(chunk_size=65536, decode_unicode=True): + stream.write(chunk) + saved = True + except IOError as e: + logger.error("couldn't save Kubernetes service resources: %s" % e) + except Exception as e: + logger.error("couldn't read Kubernetes service resources: %s" % e) + except requests.exceptions.RequestException as e: + logger.error("could not load new snapshot: %s" % e) + + return saved + +def save_url_contents(logger: logging.Logger, url: str, path: str) -> bool: + with open(path, 'w', encoding='utf-8') as stream: + return _load_url_contents(logger, url, stream) + +def load_url_contents(logger: logging.Logger, url: str) -> Optional[str]: + stream = io.StringIO() + + saved = _load_url_contents(logger, url, stream) + + if saved: + return stream.getvalue() + else: + return None + + class TLSPaths(Enum): mount_cert_dir = "/etc/certs" mount_tls_crt = os.path.join(mount_cert_dir, "tls.crt") @@ -58,6 +96,7 @@ def generate(directory): 'key': os.path.join(directory, 'tls.key') } + class SystemInfo: MyHostName = 'localhost' MyResolvedName = '127.0.0.1' @@ -133,6 +172,7 @@ def referenced_by(self, source): if source not in refby: refby.append(source) + class DelayTrigger (threading.Thread): def __init__(self, onfired, timeout=5, name=None): super().__init__() @@ -212,11 +252,12 @@ def __str__(self) -> str: class KubeSecretReader: - def __init__(self) -> None: + def __init__(self, secret_root: str) -> None: self.v1 = None self.__name__ = 'KubeSecretReader' + self.secret_root = secret_root - def __call__(self, context: 'IRTLSContext', secret_name: str, namespace: str, secret_root: str): + def __call__(self, context: 'IRTLSContext', secret_name: str, namespace: str): # Make sure we have a Kube connection. if not self.v1: self.v1 = kube_v1() @@ -246,7 +287,7 @@ def __call__(self, context: 'IRTLSContext', secret_name: str, namespace: str, se if key: key = binascii.a2b_base64(key) - secret_dir = os.path.join(secret_root, namespace, "secrets", secret_name) + secret_dir = os.path.join(self.secret_root, namespace, "secrets", secret_name) cert_path = None key_path = None @@ -267,36 +308,57 @@ def __call__(self, context: 'IRTLSContext', secret_name: str, namespace: str, se return SavedSecret(secret_name, namespace, cert_path, key_path, cert_data) -class SplitConfigChecker: - def __init__(self, logger, root_path: str) -> None: +class SecretSaver: + def __init__(self, logger: logging.Logger, source_root: str, cache_dir: str) -> None: self.logger = logger - self.root = root_path + self.source_root = source_root + self.cache_dir = cache_dir - def secret_reader(self, context: 'IRTLSContext', secret_name: str, namespace: str, secret_root: str): - yaml_path = os.path.join(self.root, namespace, "secrets", "%s.yaml" % secret_name) + def file_reader(self, context: 'IRTLSContext', secret_name: str, namespace: str): + self.context = context + self.secret_name = secret_name + self.namespace = namespace + + self.source = os.path.join(self.source_root, namespace, "secrets", "%s.yaml" % secret_name) - serialization = None + self.serialization = None + + try: + self.serialization = open(self.source, "r").read() + except IOError as e: + self.logger.error("TLSContext %s: SCC.file_reader could not open %s" % (context.name, self.source)) + + return self.secret_parser() + + def url_reader(self, context: 'IRTLSContext', secret_name: str, namespace: str): + self.context = context + self.secret_name = secret_name + self.namespace = namespace + + self.source = "%s/secrets/%s/%s" % (self.source_root, namespace, secret_name) + self.serialization = load_url_contents(self.logger, self.source) + + if not self.serialization: + self.logger.error("TLSContext %s: SCC.url_reader could not load %s" % (context.name, self.source)) + + return self.secret_parser() + + def secret_parser(self) -> SavedSecret: objects = [] cert_data = None cert = None key = None cert_path = None key_path = None + ocount = 0 + errors = 0 - try: - serialization = open(yaml_path, "r").read() - except IOError as e: - self.logger.error("TLSContext %s: SCC.secret_reader could not open %s" % (context.name, yaml_path)) - - if serialization: + if self.serialization: try: - objects.extend(list(yaml.safe_load_all(serialization))) + objects.extend(list(yaml.safe_load_all(self.serialization))) except yaml.error.YAMLError as e: self.logger.error("TLSContext %s: SCC.secret_reader could not parse %s: %s" % - (context.name, yaml_path, e)) - - ocount = 0 - errors = 0 + (self.context.name, self.source, e)) for obj in objects: ocount += 1 @@ -304,7 +366,7 @@ def secret_reader(self, context: 'IRTLSContext', secret_name: str, namespace: st if kind != "Secret": self.logger.error("TLSContext %s: SCC.secret_reader found K8s %s at %s.%d?" % - (context.name, kind, yaml_path, ocount)) + (self.context.name, kind, self.source, ocount)) errors += 1 continue @@ -312,14 +374,14 @@ def secret_reader(self, context: 'IRTLSContext', secret_name: str, namespace: st if not metadata: self.logger.error("TLSContext %s: SCC.secret_reader found K8s Secret with no metadata at %s.%d?" % - (context.name, yaml_path, ocount)) + (self.context.name, self.source, ocount)) errors += 1 continue if 'data' in obj: if cert_data: self.logger.error("TLSContext %s: SCC.secret_reader found multiple Secrets in %s?" % - (context.name, yaml_path)) + (self.context.name, self.source)) errors += 1 continue @@ -330,7 +392,7 @@ def secret_reader(self, context: 'IRTLSContext', secret_name: str, namespace: st # # if not cert_data: # self.logger.error("TLSContext %s: SCC.secret_reader found no certificate in %s?" % - # (context.name, yaml_path)) + # (self.context.name, self.source)) # return None # OK, we have something to work with. Hopefully. @@ -348,11 +410,11 @@ def secret_reader(self, context: 'IRTLSContext', secret_name: str, namespace: st # if not cert: # # This is an error. Having a cert but no key might be OK, we'll let our caller decide. # self.logger.error("TLSContext %s: SCC.secret_reader found data but no cert in %s?" % - # (context.name, yaml_path)) + # (self.context.name, yaml_path)) # return None if cert: - secret_dir = os.path.join(self.root, namespace, "secrets-decoded", secret_name) + secret_dir = os.path.join(self.cache_dir, self.namespace, "secrets-decoded", self.secret_name) try: os.makedirs(secret_dir) @@ -366,7 +428,7 @@ def secret_reader(self, context: 'IRTLSContext', secret_name: str, namespace: st key_path = os.path.join(secret_dir, "tls.key") open(key_path, "w").write(key.decode("utf-8")) - return SavedSecret(secret_name, namespace, cert_path, key_path, cert_data) + return SavedSecret(self.secret_name, self.namespace, cert_path, key_path, cert_data) def kube_v1(): diff --git a/ambassador/ambassador_diag/diagd.py b/ambassador/ambassador_diag/diagd.py index 675d95aaf6..3e167d887c 100644 --- a/ambassador/ambassador_diag/diagd.py +++ b/ambassador/ambassador_diag/diagd.py @@ -22,6 +22,9 @@ import logging import multiprocessing import os +import queue +import signal +import threading import time import uuid @@ -34,7 +37,7 @@ from gunicorn.six import iteritems from ambassador import Config, IR, EnvoyConfig, Diagnostics, Scout, Version -from ambassador.utils import SystemInfo, PeriodicTrigger, SplitConfigChecker +from ambassador.utils import SystemInfo, PeriodicTrigger, SecretSaver, save_url_contents from ambassador.diagnostics import EnvoyStats @@ -60,10 +63,10 @@ 'module': 'https://www.getambassador.io/reference/configuration#modules', } -envoy_targets = { - 'route': 'https://envoyproxy.github.io/envoy/configuration/http_conn_man/route_config/route.html', - 'cluster': 'https://envoyproxy.github.io/envoy/configuration/cluster_manager/cluster.html', -} +# envoy_targets = { +# 'route': 'https://envoyproxy.github.io/envoy/configuration/http_conn_man/route_config/route.html', +# 'cluster': 'https://envoyproxy.github.io/envoy/configuration/cluster_manager/cluster.html', +# } def number_of_workers(): @@ -71,20 +74,28 @@ def number_of_workers(): class DiagApp (Flask): + ambex_pid: int estats: EnvoyStats config_dir_prefix: str + bootstrap_path: str + ads_path: str health_checks: bool debugging: bool verbose: bool k8s: bool notice_path: str logger: logging.Logger - scc: SplitConfigChecker + # scc: SecretSaver aconf: Config + ir: IR + econf: EnvoyConfig + diag: Diagnostics notices: 'Notices' scout: Scout scout_args: Dict[str, Any] scout_result: Dict[str, Any] + watcher: 'AmbassadorEventWatcher' + # Get the Flask app defined early. app = DiagApp(__name__, @@ -173,35 +184,6 @@ def extend(self, notices): self.post(notice) -def get_aconf(app) -> Config: - # We need to find the sync-# directory with the highest number... - sync_dirs = [] - latest = app.config_dir_prefix - - for subdir in os.listdir(app.config_dir_prefix): - if subdir.startswith("sync-"): - try: - sync_dirs.append(int(subdir.replace("sync-", ""))) - except ValueError: - pass - - if sync_dirs: - latest_generation = sorted(sync_dirs, reverse=True)[0] - latest = os.path.join(app.config_dir_prefix, "sync-%d" % latest_generation) - - app.logger.debug("Fetching resources from %s" % latest) - - app.scc = SplitConfigChecker(app.logger, latest) - - aconf = Config() - aconf.load_from_directory(latest, k8s=app.k8s, recurse=True) - - app.notices = Notices(app.notice_path) - app.notices.reset() - - return aconf - - def check_scout(app, what: str, ir: Optional[IR]=None) -> None: uptime = datetime.datetime.now() - boot_time hr_uptime = td_format(uptime) @@ -283,6 +265,25 @@ def envoy_status(estats): } +@app.route('/_internal/v0/ping', methods=[ 'GET' ]) +def handle_ping(): + return "ACK", 200 + + +@app.route('/_internal/v0/update', methods=[ 'POST' ]) +def handle_update(): + url = request.args.get('url', None) + + if not url: + app.logger.error("error: update requested with no URL") + return "error: update requested with no URL", 400 + + app.logger.info("Update requested from %s" % url) + app.watcher.post('CONFIG', url) + + return "update requested", 200 + + @app.route('/ambassador/v0/favicon.ico', methods=[ 'GET' ]) def favicon(): template_path = resource_filename(Requirement.parse("ambassador"), "templates") @@ -315,12 +316,10 @@ def check_ready(): def show_overview(reqid=None): app.logger.debug("OV %s - showing overview" % reqid) - aconf = get_aconf(app) - ir = IR(aconf, secret_reader=app.scc.secret_reader) - check_scout(app, "overview", ir) + ir = app.ir + diag = app.diag - econf = EnvoyConfig.generate(ir, "V2") - diag = Diagnostics(ir, econf) + check_scout(app, "overview", ir) if app.verbose: app.logger.debug("OV %s: DIAG" % reqid) @@ -404,12 +403,10 @@ def collect_errors_and_notices(request, reqid, what: str, diag: Diagnostics) -> def show_intermediate(source=None, reqid=None): app.logger.debug("SRC %s - getting intermediate for '%s'" % (reqid, source)) - aconf = get_aconf(app) - ir = IR(aconf, secret_reader=app.scc.secret_reader) - check_scout(app, "detail: %s" % source, ir) + ir = app.ir + diag = app.diag - econf = EnvoyConfig.generate(ir, "V2") - diag = Diagnostics(ir, econf) + check_scout(app, "detail: %s" % source, ir) method = request.args.get('method', None) resource = request.args.get('resource', None) @@ -474,7 +471,8 @@ def source_lookup(name, sources): return source.get('_source', name) -def create_diag_app(config_dir_path, do_checks=False, reload=False, debug=False, k8s=True, verbose=False, notices=None): +def create_diag_app(config_dir_path, bootstrap_path, ads_path, ambex_pid, + do_checks=False, reload=False, debug=False, k8s=True, verbose=False, notices=None): app.estats = EnvoyStats() app.health_checks = False app.debugging = reload @@ -482,6 +480,9 @@ def create_diag_app(config_dir_path, do_checks=False, reload=False, debug=False, app.k8s = k8s app.notice_path = notices + # This will raise an exception and crash if you pass it a string. That's intentional. + app.ambex_pid = int(ambex_pid) + # This feels like overkill. app.logger = logging.getLogger("ambassador.diagd") app.logger.setLevel(logging.INFO) @@ -494,10 +495,99 @@ def create_diag_app(config_dir_path, do_checks=False, reload=False, debug=False, app.health_checks = True app.config_dir_prefix = config_dir_path + app.bootstrap_path = bootstrap_path + app.ads_path = ads_path + + app.events = queue.Queue() + # app.scc = SecretSaver(app.logger, app.config_dir_prefix, app.config_dir_prefix) return app +class AmbassadorEventWatcher(threading.Thread): + def __init__(self, app: DiagApp) -> None: + super().__init__(name="AmbassadorEventWatcher", daemon=True) + self.app = app + self.logger = self.app.logger + self.events: queue.Queue = queue.Queue() + + def post(self, cmd: str, arg: str) -> None: + self.events.put((cmd, arg)) + + def update_estats(self) -> None: + self.post('ESTATS', '') + + def run(self): + self.logger.info("starting event watcher") + + while True: + cmd, arg = self.events.get() + + if cmd == 'ESTATS': + # self.logger.info("updating estats") + try: + self.app.estats.update() + except Exception as e: + self.logger.error("could not update estats: %s" % e) + self.logger.exception(e) + elif cmd == 'CONFIG': + try: + self.load_config(arg) + except Exception as e: + self.logger.error("could not reconfigure: %s" % e) + self.logger.exception(e) + else: + self.logger.error("unknown event type: '%s' '%s'" % (cmd, arg)) + + def load_config(self, url): + snapshot = url.split('/')[-1] + aconf_path = os.path.join(app.config_dir_prefix, "snapshot-%s.yaml" % snapshot) + ir_path = os.path.join(app.config_dir_prefix, "ir-%s.json" % snapshot) + + self.logger.info("copying configuration from %s to %s" % (url, aconf_path)) + + saved = save_url_contents(self.logger, "%s/services" % url, aconf_path) + + if saved: + scc = SecretSaver(app.logger, url, app.config_dir_prefix) + + aconf = Config() + # Yeah yeah yeah. It's not really a directory. Whatever. + aconf.load_from_directory(aconf_path, k8s=app.k8s, recurse=True) + + app.notices = Notices(app.notice_path) + app.notices.reset() + + ir = IR(aconf, secret_reader=scc.url_reader) + open(ir_path, "w").write(ir.as_json()) + + check_scout(app, "update", ir) + + econf = EnvoyConfig.generate(ir, "V2") + diag = Diagnostics(ir, econf) + + bootstrap_config, ads_config = econf.split_config() + + self.logger.info("saving Envoy configuration for snapshot %s" % snapshot) + + with open(app.bootstrap_path, "w") as output: + output.write(json.dumps(bootstrap_config, sort_keys=True, indent=4)) + + with open(app.ads_path, "w") as output: + output.write(json.dumps(ads_config, sort_keys=True, indent=4)) + + app.aconf = aconf + app.ir = ir + app.econf = econf + app.diag = diag + + if app.ambex_pid != 0: + self.logger.info("notifying PID %d ambex" % app.ambex_pid) + os.kill(app.ambex_pid, signal.SIGHUP) + + self.logger.info("configuration updated") + + class StandaloneApplication(gunicorn.app.base.BaseApplication): def __init__(self, app, options=None): self.options = options or {} @@ -511,19 +601,28 @@ def load_config(self): self.cfg.set(key.lower(), value) def load(self): + # This is a little weird, but whatever. + self.application.watcher = AmbassadorEventWatcher(self.application) + self.application.watcher.start() + if self.application.health_checks: self.application.logger.info("Starting periodic updates") - self.application.stats_updater = PeriodicTrigger(self.application.estats.update, period=5) + self.application.stats_updater = PeriodicTrigger(self.application.watcher.update_estats, period=5) return self.application -def _main(config_dir_path: Parameter.REQUIRED, *, no_checks=False, reload=False, debug=False, verbose=False, +def _main(config_dir_path: Parameter.REQUIRED, bootstrap_path: Parameter.REQUIRED, ads_path: Parameter.REQUIRED, + ambex_pid: Parameter.REQUIRED, *, + no_checks=False, reload=False, debug=False, verbose=False, workers=None, port=8877, host='0.0.0.0', k8s=False, notices=None): """ Run the diagnostic daemon. :param config_dir_path: Configuration directory to scan for Ambassador YAML files + :param bootstrap_path: Path to which to write bootstrap Envoy configuration + :param ads_path: Path to which to write ADS Envoy configuration + :param ambex_pid: PID to signal with HUP after updating Envoy configuration :param no_checks: If True, don't do Envoy-cluster health checking :param reload: If True, run Flask in debug mode for live reloading :param debug: If True, do debug logging @@ -533,9 +632,10 @@ def _main(config_dir_path: Parameter.REQUIRED, *, no_checks=False, reload=False, :param port: Port on which to listen (default 8877) :param notices: Optional file to read for local notices """ - + # Create the application itself. - flask_app = create_diag_app(config_dir_path, not no_checks, reload, debug, k8s, verbose, notices) + flask_app = create_diag_app(config_dir_path, bootstrap_path, ads_path, ambex_pid, + not no_checks, reload, debug, k8s, verbose, notices) if not workers: workers = number_of_workers() diff --git a/ambassador/entrypoint.sh b/ambassador/entrypoint.sh index effd303a63..ecc57b3428 100644 --- a/ambassador/entrypoint.sh +++ b/ambassador/entrypoint.sh @@ -37,6 +37,8 @@ fi ENVOY_DIR="${AMBASSADOR_CONFIG_BASE_DIR}/envoy" ENVOY_CONFIG_FILE="${ENVOY_DIR}/envoy.json" +# The bootstrap file really is in the config base dir, not the Envoy dir. +ENVOY_BOOTSTRAP_FILE="${AMBASSADOR_CONFIG_BASE_DIR}/bootstrap-ads.json" # Set AMBASSADOR_DEBUG to things separated by spaces to enable debugging. check_debug () { @@ -177,21 +179,45 @@ AMBASSADOR_CLUSTER_ID="${cluster_id}" export AMBASSADOR_CLUSTER_ID echo "AMBASSADOR: using cluster ID $AMBASSADOR_CLUSTER_ID" -echo "AMBASSADOR: starting diagd" -diagd "${CONFIG_DIR}" $DIAGD_DEBUG $DIAGD_K8S --notices "${AMBASSADOR_CONFIG_BASE_DIR}/notices.json" & -pids="${pids:+${pids} }$!:diagd" - echo "AMBASSADOR: starting ads" ./ambex "${ENVOY_DIR}" & AMBEX_PID="$!" pids="${pids:+${pids} }${AMBEX_PID}:ambex" echo "AMBASSADOR: starting Envoy" -envoy $ENVOY_DEBUG -c "${AMBASSADOR_CONFIG_BASE_DIR}/bootstrap-ads.json" & +envoy $ENVOY_DEBUG -c "${ENVOY_BOOTSTRAP_FILE}" & pids="${pids:+${pids} }$!:envoy" +echo "AMBASSADOR: starting diagd" +diagd "${CONFIG_DIR}" "${ENVOY_BOOTSTRAP_FILE}" "${ENVOY_CONFIG_FILE}" $AMBEX_PID $DIAGD_DEBUG $DIAGD_K8S --notices "${AMBASSADOR_CONFIG_BASE_DIR}/notices.json" & +pids="${pids:+${pids} }$!:diagd" + +# Wait for diagd to start +tries_left=10 +delay=1 +while [ $tries_left -gt 0 ]; do + echo "AMBASSADOR: pinging diagd ($tries_left)..." + + status=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:8877/_internal/v0/ping) + + if [ "$status" = "200" ]; then + break + fi + + tries_left=$(( $tries_left - 1 )) + sleep $delay + delay=$(( $delay * 2 )) + if [ $delay -gt 10 ]; then delay=5; fi +done + +if [ $tries_left -le 0 ]; then + echo "AMBASSADOR: giving up on diagd and hoping for the best..." +else + echo "AMBASSADOR: diagd running" +fi + if [ -z "${AMBASSADOR_NO_KUBERNETES}" ]; then - KUBEWATCH_SYNC_CMD="ambassador splitconfig --debug --k8s --bootstrap-path=${AMBASSADOR_CONFIG_BASE_DIR}/bootstrap-ads.json --ads-path=${ENVOY_CONFIG_FILE} --ambex-pid=${AMBEX_PID}" + KUBEWATCH_SYNC_CMD="python3 /ambassador/post_update.py" KUBEWATCH_NAMESPACE_ARG="" @@ -200,7 +226,7 @@ if [ -z "${AMBASSADOR_NO_KUBERNETES}" ]; then fi set -x - "$APPDIR/kubewatch" ${KUBEWATCH_NAMESPACE_ARG} --root "$CONFIG_DIR" --sync "$KUBEWATCH_SYNC_CMD" --warmup-delay 10s secrets services & + "$APPDIR/kubewatch" ${KUBEWATCH_NAMESPACE_ARG} --sync "$KUBEWATCH_SYNC_CMD" --warmup-delay 10s secrets services & set +x pids="${pids:+${pids} }$!:kubewatch" fi diff --git a/ambassador/kubewatch.py b/ambassador/kubewatch.py index 3319295e5f..bbf4ff2990 100644 --- a/ambassador/kubewatch.py +++ b/ambassador/kubewatch.py @@ -30,7 +30,7 @@ from kubernetes import watch from kubernetes.client.rest import ApiException from ambassador import Config, Scout -from ambassador.utils import kube_v1 +from ambassador.utils import kube_v1, KubeSecretReader from ambassador.ir import IR from ambassador.ir.irtlscontext import IRTLSContext from ambassador.envoy import V2Config @@ -39,6 +39,7 @@ __version__ = Version ambassador_id = os.getenv("AMBASSADOR_ID", "default") +secret_root = os.environ.get('AMBASSADOR_CONFIG_BASE_DIR', "/ambassador") logging.basicConfig( level=logging.INFO, # if appDebug else logging.INFO, @@ -221,7 +222,7 @@ def generate_config(self, changes, output): aconf = Config() aconf.load_from_directory(output) - ir = IR(aconf) + ir = IR(aconf, secret_reader=KubeSecretReader(secret_root)) envoy_config = V2Config(ir) bootstrap_config, ads_config = envoy_config.split_config() diff --git a/ambassador/post_update.py b/ambassador/post_update.py new file mode 100644 index 0000000000..03721fa337 --- /dev/null +++ b/ambassador/post_update.py @@ -0,0 +1,20 @@ +import sys + +import os +import urllib + +import requests + +base_url = os.environ.get('AMBASSADOR_EVENT_URL', 'http://localhost:8877/_internal/v0/update') + +if len(sys.argv) < 2: + sys.stderr.write("Usage: %s update-url\n" % os.path.basename(sys.argv[0])) + sys.exit(1) + +r = requests.post(base_url, params={ 'url': sys.argv[1] }) + +if r.status_code != 200: + sys.stderr.write("update to %s failed:\nstatus %d: %s" % (r.url, r.status_code, r.text)) + sys.exit(1) +else: + sys.exit(0) diff --git a/ambassador/tests/ambassador_test.py b/ambassador/tests/ambassador_test.py index bb9104c292..45ec71d9b0 100644 --- a/ambassador/tests/ambassador_test.py +++ b/ambassador/tests/ambassador_test.py @@ -666,7 +666,7 @@ def file_always_exists(filename): return True -def atest_secret_reader(context: 'IRTLSContext', secret_name: str, namespace: str, secret_root: str) -> SavedSecret: +def atest_secret_reader(context: 'IRTLSContext', secret_name: str, namespace: str) -> SavedSecret: # In the Real World, the secret reader should, y'know, read secrets.. # Here we're just gonna fake it. diff --git a/ambassador/tests/kat/abstract_tests.py b/ambassador/tests/kat/abstract_tests.py index 81cbfd3363..8cae35f7ef 100644 --- a/ambassador/tests/kat/abstract_tests.py +++ b/ambassador/tests/kat/abstract_tests.py @@ -187,7 +187,12 @@ def post_manifest(self): with open(fname, "wb") as fd: fd.write(result.stdout) content = result.stdout - secret = yaml.load(content) + try: + secret = yaml.load(content) + except Exception as e: + print("could not parse YAML:\n%s" % content) + raise e + data = secret['data'] # secret_dir = tempfile.mkdtemp(prefix=self.path.k8s, suffix="secret") secret_dir = "/tmp/%s-ambassadormixin-%s" % (self.path.k8s, 'secret') From 50f29671d7f7ceea74d288ebf4f7293fc47e08a0 Mon Sep 17 00:00:00 2001 From: Flynn Date: Sun, 27 Jan 2019 01:20:31 -0500 Subject: [PATCH 2/4] Don't allow Ambassador's check_ready to return 200 until it's gotten an update from kubewatch. --- ambassador/ambassador_diag/diagd.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ambassador/ambassador_diag/diagd.py b/ambassador/ambassador_diag/diagd.py index 3e167d887c..837cd92435 100644 --- a/ambassador/ambassador_diag/diagd.py +++ b/ambassador/ambassador_diag/diagd.py @@ -303,6 +303,9 @@ def check_alive(): @app.route('/ambassador/v0/check_ready', methods=[ 'GET' ]) def check_ready(): + if not app.ir: + return "ambassador waiting for config", 503 + status = envoy_status(app.estats) if status['ready']: From 0b424889758af0b80d01bc80fa72fc5acbd0504a Mon Sep 17 00:00:00 2001 From: Flynn Date: Sun, 27 Jan 2019 01:22:27 -0500 Subject: [PATCH 3/4] Ditch some dead code. --- ambassador/ambassador/utils.py | 58 ---------------------------------- ambassador/entrypoint.sh | 17 ---------- 2 files changed, 75 deletions(-) diff --git a/ambassador/ambassador/utils.py b/ambassador/ambassador/utils.py index 16259d5a7a..525f4df6b2 100644 --- a/ambassador/ambassador/utils.py +++ b/ambassador/ambassador/utils.py @@ -27,7 +27,6 @@ import yaml from kubernetes import client, config -from enum import Enum from .VERSION import Version @@ -74,29 +73,6 @@ def load_url_contents(logger: logging.Logger, url: str) -> Optional[str]: return None -class TLSPaths(Enum): - mount_cert_dir = "/etc/certs" - mount_tls_crt = os.path.join(mount_cert_dir, "tls.crt") - mount_tls_key = os.path.join(mount_cert_dir, "tls.key") - - client_mount_dir = "/etc/cacert" - client_mount_crt = os.path.join(client_mount_dir, "tls.crt") - - cert_dir = "/ambassador/certs" - tls_crt = os.path.join(cert_dir, "tls.crt") - tls_key = os.path.join(cert_dir, "tls.key") - - client_cert_dir = "/ambassador/cacert" - client_tls_crt = os.path.join(client_cert_dir, "tls.crt") - - @staticmethod - def generate(directory): - return { - 'crt': os.path.join(directory, 'tls.crt'), - 'key': os.path.join(directory, 'tls.key') - } - - class SystemInfo: MyHostName = 'localhost' MyResolvedName = '127.0.0.1' @@ -155,24 +131,6 @@ def fromError(self, error, **kwargs): def OK(self, **kwargs): return RichStatus(True, **kwargs) -class SourcedDict (dict): - def __init__(self, _source="--internal--", _from=None, **kwargs): - super().__init__(self, **kwargs) - - if _from and ('_source' in _from): - self['_source'] = _from['_source'] - else: - self['_source'] = _source - - # self['_referenced_by'] = [] - - def referenced_by(self, source): - refby = self.setdefault('_referenced_by', []) - - if source not in refby: - refby.append(source) - - class DelayTrigger (threading.Thread): def __init__(self, onfired, timeout=5, name=None): super().__init__() @@ -458,19 +416,3 @@ def kube_v1(): pass return k8s_api - - -def check_cert_file(path): - readable = False - - try: - data = open(path, "r").read() - - if data and (len(data) > 0): - readable = True - except OSError: - pass - except IOError: - pass - - return readable diff --git a/ambassador/entrypoint.sh b/ambassador/entrypoint.sh index ecc57b3428..e0d08b709e 100644 --- a/ambassador/entrypoint.sh +++ b/ambassador/entrypoint.sh @@ -140,23 +140,6 @@ handle_int() { echo "Exiting due to Control-C" } -wait_for_ready() { - host=$1 - is_ready=1 - sleep_for_seconds=4 - while true; do - sleep ${sleep_for_seconds} - if getent hosts ${host}; then - echo "$host exists" - is_ready=0 - break - else - echo "$host is not reachable, trying again in ${sleep_for_seconds} seconds ..." - fi - done - return ${is_ready} -} - # set -o monitor trap "handle_chld" CHLD trap "handle_int" INT From 12f4f027840ed26fda82cb51cd05069b778528bd Mon Sep 17 00:00:00 2001 From: Flynn Date: Sun, 27 Jan 2019 02:03:51 -0500 Subject: [PATCH 4/4] Don't endlessly reduplicate notices in the diag service. --- ambassador/ambassador_diag/diagd.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/ambassador/ambassador_diag/diagd.py b/ambassador/ambassador_diag/diagd.py index 837cd92435..207e281525 100644 --- a/ambassador/ambassador_diag/diagd.py +++ b/ambassador/ambassador_diag/diagd.py @@ -172,12 +172,17 @@ def reset(self): local_notices.append({ 'level': 'ERROR', 'message': 'bad local notices: %s' % local_data }) self.notices = local_notices + # app.logger.info("Notices: after RESET: %s" % json.dumps(self.notices)) def post(self, notice): + # app.logger.debug("Notices: POST %s" % notice) self.notices.append(notice) + # app.logger.info("Notices: after POST: %s" % json.dumps(self.notices)) def prepend(self, notice): + # app.logger.debug("Notices: PREPEND %s" % notice) self.notices.insert(0, notice) + # app.logger.info("Notices: after PREPEND: %s" % json.dumps(self.notices)) def extend(self, notices): for notice in notices: @@ -188,6 +193,8 @@ def check_scout(app, what: str, ir: Optional[IR]=None) -> None: uptime = datetime.datetime.now() - boot_time hr_uptime = td_format(uptime) + app.notices.reset() + app.scout = Scout() app.scout_args = { "uptime": int(uptime.total_seconds()), @@ -198,10 +205,12 @@ def check_scout(app, what: str, ir: Optional[IR]=None) -> None: app.scout_args["features"] = ir.features() app.scout_result = app.scout.report(mode="diagd", action=what, **app.scout_args) - app.notices.extend(app.scout_result.pop('notices', [])) + scout_notices = app.scout_result.pop('notices', []) + app.notices.extend(scout_notices) app.logger.info("Scout reports %s" % json.dumps(app.scout_result)) - app.logger.info("Scout notices: %s" % json.dumps(app.notices.notices)) + app.logger.info("Scout notices: %s" % json.dumps(scout_notices)) + app.logger.info("App notices after scout: %s" % json.dumps(app.notices.notices)) def td_format(td_object): @@ -393,7 +402,6 @@ def collect_errors_and_notices(request, reqid, what: str, diag: Diagnostics) -> for notice_key, notice_list in dnotices.items(): for notice in notice_list: - app.logger.debug("POSTING NOTICE %s %s" % (notice_key, notice)) app.notices.post({'level': 'NOTICE', 'message': "%s: %s" % (notice_key, notice)}) ddict['errors'] = errors @@ -482,6 +490,8 @@ def create_diag_app(config_dir_path, bootstrap_path, ads_path, ambex_pid, app.verbose = verbose app.k8s = k8s app.notice_path = notices + app.notices = Notices(app.notice_path) + app.notices.reset() # This will raise an exception and crash if you pass it a string. That's intentional. app.ambex_pid = int(ambex_pid) @@ -558,9 +568,6 @@ def load_config(self, url): # Yeah yeah yeah. It's not really a directory. Whatever. aconf.load_from_directory(aconf_path, k8s=app.k8s, recurse=True) - app.notices = Notices(app.notice_path) - app.notices.reset() - ir = IR(aconf, secret_reader=scc.url_reader) open(ir_path, "w").write(ir.as_json())