Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Leader Election issue #434 #206

Merged
merged 1 commit into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions leaderelection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## Leader Election Example
This example demonstrates how to use the leader election library.

## Running
Run the following command in multiple separate terminals preferably an odd number.
Each running process uses a unique identifier displayed when it starts to run.

- When a program runs, if a lock object already exists with the specified name,
all candidates will start as followers.
- If a lock object does not exist with the specified name then whichever candidate
creates a lock object first will become the leader and the rest will be followers.
- The user will be prompted about the status of the candidates and transitions.

### Command to run
```python example.py```

Now kill the existing leader. You will see from the terminal outputs that one of the
remaining running processes will be elected as the new leader.
13 changes: 13 additions & 0 deletions leaderelection/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
59 changes: 59 additions & 0 deletions leaderelection/electionconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
roycaihw marked this conversation as resolved.
Show resolved Hide resolved
import logging
logging.basicConfig(level=logging.INFO)


class Config:
# Validate config, exit if an error is detected
def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading):
self.jitter_factor = 1.2

if lock is None:
sys.exit("lock cannot be None")
self.lock = lock

if lease_duration <= renew_deadline:
sys.exit("lease_duration must be greater than renew_deadline")

if renew_deadline <= self.jitter_factor * retry_period:
sys.exit("renewDeadline must be greater than retry_period*jitter_factor")

if lease_duration < 1:
sys.exit("lease_duration must be greater than one")

if renew_deadline < 1:
sys.exit("renew_deadline must be greater than one")

if retry_period < 1:
sys.exit("retry_period must be greater than one")

self.lease_duration = lease_duration
self.renew_deadline = renew_deadline
self.retry_period = retry_period

if onstarted_leading is None:
sys.exit("callback onstarted_leading cannot be None")
self.onstarted_leading = onstarted_leading

if onstopped_leading is None:
self.onstopped_leading = self.on_stoppedleading_callback
else:
self.onstopped_leading = onstopped_leading

# Default callback for when the current candidate if a leader, stops leading
def on_stoppedleading_callback(self):
logging.info("stopped leading".format(self.lock.identity))
54 changes: 54 additions & 0 deletions leaderelection/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid
from kubernetes import client, config
from leaderelection import leaderelection
from leaderelection.resourcelock.configmaplock import ConfigMapLock
from leaderelection import electionconfig


# Authenticate using config file
config.load_kube_config(config_file=r"")

# Parameters required from the user

# A unique identifier for this candidate
candidate_id = uuid.uuid4()

# Name of the lock object to be created
lock_name = "examplepython"

# Kubernetes namespace
lock_namespace = "default"


# The function that a user wants to run once a candidate is elected as a leader
def example_func():
print("I am leader")


# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
# In that case, a default callback function will be used

# Create config
config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17,
renew_deadline=15, retry_period=5, onstarted_leading=example_func,
onstopped_leading=None)

# Enter leader election
leaderelection.LeaderElection(config).run()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually more than one is run to see how leaderelection is working. a simple README would be helpful.

Go example:
https://github.com/kubernetes/kubernetes/tree/master/staging/src/k8s.io/client-go/examples/leader-election


# User can choose to do another round of election or simply exit
print("Exited leader election")
191 changes: 191 additions & 0 deletions leaderelection/leaderelection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import sys
import time
import json
import threading
from .leaderelectionrecord import LeaderElectionRecord
import logging
# if condition to be removed when support for python2 will be removed
if sys.version_info > (3, 0):
from http import HTTPStatus
else:
import httplib
logging.basicConfig(level=logging.INFO)

"""
This package implements leader election using an annotation in a Kubernetes object.
The onstarted_leading function is run in a thread and when it returns, if it does
it might not be safe to run it again in a process.

At first all candidates are considered followers. The one to create a lock or update
an existing lock first becomes the leader and remains so until it keeps renewing its
lease.
"""


class LeaderElection:
def __init__(self, election_config):
if election_config is None:
sys.exit("argument config not passed")

# Latest record observed in the created lock object
self.observed_record = None

# The configuration set for this candidate
self.election_config = election_config

# Latest update time of the lock
self.observed_time_milliseconds = 0

# Point of entry to Leader election
def run(self):
# Try to create/ acquire a lock
if self.acquire():
logging.info("{} successfully acquired lease".format(self.election_config.lock.identity))

# Start leading and call OnStartedLeading()
threading.daemon = True
threading.Thread(target=self.election_config.onstarted_leading).start()

self.renew_loop()

# Failed to update lease, run OnStoppedLeading callback
self.election_config.onstopped_leading()

def acquire(self):
# Follower
logging.info("{} is a follower".format(self.election_config.lock.identity))
retry_period = self.election_config.retry_period

while True:
succeeded = self.try_acquire_or_renew()

if succeeded:
return True

time.sleep(retry_period)

def renew_loop(self):
# Leader
logging.info("Leader has entered renew loop and will try to update lease continuously")

retry_period = self.election_config.retry_period
renew_deadline = self.election_config.renew_deadline * 1000

while True:
timeout = int(time.time() * 1000) + renew_deadline
succeeded = False

while int(time.time() * 1000) < timeout:
succeeded = self.try_acquire_or_renew()

if succeeded:
break
time.sleep(retry_period)

if succeeded:
time.sleep(retry_period)
continue

# failed to renew, return
return

def try_acquire_or_renew(self):
now_timestamp = time.time()
now = datetime.datetime.fromtimestamp(now_timestamp)

# Check if lock is created
lock_status, old_election_record = self.election_config.lock.get(self.election_config.lock.name,
self.election_config.lock.namespace)

# create a default Election record for this candidate
leader_election_record = LeaderElectionRecord(self.election_config.lock.identity,
str(self.election_config.lease_duration), str(now), str(now))

# A lock is not created with that name, try to create one
if not lock_status:
# To be removed when support for python2 will be removed
if sys.version_info > (3, 0):
if json.loads(old_election_record.body)['code'] != HTTPStatus.NOT_FOUND:
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
old_election_record.reason))
return False
else:
if json.loads(old_election_record.body)['code'] != httplib.NOT_FOUND:
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
old_election_record.reason))
return False

logging.info("{} is trying to create a lock".format(leader_election_record.holder_identity))
create_status = self.election_config.lock.create(name=self.election_config.lock.name,
namespace=self.election_config.lock.namespace,
election_record=leader_election_record)

if create_status is False:
logging.info("{} Failed to create lock".format(leader_election_record.holder_identity))
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a log for the create failure


self.observed_record = leader_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
return True

# A lock exists with that name
# Validate old_election_record
if old_election_record is None:
# try to update lock with proper annotation and election record
return self.update_lock(leader_election_record)

if (old_election_record.holder_identity is None or old_election_record.lease_duration is None
or old_election_record.acquire_time is None or old_election_record.renew_time is None):
# try to update lock with proper annotation and election record
return self.update_lock(leader_election_record)

# Report transitions
if self.observed_record and self.observed_record.holder_identity != old_election_record.holder_identity:
logging.info("Leader has switched to {}".format(old_election_record.holder_identity))

if self.observed_record is None or old_election_record.__dict__ != self.observed_record.__dict__:
self.observed_record = old_election_record
self.observed_time_milliseconds = int(time.time() * 1000)

# If This candidate is not the leader and lease duration is yet to finish
if (self.election_config.lock.identity != self.observed_record.holder_identity
and self.observed_time_milliseconds + self.election_config.lease_duration * 1000 > int(now_timestamp * 1000)):
logging.info("yet to finish lease_duration, lease held by {} and has not expired".format(old_election_record.holder_identity))
return False

# If this candidate is the Leader
if self.election_config.lock.identity == self.observed_record.holder_identity:
# Leader updates renewTime, but keeps acquire_time unchanged
leader_election_record.acquire_time = self.observed_record.acquire_time

return self.update_lock(leader_election_record)

def update_lock(self, leader_election_record):
# Update object with latest election record
update_status = self.election_config.lock.update(self.election_config.lock.name,
self.election_config.lock.namespace,
leader_election_record)

if update_status is False:
logging.info("{} failed to acquire lease".format(leader_election_record.holder_identity))
return False

self.observed_record = leader_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
logging.info("leader {} has successfully acquired lease".format(leader_election_record.holder_identity))
return True
Loading