Skip to content

Commit

Permalink
use uid instead of ip address as key to hash algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
Zyiqin-Miranda committed Aug 8, 2023
1 parent f0ee3e1 commit 66085cc
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions deltacat/io/memcached_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,27 @@ def initialize_hasher(self):

def put_many(self, objects: List[object], *args, **kwargs) -> List[Any]:
input = {}
result = []
result = defaultdict(dict)
current_ip = self._get_current_ip()
if self.storage_node_ips:
create_ref_ip = self._get_storage_node_ip(current_ip)
else:
create_ref_ip = current_ip
for obj in objects:
serialized = cloudpickle.dumps(obj)
uid = uuid.uuid4()
create_ref_ip = self._get_create_ref_ip(current_ip, uid)
ref = self._create_ref(uid, create_ref_ip)
input[uid.__str__()] = serialized
input[create_ref_ip][uid.__str__()] = serialized
result.append(ref)

client = self._get_client_by_ip(create_ref_ip)
if client.set_many(input, noreply=False):
raise RuntimeError("Unable to write few keys to cache")
for create_ref_ip, uid_to_object in input.items():
client = self._get_client_by_ip(create_ref_ip)
if client.set_many(uid_to_object, noreply=False):
raise RuntimeError("Unable to write few keys to cache")

return result

def put(self, obj: object, *args, **kwargs) -> Any:
serialized = cloudpickle.dumps(obj)
uid = uuid.uuid4()
current_ip = self._get_current_ip()
if self.storage_node_ips:
create_ref_ip = self._get_storage_node_ip(current_ip)
else:
create_ref_ip = current_ip
create_ref_ip = self._get_create_ref_ip(current_ip, uid)
ref = self._create_ref(uid, create_ref_ip)
client = self._get_client_by_ip(create_ref_ip)

Expand Down Expand Up @@ -123,6 +117,13 @@ def _get_storage_node_ip(self, ip_address: str):
storage_node_ip = self.hasher.get_node(ip_address)
return storage_node_ip

def _get_create_ref_ip(self, current_ip, uid):
if self.storage_node_ips:
create_ref_ip = self._get_storage_node_ip(uid)
else:
create_ref_ip = current_ip
return create_ref_ip

def _get_client_by_ip(self, ip_address: str):
if ip_address in self.client_cache:
return self.client_cache[ip_address]
Expand Down

0 comments on commit 66085cc

Please sign in to comment.