Skip to content

Commit

Permalink
implement operator and topic create&delete via api
Browse files Browse the repository at this point in the history
  • Loading branch information
mabulgu committed Nov 4, 2023
1 parent 2a4e967 commit 611369f
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 71 deletions.
48 changes: 18 additions & 30 deletions kfk/commands/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,24 @@
@kfk.command()
def operator(is_install, is_uninstall, namespace):
"""Installs/Uninstalls Strimzi Kafka Operator."""
if is_install:
for directory_name, dirs, files in os.walk(
"{strimzi_path}/install/cluster-operator/".format(strimzi_path=STRIMZI_PATH)
):
for file_name in files:
file_path = os.path.join(directory_name, file_name)

if SpecialTexts.OPERATOR_ROLE_BINDING in file_name:
with open(file_path) as file:
stream = file.read().replace(
SpecialTexts.OPERATOR_MY_PROJECT, namespace
)
temp_file = create_temp_file(stream)
file_path = temp_file.name
create_using_yaml(file_path, namespace)
elif is_uninstall:
# TODO: refactor here
for directory_name, dirs, files in os.walk(
"{strimzi_path}/install/cluster-operator/".format(strimzi_path=STRIMZI_PATH)
):
for file_name in files:
file_path = os.path.join(directory_name, file_name)
for directory_name, dirs, files in os.walk(
"{strimzi_path}/install/cluster-operator/".format(strimzi_path=STRIMZI_PATH)
):
for file_name in files:
file_path = os.path.join(directory_name, file_name)

if SpecialTexts.OPERATOR_ROLE_BINDING in file_name:
with open(file_path) as file:
stream = file.read().replace(
SpecialTexts.OPERATOR_MY_PROJECT, namespace
)
temp_file = create_temp_file(stream)
file_path = temp_file.name
if SpecialTexts.OPERATOR_ROLE_BINDING in file_name:
with open(file_path) as file:
stream = file.read().replace(
SpecialTexts.OPERATOR_MY_PROJECT, namespace
)
temp_file = create_temp_file(stream)
file_path = temp_file.name
if is_install:
create_using_yaml(file_path, namespace)
elif is_uninstall:
delete_using_yaml(file_path, namespace)
else:
print_missing_options_for_command("operator")
else:
print_missing_options_for_command("operator")
break
23 changes: 19 additions & 4 deletions kfk/commands/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from kfk.config import STRIMZI_PATH, STRIMZI_VERSION
from kfk.constants import KAFKA_PORT
from kfk.kubectl_command_builder import Kubectl
from kfk.kubernetes_commons import create_using_yaml, delete_object
from kfk.kubernetes_commons import create_using_yaml, delete_using_yaml
from kfk.option_extensions import NotRequiredIf, RequiredIf


Expand Down Expand Up @@ -113,7 +113,7 @@ def topics(
elif is_describe:
describe(topic, output, native, command_config, cluster, namespace)
elif is_delete:
delete(topic, namespace)
delete(topic, cluster, namespace)
elif is_alter:
alter(
topic,
Expand Down Expand Up @@ -205,8 +205,23 @@ def describe(topic, output, native, command_config, cluster, namespace):
)


def delete(topic, namespace):
delete_object(topic, "topic", namespace)
def delete(topic, cluster, namespace):
with open(
"{strimzi_path}/examples/topic/kafka-topic.yaml".format(
strimzi_path=STRIMZI_PATH
).format(version=STRIMZI_VERSION)
) as file:
topic_dict = yaml.full_load(file)

topic_dict["metadata"]["name"] = topic
topic_dict["metadata"]["labels"]["strimzi.io/cluster"] = cluster

topic_yaml = yaml.dump(topic_dict)
topic_temp_file = create_temp_file(topic_yaml)

delete_using_yaml(topic_temp_file.name, namespace)

topic_temp_file.close()


def alter(
Expand Down
77 changes: 52 additions & 25 deletions kfk/kubernetes_commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@
k8s_client = client.ApiClient()


def delete_object(name, kind, namespace):
k8s_api = client.CoreV1Api(k8s_client)
_delete_object(k8s_api, name, kind, namespace=namespace)
def yaml_object_argument_filter(func):
def inner(k8s_api, yml_object, kind, **kwargs):
if kind != "custom_object":
kwargs.pop("version")
kwargs.pop("group")
kwargs.pop("plural")
return func(k8s_api, yml_object, kind, **kwargs)

return inner


def create_using_yaml(file_path, namespace):
Expand Down Expand Up @@ -45,7 +51,7 @@ def _operate_using_yaml(
namespace="default",
**kwargs,
):
def operate_with(objects):
def _operate_with(objects):
failures = []
k8s_objects = []
for yml_object in objects:
Expand All @@ -69,11 +75,11 @@ def operate_with(objects):

if yaml_objects:
yml_object_all = yaml_objects
return operate_with(yml_object_all)
return _operate_with(yml_object_all)
elif yaml_file:
with open(path.abspath(yaml_file)) as f:
yml_object_all = yaml.safe_load_all(f)
return operate_with(yml_object_all)
return _operate_with(yml_object_all)
else:
raise ValueError(
"One of `yaml_file` or `yaml_objects` arguments must be provided"
Expand Down Expand Up @@ -121,68 +127,89 @@ def _operate_using_dict(
def _operate_using_dict_single_object(
k8s_client, yml_object, operation, verbose=False, namespace="default", **kwargs
):
object_type = ""
# get group and version from apiVersion
group, _, version = yml_object["apiVersion"].partition("/")
if version == "":
version = group
group = "core"
# Take care for the case e.g. api_type is "apiextensions.k8s.io"
group = "".join(group.rsplit(".k8s.io", 1))
group_prefix = "".join(group.rsplit(".k8s.io", 1))
# convert group name from DNS subdomain format to
# python class name convention
group = "".join(word.capitalize() for word in group.split("."))
func = "{0}{1}Api".format(group, version.capitalize())
k8s_api = getattr(client, func)(k8s_client)
group_prefix = "".join(word.capitalize() for word in group_prefix.split("."))
func = "{0}{1}Api".format(group_prefix, version.capitalize())

try:
k8s_api = getattr(client, func)(k8s_client)
except AttributeError:
func = "CustomObjectsApi"
k8s_api = getattr(client, func)(k8s_client)
object_type = "custom_object"

kind = yml_object["kind"]
kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind)
kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower()
kind_snake_case = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind)
object_type_from_kind = re.sub(
"([a-z0-9])([A-Z])", r"\1_\2", kind_snake_case
).lower()
name = yml_object["metadata"]["name"]

if not object_type:
object_type = object_type_from_kind

getattr(sys.modules[__name__], f"_{operation}_using_yaml_object")(
k8s_api, yml_object, kind, version.capitalize(), namespace=namespace
k8s_api,
yml_object,
object_type,
version=version,
group=group,
plural=kind.lower() + "s",
namespace=namespace,
)
if verbose:
msg = f"{kind} `{name}` {operation}d."
print(msg)


def _create_using_yaml_object(k8s_api, yml_object, kind, version, **kwargs):
if hasattr(k8s_api, f"create_namespaced_{kind}"):
@yaml_object_argument_filter
def _create_using_yaml_object(k8s_api, yml_object, object_type, **kwargs):
if hasattr(k8s_api, f"create_namespaced_{object_type}"):
if "namespace" in yml_object["metadata"]:
namespace = yml_object["metadata"]["namespace"]
kwargs["namespace"] = namespace
resp = getattr(k8s_api, "create_namespaced_{0}".format(kind))(
resp = getattr(k8s_api, f"create_namespaced_{object_type}")(
body=yml_object, **kwargs
)
else:
kwargs.pop("namespace", None)
resp = getattr(k8s_api, "create_{0}".format(kind))(body=yml_object, **kwargs)
resp = getattr(k8s_api, f"create_{object_type}")(body=yml_object, **kwargs)
return resp


def _delete_using_yaml_object(k8s_api, yml_object, kind, version, **kwargs):
@yaml_object_argument_filter
def _delete_using_yaml_object(k8s_api, yml_object, object_type, **kwargs):
if "namespace" in yml_object["metadata"]:
namespace = yml_object["metadata"]["namespace"]
kwargs["namespace"] = namespace
name = yml_object["metadata"]["name"]
delete_object(k8s_api, name, kind, version, **kwargs)
_delete_object(k8s_api, name, object_type, **kwargs)


def _delete_object(k8s_api, name, kind, version="V1", **kwargs):
def _delete_object(k8s_api, name, object_type, delete_options_version="V1", **kwargs):
try:
if hasattr(k8s_api, "delete_namespaced_{0}".format(kind)):
resp = getattr(k8s_api, "delete_namespaced_{}".format(kind))(
if hasattr(k8s_api, f"delete_namespaced_{object_type}"):
resp = getattr(k8s_api, f"delete_namespaced_{object_type}")(
name=name,
body=getattr(client, f"{version}DeleteOptions")(
body=getattr(client, f"{delete_options_version}DeleteOptions")(
propagation_policy="Background", grace_period_seconds=5
),
**kwargs,
)
else:
kwargs.pop("namespace", None)
resp = getattr(k8s_api, "delete_{}".format(kind))(
resp = getattr(k8s_api, f"delete_{object_type}")(
name=name,
body=getattr(client, f"{version}DeleteOptions")(
body=getattr(client, f"{delete_options_version}DeleteOptions")(
propagation_policy="Background", grace_period_seconds=5
),
**kwargs,
Expand Down
14 changes: 11 additions & 3 deletions tests/test_operator_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ def setUp(self):
self.namespace = "kafka"
self.installation_file_count = 28

@mock.patch("kfk.commands.operator.os.system")
def test_install_strimzi(self, mock_os_system):
@mock.patch("kfk.commands.operator.create_using_yaml")
def test_install_strimzi(self, mock_create_using_yaml):
result = self.runner.invoke(
kfk, ["operator", "--install", "-n", self.namespace]
)
assert result.exit_code == 0
assert mock_os_system.call_count == self.installation_file_count
assert mock_create_using_yaml.call_count == self.installation_file_count

@mock.patch("kfk.commands.operator.delete_using_yaml")
def test_uninstall_strimzi(self, mock_delete_using_yaml):
result = self.runner.invoke(
kfk, ["operator", "--uninstall", "-n", self.namespace]
)
assert result.exit_code == 0
assert mock_delete_using_yaml.call_count == self.installation_file_count
18 changes: 9 additions & 9 deletions tests/test_topics_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ def test_describe_topic_native_with_command_config(
)

@mock.patch("kfk.commands.topics.create_temp_file")
@mock.patch("kfk.commands.topics.os")
def test_create_topic(self, mock_os, mock_create_temp_file):
@mock.patch("kfk.commands.topics.create_using_yaml")
def test_create_topic(self, mock_create_using_yaml, mock_create_temp_file):
result = self.runner.invoke(
kfk,
[
Expand All @@ -185,8 +185,10 @@ def test_create_topic(self, mock_os, mock_create_temp_file):
assert expected_topic_yaml == result_topic_yaml

@mock.patch("kfk.commands.topics.create_temp_file")
@mock.patch("kfk.commands.topics.os")
def test_create_topic_with_config(self, mock_os, mock_create_temp_file):
@mock.patch("kfk.commands.topics.create_using_yaml")
def test_create_topic_with_config(
self, mock_create_using_yaml, mock_create_temp_file
):
result = self.runner.invoke(
kfk,
[
Expand Down Expand Up @@ -230,8 +232,8 @@ def test_create_topic_without_required_params(self):
)
assert result.exit_code == 2

@mock.patch("kfk.commands.topics.os")
def test_delete_topic(self, mock_os):
@mock.patch("kfk.commands.topics.delete_using_yaml")
def test_delete_topic(self, mock_delete_using_yaml):
result = self.runner.invoke(
kfk,
[
Expand All @@ -248,9 +250,7 @@ def test_delete_topic(self, mock_os):

assert result.exit_code == 0

mock_os.system.assert_called_with(
Kubectl().delete().kafkatopics(self.topic).namespace(self.namespace).build()
)
mock_delete_using_yaml.assert_called_once()

@mock.patch("kfk.commands.topics.create_temp_file")
@mock.patch("kfk.commons.get_resource_yaml")
Expand Down

0 comments on commit 611369f

Please sign in to comment.