Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Commit

Permalink
Merge pull request #206 from Invictus17/master
Browse files Browse the repository at this point in the history
Leader Election issue #434
  • Loading branch information
k8s-ci-robot committed Jan 15, 2021
2 parents b002110 + 4d29af1 commit 4bf72d7
Show file tree
Hide file tree
Showing 9 changed files with 769 additions and 0 deletions.
18 changes: 18 additions & 0 deletions leaderelection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## Leader Election Example
This example demonstrates how to use the leader election library.

## Running
Run the following command in multiple separate terminals preferably an odd number.
Each running process uses a unique identifier displayed when it starts to run.

- When a program runs, if a lock object already exists with the specified name,
all candidates will start as followers.
- If a lock object does not exist with the specified name then whichever candidate
creates a lock object first will become the leader and the rest will be followers.
- The user will be prompted about the status of the candidates and transitions.

### Command to run
```python example.py```

Now kill the existing leader. You will see from the terminal outputs that one of the
remaining running processes will be elected as the new leader.
13 changes: 13 additions & 0 deletions leaderelection/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
59 changes: 59 additions & 0 deletions leaderelection/electionconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import logging
logging.basicConfig(level=logging.INFO)


class Config:
# Validate config, exit if an error is detected
def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading):
self.jitter_factor = 1.2

if lock is None:
sys.exit("lock cannot be None")
self.lock = lock

if lease_duration <= renew_deadline:
sys.exit("lease_duration must be greater than renew_deadline")

if renew_deadline <= self.jitter_factor * retry_period:
sys.exit("renewDeadline must be greater than retry_period*jitter_factor")

if lease_duration < 1:
sys.exit("lease_duration must be greater than one")

if renew_deadline < 1:
sys.exit("renew_deadline must be greater than one")

if retry_period < 1:
sys.exit("retry_period must be greater than one")

self.lease_duration = lease_duration
self.renew_deadline = renew_deadline
self.retry_period = retry_period

if onstarted_leading is None:
sys.exit("callback onstarted_leading cannot be None")
self.onstarted_leading = onstarted_leading

if onstopped_leading is None:
self.onstopped_leading = self.on_stoppedleading_callback
else:
self.onstopped_leading = onstopped_leading

# Default callback for when the current candidate if a leader, stops leading
def on_stoppedleading_callback(self):
logging.info("stopped leading".format(self.lock.identity))
54 changes: 54 additions & 0 deletions leaderelection/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid
from kubernetes import client, config
from leaderelection import leaderelection
from leaderelection.resourcelock.configmaplock import ConfigMapLock
from leaderelection import electionconfig


# Authenticate using config file
config.load_kube_config(config_file=r"")

# Parameters required from the user

# A unique identifier for this candidate
candidate_id = uuid.uuid4()

# Name of the lock object to be created
lock_name = "examplepython"

# Kubernetes namespace
lock_namespace = "default"


# The function that a user wants to run once a candidate is elected as a leader
def example_func():
print("I am leader")


# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
# In that case, a default callback function will be used

# Create config
config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17,
renew_deadline=15, retry_period=5, onstarted_leading=example_func,
onstopped_leading=None)

# Enter leader election
leaderelection.LeaderElection(config).run()

# User can choose to do another round of election or simply exit
print("Exited leader election")
191 changes: 191 additions & 0 deletions leaderelection/leaderelection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import sys
import time
import json
import threading
from .leaderelectionrecord import LeaderElectionRecord
import logging
# if condition to be removed when support for python2 will be removed
if sys.version_info > (3, 0):
from http import HTTPStatus
else:
import httplib
logging.basicConfig(level=logging.INFO)

"""
This package implements leader election using an annotation in a Kubernetes object.
The onstarted_leading function is run in a thread and when it returns, if it does
it might not be safe to run it again in a process.
At first all candidates are considered followers. The one to create a lock or update
an existing lock first becomes the leader and remains so until it keeps renewing its
lease.
"""


class LeaderElection:
def __init__(self, election_config):
if election_config is None:
sys.exit("argument config not passed")

# Latest record observed in the created lock object
self.observed_record = None

# The configuration set for this candidate
self.election_config = election_config

# Latest update time of the lock
self.observed_time_milliseconds = 0

# Point of entry to Leader election
def run(self):
# Try to create/ acquire a lock
if self.acquire():
logging.info("{} successfully acquired lease".format(self.election_config.lock.identity))

# Start leading and call OnStartedLeading()
threading.daemon = True
threading.Thread(target=self.election_config.onstarted_leading).start()

self.renew_loop()

# Failed to update lease, run OnStoppedLeading callback
self.election_config.onstopped_leading()

def acquire(self):
# Follower
logging.info("{} is a follower".format(self.election_config.lock.identity))
retry_period = self.election_config.retry_period

while True:
succeeded = self.try_acquire_or_renew()

if succeeded:
return True

time.sleep(retry_period)

def renew_loop(self):
# Leader
logging.info("Leader has entered renew loop and will try to update lease continuously")

retry_period = self.election_config.retry_period
renew_deadline = self.election_config.renew_deadline * 1000

while True:
timeout = int(time.time() * 1000) + renew_deadline
succeeded = False

while int(time.time() * 1000) < timeout:
succeeded = self.try_acquire_or_renew()

if succeeded:
break
time.sleep(retry_period)

if succeeded:
time.sleep(retry_period)
continue

# failed to renew, return
return

def try_acquire_or_renew(self):
now_timestamp = time.time()
now = datetime.datetime.fromtimestamp(now_timestamp)

# Check if lock is created
lock_status, old_election_record = self.election_config.lock.get(self.election_config.lock.name,
self.election_config.lock.namespace)

# create a default Election record for this candidate
leader_election_record = LeaderElectionRecord(self.election_config.lock.identity,
str(self.election_config.lease_duration), str(now), str(now))

# A lock is not created with that name, try to create one
if not lock_status:
# To be removed when support for python2 will be removed
if sys.version_info > (3, 0):
if json.loads(old_election_record.body)['code'] != HTTPStatus.NOT_FOUND:
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
old_election_record.reason))
return False
else:
if json.loads(old_election_record.body)['code'] != httplib.NOT_FOUND:
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
old_election_record.reason))
return False

logging.info("{} is trying to create a lock".format(leader_election_record.holder_identity))
create_status = self.election_config.lock.create(name=self.election_config.lock.name,
namespace=self.election_config.lock.namespace,
election_record=leader_election_record)

if create_status is False:
logging.info("{} Failed to create lock".format(leader_election_record.holder_identity))
return False

self.observed_record = leader_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
return True

# A lock exists with that name
# Validate old_election_record
if old_election_record is None:
# try to update lock with proper annotation and election record
return self.update_lock(leader_election_record)

if (old_election_record.holder_identity is None or old_election_record.lease_duration is None
or old_election_record.acquire_time is None or old_election_record.renew_time is None):
# try to update lock with proper annotation and election record
return self.update_lock(leader_election_record)

# Report transitions
if self.observed_record and self.observed_record.holder_identity != old_election_record.holder_identity:
logging.info("Leader has switched to {}".format(old_election_record.holder_identity))

if self.observed_record is None or old_election_record.__dict__ != self.observed_record.__dict__:
self.observed_record = old_election_record
self.observed_time_milliseconds = int(time.time() * 1000)

# If This candidate is not the leader and lease duration is yet to finish
if (self.election_config.lock.identity != self.observed_record.holder_identity
and self.observed_time_milliseconds + self.election_config.lease_duration * 1000 > int(now_timestamp * 1000)):
logging.info("yet to finish lease_duration, lease held by {} and has not expired".format(old_election_record.holder_identity))
return False

# If this candidate is the Leader
if self.election_config.lock.identity == self.observed_record.holder_identity:
# Leader updates renewTime, but keeps acquire_time unchanged
leader_election_record.acquire_time = self.observed_record.acquire_time

return self.update_lock(leader_election_record)

def update_lock(self, leader_election_record):
# Update object with latest election record
update_status = self.election_config.lock.update(self.election_config.lock.name,
self.election_config.lock.namespace,
leader_election_record)

if update_status is False:
logging.info("{} failed to acquire lease".format(leader_election_record.holder_identity))
return False

self.observed_record = leader_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
logging.info("leader {} has successfully acquired lease".format(leader_election_record.holder_identity))
return True
Loading

0 comments on commit 4bf72d7

Please sign in to comment.