diff --git a/python/vineyard/core/client.py b/python/vineyard/core/client.py index 90a7c410..63686988 100644 --- a/python/vineyard/core/client.py +++ b/python/vineyard/core/client.py @@ -29,6 +29,7 @@ from vineyard._C import Blob from vineyard._C import BlobBuilder from vineyard._C import IPCClient +from vineyard._C import NotEnoughMemoryException from vineyard._C import Object from vineyard._C import ObjectID from vineyard._C import ObjectMeta @@ -613,6 +614,37 @@ def _locate_and_fetch(self, meta) -> Object: ) return remote_client.get_object(meta.id) + def _find_the_most_available_memory_instance(self) -> int: + """ + Find the vineyard instance with the most available memory. + + Returns: + int: The instance id with the most available memory. + if only have one instance, return -1 + """ + cluster_info = self.default_client().meta + instance_id_with_most_available_memory = -1 + available_memory = float('-inf') + + for instance_id, status in cluster_info.items(): + # skip the current vineyard instance + # check if both the ipc_socket and rpc_endpoint are the same + # avoid the case the same ipc_socket in different machines(rpc_endpoint) + if ( + status['ipc_socket'] == self.ipc_socket + and status['rpc_endpoint'] == self.rpc_endpoint + ): + continue + host, port = status['rpc_endpoint'].split(':') + new_client = _connect(host, port) + current_available_memory = ( + new_client.status.memory_limit - new_client.status.memory_usage + ) + if current_available_memory > available_memory: + instance_id_with_most_available_memory = instance_id + available_memory = current_available_memory + return instance_id_with_most_available_memory + @_apply_docstring(get) def get( self, @@ -633,7 +665,26 @@ def put( name: Optional[str] = None, **kwargs, ): - return put(self, value, builder, persist, name, **kwargs) + try: + return put(self, value, builder, persist, name, **kwargs) + except NotEnoughMemoryException as exec: + instance_id = self._find_the_most_available_memory_instance() + if instance_id == -1: + warnings.warn("No other vineyard instance available") + raise exec + else: + meta = self.default_client().meta + warnings.warn( + f"Put object to the vineyard instance {instance_id}" + "with the most available memory." + ) + # connect to the instance with the most available memory + self._ipc_client = None + if os.path.exists(meta[instance_id]['ipc_socket']): + self._ipc_client = _connect(meta[instance_id]['ipc_socket']) + host, port = meta[instance_id]['rpc_endpoint'].split(':') + self._rpc_client = _connect(host, port) + return put(self, value, builder, persist, name, **kwargs) @contextlib.contextmanager def with_compression(self, enabled: bool = True): diff --git a/python/vineyard/deploy/tests/test_distributed.py b/python/vineyard/deploy/tests/test_distributed.py index aa581cd1..64dd6fa2 100644 --- a/python/vineyard/deploy/tests/test_distributed.py +++ b/python/vineyard/deploy/tests/test_distributed.py @@ -644,3 +644,23 @@ def test_get_and_put_with_different_vineyard_instances( pd.testing.assert_series_equal(value, v) else: assert value == v + + +def test_connected_vineyardd_out_of_memory(vineyard_ipc_sockets): + client = vineyard.connect(vineyard_ipc_sockets[0]) + + # generate 10 * 1024 * 1024 * 8 bytes = 80 MB data + data = np.ones((10, 1024, 1024)) + + # put data until the connected vineyardd's memory is full + while ( + client.status.memory_limit - client.status.memory_usage > 10 * 1024 * 1024 * 8 + ): + o1 = client.put(data) + + # test whether the client can put data to other vineyardd instances + o2 = client.put(data) + + if o1: + client.delete(o1) + client.delete(o2) diff --git a/python/vineyard/deploy/tests/test_migration.py b/python/vineyard/deploy/tests/test_migration.py index 6caab104..5078ba12 100644 --- a/python/vineyard/deploy/tests/test_migration.py +++ b/python/vineyard/deploy/tests/test_migration.py @@ -256,6 +256,8 @@ def test_migration_large_object( client1 = vineyard.connect(vineyard_ipc_sockets[0]) client2 = vineyard.connect(vineyard_ipc_sockets[1]) + client1.clear() + client2.clear() data1 = np.ones((1024, 102400)) o1 = client1.put(data1) client1.persist(o1)