Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP-39600: Rework LVM locking to use fair lock queue #658

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ SM_LIBS += journaler
SM_LIBS += fjournaler
SM_LIBS += lock
SM_LIBS += flock
SM_LIBS += lock_queue
SM_LIBS += ipc
SM_LIBS += srmetadata
SM_LIBS += metadata
Expand Down
8 changes: 2 additions & 6 deletions drivers/LVHDSR.py
Original file line number Diff line number Diff line change
Expand Up @@ -1758,9 +1758,7 @@ def _snapshot(self, snapType, cloneOp=False, cbtlog=None, cbt_consistency=None):
if snapType == VDI.SNAPSHOT_DOUBLE:
clonUuid = util.gen_uuid()
jval = "%s_%s" % (baseUuid, clonUuid)
with lvutil.LvmLockContext():
# This makes multiple LVM calls so take the lock early
self.sr.journaler.create(self.JRN_CLONE, origUuid, jval)
self.sr.journaler.create(self.JRN_CLONE, origUuid, jval)
util.fistpoint.activate("LVHDRT_clone_vdi_after_create_journal", self.sr.uuid)

try:
Expand Down Expand Up @@ -1835,9 +1833,7 @@ def _snapshot(self, snapType, cloneOp=False, cbtlog=None, cbt_consistency=None):
self._failClone(origUuid, jval, str(e))
util.fistpoint.activate("LVHDRT_clone_vdi_before_remove_journal", self.sr.uuid)

with lvutil.LvmLockContext():
# This makes multiple LVM calls so take the lock early
self.sr.journaler.remove(self.JRN_CLONE, origUuid)
self.sr.journaler.remove(self.JRN_CLONE, origUuid)
MarkSymsCtx marked this conversation as resolved.
Show resolved Hide resolved

return self._finishSnapshot(snapVDI, snapVDI2, hostRefs, cloneOp, snapType)

Expand Down
1 change: 0 additions & 1 deletion drivers/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def __new__(cls, name, ns=None, *args, **kwargs):
instances[name] = LockImplementation(name, ns)
return instances[name]

# These are required to pacify pylint as it doesn't understand the __new__
def acquire(self):
raise NotImplementedError("Lock methods implemented in LockImplementation")

Expand Down
120 changes: 120 additions & 0 deletions drivers/lock_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (C) Cloud Software Group, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation; version 2.1 only.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

import os
import pickle
import sys
import time

import lock
import util


DEBUG_LOG = True

def debug_log(msg):
if DEBUG_LOG:
util.SMlog("LockQueue: " + msg)

def get_process_start_time(pid):
proc_file = f"/proc/{pid}/stat"
with open(proc_file, 'r') as f:
return f.read().split(')')[-1].split(' ')[20]

def process_is_valid(pid, start_time):
proc_file = f"/proc/{pid}/stat"

try:
if start_time != get_process_start_time(pid):
debug_log(f"Process {pid} has incorrect start time real:{get_process_start_time(pid)} vs expected:{start_time}")
return False
except FileNotFoundError:
debug_log(f"Process {pid} is dead")
return False

return True

class LockQueue:
def __init__(self, name):
self.name = name
self._queue_lock = lock.Lock(name, f"ql-{name}")
self._action_lock = lock.Lock(name, f"al-{name}")
# Filename to hold the process queue
self._mem = f"/tmp/mem-{name}"

def load_queue(self):
try:
with open(self._mem, "rb") as f:
queue = pickle.load(f)
debug_log("load_queue {}".format(queue))
except EOFError:
queue = []
except FileNotFoundError:
queue = []
return queue

def save_queue(self, queue):
with open(self._mem, "w+b") as f:
pickle.dump(queue, f)
debug_log("save_queue {}".format(queue))

def push_into_process_queue(self):
self._queue_lock.acquire()

queue = self.load_queue()
queue.append((os.getpid(), get_process_start_time(os.getpid())))
self.save_queue(queue)

self._queue_lock.release()

def __enter__(self):
# Add ourselves to the process queue.
self.push_into_process_queue()

# Keep reading the process queue until we are at the front
while True:
self._queue_lock.acquire()
queue = self.load_queue()
front_pid, front_start_time = queue.pop(0)
print(f"Testing for PID {front_pid}")
if front_pid == os.getpid():
# We are at the front, it is now our turn to wait on the action lock
# and then do our work
debug_log(f"{front_pid} taking action lock")
self._action_lock.acquire()
rdn32 marked this conversation as resolved.
Show resolved Hide resolved
# When we have the action lock, save the queue (which no longer
# includes us) and release the queue lock to let others join.
self.save_queue(queue)
self._queue_lock.release()
break

# Getting here means it was not our turn to do stuff
# If the process at the front of the queue is not alive then remove it
if not process_is_valid(front_pid, front_start_time):
# front pid has already been popped from queue so just save it
debug_log(f"Removing invalid process {front_pid}")
self.save_queue(queue)
# Release the lock and try again later. Most waiting will be on the queue lock,
MarkSymsCtx marked this conversation as resolved.
Show resolved Hide resolved
# waiting for the single Action lock waiter to release it when it has the action
# lock. We sleep a short while before our next check to make it easier for new
# waiters to join the queue without really wasting our own time.
self._queue_lock.release()
time.sleep(0.1)

debug_log("In manager")
return self

def __exit__(self, type, value, tbck):
self._action_lock.release()
27 changes: 22 additions & 5 deletions drivers/lvutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
# Miscellaneous LVM utility functions
#

import traceback
import re
import os
import errno
import time

import lock
import lock_queue
import util
import xs_errors
import xml.dom.minidom
Expand Down Expand Up @@ -110,7 +112,7 @@ def extract_vgname(str_in):

return None

class LvmLockContext(object):
class LvmLockContext(lock_queue.LockQueue):
"""
Context manager around the LVM lock.

Expand All @@ -120,20 +122,35 @@ class LvmLockContext(object):
"""

def __init__(self, cmd=None):
self.lock = lock.Lock(LVM_LOCK)
self.cmd = cmd
self.locked = False
try:
super().__init__(LVM_LOCK)
except Exception as e:
util.SMlog(f"===> LvmLockContext __init__ {e} {traceback.format_exc()}")
raise

def __enter__(self):
if self.cmd and '--readonly' in self.cmd:
return

self.lock.acquire()
while True:
try:
super().__enter__()
except Exception as e:
util.SMlog(f"===> LvmLockContext __enter__ {e} {traceback.format_exc()}")
continue
break

self.locked = True

def __exit__(self, exc_type, value, traceback):
def __exit__(self, exc_type, value, tbck):
if self.locked:
self.lock.release()
try:
super().__exit__(exc_type, value, tbck)
except Exception as e:
util.SMlog(f"===> LvmLockContext __exit__ {e} {traceback.format_exc()}")
rdn32 marked this conversation as resolved.
Show resolved Hide resolved
raise


LVM_RETRY_ERRORS = [
Expand Down
1 change: 1 addition & 0 deletions mk/sm.spec.in
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ tests/run_python_unittests.sh
/opt/xensource/sm/journaler.py
/opt/xensource/sm/lcache.py
/opt/xensource/sm/lock.py
/opt/xensource/sm/lock_queue.py
/opt/xensource/sm/lvhdutil.py
/opt/xensource/sm/lvmanager.py
/opt/xensource/sm/lvmcache.py
Expand Down
3 changes: 2 additions & 1 deletion tests/test_LVHDSR.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ def create_LVHDSR(self, master=False, command='foo', sr_uuid=None):
sr_uuid = str(uuid.uuid4())
return LVHDSR.LVHDSR(srcmd, sr_uuid)

@mock.patch('lvutil.LvmLockContext', autospec=True)
@mock.patch('lvhdutil.getVDIInfo', autospec=True)
@mock.patch('LVHDSR.Lock', autospec=True)
@mock.patch('SR.XenAPI')
def test_loadvids(self, mock_xenapi, mock_lock, mock_getVDIInfo):
def test_loadvids(self, mock_xenapi, mock_lock, mock_getVDIInfo, mock_lvlock):
"""sr.allVDIs populated by _loadvdis"""

vdi_uuid = 'some VDI UUID'
Expand Down
2 changes: 2 additions & 0 deletions tests/test_LVHDoHBASR.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def setUp(self):
self.mock_lvhdsr = lvhdsr_patcher.start()
util_patcher = mock.patch('LVHDoHBASR.util', autospec=True)
self.mock_util = util_patcher.start()
lc_patcher = mock.patch('LVHDSR.lvmcache.lvutil.LvmLockContext', autospec=True)
self.mock_lc = lc_patcher.start()
xenapi_patcher = mock.patch('SR.XenAPI')
self.mock_xapi = xenapi_patcher.start()

Expand Down
2 changes: 2 additions & 0 deletions tests/test_LVHDoISCSISR.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ def deepcopy(to_copy):

lock_patcher = mock.patch('LVHDSR.Lock')
self.mock_lock = lock_patcher.start()
lvlock_patcher = mock.patch('LVHDSR.lvutil.LvmLockContext')
self.mock_lvlock = lvlock_patcher.start()

self.addCleanup(mock.patch.stopall)

Expand Down
4 changes: 3 additions & 1 deletion tests/test_SRCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ def test_run_wrapped_if_not_SRException(
new=lambda s: s.encode("ascii", "surrogateescape"))
@mock.patch("os.fsdecode",
new=lambda bs: bs.decode("ascii", "surrogateescape"))
def test_parse_handles_wide_chars(self):
@mock.patch('util.gen_uuid', autospec=True)
def test_parse_handles_wide_chars(self, gen_uuid):
import os
import xmlrpc.client
from DummySR import DRIVER_INFO

gen_uuid.return_value = '13c4384e-897b-e745-6b3e-9a89c06537be'
xmlrpc_method = "vdi_create"
xmlrpc_params = {
'host_ref': 'OpaqueRef:133c7c46-f4d9-3695-83c4-bf8574b89fb9',
Expand Down
87 changes: 87 additions & 0 deletions tests/test_lock_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import builtins
import copy
import os
import sys
import unittest
import unittest.mock as mock

import lock
import lock_queue

## Instead of saving the process queue to disk the mocks will save it here.
## It needs to be global because it is shared between threads.
rdn32 marked this conversation as resolved.
Show resolved Hide resolved
saved_queue = []

def mock_pickle_dump_fn(*args):
global saved_queue
saved_queue = copy.deepcopy(args[0])

def mock_pickle_load_fn(*args):
global saved_queue
return copy.deepcopy(saved_queue)


class Test_LockQueue(unittest.TestCase):
def setUp(self):
# Re-initialize queue to empty for each test
global saved_queue
saved_queue = []

def get_lock_name(self):
return "bacon"

@mock.patch('lock_queue.pickle.load', side_effect=mock_pickle_load_fn)
@mock.patch('lock_queue.pickle.dump', side_effect=mock_pickle_dump_fn)
@mock.patch('lock_queue.os.getpid')
@mock.patch('lock_queue.get_process_start_time')
@mock.patch('lock.Lock', autospec=False)
def test_push_to_queue_3x(self, lock, start_time, getpid, pdump, pload):
global saved_queue

lq = lock_queue.LockQueue(self.get_lock_name())
# Push to queue 3 times using these PID and Start Time combinations
test_pids = [997, 993, 996]
test_sts = [360, 430, 458]
for p, s in zip(test_pids, test_sts):
start_time.return_value = s
getpid.return_value = p
lq.push_into_process_queue()

# Test the queue includes the PID and Start Time pairs in the order we expect
self.assertEqual(list(zip(test_pids, test_sts)), saved_queue)

@mock.patch('lock_queue.pickle.load', side_effect=mock_pickle_load_fn)
@mock.patch('lock_queue.pickle.dump', side_effect=mock_pickle_dump_fn)
@mock.patch('lock_queue.os.getpid')
@mock.patch('lock_queue.get_process_start_time')
@mock.patch('lock.Lock', autospec=False)
def test_context_manager(self, lock, start_time, getpid, pdump, pload):
global saved_queue

getpid.return_value = 959
start_time.return_value = 575

# Queue is empty
self.assertEqual(saved_queue, [])

with lock_queue.LockQueue(self.get_lock_name()) as lq:
# Should have removed from the queue before completing entry to the context manager
self.assertEqual(saved_queue, [])

@mock.patch('lock_queue.pickle.load', side_effect=mock_pickle_load_fn)
@mock.patch('lock_queue.pickle.dump', side_effect=mock_pickle_dump_fn)
@mock.patch('lock_queue.os.getpid')
@mock.patch('lock_queue.get_process_start_time')
@mock.patch('lock.Lock', autospec=False)
def test_context_manager_bad_entry(self, lock, start_time, getpid, pdump, pload):
global saved_queue

# Initialise saved_queue with non-existent pid
saved_queue = [(0, 67867)]

getpid.return_value = 959
start_time.return_value = 575
with lock_queue.LockQueue(self.get_lock_name()) as lq:
# Should have removed from the queue before completing entry to the context manager
self.assertEqual(saved_queue, [])

Loading