Skip to content

Commit

Permalink
Refactor variable substitution (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
sujuka99 authored Jun 13, 2023
1 parent 4bbf6fd commit d8918a4
Show file tree
Hide file tree
Showing 30 changed files with 607 additions and 365 deletions.
1 change: 1 addition & 0 deletions examples/bakdata/atm-fraud-detection/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ kafka-app:
streams:
brokers: ${broker}
schemaRegistryUrl: ${schema_registry_url}
optimizeLeaveGroupBehavior: false

producer:
to:
Expand Down
9 changes: 3 additions & 6 deletions kpops/component_handlers/helm_wrapper/helm.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ def uninstall(
release_name: str,
dry_run: bool,
) -> str | None:
"""
Prepares and executes the helm uninstall command
"""
"""Prepares and executes the helm uninstall command"""
command = [
"helm",
"uninstall",
Expand All @@ -144,8 +142,7 @@ def template(
values: dict,
flags: HelmTemplateFlags = HelmTemplateFlags(),
) -> str:
"""
From HELM: Render chart templates locally and display the output.
"""From HELM: Render chart templates locally and display the output.
Any values that would normally be looked up or retrieved in-cluster will
be faked locally. Additionally, none of the server-side testing of chart
Expand All @@ -158,7 +155,7 @@ def template(
:param namespace: The Kubernetes namespace the command should execute in
:type namespace: str
:param values: `values.yaml` to be used
:type values: dict[str, str]
:type values: dict
:param flags: the flags to be set for `helm template`, defaults to HelmTemplateFlags()
:type flags: HelmTemplateFlags, optional
:return: the output of `helm template`
Expand Down
43 changes: 2 additions & 41 deletions kpops/components/base_components/base_defaults_component.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import inspect
import logging
from collections import deque
from collections.abc import Mapping, Sequence
from collections.abc import Sequence
from pathlib import Path
from typing import TypeVar

Expand All @@ -10,6 +10,7 @@

from kpops.cli.pipeline_config import PipelineConfig
from kpops.component_handlers import ComponentHandlers
from kpops.utils.dict_ops import update_nested
from kpops.utils.docstring import describe_attr
from kpops.utils.environment import ENV
from kpops.utils.pydantic import DescConfig
Expand Down Expand Up @@ -208,43 +209,3 @@ def deduplicate(seq: Sequence[T]) -> list[T]:
:rtype: list[T]
"""
return list(dict.fromkeys(seq))


def update_nested_pair(original_dict: dict, other_dict: Mapping) -> dict:
"""Nested update for 2 dictionaries
:param original_dict: Dictionary to be updated
:type original_dict: dict
:param other_dict: Mapping that contains new or updated key-value pairs
:type other_dict: Mapping
:return: Updated dictionary
:rtype: dict
"""
for key, value in other_dict.items():
if isinstance(value, Mapping):
nested_val = original_dict.get(key, {})
if nested_val is not None:
original_dict[key] = update_nested_pair(nested_val, value)
else:
if key not in original_dict:
original_dict[key] = value
return original_dict


def update_nested(*argv: dict) -> dict:
"""Merge multiple configuration dicts.
The dicts have multiple layers. These layers will be merged recursively.
:param argv: n dictionaries
:type argv: dict
:returns: Merged configuration dict
:rtype: dict
"""
if len(argv) == 0:
return {}
if len(argv) == 1:
return argv[0]
if len(argv) == 2:
return update_nested_pair(argv[0], argv[1])
return update_nested(update_nested_pair(argv[0], argv[1]), *argv[2:])
12 changes: 0 additions & 12 deletions kpops/components/base_components/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
)
from kpops.utils.docstring import describe_attr, describe_object
from kpops.utils.pydantic import CamelCaseConfig, DescConfig
from kpops.utils.yaml_loading import substitute

log = logging.getLogger("KafkaApp")

Expand Down Expand Up @@ -100,17 +99,6 @@ class KafkaApp(KubernetesApp):
description=describe_attr("version", __doc__),
)

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.app.streams.brokers = substitute(
self.app.streams.brokers, {"broker": self.config.broker}
)
if self.app.streams.schema_registry_url:
self.app.streams.schema_registry_url = substitute(
self.app.streams.schema_registry_url,
{"schema_registry_url": self.config.schema_registry_url},
)

@property
def clean_up_helm_chart(self) -> str:
"""Helm chart used to destroy and clean this component"""
Expand Down
10 changes: 0 additions & 10 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import json
import logging
from abc import ABC
from functools import cached_property
Expand Down Expand Up @@ -92,7 +91,6 @@ class Config(CamelCaseConfig):

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.prepare_connector_config()

@cached_property
def helm(self) -> Helm:
Expand Down Expand Up @@ -124,14 +122,6 @@ def kafka_connect_resetter_chart(self) -> str:
"""Resetter chart for this component"""
return f"{self.repo_config.repository_name}/kafka-connect-resetter"

def prepare_connector_config(self) -> None:
"""Substitute component related variables in config"""
substituted_config = self.substitute_component_variables(
json.dumps(self.app.dict())
)
out: dict = json.loads(substituted_config)
self.app = KafkaConnectConfig(**out)

@override
def deploy(self, dry_run: bool) -> None:
if self.to:
Expand Down
33 changes: 19 additions & 14 deletions kpops/components/base_components/kubernetes_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,21 @@ class Config(CamelCaseConfig, DescConfig):
extra = Extra.allow


# TODO: label and annotations
class KubernetesApp(PipelineComponent):
"""Base class for all Kubernetes apps.
All built-in components are Kubernetes apps, except for the Kafka connectors.
:param type: Component type, defaults to "kubernetes-app"
:type type: str, optional
:param schema_type: Used for schema generation, same as :param:`type`,
defaults to "kubernetes-app"
:type schema_type: Literal["kubernetes-app"], optional
:param validate_name: Whether to check if the name of the component is
compatible with Kubernetes, defaults to True
:param app: Application-specific settings
:type app: KubernetesAppConfig
:param repo_config: Configuration of the Helm chart repo to be used for
deploying the component, defaults to None
:type repo_config: HelmRepoConfig, optional
:param namespace: Namespace in which the component shall be deployed
:type namespace: str
:param version: Helm chart version, defaults to None
:type version: str, optional
"""

type: str = Field(
Expand All @@ -67,6 +62,12 @@ class KubernetesApp(PipelineComponent):
description=describe_object(__doc__),
exclude=True,
)
validate_name: bool = Field(
default=True,
description=describe_attr("validate_name", __doc__),
exclude=True,
hidden_from_schema=True,
)
app: KubernetesAppConfig = Field(
default=...,
description=describe_attr("app", __doc__),
Expand All @@ -88,8 +89,9 @@ class Config(CamelCaseConfig, DescConfig):
pass

def __init__(self, **kwargs):
if kwargs.get("validate_name", True):
self.validate_kubernetes_name(kwargs["name"])
super().__init__(**kwargs)
self.__check_compatible_name()

@cached_property
def helm(self) -> Helm:
Expand Down Expand Up @@ -192,12 +194,15 @@ def get_helm_chart(self) -> str:
f"Please implement the get_helm_chart() method of the {self.__module__} module."
)

def __check_compatible_name(self) -> None:
"""Check if the component's name `self.name` is valid for Kubernetes"""
if not bool(KUBERNETES_NAME_CHECK_PATTERN.match(self.name)): # TODO: SMARTER
raise ValueError(
f"The component name {self.name} is invalid for Kubernetes."
)
@staticmethod
def validate_kubernetes_name(name: str) -> None:
"""Check if a name is valid for a Kubernetes resource
:param name: Name that is to be used for the resource
:raises ValueError: The component name {name} is invalid for Kubernetes.
"""
if not bool(KUBERNETES_NAME_CHECK_PATTERN.match(name)):
raise ValueError(f"The component name {name} is invalid for Kubernetes.")

@override
def dict(self, *, exclude=None, **kwargs) -> dict[str, Any]:
Expand Down
65 changes: 1 addition & 64 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
ToSection,
)
from kpops.utils.docstring import describe_attr, describe_object
from kpops.utils.environment import ENV
from kpops.utils.pydantic import CamelCaseConfig, DescConfig
from kpops.utils.yaml_loading import substitute


class PipelineComponent(BaseDefaultsComponent):
Expand Down Expand Up @@ -80,59 +78,9 @@ class Config(CamelCaseConfig, DescConfig):

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.substitute_output_topic_names()
self.substitute_name()
self.substitute_prefix()
self.set_input_topics()
self.set_output_topics()

def substitute_output_topic_names(self) -> None:
"""Substitute component and topic sepcific names in output topics"""
if self.to:
updated_to = {}
for name, topic in self.to.topics.items():
name = self.substitute_component_variables(name)
updated_to[name] = topic
self.to.topics = updated_to

@staticmethod
def substitute_component_names(key: str, _type: str, **kwargs) -> str:
"""Substitute component field name, e.g., `error_topic_name`
:param key: The raw input containing $-placeholders
:type key: str
:param _type: The key-value mapping containing substitutions
:type _type: str
:param **kwargs: Additional key-value mappings that contain substitutions
:return: Substituted input string
:rtype: str
"""
return substitute(key, {"component_type": _type, **kwargs})

def substitute_component_variables(self, topic_name: str) -> str:
"""Substitute component, env and topic-specific variables in topic's name
:param topic_name: topic name
:return: final topic name
"""
error_topic_name = self.substitute_component_names(
self.config.topic_name_config.default_error_topic_name,
self.type,
**ENV,
)
output_topic_name = self.substitute_component_names(
self.config.topic_name_config.default_output_topic_name,
self.type,
**ENV,
)
return self.substitute_component_names(
topic_name,
self.type,
component_name=self.name,
error_topic_name=error_topic_name,
output_topic_name=output_topic_name,
)

def add_input_topics(self, topics: list[str]) -> None:
"""Add given topics to the list of input topics.
Expand Down Expand Up @@ -259,15 +207,8 @@ def weave_from_topics(
for input_topic in input_topics:
self.apply_from_inputs(input_topic, from_topic)

def substitute_name(self) -> None:
"""Substitute $ placeholders in `self.name` with `self.type`"""
if self.name:
self.name = self.substitute_component_names(self.name, self.type)
else:
raise ValueError("Every component must have a name in the end.")

def inflate(self) -> list[PipelineComponent]:
"""Inflate a component.
"""Inflate a component
This is helpful if one component should result in multiple components.
To support this, override this method and return a list of components
Expand Down Expand Up @@ -325,7 +266,3 @@ def clean(self, dry_run: bool) -> None:
:param dry_run: Whether to do a dry run of the command
:type dry_run: bool
"""

def substitute_prefix(self) -> None:
"""Substitute $-placeholders in self.prefix with environment variables"""
self.prefix = substitute(self.prefix, ENV)
13 changes: 0 additions & 13 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class Config(DescConfig):

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.__substitute_autoscaling_topic_names()

@override
def add_input_topics(self, topics: list[str]) -> None:
Expand Down Expand Up @@ -106,15 +105,3 @@ def __run_streams_clean_up_job(self, dry_run: bool, delete_output: bool) -> None
dry_run=dry_run,
retain_clean_jobs=self.config.retain_clean_jobs,
)

def __substitute_autoscaling_topic_names(self) -> None:
"""Substitute autoscaling topics' names"""
if not self.app.autoscaling:
return
self.app.autoscaling.topics = [
self.substitute_component_variables(topic)
for topic in self.app.autoscaling.topics
]
self.app.autoscaling.consumer_group = self.substitute_component_variables(
self.app.autoscaling.consumer_group
)
Loading

0 comments on commit d8918a4

Please sign in to comment.