Skip to content

Commit

Permalink
use uid instead of ip address as key to hash algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
Zyiqin-Miranda committed Aug 8, 2023
1 parent f0ee3e1 commit 571f800
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
33 changes: 17 additions & 16 deletions deltacat/io/memcached_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,28 @@ def initialize_hasher(self):
self.hasher.add_node(n)

def put_many(self, objects: List[object], *args, **kwargs) -> List[Any]:
input = {}
input = defaultdict(dict)
result = []
current_ip = self._get_current_ip()
if self.storage_node_ips:
create_ref_ip = self._get_storage_node_ip(current_ip)
else:
create_ref_ip = current_ip
for obj in objects:
serialized = cloudpickle.dumps(obj)
uid = uuid.uuid4()
create_ref_ip = self._get_create_ref_ip(current_ip, uid.__str__())
ref = self._create_ref(uid, create_ref_ip)
input[uid.__str__()] = serialized
input[create_ref_ip][uid.__str__()] = serialized
result.append(ref)

client = self._get_client_by_ip(create_ref_ip)
if client.set_many(input, noreply=False):
raise RuntimeError("Unable to write few keys to cache")
for create_ref_ip, uid_to_object in input.items():
client = self._get_client_by_ip(create_ref_ip)
if client.set_many(uid_to_object, noreply=False):
raise RuntimeError("Unable to write few keys to cache")

return result

def put(self, obj: object, *args, **kwargs) -> Any:
serialized = cloudpickle.dumps(obj)
uid = uuid.uuid4()
current_ip = self._get_current_ip()
if self.storage_node_ips:
create_ref_ip = self._get_storage_node_ip(current_ip)
else:
create_ref_ip = current_ip
create_ref_ip = self._get_create_ref_ip(current_ip, uid.__str__())
ref = self._create_ref(uid, create_ref_ip)
client = self._get_client_by_ip(create_ref_ip)

Expand Down Expand Up @@ -118,11 +112,18 @@ def get(self, ref: Any, *args, **kwargs) -> object:
def _create_ref(self, uid, ip) -> str:
return f"{uid}{self.SEPARATOR}{ip}"

def _get_storage_node_ip(self, ip_address: str):
def _get_storage_node_ip(self, key: str):
self.initialize_hasher()
storage_node_ip = self.hasher.get_node(ip_address)
storage_node_ip = self.hasher.get_node(key)
return storage_node_ip

def _get_create_ref_ip(self, current_ip, uid):
if self.storage_node_ips:
create_ref_ip = self._get_storage_node_ip(uid)
else:
create_ref_ip = current_ip
return create_ref_ip

def _get_client_by_ip(self, ip_address: str):
if ip_address in self.client_cache:
return self.client_cache[ip_address]
Expand Down
5 changes: 2 additions & 3 deletions deltacat/tests/io/test_memcached_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ def test_put_many_sanity(
mock_retrying_client.return_value = mock_client.return_value
mock_client.return_value.set_many.return_value = []

result = self.object_store.put_many(["a", "b"])
result = self.object_store.put_many(["a", "b", "c"])

self.assertEqual(2, len(result))
self.assertEqual(3, len(result))
self.assertRegex(result[0], ".*_.*")
self.assertEqual(1, mock_client.return_value.set_many.call_count)

@mock.patch("deltacat.io.memcached_object_store.Client")
@mock.patch("deltacat.io.memcached_object_store.RetryingClient")
Expand Down

0 comments on commit 571f800

Please sign in to comment.