Skip to content

Commit

Permalink
Use quorum=True for etcd reads in IPAM.
Browse files Browse the repository at this point in the history
  • Loading branch information
Spike Curtis committed Nov 4, 2015
1 parent 5ecb2af commit d01d2ec
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
30 changes: 21 additions & 9 deletions calico_containers/pycalico/ipam.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
get_block_cidr_for_address,
BLOCK_PREFIXLEN,
AlreadyAssignedError,
AddressNotAssignedError)
AddressNotAssignedError,
NoHostAffinityWarning)
from pycalico.handle import (AllocationHandle,
AddressCountTooLow)
from pycalico.util import get_hostname
Expand Down Expand Up @@ -57,7 +58,10 @@ def _read_block(self, block_cidr):
"""
key = _block_datastore_key(block_cidr)
try:
result = self.etcd_client.read(key)
# Use quorum=True to ensure we don't get stale reads. Without this
# we allow many subtle race conditions, such as creating a block,
# then later reading it and finding it doesn't exist.
result = self.etcd_client.read(key, quorum=True)
except EtcdKeyNotFound:
raise KeyError(str(block_cidr))
block = AllocationBlock.from_etcd_result(result)
Expand Down Expand Up @@ -98,7 +102,7 @@ def _get_affine_blocks(self, host, version, pool):
"version": version}
block_ids = []
try:
result = self.etcd_client.read(path).children
result = self.etcd_client.read(path, quorum=True).children
for child in result:
packed = child.key.split("/")
if len(packed) == 9:
Expand Down Expand Up @@ -141,7 +145,7 @@ def _new_affine_block(self, host, version, pool):
_log.debug("Checking if block %s is free.", block_id)
key = _block_datastore_key(block_cidr)
try:
_ = self.etcd_client.read(key)
_ = self.etcd_client.read(key, quorum=True)
except EtcdKeyNotFound:
_log.debug("Found block %s free.", block_id)
try:
Expand Down Expand Up @@ -292,7 +296,7 @@ def _read_handle(self, handle_id):
"""
key = _handle_datastore_key(handle_id)
try:
result = self.etcd_client.read(key)
result = self.etcd_client.read(key, quorum=True)
except EtcdKeyNotFound:
raise KeyError(handle_id)
handle = AllocationHandle.from_etcd_result(result)
Expand Down Expand Up @@ -455,10 +459,18 @@ def _auto_assign(self, ip_version, num, handle_id,
_log.info("Ran out of affine blocks for %s in pool %s",
hostname, pool)
break
ips = self._auto_assign_block(block_id,
num_remaining,
handle_id,
attributes)
try:
ips = self._auto_assign_block(block_id,
num_remaining,
handle_id,
attributes)
except (KeyError, NoHostAffinityWarning):
# In certain rare race conditions, _get_affine_blocks above
# can return block_ids that don't exist or don't actually have
# affinity to this host (due to multiple IPAM clients on this
# host running simultaneously). If that happens, just move to
# the next one.
continue
allocated_ips.extend(ips)
num_remaining = num - len(allocated_ips)

Expand Down
27 changes: 17 additions & 10 deletions calico_containers/tests/unit/ipam_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,9 @@ def m_get_affine_blocks(self, host, ip_version, pool):
# Read returns appropriate result based on key.
read_results = {m_resultb.key: m_resultb,
m_resulth.key: m_resulth}
def read(key):
def read(key, quorum):
""" Return a copy of the current stored value depending on key."""
assert quorum
return copy.copy(read_results[key])
self.m_etcd_client.read.side_effect = read

Expand Down Expand Up @@ -304,8 +305,9 @@ def m_get_affine_blocks(self, host, ip_version, pool):
m_resultb.value = block.to_json()
m_resultb.key = "/calico/ipam/v2/assignment/ipv4/block/10.11.12.0-24"

def read(key):
def read(key, quorum):
""" Return a copy of the current stored value depending on key."""
assert quorum
return copy.copy(m_resultb)
self.m_etcd_client.read.side_effect = read
self.m_etcd_client.update.side_effect = EtcdCompareFailed()
Expand Down Expand Up @@ -464,7 +466,7 @@ def test_assign_with_handle_cas_fails(self, m_get_hostname):
# Read returns appropriate result based on key.
read_results = {m_resultb.key: m_resultb,
m_resulth.key: m_resulth}
def read(key):
def read(key, quorum):
""" Return a copy of the current stored value depending on key."""
return copy.copy(read_results[key])
self.m_etcd_client.read.side_effect = read
Expand Down Expand Up @@ -513,7 +515,7 @@ def test_assign_persistent_cas_fails(self, m_get_hostname):
block = _test_block_empty_v4()
m_result0 = Mock(spec=EtcdResult)
m_result0.value = block.to_json()
def read(key):
def read(key, quorum):
return copy.copy(m_result0)
self.m_etcd_client.read.side_effect = read

Expand Down Expand Up @@ -941,7 +943,8 @@ def test_release_ip_by_handle_cas_error(self):
read_results = {m_resulth.key: m_resulth,
m_resultb4.key: m_resultb4,
m_resultb6.key: m_resultb6}
def read(key):
def read(key, quorum):
assert quorum
return copy.copy(read_results[key])
self.m_etcd_client.read.side_effect = read

Expand Down Expand Up @@ -1022,7 +1025,8 @@ def test_release_ip_by_handle_no_ips(self):
# Mock out read.
read_results = {m_resulth.key: m_resulth,
m_resultb4.key: m_resultb4}
def read(key):
def read(key, quorum):
assert quorum
return read_results[key]
self.m_etcd_client.read.side_effect = read

Expand Down Expand Up @@ -1172,7 +1176,8 @@ def test_get_affine_blocks(self):
expected_ids = ["192.168.3.0/26", "192.168.5.0/26"]

# Return some blocks.
def m_read(path):
def m_read(path, quorum):
assert quorum
assert path == "/calico/ipam/v2/host/test_host/ipv4/block/"
result = Mock(spec=EtcdResult)
children = []
Expand All @@ -1195,7 +1200,8 @@ def test_get_affine_blocks_empty(self):
expected_ids = []

# Return some blocks.
def m_read(path):
def m_read(path, quorum):
assert quorum
assert path == "/calico/ipam/v2/host/test_host/ipv4/block/"
result = Mock(spec=EtcdResult)
result.children = iter([])
Expand Down Expand Up @@ -1224,7 +1230,8 @@ def test_get_affine_blocks_pool(self):
returned_ids = ["192.168.3.0/26", "10.10.1.0/26"]

# Return some blocks.
def m_read(path):
def m_read(path, quorum):
assert quorum
assert path == "/calico/ipam/v2/host/test_host/ipv4/block/"
result = Mock(spec=EtcdResult)
children = []
Expand Down Expand Up @@ -1269,7 +1276,7 @@ def test_claim_block_affinity_already_owned(self):
self.m_etcd_client.write.assert_has_calls([call(ANY, ""),
call(key, value,
prevExist=False)])
self.m_etcd_client.read.assert_called_once_with(key)
self.m_etcd_client.read.assert_called_once_with(key, quorum=True)

def test_new_affine_block_race(self):
"""
Expand Down

0 comments on commit d01d2ec

Please sign in to comment.