-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add hashed memcached client support #173
Merged
Zyiqin-Miranda
merged 6 commits into
ray-project:phash_main
from
Zyiqin-Miranda:tracking_phash_main_zyiqin_memcached
Aug 9, 2023
Merged
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
91d2637
Adding support for reading table into ParquetFile (#163)
raghumdani fb9c70b
Add hashed memcached client support
Zyiqin-Miranda 02e0752
Adding support for reading table into ParquetFile (#163)
raghumdani 6b9f9a8
Merge branch 'phash_main' of https://github.com/ray-project/deltacat …
Zyiqin-Miranda f0ee3e1
Merge branch 'phash_main' of https://github.com/ray-project/deltacat …
Zyiqin-Miranda 1130bd7
use uid instead of ip address as key to hash algorithm
Zyiqin-Miranda File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,13 +3,14 @@ | |
from collections import defaultdict | ||
import time | ||
from deltacat.io.object_store import IObjectStore | ||
from typing import Any, List | ||
from typing import Any, List, Optional | ||
from deltacat import logs | ||
import uuid | ||
import socket | ||
from pymemcache.client.base import Client | ||
from pymemcache.client.retrying import RetryingClient | ||
from pymemcache.exceptions import MemcacheUnexpectedCloseError | ||
from pymemcache.client.rendezvous import RendezvousHash | ||
|
||
logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) | ||
|
||
|
@@ -19,25 +20,39 @@ class MemcachedObjectStore(IObjectStore): | |
An implementation of object store that uses Memcached. | ||
""" | ||
|
||
def __init__(self, port=11212) -> None: | ||
def __init__( | ||
self, storage_node_ips: Optional[List[str]] = None, port: Optional[int] = 11212 | ||
) -> None: | ||
self.client_cache = {} | ||
self.current_ip = None | ||
self.SEPARATOR = "_" | ||
self.port = port | ||
self.storage_node_ips = storage_node_ips | ||
self.hasher = None | ||
super().__init__() | ||
|
||
def initialize_hasher(self): | ||
if not self.hasher and self.storage_node_ips: | ||
self.hasher = RendezvousHash() | ||
for n in self.storage_node_ips: | ||
self.hasher.add_node(n) | ||
|
||
def put_many(self, objects: List[object], *args, **kwargs) -> List[Any]: | ||
input = {} | ||
result = [] | ||
current_ip = self._get_current_ip() | ||
if self.storage_node_ips: | ||
create_ref_ip = self._get_storage_node_ip(current_ip) | ||
else: | ||
create_ref_ip = current_ip | ||
for obj in objects: | ||
serialized = cloudpickle.dumps(obj) | ||
uid = uuid.uuid4() | ||
ref = self._create_ref(uid, current_ip) | ||
ref = self._create_ref(uid, create_ref_ip) | ||
input[uid.__str__()] = serialized | ||
result.append(ref) | ||
|
||
client = self._get_client_by_ip(current_ip) | ||
client = self._get_client_by_ip(create_ref_ip) | ||
if client.set_many(input, noreply=False): | ||
raise RuntimeError("Unable to write few keys to cache") | ||
|
||
|
@@ -47,8 +62,12 @@ def put(self, obj: object, *args, **kwargs) -> Any: | |
serialized = cloudpickle.dumps(obj) | ||
uid = uuid.uuid4() | ||
current_ip = self._get_current_ip() | ||
ref = self._create_ref(uid, current_ip) | ||
client = self._get_client_by_ip(current_ip) | ||
if self.storage_node_ips: | ||
create_ref_ip = self._get_storage_node_ip(current_ip) | ||
else: | ||
create_ref_ip = current_ip | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we move this to a single function so that we don't have to have this if statement in every method? |
||
ref = self._create_ref(uid, create_ref_ip) | ||
client = self._get_client_by_ip(create_ref_ip) | ||
|
||
if client.set(uid.__str__(), serialized): | ||
return ref | ||
|
@@ -99,6 +118,11 @@ def get(self, ref: Any, *args, **kwargs) -> object: | |
def _create_ref(self, uid, ip) -> str: | ||
return f"{uid}{self.SEPARATOR}{ip}" | ||
|
||
def _get_storage_node_ip(self, ip_address: str): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename argument to |
||
self.initialize_hasher() | ||
storage_node_ip = self.hasher.get_node(ip_address) | ||
raghumdani marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return storage_node_ip | ||
|
||
def _get_client_by_ip(self, ip_address: str): | ||
if ip_address in self.client_cache: | ||
return self.client_cache[ip_address] | ||
|
@@ -108,7 +132,7 @@ def _get_client_by_ip(self, ip_address: str): | |
base_client, | ||
attempts=3, | ||
retry_delay=0.01, | ||
retry_for=[MemcacheUnexpectedCloseError], | ||
retry_for=[MemcacheUnexpectedCloseError, ConnectionResetError], | ||
) | ||
|
||
self.client_cache[ip_address] = client | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call get_node with the key instead of current ip to ensure the uniform distribution?