diff --git a/infrastructure/beacon_server.py b/infrastructure/beacon_server.py index 787d352f1c..74e7de25ad 100644 --- a/infrastructure/beacon_server.py +++ b/infrastructure/beacon_server.py @@ -66,9 +66,9 @@ class BeaconServer(SCIONElement): # Timeout for TRC or Certificate requests. REQUESTS_TIMEOUT = 10 # ZK path for incoming PCBs - ZK_INCOMING_PATH = "/incoming" + ZK_INCOMING_PATH = "incoming" # ZK path for recent PCBs - ZK_RECENT_PATH = "/recent" + ZK_RECENT_PATH = "recent" def __init__(self, addr, topo_file, config_file, path_policy_file): SCIONElement.__init__(self, addr, topo_file, config_file=config_file) diff --git a/lib/zookeeper.py b/lib/zookeeper.py index ec174dd188..6908de60d6 100644 --- a/lib/zookeeper.py +++ b/lib/zookeeper.py @@ -16,6 +16,7 @@ ======================================================================== """ +import os.path import logging import threading import time @@ -84,10 +85,8 @@ def __init__(self, isd_id, ad_id, srv_name, srv_id, self._on_disconnect = on_disconnect self._ensure_paths = ensure_paths - self._chroot = "ISD%d-AD%d/%s" % (self._isd_id, self._ad_id, self._srv_name) - self._zk = KazooClient(hosts="%s/%s" % - (",".join(zk_hosts), self._chroot), - timeout=self._timeout) + self._prefix = "/ISD%d-AD%d/%s" % (self._isd_id, self._ad_id, self._srv_name) + self._zk = KazooClient(hosts=",".join(zk_hosts), timeout=self._timeout) # Stop kazoo from drowning the log with debug spam: self._zk.logger.setLevel(logging.ERROR) # FIXME(kormat): remove once stable: @@ -157,8 +156,9 @@ def _state_connected(self): # Might be first connection, or reconnecting after a problem. logging.debug("Connection to Zookeeper succeeded") try: + self._zk.ensure_path(self._prefix) for path in self._ensure_paths: - self._zk.ensure_path(path) + self._zk.ensure_path(os.path.join(self._prefix, path)) except (ConnectionLoss, SessionExpiredError): return self._connected.set() @@ -218,7 +218,8 @@ def join_party(self): raise ZkConnectionLoss if self._party is None: # Initialise the service party - self._party = self._zk.Party("/party", self._srv_id) + party_path = os.path.join(self._prefix, "party") + self._party = self._zk.Party(party_path, self._srv_id) try: self._party.join() except ConnectionLoss: @@ -230,13 +231,14 @@ def watch_children(self, path, func): Register a callback function to be called when a path's children change. This watch does not persist across disconnections. - :param str path: The absolute directory of the path. + :param str path: The path to watch. :param function func: The function to call. :raises: ZkConnectionLoss: if the connection to ZK drops. """ if not self.is_connected(): raise ZkConnectionLoss + path = os.path.join(self._prefix, path) try: self._zk.exists(path) self._zk.ChildrenWatch(path, func=func, allow_session_lost=False) @@ -258,7 +260,9 @@ def get_lock(self, timeout=60.0): if self._lock.is_set(): # We already have the lock return True - self._zk_lock = self._zk.Lock("/lock", self._srv_id) + if self._zk_lock is None: + lock_path = os.path.join(self._prefix, "lock") + self._zk_lock = self._zk.Lock(lock_path, self._srv_id) try: if self._zk_lock.acquire(timeout=timeout): self._lock.set() @@ -282,8 +286,7 @@ def store_shared_item(self, path, name, value): """ Store an item in a shared path. - :param str path: The absolute directory path to store the item in. E.g. - ``"/shared"`` + :param str path: The path to store the item in. E.g. ``"shared"`` :param str name: A prefix for the item entry. E.g. ``"pcb"`` :param bytes value: The value to store in the item. :raises: @@ -291,6 +294,7 @@ def store_shared_item(self, path, name, value): """ if not self.is_connected(): raise ZkConnectionLoss + path = os.path.join(self._prefix, path) try: self._zk.create("%s/%s" % (path, name), value, sequence=True) except (ConnectionLoss, SessionExpiredError): @@ -300,8 +304,7 @@ def get_shared_item(self, path, entry): """ Retrieve a specific item from a shared path. - :param str path: The absolute directory path the item is stored in. E.g. - ``"/shared"`` + :param str path: The path the item is stored in. E.g. ``"shared"`` :param str entry: The name of the entry. E.g. ``"pcb0000002046"`` :return: The value of the item :rtype: :class:`bytes` @@ -311,8 +314,9 @@ def get_shared_item(self, path, entry): """ if not self.is_connected(): raise ZkConnectionLoss + entry_path = os.path.join(self._prefix, path, entry) try: - data, _ = self._zk.get("%s/%s" % (path, entry)) + data, _ = self._zk.get(entry_path) except NoNodeError: raise ZkNoNodeError except (ConnectionLoss, SessionExpiredError): @@ -323,8 +327,7 @@ def get_shared_entries(self, path): """ List the items in a shared path. - :param str path: The absolute directory path the items are stored in. E.g. - ``"/shared"`` + :param str path: The path the items are stored in. E.g. ``"shared"`` :return: The value of the item, if successfully retrieved, otherwise ``None`` :rtype: :class:`bytes` or ``None`` :raises: @@ -332,6 +335,7 @@ def get_shared_entries(self, path): """ if not self.is_connected(): return [] + path = os.path.join(self._prefix, path) try: entries = self._zk.get_children(path) except ConnectionLoss: @@ -342,8 +346,8 @@ def move_shared_items(self, src, dest): """ Move items from one shared path to another - :param str src: The absolute directory of the source - :param str dest: The absolute directory path of the destination + :param str src: The path of the source + :param str dest: The path of the destination :raises: ZkConnectionLoss: if the connection to ZK drops """ @@ -352,6 +356,8 @@ def move_shared_items(self, src, dest): max_entries = 50 if not self.is_connected(): raise ZkConnectionLoss + src = os.path.join(self._prefix, src) + dest = os.path.join(self._prefix, dest) try: src_entries = self._zk.get_children(src) dest_entries = self._zk.get_children(dest)