From 4d29af161b3d0e8c531c5829da98ef3ee4f03eb1 Mon Sep 17 00:00:00 2001 From: Sumant Date: Fri, 31 Jul 2020 19:09:24 -0400 Subject: [PATCH] Leader Election issue #434 changed file naming style consistent with the existing go client code Update example.py Changed file and folder names Rename LeaderElection.py to leaderelection.py Rename threadingWithException.py to threadingwithexception.py Rename ConfigMapLock.py to configmaplock.py LeaderElection to leaderelection Added boiler plate headers, updated variable and function names consistent with the guidelines, removed the ctypes dependency by using traces to kill threads, changed logic for leader now it gives up and doesn't re-join as a follower if it fails to update lease added correct boiler plate year Rename threadingWithTrace.py to threadingwithtrace.py Update leaderelection.py Update example.py Changes based on review - logging, OnStoppedLeading is not killed abruptly, OnStartedLeading is not run in a separate thread, adding README Update example.py updated comments set threads as daemon Update README.md Code made consistent with other clients. Update example.py Update leaderelection.py Error & exception handling for the annotation, reduced indentation Adding serializing functions for serializing & de-serializing locks, leader_election_record as a class Adding a test Adding boilerplate header Rename leaderelectiontest.py to leaderelection_test.py Updated boiler plates handling imports for pytest handling 'HTTP not found' compatibility with python 2 & 3, & handling relative imports Update leaderelection.py to check tests for tox assertEquals -> assertEqual Update leaderelection_test.py making Threading compatible for Python 2 changing datetime.timestamp for backward compatibility with Python 2.7 Adding comments for test_Leader_election_with_renew_deadline & making candidates run in parallel for test_leader_election remove redundant daemon = True reassignment common thread lock for MockResourceLock --- leaderelection/README.md | 18 ++ leaderelection/__init__.py | 13 + leaderelection/electionconfig.py | 59 ++++ leaderelection/example.py | 54 ++++ leaderelection/leaderelection.py | 191 +++++++++++++ leaderelection/leaderelection_test.py | 270 +++++++++++++++++++ leaderelection/leaderelectionrecord.py | 22 ++ leaderelection/resourcelock/__init__.py | 13 + leaderelection/resourcelock/configmaplock.py | 129 +++++++++ 9 files changed, 769 insertions(+) create mode 100644 leaderelection/README.md create mode 100644 leaderelection/__init__.py create mode 100644 leaderelection/electionconfig.py create mode 100644 leaderelection/example.py create mode 100644 leaderelection/leaderelection.py create mode 100644 leaderelection/leaderelection_test.py create mode 100644 leaderelection/leaderelectionrecord.py create mode 100644 leaderelection/resourcelock/__init__.py create mode 100644 leaderelection/resourcelock/configmaplock.py diff --git a/leaderelection/README.md b/leaderelection/README.md new file mode 100644 index 00000000..41ed1c48 --- /dev/null +++ b/leaderelection/README.md @@ -0,0 +1,18 @@ +## Leader Election Example +This example demonstrates how to use the leader election library. + +## Running +Run the following command in multiple separate terminals preferably an odd number. +Each running process uses a unique identifier displayed when it starts to run. + +- When a program runs, if a lock object already exists with the specified name, +all candidates will start as followers. +- If a lock object does not exist with the specified name then whichever candidate +creates a lock object first will become the leader and the rest will be followers. +- The user will be prompted about the status of the candidates and transitions. + +### Command to run +```python example.py``` + +Now kill the existing leader. You will see from the terminal outputs that one of the + remaining running processes will be elected as the new leader. diff --git a/leaderelection/__init__.py b/leaderelection/__init__.py new file mode 100644 index 00000000..37da225c --- /dev/null +++ b/leaderelection/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/leaderelection/electionconfig.py b/leaderelection/electionconfig.py new file mode 100644 index 00000000..7b0db639 --- /dev/null +++ b/leaderelection/electionconfig.py @@ -0,0 +1,59 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import logging +logging.basicConfig(level=logging.INFO) + + +class Config: + # Validate config, exit if an error is detected + def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading): + self.jitter_factor = 1.2 + + if lock is None: + sys.exit("lock cannot be None") + self.lock = lock + + if lease_duration <= renew_deadline: + sys.exit("lease_duration must be greater than renew_deadline") + + if renew_deadline <= self.jitter_factor * retry_period: + sys.exit("renewDeadline must be greater than retry_period*jitter_factor") + + if lease_duration < 1: + sys.exit("lease_duration must be greater than one") + + if renew_deadline < 1: + sys.exit("renew_deadline must be greater than one") + + if retry_period < 1: + sys.exit("retry_period must be greater than one") + + self.lease_duration = lease_duration + self.renew_deadline = renew_deadline + self.retry_period = retry_period + + if onstarted_leading is None: + sys.exit("callback onstarted_leading cannot be None") + self.onstarted_leading = onstarted_leading + + if onstopped_leading is None: + self.onstopped_leading = self.on_stoppedleading_callback + else: + self.onstopped_leading = onstopped_leading + + # Default callback for when the current candidate if a leader, stops leading + def on_stoppedleading_callback(self): + logging.info("stopped leading".format(self.lock.identity)) diff --git a/leaderelection/example.py b/leaderelection/example.py new file mode 100644 index 00000000..b8d8e616 --- /dev/null +++ b/leaderelection/example.py @@ -0,0 +1,54 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid +from kubernetes import client, config +from leaderelection import leaderelection +from leaderelection.resourcelock.configmaplock import ConfigMapLock +from leaderelection import electionconfig + + +# Authenticate using config file +config.load_kube_config(config_file=r"") + +# Parameters required from the user + +# A unique identifier for this candidate +candidate_id = uuid.uuid4() + +# Name of the lock object to be created +lock_name = "examplepython" + +# Kubernetes namespace +lock_namespace = "default" + + +# The function that a user wants to run once a candidate is elected as a leader +def example_func(): + print("I am leader") + + +# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading() +# In that case, a default callback function will be used + +# Create config +config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17, + renew_deadline=15, retry_period=5, onstarted_leading=example_func, + onstopped_leading=None) + +# Enter leader election +leaderelection.LeaderElection(config).run() + +# User can choose to do another round of election or simply exit +print("Exited leader election") diff --git a/leaderelection/leaderelection.py b/leaderelection/leaderelection.py new file mode 100644 index 00000000..a707fbac --- /dev/null +++ b/leaderelection/leaderelection.py @@ -0,0 +1,191 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import sys +import time +import json +import threading +from .leaderelectionrecord import LeaderElectionRecord +import logging +# if condition to be removed when support for python2 will be removed +if sys.version_info > (3, 0): + from http import HTTPStatus +else: + import httplib +logging.basicConfig(level=logging.INFO) + +""" +This package implements leader election using an annotation in a Kubernetes object. +The onstarted_leading function is run in a thread and when it returns, if it does +it might not be safe to run it again in a process. + +At first all candidates are considered followers. The one to create a lock or update +an existing lock first becomes the leader and remains so until it keeps renewing its +lease. +""" + + +class LeaderElection: + def __init__(self, election_config): + if election_config is None: + sys.exit("argument config not passed") + + # Latest record observed in the created lock object + self.observed_record = None + + # The configuration set for this candidate + self.election_config = election_config + + # Latest update time of the lock + self.observed_time_milliseconds = 0 + + # Point of entry to Leader election + def run(self): + # Try to create/ acquire a lock + if self.acquire(): + logging.info("{} successfully acquired lease".format(self.election_config.lock.identity)) + + # Start leading and call OnStartedLeading() + threading.daemon = True + threading.Thread(target=self.election_config.onstarted_leading).start() + + self.renew_loop() + + # Failed to update lease, run OnStoppedLeading callback + self.election_config.onstopped_leading() + + def acquire(self): + # Follower + logging.info("{} is a follower".format(self.election_config.lock.identity)) + retry_period = self.election_config.retry_period + + while True: + succeeded = self.try_acquire_or_renew() + + if succeeded: + return True + + time.sleep(retry_period) + + def renew_loop(self): + # Leader + logging.info("Leader has entered renew loop and will try to update lease continuously") + + retry_period = self.election_config.retry_period + renew_deadline = self.election_config.renew_deadline * 1000 + + while True: + timeout = int(time.time() * 1000) + renew_deadline + succeeded = False + + while int(time.time() * 1000) < timeout: + succeeded = self.try_acquire_or_renew() + + if succeeded: + break + time.sleep(retry_period) + + if succeeded: + time.sleep(retry_period) + continue + + # failed to renew, return + return + + def try_acquire_or_renew(self): + now_timestamp = time.time() + now = datetime.datetime.fromtimestamp(now_timestamp) + + # Check if lock is created + lock_status, old_election_record = self.election_config.lock.get(self.election_config.lock.name, + self.election_config.lock.namespace) + + # create a default Election record for this candidate + leader_election_record = LeaderElectionRecord(self.election_config.lock.identity, + str(self.election_config.lease_duration), str(now), str(now)) + + # A lock is not created with that name, try to create one + if not lock_status: + # To be removed when support for python2 will be removed + if sys.version_info > (3, 0): + if json.loads(old_election_record.body)['code'] != HTTPStatus.NOT_FOUND: + logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name, + old_election_record.reason)) + return False + else: + if json.loads(old_election_record.body)['code'] != httplib.NOT_FOUND: + logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name, + old_election_record.reason)) + return False + + logging.info("{} is trying to create a lock".format(leader_election_record.holder_identity)) + create_status = self.election_config.lock.create(name=self.election_config.lock.name, + namespace=self.election_config.lock.namespace, + election_record=leader_election_record) + + if create_status is False: + logging.info("{} Failed to create lock".format(leader_election_record.holder_identity)) + return False + + self.observed_record = leader_election_record + self.observed_time_milliseconds = int(time.time() * 1000) + return True + + # A lock exists with that name + # Validate old_election_record + if old_election_record is None: + # try to update lock with proper annotation and election record + return self.update_lock(leader_election_record) + + if (old_election_record.holder_identity is None or old_election_record.lease_duration is None + or old_election_record.acquire_time is None or old_election_record.renew_time is None): + # try to update lock with proper annotation and election record + return self.update_lock(leader_election_record) + + # Report transitions + if self.observed_record and self.observed_record.holder_identity != old_election_record.holder_identity: + logging.info("Leader has switched to {}".format(old_election_record.holder_identity)) + + if self.observed_record is None or old_election_record.__dict__ != self.observed_record.__dict__: + self.observed_record = old_election_record + self.observed_time_milliseconds = int(time.time() * 1000) + + # If This candidate is not the leader and lease duration is yet to finish + if (self.election_config.lock.identity != self.observed_record.holder_identity + and self.observed_time_milliseconds + self.election_config.lease_duration * 1000 > int(now_timestamp * 1000)): + logging.info("yet to finish lease_duration, lease held by {} and has not expired".format(old_election_record.holder_identity)) + return False + + # If this candidate is the Leader + if self.election_config.lock.identity == self.observed_record.holder_identity: + # Leader updates renewTime, but keeps acquire_time unchanged + leader_election_record.acquire_time = self.observed_record.acquire_time + + return self.update_lock(leader_election_record) + + def update_lock(self, leader_election_record): + # Update object with latest election record + update_status = self.election_config.lock.update(self.election_config.lock.name, + self.election_config.lock.namespace, + leader_election_record) + + if update_status is False: + logging.info("{} failed to acquire lease".format(leader_election_record.holder_identity)) + return False + + self.observed_record = leader_election_record + self.observed_time_milliseconds = int(time.time() * 1000) + logging.info("leader {} has successfully acquired lease".format(leader_election_record.holder_identity)) + return True diff --git a/leaderelection/leaderelection_test.py b/leaderelection/leaderelection_test.py new file mode 100644 index 00000000..9fb6d9bc --- /dev/null +++ b/leaderelection/leaderelection_test.py @@ -0,0 +1,270 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from . import leaderelection +from .leaderelectionrecord import LeaderElectionRecord +from kubernetes.client.rest import ApiException +from . import electionconfig +import unittest +import threading +import json +import time +import pytest + +thread_lock = threading.RLock() + +class LeaderElectionTest(unittest.TestCase): + def test_simple_leader_election(self): + election_history = [] + leadership_history = [] + + def on_create(): + election_history.append("create record") + leadership_history.append("get leadership") + + def on_update(): + election_history.append("update record") + + def on_change(): + election_history.append("change record") + + mock_lock = MockResourceLock("mock", "mock_namespace", "mock", thread_lock, on_create, on_update, on_change, None) + + def on_started_leading(): + leadership_history.append("start leading") + + def on_stopped_leading(): + leadership_history.append("stop leading") + + # Create config 4.5 4 3 + config = electionconfig.Config(lock=mock_lock, lease_duration=2.5, + renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading, + onstopped_leading=on_stopped_leading) + + # Enter leader election + leaderelection.LeaderElection(config).run() + + self.assert_history(election_history, ["create record", "update record", "update record", "update record"]) + self.assert_history(leadership_history, ["get leadership", "start leading", "stop leading"]) + + def test_leader_election(self): + election_history = [] + leadership_history = [] + + def on_create_A(): + election_history.append("A creates record") + leadership_history.append("A gets leadership") + + def on_update_A(): + election_history.append("A updates record") + + def on_change_A(): + election_history.append("A gets leadership") + + mock_lock_A = MockResourceLock("mock", "mock_namespace", "MockA", thread_lock, on_create_A, on_update_A, on_change_A, None) + mock_lock_A.renew_count_max = 3 + + def on_started_leading_A(): + leadership_history.append("A starts leading") + + def on_stopped_leading_A(): + leadership_history.append("A stops leading") + + config_A = electionconfig.Config(lock=mock_lock_A, lease_duration=2.5, + renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading_A, + onstopped_leading=on_stopped_leading_A) + + def on_create_B(): + election_history.append("B creates record") + leadership_history.append("B gets leadership") + + def on_update_B(): + election_history.append("B updates record") + + def on_change_B(): + leadership_history.append("B gets leadership") + + mock_lock_B = MockResourceLock("mock", "mock_namespace", "MockB", thread_lock, on_create_B, on_update_B, on_change_B, None) + mock_lock_B.renew_count_max = 4 + + def on_started_leading_B(): + leadership_history.append("B starts leading") + + def on_stopped_leading_B(): + leadership_history.append("B stops leading") + + config_B = electionconfig.Config(lock=mock_lock_B, lease_duration=2.5, + renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading_B, + onstopped_leading=on_stopped_leading_B) + + mock_lock_B.leader_record = mock_lock_A.leader_record + + threading.daemon = True + # Enter leader election for A + threading.Thread(target=leaderelection.LeaderElection(config_A).run()).start() + + # Enter leader election for B + threading.Thread(target=leaderelection.LeaderElection(config_B).run()).start() + + time.sleep(5) + + self.assert_history(election_history, + ["A creates record", + "A updates record", + "A updates record", + "B updates record", + "B updates record", + "B updates record", + "B updates record"]) + self.assert_history(leadership_history, + ["A gets leadership", + "A starts leading", + "A stops leading", + "B gets leadership", + "B starts leading", + "B stops leading"]) + + + """Expected behavior: to check if the leader stops leading if it fails to update the lock within the renew_deadline + and stops leading after finally timing out. The difference between each try comes out to be approximately the sleep + time. + Example: + create record: 0s + on try update: 1.5s + on update: zzz s + on try update: 3s + on update: zzz s + on try update: 4.5s + on try update: 6s + Timeout - Leader Exits""" + def test_Leader_election_with_renew_deadline(self): + election_history = [] + leadership_history = [] + + def on_create(): + election_history.append("create record") + leadership_history.append("get leadership") + + def on_update(): + election_history.append("update record") + + def on_change(): + election_history.append("change record") + + def on_try_update(): + election_history.append("try update record") + + mock_lock = MockResourceLock("mock", "mock_namespace", "mock", thread_lock, on_create, on_update, on_change, on_try_update) + mock_lock.renew_count_max = 3 + + def on_started_leading(): + leadership_history.append("start leading") + + def on_stopped_leading(): + leadership_history.append("stop leading") + + # Create config + config = electionconfig.Config(lock=mock_lock, lease_duration=2.5, + renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading, + onstopped_leading=on_stopped_leading) + + # Enter leader election + leaderelection.LeaderElection(config).run() + + self.assert_history(election_history, + ["create record", + "try update record", + "update record", + "try update record", + "update record", + "try update record", + "try update record"]) + + self.assert_history(leadership_history, ["get leadership", "start leading", "stop leading"]) + + def assert_history(self, history, expected): + self.assertIsNotNone(expected) + self.assertIsNotNone(history) + self.assertEqual(len(expected), len(history)) + + for idx in range(len(history)): + self.assertEqual(history[idx], expected[idx], + msg="Not equal at index {}, expected {}, got {}".format(idx, expected[idx], + history[idx])) + + +class MockResourceLock: + def __init__(self, name, namespace, identity, shared_lock, on_create=None, on_update=None, on_change=None, on_try_update=None): + # self.leader_record is shared between two MockResourceLock objects + self.leader_record = [] + self.renew_count = 0 + self.renew_count_max = 4 + self.name = name + self.namespace = namespace + self.identity = str(identity) + self.lock = shared_lock + + self.on_create = on_create + self.on_update = on_update + self.on_change = on_change + self.on_try_update = on_try_update + + def get(self, name, namespace): + self.lock.acquire() + try: + if self.leader_record: + return True, self.leader_record[0] + + ApiException.body = json.dumps({'code': 404}) + return False, ApiException + finally: + self.lock.release() + + def create(self, name, namespace, election_record): + self.lock.acquire() + try: + if len(self.leader_record) == 1: + return False + self.leader_record.append(election_record) + self.on_create() + self.renew_count += 1 + return True + finally: + self.lock.release() + + def update(self, name, namespace, updated_record): + self.lock.acquire() + try: + if self.on_try_update: + self.on_try_update() + if self.renew_count >= self.renew_count_max: + return False + + old_record = self.leader_record[0] + self.leader_record[0] = updated_record + + self.on_update() + + if old_record.holder_identity != updated_record.holder_identity: + self.on_change() + + self.renew_count += 1 + return True + finally: + self.lock.release() + + +if __name__ == '__main__': + unittest.main() diff --git a/leaderelection/leaderelectionrecord.py b/leaderelection/leaderelectionrecord.py new file mode 100644 index 00000000..ebb550d4 --- /dev/null +++ b/leaderelection/leaderelectionrecord.py @@ -0,0 +1,22 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class LeaderElectionRecord: + # Annotation used in the lock object + def __init__(self, holder_identity, lease_duration, acquire_time, renew_time): + self.holder_identity = holder_identity + self.lease_duration = lease_duration + self.acquire_time = acquire_time + self.renew_time = renew_time diff --git a/leaderelection/resourcelock/__init__.py b/leaderelection/resourcelock/__init__.py new file mode 100644 index 00000000..37da225c --- /dev/null +++ b/leaderelection/resourcelock/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/leaderelection/resourcelock/configmaplock.py b/leaderelection/resourcelock/configmaplock.py new file mode 100644 index 00000000..8d155e29 --- /dev/null +++ b/leaderelection/resourcelock/configmaplock.py @@ -0,0 +1,129 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kubernetes.client.rest import ApiException +from kubernetes import client, config +from kubernetes.client.api_client import ApiClient +from leaderelection.leaderelectionrecord import LeaderElectionRecord +import json +import logging +logging.basicConfig(level=logging.INFO) + + +class ConfigMapLock: + def __init__(self, name, namespace, identity): + """ + :param name: name of the lock + :param namespace: namespace + :param identity: A unique identifier that the candidate is using + """ + self.api_instance = client.CoreV1Api() + self.leader_electionrecord_annotationkey = 'control-plane.alpha.kubernetes.io/leader' + self.name = name + self.namespace = namespace + self.identity = str(identity) + self.configmap_reference = None + self.lock_record = { + 'holderIdentity': None, + 'leaseDurationSeconds': None, + 'acquireTime': None, + 'renewTime': None + } + + # get returns the election record from a ConfigMap Annotation + def get(self, name, namespace): + """ + :param name: Name of the configmap object information to get + :param namespace: Namespace in which the configmap object is to be searched + :return: 'True, election record' if object found else 'False, exception response' + """ + try: + api_response = self.api_instance.read_namespaced_config_map(name, namespace) + + # If an annotation does not exist - add the leader_electionrecord_annotationkey + annotations = api_response.metadata.annotations + if annotations is None or annotations == '': + api_response.metadata.annotations = {self.leader_electionrecord_annotationkey: ''} + self.configmap_reference = api_response + return True, None + + # If an annotation exists but, the leader_electionrecord_annotationkey does not then add it as a key + if not annotations.get(self.leader_electionrecord_annotationkey): + api_response.metadata.annotations = {self.leader_electionrecord_annotationkey: ''} + self.configmap_reference = api_response + return True, None + + lock_record = self.get_lock_object(json.loads(annotations[self.leader_electionrecord_annotationkey])) + + self.configmap_reference = api_response + return True, lock_record + except ApiException as e: + return False, e + + def create(self, name, namespace, election_record): + """ + :param electionRecord: Annotation string + :param name: Name of the configmap object to be created + :param namespace: Namespace in which the configmap object is to be created + :return: 'True' if object is created else 'False' if failed + """ + body = client.V1ConfigMap( + metadata={"name": name, + "annotations": {self.leader_electionrecord_annotationkey: json.dumps(self.get_lock_dict(election_record))}}) + + try: + api_response = self.api_instance.create_namespaced_config_map(namespace, body, pretty=True) + return True + except ApiException as e: + logging.info("Failed to create lock as {}".format(e)) + return False + + def update(self, name, namespace, updated_record): + """ + :param name: name of the lock to be updated + :param namespace: namespace the lock is in + :param updated_record: the updated election record + :return: True if update is succesful False if it fails + """ + try: + # Set the updated record + self.configmap_reference.metadata.annotations[self.leader_electionrecord_annotationkey] = json.dumps(self.get_lock_dict(updated_record)) + api_response = self.api_instance.replace_namespaced_config_map(name=name, namespace=namespace, + body=self.configmap_reference) + return True + except ApiException as e: + logging.info("Failed to update lock as {}".format(e)) + return False + + def get_lock_object(self, lock_record): + leader_election_record = LeaderElectionRecord(None, None, None, None) + + if lock_record.get('holderIdentity'): + leader_election_record.holder_identity = lock_record['holderIdentity'] + if lock_record.get('leaseDurationSeconds'): + leader_election_record.lease_duration = lock_record['leaseDurationSeconds'] + if lock_record.get('acquireTime'): + leader_election_record.acquire_time = lock_record['acquireTime'] + if lock_record.get('renewTime'): + leader_election_record.renew_time = lock_record['renewTime'] + + return leader_election_record + + def get_lock_dict(self, leader_election_record): + self.lock_record['holderIdentity'] = leader_election_record.holder_identity + self.lock_record['leaseDurationSeconds'] = leader_election_record.lease_duration + self.lock_record['acquireTime'] = leader_election_record.acquire_time + self.lock_record['renewTime'] = leader_election_record.renew_time + + return self.lock_record \ No newline at end of file