forked from xapi-project/sm
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
CP-39600: Rework LVM locking to use fair lock queue (xapi-project#658)
This is intended to prevent a storm of later LVM commands from usurping the lock that an earlier command is waiting for, just because they happened to manage to start waiting before the original holder was done. In some pathalogical circumstances, this has been known to result in a commant taking many minutes to get the lock. Signed-off-by: Tim Smith <tim.smith@cloud.com> Co-authored-by: Tim Smith <tim.smith@cloud.com>
- Loading branch information
1 parent
4dd5587
commit 125aa9e
Showing
12 changed files
with
327 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
# Copyright (C) Cloud Software Group, Inc. | ||
# | ||
# This program is free software; you can redistribute it and/or modify | ||
# it under the terms of the GNU Lesser General Public License as published | ||
# by the Free Software Foundation; version 2.1 only. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU Lesser General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Lesser General Public License | ||
# along with this program; if not, write to the Free Software Foundation, Inc., | ||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||
|
||
import os | ||
import pickle | ||
import sys | ||
import time | ||
|
||
import lock | ||
import util | ||
|
||
|
||
DEBUG_LOG = True | ||
|
||
def debug_log(msg): | ||
if DEBUG_LOG: | ||
util.SMlog("LockQueue: " + msg) | ||
|
||
def get_process_start_time(pid): | ||
proc_file = f"/proc/{pid}/stat" | ||
with open(proc_file, 'r') as f: | ||
return f.read().split(')')[-1].split(' ')[20] | ||
|
||
def process_is_valid(pid, start_time): | ||
proc_file = f"/proc/{pid}/stat" | ||
|
||
try: | ||
if start_time != get_process_start_time(pid): | ||
debug_log(f"Process {pid} has incorrect start time real:{get_process_start_time(pid)} vs expected:{start_time}") | ||
return False | ||
except FileNotFoundError: | ||
debug_log(f"Process {pid} is dead") | ||
return False | ||
|
||
return True | ||
|
||
class LockQueue: | ||
def __init__(self, name): | ||
self.name = name | ||
self._queue_lock = lock.Lock(name, f"ql-{name}") | ||
self._action_lock = lock.Lock(name, f"al-{name}") | ||
# Filename to hold the process queue | ||
self._mem = f"/tmp/mem-{name}" | ||
|
||
def load_queue(self): | ||
try: | ||
with open(self._mem, "rb") as f: | ||
queue = pickle.load(f) | ||
debug_log("load_queue {}".format(queue)) | ||
except EOFError: | ||
queue = [] | ||
except FileNotFoundError: | ||
queue = [] | ||
return queue | ||
|
||
def save_queue(self, queue): | ||
with open(self._mem, "w+b") as f: | ||
pickle.dump(queue, f) | ||
debug_log("save_queue {}".format(queue)) | ||
|
||
def push_into_process_queue(self): | ||
self._queue_lock.acquire() | ||
|
||
queue = self.load_queue() | ||
queue.append((os.getpid(), get_process_start_time(os.getpid()))) | ||
self.save_queue(queue) | ||
|
||
self._queue_lock.release() | ||
|
||
def __enter__(self): | ||
# Add ourselves to the process queue. | ||
self.push_into_process_queue() | ||
|
||
# Keep reading the process queue until we are at the front | ||
while True: | ||
self._queue_lock.acquire() | ||
queue = self.load_queue() | ||
front_pid, front_start_time = queue.pop(0) | ||
print(f"Testing for PID {front_pid}") | ||
if front_pid == os.getpid(): | ||
# We are at the front, it is now our turn to wait on the action lock | ||
# and then do our work | ||
debug_log(f"{front_pid} taking action lock") | ||
self._action_lock.acquire() | ||
# When we have the action lock, save the queue (which no longer | ||
# includes us) and release the queue lock to let others join. | ||
self.save_queue(queue) | ||
self._queue_lock.release() | ||
break | ||
|
||
# Getting here means it was not our turn to do stuff | ||
# If the process at the front of the queue is not alive then remove it | ||
if not process_is_valid(front_pid, front_start_time): | ||
# front pid has already been popped from queue so just save it | ||
debug_log(f"Removing invalid process {front_pid}") | ||
self.save_queue(queue) | ||
# Release the lock and try again later. Most waiting will be on the queue lock, | ||
# waiting for the single Action lock waiter to release it when it has the action | ||
# lock. We sleep a short while before our next check to make it easier for new | ||
# waiters to join the queue without really wasting our own time. | ||
self._queue_lock.release() | ||
time.sleep(0.1) | ||
|
||
debug_log("In manager") | ||
return self | ||
|
||
def __exit__(self, type, value, tbck): | ||
self._action_lock.release() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import builtins | ||
import copy | ||
import os | ||
import sys | ||
import unittest | ||
import unittest.mock as mock | ||
|
||
import lock | ||
import lock_queue | ||
|
||
## Instead of saving the process queue to disk the mocks will save it here. | ||
## It needs to be global because it is shared between threads. | ||
saved_queue = [] | ||
|
||
def mock_pickle_dump_fn(*args): | ||
global saved_queue | ||
saved_queue = copy.deepcopy(args[0]) | ||
|
||
def mock_pickle_load_fn(*args): | ||
global saved_queue | ||
return copy.deepcopy(saved_queue) | ||
|
||
|
||
class Test_LockQueue(unittest.TestCase): | ||
def setUp(self): | ||
# Re-initialize queue to empty for each test | ||
global saved_queue | ||
saved_queue = [] | ||
|
||
def get_lock_name(self): | ||
return "bacon" | ||
|
||
@mock.patch('lock_queue.pickle.load', side_effect=mock_pickle_load_fn) | ||
@mock.patch('lock_queue.pickle.dump', side_effect=mock_pickle_dump_fn) | ||
@mock.patch('lock_queue.os.getpid') | ||
@mock.patch('lock_queue.get_process_start_time') | ||
@mock.patch('lock.Lock', autospec=False) | ||
def test_push_to_queue_3x(self, lock, start_time, getpid, pdump, pload): | ||
global saved_queue | ||
|
||
lq = lock_queue.LockQueue(self.get_lock_name()) | ||
# Push to queue 3 times using these PID and Start Time combinations | ||
test_pids = [997, 993, 996] | ||
test_sts = [360, 430, 458] | ||
for p, s in zip(test_pids, test_sts): | ||
start_time.return_value = s | ||
getpid.return_value = p | ||
lq.push_into_process_queue() | ||
|
||
# Test the queue includes the PID and Start Time pairs in the order we expect | ||
self.assertEqual(list(zip(test_pids, test_sts)), saved_queue) | ||
|
||
@mock.patch('lock_queue.pickle.load', side_effect=mock_pickle_load_fn) | ||
@mock.patch('lock_queue.pickle.dump', side_effect=mock_pickle_dump_fn) | ||
@mock.patch('lock_queue.os.getpid') | ||
@mock.patch('lock_queue.get_process_start_time') | ||
@mock.patch('lock.Lock', autospec=False) | ||
def test_context_manager(self, lock, start_time, getpid, pdump, pload): | ||
global saved_queue | ||
|
||
getpid.return_value = 959 | ||
start_time.return_value = 575 | ||
|
||
# Queue is empty | ||
self.assertEqual(saved_queue, []) | ||
|
||
with lock_queue.LockQueue(self.get_lock_name()) as lq: | ||
# Should have removed from the queue before completing entry to the context manager | ||
self.assertEqual(saved_queue, []) | ||
|
||
@mock.patch('lock_queue.pickle.load', side_effect=mock_pickle_load_fn) | ||
@mock.patch('lock_queue.pickle.dump', side_effect=mock_pickle_dump_fn) | ||
@mock.patch('lock_queue.os.getpid') | ||
@mock.patch('lock_queue.get_process_start_time') | ||
@mock.patch('lock.Lock', autospec=False) | ||
def test_context_manager_bad_entry(self, lock, start_time, getpid, pdump, pload): | ||
global saved_queue | ||
|
||
# Initialise saved_queue with non-existent pid | ||
saved_queue = [(0, 67867)] | ||
|
||
getpid.return_value = 959 | ||
start_time.return_value = 575 | ||
with lock_queue.LockQueue(self.get_lock_name()) as lq: | ||
# Should have removed from the queue before completing entry to the context manager | ||
self.assertEqual(saved_queue, []) | ||
|
Oops, something went wrong.