Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch away from kazoo chroot to explicit prefix #62

Merged
merged 1 commit into from
Apr 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions infrastructure/beacon_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class BeaconServer(SCIONElement):
# Timeout for TRC or Certificate requests.
REQUESTS_TIMEOUT = 10
# ZK path for incoming PCBs
ZK_INCOMING_PATH = "/incoming"
ZK_INCOMING_PATH = "incoming"
# ZK path for recent PCBs
ZK_RECENT_PATH = "/recent"
ZK_RECENT_PATH = "recent"

def __init__(self, addr, topo_file, config_file, path_policy_file):
SCIONElement.__init__(self, addr, topo_file, config_file=config_file)
Expand Down
40 changes: 23 additions & 17 deletions lib/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
========================================================================
"""

import os.path
import logging
import threading
import time
Expand Down Expand Up @@ -84,10 +85,8 @@ def __init__(self, isd_id, ad_id, srv_name, srv_id,
self._on_disconnect = on_disconnect
self._ensure_paths = ensure_paths

self._chroot = "ISD%d-AD%d/%s" % (self._isd_id, self._ad_id, self._srv_name)
self._zk = KazooClient(hosts="%s/%s" %
(",".join(zk_hosts), self._chroot),
timeout=self._timeout)
self._prefix = "/ISD%d-AD%d/%s" % (self._isd_id, self._ad_id, self._srv_name)
self._zk = KazooClient(hosts=",".join(zk_hosts), timeout=self._timeout)
# Stop kazoo from drowning the log with debug spam:
self._zk.logger.setLevel(logging.ERROR)
# FIXME(kormat): remove once stable:
Expand Down Expand Up @@ -157,8 +156,9 @@ def _state_connected(self):
# Might be first connection, or reconnecting after a problem.
logging.debug("Connection to Zookeeper succeeded")
try:
self._zk.ensure_path(self._prefix)
for path in self._ensure_paths:
self._zk.ensure_path(path)
self._zk.ensure_path(os.path.join(self._prefix, path))
except (ConnectionLoss, SessionExpiredError):
return
self._connected.set()
Expand Down Expand Up @@ -218,7 +218,8 @@ def join_party(self):
raise ZkConnectionLoss
if self._party is None:
# Initialise the service party
self._party = self._zk.Party("/party", self._srv_id)
party_path = os.path.join(self._prefix, "party")
self._party = self._zk.Party(party_path, self._srv_id)
try:
self._party.join()
except ConnectionLoss:
Expand All @@ -230,13 +231,14 @@ def watch_children(self, path, func):
Register a callback function to be called when a path's children
change. This watch does not persist across disconnections.

:param str path: The absolute directory of the path.
:param str path: The path to watch.
:param function func: The function to call.
:raises:
ZkConnectionLoss: if the connection to ZK drops.
"""
if not self.is_connected():
raise ZkConnectionLoss
path = os.path.join(self._prefix, path)
try:
self._zk.exists(path)
self._zk.ChildrenWatch(path, func=func, allow_session_lost=False)
Expand All @@ -258,7 +260,9 @@ def get_lock(self, timeout=60.0):
if self._lock.is_set():
# We already have the lock
return True
self._zk_lock = self._zk.Lock("/lock", self._srv_id)
if self._zk_lock is None:
lock_path = os.path.join(self._prefix, "lock")
self._zk_lock = self._zk.Lock(lock_path, self._srv_id)
try:
if self._zk_lock.acquire(timeout=timeout):
self._lock.set()
Expand All @@ -282,15 +286,15 @@ def store_shared_item(self, path, name, value):
"""
Store an item in a shared path.

:param str path: The absolute directory path to store the item in. E.g.
``"/shared"``
:param str path: The path to store the item in. E.g. ``"shared"``
:param str name: A prefix for the item entry. E.g. ``"pcb"``
:param bytes value: The value to store in the item.
:raises:
ZkConnectionLoss: if the connection to ZK drops
"""
if not self.is_connected():
raise ZkConnectionLoss
path = os.path.join(self._prefix, path)
try:
self._zk.create("%s/%s" % (path, name), value, sequence=True)
except (ConnectionLoss, SessionExpiredError):
Expand All @@ -300,8 +304,7 @@ def get_shared_item(self, path, entry):
"""
Retrieve a specific item from a shared path.

:param str path: The absolute directory path the item is stored in. E.g.
``"/shared"``
:param str path: The path the item is stored in. E.g. ``"shared"``
:param str entry: The name of the entry. E.g. ``"pcb0000002046"``
:return: The value of the item
:rtype: :class:`bytes`
Expand All @@ -311,8 +314,9 @@ def get_shared_item(self, path, entry):
"""
if not self.is_connected():
raise ZkConnectionLoss
entry_path = os.path.join(self._prefix, path, entry)
try:
data, _ = self._zk.get("%s/%s" % (path, entry))
data, _ = self._zk.get(entry_path)
except NoNodeError:
raise ZkNoNodeError
except (ConnectionLoss, SessionExpiredError):
Expand All @@ -323,15 +327,15 @@ def get_shared_entries(self, path):
"""
List the items in a shared path.

:param str path: The absolute directory path the items are stored in. E.g.
``"/shared"``
:param str path: The path the items are stored in. E.g. ``"shared"``
:return: The value of the item, if successfully retrieved, otherwise ``None``
:rtype: :class:`bytes` or ``None``
:raises:
ZkConnectionLoss: if the connection to ZK drops
"""
if not self.is_connected():
return []
path = os.path.join(self._prefix, path)
try:
entries = self._zk.get_children(path)
except ConnectionLoss:
Expand All @@ -342,8 +346,8 @@ def move_shared_items(self, src, dest):
"""
Move items from one shared path to another

:param str src: The absolute directory of the source
:param str dest: The absolute directory path of the destination
:param str src: The path of the source
:param str dest: The path of the destination
:raises:
ZkConnectionLoss: if the connection to ZK drops
"""
Expand All @@ -352,6 +356,8 @@ def move_shared_items(self, src, dest):
max_entries = 50
if not self.is_connected():
raise ZkConnectionLoss
src = os.path.join(self._prefix, src)
dest = os.path.join(self._prefix, dest)
try:
src_entries = self._zk.get_children(src)
dest_entries = self._zk.get_children(dest)
Expand Down