diff --git a/python/core.cc b/python/core.cc index 70ef4af5..34e1af10 100644 --- a/python/core.cc +++ b/python/core.cc @@ -605,7 +605,7 @@ void bind_blobs(py::module& mod) { }); // RemoteBlob - py::class_>( + py::class_, Object>( mod, "RemoteBlob", py::buffer_protocol(), doc::RemoteBlob) .def_property_readonly( "id", [](RemoteBlob* self) -> ObjectIDWrapper { return self->id(); }, diff --git a/python/vineyard/core/resolver.py b/python/vineyard/core/resolver.py index 0fa77b12..6307c0a7 100644 --- a/python/vineyard/core/resolver.py +++ b/python/vineyard/core/resolver.py @@ -230,12 +230,11 @@ def get( object_id = client.get_name(name) # run resolver - obj = client.get_object(object_id, fetch=fetch) - meta = obj.meta - if not meta.islocal and not meta.isglobal: - raise ValueError( - "Not a local object: for remote object, you can only get its metadata" - ) + if isinstance(client, RPCClient): + obj = client.get_object(object_id) + else: + obj = client.get_object(object_id, fetch=fetch) + if resolver is None: resolver = get_current_resolvers() return resolver(obj, __vineyard_client=client, **kw) diff --git a/python/vineyard/core/tests/test_rpc_client.py b/python/vineyard/core/tests/test_rpc_client.py index dd0996dc..dd4d2e4a 100644 --- a/python/vineyard/core/tests/test_rpc_client.py +++ b/python/vineyard/core/tests/test_rpc_client.py @@ -115,15 +115,6 @@ def test_remote_blob_create_and_get_large_object(vineyard_endpoint): assert memoryview(remote_blob) == memoryview(large_payload) -def test_remote_blob_error(vineyard_endpoint): - vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':')) - - with pytest.raises( - ValueError, match="Vineyard RPC client cannot be used to create local blobs" - ): - vineyard_rpc_client.put(np.ones((2, 3, 4))) - - def test_multiple_remote_blobs(vineyard_endpoint): vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':')) diff --git a/python/vineyard/data/arrow.py b/python/vineyard/data/arrow.py index b0ad65c8..90005c76 100644 --- a/python/vineyard/data/arrow.py +++ b/python/vineyard/data/arrow.py @@ -31,16 +31,16 @@ from vineyard._C import Blob from vineyard._C import IPCClient from vineyard._C import Object +from vineyard._C import ObjectID from vineyard._C import ObjectMeta +from vineyard._C import RemoteBlob from vineyard.core.builder import BuilderContext from vineyard.core.resolver import ResolverContext from vineyard.data.utils import build_buffer from vineyard.data.utils import normalize_dtype -def buffer_builder( - client: IPCClient, buffer: Union[bytes, memoryview], builder: BuilderContext -): +def buffer_builder(client, buffer: Union[bytes, memoryview], builder: BuilderContext): if buffer is None: address = None size = 0 @@ -51,7 +51,16 @@ def buffer_builder( def as_arrow_buffer(blob: Blob): - buffer = blob.buffer + if isinstance(blob, Blob): + buffer = blob.buffer + else: + if ( + blob.id == ObjectID("o8000000000000000") + or (int(blob.id) & int(ObjectID("o8000000000000000"))) == 0 + ): + buffer = b'' + else: + buffer = memoryview(blob) if buffer is None: return pa.py_buffer(bytearray()) return pa.py_buffer(buffer) diff --git a/python/vineyard/data/base.py b/python/vineyard/data/base.py index d4e5ea35..3b3f5993 100644 --- a/python/vineyard/data/base.py +++ b/python/vineyard/data/base.py @@ -130,6 +130,7 @@ def register_base_types( if resolver_ctx is not None: resolver_ctx.register('vineyard::Blob', bytes_resolver) + resolver_ctx.register('vineyard::RemoteBlob', bytes_resolver) resolver_ctx.register('vineyard::Scalar', scalar_resolver) resolver_ctx.register('vineyard::Array', array_resolver) resolver_ctx.register('vineyard::Sequence', sequence_resolver) diff --git a/python/vineyard/data/tests/test_arrow.py b/python/vineyard/data/tests/test_arrow.py index 00469846..4f3a7318 100644 --- a/python/vineyard/data/tests/test_arrow.py +++ b/python/vineyard/data/tests/test_arrow.py @@ -66,13 +66,27 @@ def test_arrow_array(vineyard_client): assert vineyard_client.get(object_id).values.equals(nested_arr.values) -def test_record_batch(vineyard_client): +def test_arrow_array_with_rpc_client(vineyard_rpc_client): + test_arrow_array(vineyard_rpc_client) + + +def test_record_batch_with_rpc_client(vineyard_rpc_client): + test_record_batch(vineyard_rpc_client) + + +@pytest.mark.skip +def build_record_batch(): arrays = [ pa.array([1, 2, 3, 4]), pa.array(['foo', 'bar', 'baz', None]), pa.array([True, None, False, True]), ] batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'f2']) + return batch + + +def test_record_batch(vineyard_client): + batch = build_record_batch() _object_id = vineyard_client.put(batch) # noqa: F841 # processing tables that contains string is not roundtrip, as StringArray # will be transformed to LargeStringArray @@ -80,7 +94,12 @@ def test_record_batch(vineyard_client): # assert batch.equals(vineyard_client.get(object_id)) -def test_table(vineyard_client): +def test_table_with_rpc_client(vineyard_rpc_client): + test_table(vineyard_rpc_client) + + +@pytest.mark.skip +def build_tabel(): arrays = [ pa.array([1, 2, 3, 4]), pa.array(['foo', 'bar', 'baz', None]), @@ -89,6 +108,11 @@ def test_table(vineyard_client): batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'f2']) batches = [batch] * 5 table = pa.Table.from_batches(batches) + return table + + +def test_table(vineyard_client): + table = build_tabel() _object_id = vineyard_client.put(table) # noqa: F841 # processing tables that contains string is not roundtrip, as StringArray # will be transformed to LargeStringArray @@ -96,6 +120,11 @@ def test_table(vineyard_client): # assert table.equals(vineyard_client.get(object_id)) +@pytest.mark.skipif(polars is None, reason='polars is not installed') +def test_polars_dataframe_with_rpc_client(vineyard_rpc_client): + test_polars_dataframe(vineyard_rpc_client) + + @pytest.mark.skipif(polars is None, reason='polars is not installed') def test_polars_dataframe(vineyard_client): arrays = [ @@ -112,3 +141,20 @@ def test_polars_dataframe(vineyard_client): # will be transformed to LargeStringArray # # assert table.equals(vineyard_client.get(object_id)) + + +@pytest.mark.parametrize( + "value", + [ + pa.array([1, 2, None, 3]), + pa.array(["a", None, None, None]), + pa.array([True, False, True, False]), + build_record_batch(), + build_tabel(), + ], +) +def test_with_ipc_and_rpc(value, vineyard_client, vineyard_rpc_client): + object_id = vineyard_client.put(value) + assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) + object_id = vineyard_rpc_client.put(value) + assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) diff --git a/python/vineyard/data/tests/test_base.py b/python/vineyard/data/tests/test_base.py index ea52b13b..bdef2f57 100644 --- a/python/vineyard/data/tests/test_base.py +++ b/python/vineyard/data/tests/test_base.py @@ -25,38 +25,66 @@ register_builtin_types(default_builder_context, default_resolver_context) +def test_int_with_rpc_client(vineyard_rpc_client): + test_int(vineyard_rpc_client) + + def test_int(vineyard_client): object_id = vineyard_client.put(1) assert vineyard_client.get(object_id) == 1 +def test_double_with_rpc_client(vineyard_rpc_client): + test_double(vineyard_rpc_client) + + def test_double(vineyard_client): object_id = vineyard_client.put(1.234) assert vineyard_client.get(object_id) == pytest.approx(1.234) +def test_string_with_rpc_client(vineyard_rpc_client): + test_string(vineyard_rpc_client) + + def test_string(vineyard_client): object_id = vineyard_client.put('abcde') assert vineyard_client.get(object_id) == 'abcde' +def test_bytes_with_rpc_client(vineyard_rpc_client): + test_bytes(vineyard_rpc_client) + + def test_bytes(vineyard_client): bs = b'abcde' object_id = vineyard_client.put(bs) assert vineyard_client.get(object_id) == memoryview(bs) +def test_memoryview_with_rpc_client(vineyard_rpc_client): + test_memoryview(vineyard_rpc_client) + + def test_memoryview(vineyard_client): bs = memoryview(b'abcde') object_id = vineyard_client.put(bs) assert vineyard_client.get(object_id) == bs +def test_pair_with_rpc_client(vineyard_rpc_client): + test_pair(vineyard_rpc_client) + + def test_pair(vineyard_client): object_id = vineyard_client.put((1, "2")) assert vineyard_client.get(object_id) == (1, "2") +def test_tuple_with_rpc_client(vineyard_rpc_client): + test_tuple(vineyard_rpc_client) + + def test_tuple(vineyard_client): object_id = vineyard_client.put(()) assert vineyard_client.get(object_id) == () @@ -81,3 +109,13 @@ def test_tuple(vineyard_client): 4444, "5.5.5.5.5.5.5", ) + + +@pytest.mark.parametrize( + "value", [1, 1.234, 'abcd', b'abcde', memoryview(b'abcde'), (1, "2")] +) +def test_with_ipc_and_rpc(value, vineyard_client, vineyard_rpc_client): + object_id = vineyard_client.put(value) + assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) + object_id = vineyard_rpc_client.put(value) + assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) diff --git a/python/vineyard/data/tests/test_dataframe.py b/python/vineyard/data/tests/test_dataframe.py index 722012fa..acf6ae82 100644 --- a/python/vineyard/data/tests/test_dataframe.py +++ b/python/vineyard/data/tests/test_dataframe.py @@ -19,6 +19,8 @@ import numpy as np import pandas as pd +import pytest + from vineyard.core import default_builder_context from vineyard.core import default_resolver_context from vineyard.data import register_builtin_types @@ -27,12 +29,20 @@ register_builtin_types(default_builder_context, default_resolver_context) +def test_pandas_dataframe_with_rpc_client(vineyard_rpc_client): + test_pandas_dataframe(vineyard_rpc_client) + + def test_pandas_dataframe(vineyard_client): df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8]}) object_id = vineyard_client.put(df) pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) +def test_pandas_dataframe_string_with_rpc_client(vineyard_rpc_client): + test_pandas_dataframe_string(vineyard_rpc_client) + + def test_pandas_dataframe_string(vineyard_client): # see gh#533 df = pd.DataFrame({'a': ['1', '2', '3', '4'], 'b': ['5', '6', '7', '8']}) @@ -40,12 +50,8 @@ def test_pandas_dataframe_string(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -def test_pandas_dataframe_empty(vineyard_client): - # see gh#533 - df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': ['5', '6', '7', '8']}) - df = df.iloc[0:0] - object_id = vineyard_client.put(df) - pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) +def test_pandas_dataframe_complex_columns_with_rpc_client(vineyard_rpc_client): + test_pandas_dataframe_complex_columns(vineyard_rpc_client) def test_pandas_dataframe_complex_columns(vineyard_client): @@ -55,12 +61,20 @@ def test_pandas_dataframe_complex_columns(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) +def test_pandas_dataframe_int_columns_with_rpc_client(vineyard_rpc_client): + test_pandas_dataframe_int_columns(vineyard_rpc_client) + + def test_pandas_dataframe_int_columns(vineyard_client): df = pd.DataFrame({1: [1, 2, 3, 4], 2: [5, 6, 7, 8]}) object_id = vineyard_client.put(df) pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) +def test_pandas_dataframe_mixed_columns_with_rpc_client(vineyard_rpc_client): + test_pandas_dataframe_mixed_columns(vineyard_rpc_client) + + def test_pandas_dataframe_mixed_columns(vineyard_client): df = pd.DataFrame( {'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 1: [9, 10, 11, 12], 2: [13, 14, 15, 16]} @@ -69,6 +83,10 @@ def test_pandas_dataframe_mixed_columns(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) +def test_dataframe_reindex_with_rpc_client(vineyard_rpc_client): + test_dataframe_reindex(vineyard_rpc_client) + + def test_dataframe_reindex(vineyard_client): df = pd.DataFrame(np.random.rand(10, 5), columns=['c1', 'c2', 'c3', 'c4', 'c5']) expected = df.reindex(index=np.arange(10, 1, -1)) @@ -76,6 +94,10 @@ def test_dataframe_reindex(vineyard_client): pd.testing.assert_frame_equal(expected, vineyard_client.get(object_id)) +def test_dataframe_set_index_with_rpc_client(vineyard_rpc_client): + test_dataframe_set_index(vineyard_rpc_client) + + def test_dataframe_set_index(vineyard_client): df1 = pd.DataFrame( [[1, 3, 3], [4, 2, 6], [7, 8, 9]], @@ -87,6 +109,10 @@ def test_dataframe_set_index(vineyard_client): pd.testing.assert_frame_equal(expected, vineyard_client.get(object_id)) +def test_sparse_array_with_rpc_client(vineyard_rpc_client): + test_sparse_array(vineyard_rpc_client) + + def test_sparse_array(vineyard_client): arr = np.random.randn(10) arr[2:5] = np.nan @@ -96,6 +122,10 @@ def test_sparse_array(vineyard_client): pd.testing.assert_extension_array_equal(sparr, vineyard_client.get(object_id)) +def test_dataframe_with_sparse_array_with_rpc_client(vineyard_rpc_client): + test_dataframe_with_sparse_array(vineyard_rpc_client) + + def test_dataframe_with_sparse_array(vineyard_client): df = pd.DataFrame(np.random.randn(100, 4), columns=['x', 'y', 'z', 'a']) df.iloc[:98] = np.nan @@ -104,6 +134,10 @@ def test_dataframe_with_sparse_array(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) +def test_dataframe_with_sparse_array_int_columns_with_rpc_client(vineyard_rpc_client): + test_dataframe_with_sparse_array_int_columns(vineyard_rpc_client) + + def test_dataframe_with_sparse_array_int_columns(vineyard_client): df = pd.DataFrame(np.random.randn(100, 4), columns=[1, 2, 3, 4]) df.iloc[:98] = np.nan @@ -112,6 +146,10 @@ def test_dataframe_with_sparse_array_int_columns(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) +def test_dataframe_with_sparse_array_mixed_columns_with_rpc_client(vineyard_rpc_client): + test_dataframe_with_sparse_array_mixed_columns(vineyard_rpc_client) + + def test_dataframe_with_sparse_array_mixed_columns(vineyard_client): df = pd.DataFrame(np.random.randn(100, 4), columns=['x', 'y', 'z', 0]) df.iloc[:98] = np.nan @@ -120,6 +158,10 @@ def test_dataframe_with_sparse_array_mixed_columns(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) +def test_dataframe_with_datetime_with_rpc_client(vineyard_rpc_client): + test_dataframe_with_datetime(vineyard_rpc_client) + + def test_dataframe_with_datetime(vineyard_client): # GH-575 dates = [ @@ -133,6 +175,10 @@ def test_dataframe_with_datetime(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) +def test_dataframe_with_multidimensional_with_rpc_client(vineyard_rpc_client): + test_dataframe_with_multidimensional(vineyard_rpc_client) + + def test_dataframe_with_multidimensional(vineyard_client): df = pd.DataFrame( { @@ -165,3 +211,56 @@ def test_dataframe_reusing(vineyard_client): meta1['__values_-value-0']['buffer_'].id == meta2['__values_-value-0']['buffer_'].id ) + + +@pytest.mark.parametrize( + "value", + [ + pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8]}), + pd.DataFrame({'a': ['1', '2', '3', '4'], 'b': ['5', '6', '7', '8']}), + pd.DataFrame([1, 2, 3, 4], columns=[['x']]), + pd.DataFrame({1: [1, 2, 3, 4], 2: [5, 6, 7, 8]}), + pd.DataFrame( + { + 'a': [1, 2, 3, 4], + 'b': [5, 6, 7, 8], + 1: [9, 10, 11, 12], + 2: [13, 14, 15, 16], + } + ), + pd.DataFrame(np.random.rand(10, 5), columns=['c1', 'c2', 'c3', 'c4', 'c5']), + pd.DataFrame( + [[1, 3, 3], [4, 2, 6], [7, 8, 9]], + index=['a1', 'a2', 'a3'], + columns=['x', 'y', 'z'], + ), + pd.arrays.SparseArray(np.random.randn(10)), + pd.DataFrame(np.random.randn(100, 4), columns=['x', 'y', 'z', 'a']).astype( + pd.SparseDtype("float", np.nan) + ), + pd.DataFrame(np.random.randn(100, 4), columns=[1, 2, 3, 4]).astype( + pd.SparseDtype("float", np.nan) + ), + pd.DataFrame(np.random.randn(100, 4), columns=['x', 'y', 'z', 0]).astype( + pd.SparseDtype("float", np.nan) + ), + pd.DataFrame( + pd.Series( + [ + pd.Timestamp("2012-05-01"), + pd.Timestamp("2012-05-02"), + pd.Timestamp("2012-05-03"), + ] + ) + ), + ], +) +def test_with_ipc_and_rpc(value, vineyard_client, vineyard_rpc_client): + object_id = vineyard_client.put(value) + df1 = vineyard_client.get(object_id) + df2 = vineyard_rpc_client.get(object_id) + assert df1.equals(df2) + object_id = vineyard_rpc_client.put(value) + df1 = vineyard_client.get(object_id) + df2 = vineyard_rpc_client.get(object_id) + assert df1.equals(df2) diff --git a/python/vineyard/data/tests/test_default.py b/python/vineyard/data/tests/test_default.py index c579c50a..1fad314a 100644 --- a/python/vineyard/data/tests/test_default.py +++ b/python/vineyard/data/tests/test_default.py @@ -18,6 +18,8 @@ import numpy as np +import pytest + from vineyard.core import default_builder_context from vineyard.core import default_resolver_context from vineyard.data import register_builtin_types @@ -25,6 +27,10 @@ register_builtin_types(default_builder_context, default_resolver_context) +def test_bool_with_rpc_client(vineyard_rpc_client): + test_bool(vineyard_rpc_client) + + def test_bool(vineyard_client): value = True object_id = vineyard_client.put(value) @@ -35,6 +41,10 @@ def test_bool(vineyard_client): assert vineyard_client.get(object_id) == value +def test_np_bool_with_rpc_client(vineyard_rpc_client): + test_np_bool(vineyard_rpc_client) + + def test_np_bool(vineyard_client): value = np.bool_(True) object_id = vineyard_client.put(value) @@ -45,13 +55,40 @@ def test_np_bool(vineyard_client): assert vineyard_client.get(object_id) == value +def test_list_with_rpc_client(vineyard_rpc_client): + test_list(vineyard_rpc_client) + + def test_list(vineyard_client): value = [1, 2, 3, 4, 5, 6, None, None, 9] object_id = vineyard_client.put(value) assert vineyard_client.get(object_id) == tuple(value) +def test_dict_with_rpc_client(vineyard_rpc_client): + test_dict(vineyard_rpc_client) + + def test_dict(vineyard_client): value = {1: 2, 3: 4, 5: None, None: 6} object_id = vineyard_client.put(value) assert vineyard_client.get(object_id) == value + + +@pytest.mark.parametrize( + "value", + [ + True, + False, + np.bool_(True), + np.bool_(False), + [1, 2, 3, 4, 5, 6, None, None, 9], + {1: 2, 3: 4, 5: None, None: 6}, + ], +) +def test_ipc_and_rpc(value, vineyard_client, vineyard_rpc_client): + object_id = vineyard_client.put(value) + assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) + + object_id = vineyard_rpc_client.put(value) + assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) diff --git a/python/vineyard/data/tests/test_pickle.py b/python/vineyard/data/tests/test_pickle.py index c07eff21..1e214aac 100644 --- a/python/vineyard/data/tests/test_pickle.py +++ b/python/vineyard/data/tests/test_pickle.py @@ -131,6 +131,10 @@ def test_bytes_io_roundtrip(block_size, value): assert target == value +def test_bytes_io_numpy_ndarray_with_rpc_client(vineyard_rpc_client): + test_bytes_io_numpy_ndarray(vineyard_rpc_client) + + def test_bytes_io_numpy_ndarray(vineyard_client): arr = np.random.rand(4, 5, 6) object_id = vineyard_client.put(arr) @@ -138,6 +142,10 @@ def test_bytes_io_numpy_ndarray(vineyard_client): np.testing.assert_allclose(arr, target) +def test_bytes_io_empty_ndarray_with_rpc_client(vineyard_rpc_client): + test_bytes_io_empty_ndarray(vineyard_rpc_client) + + def test_bytes_io_empty_ndarray(vineyard_client): arr = np.ones(()) object_id = vineyard_client.put(arr) @@ -180,6 +188,10 @@ def test_bytes_io_empty_ndarray(vineyard_client): np.testing.assert_allclose(arr, target) +def test_bytes_io_str_ndarray_with_rpc_client(vineyard_rpc_client): + test_bytes_io_str_ndarray(vineyard_rpc_client) + + def test_bytes_io_str_ndarray(vineyard_client): arr = np.array(['', 'x', 'yz', 'uvw']) object_id = vineyard_client.put(arr) @@ -187,6 +199,10 @@ def test_bytes_io_str_ndarray(vineyard_client): np.testing.assert_equal(arr, target) +def test_object_ndarray_with_rpc_client(vineyard_rpc_client): + test_object_ndarray(vineyard_rpc_client) + + def test_object_ndarray(vineyard_client): arr = np.array([1, 'x', 3.14, (1, 4)], dtype=object) object_id = vineyard_client.put(arr) @@ -199,6 +215,10 @@ def test_object_ndarray(vineyard_client): np.testing.assert_equal(arr, target) +def test_bytes_io_tensor_order_with_rpc_client(vineyard_rpc_client): + test_bytes_io_tensor_order(vineyard_rpc_client) + + def test_bytes_io_tensor_order(vineyard_client): arr = np.asfortranarray(np.random.rand(10, 7)) object_id = vineyard_client.put(arr) @@ -207,6 +227,10 @@ def test_bytes_io_tensor_order(vineyard_client): assert res.flags['F_CONTIGUOUS'] == arr.flags['F_CONTIGUOUS'] +def test_bytes_io_pandas_dataframe_with_rpc_client(vineyard_rpc_client): + test_bytes_io_pandas_dataframe(vineyard_rpc_client) + + def test_bytes_io_pandas_dataframe(vineyard_client): df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8]}) object_id = vineyard_client.put(df) @@ -214,6 +238,10 @@ def test_bytes_io_pandas_dataframe(vineyard_client): pd.testing.assert_frame_equal(df, target) +def test_bytes_io_pandas_dataframe_int_columns_with_rpc_client(vineyard_rpc_client): + test_bytes_io_pandas_dataframe_int_columns(vineyard_rpc_client) + + def test_bytes_io_pandas_dataframe_int_columns(vineyard_client): df = pd.DataFrame({1: [1, 2, 3, 4], 2: [5, 6, 7, 8]}) object_id = vineyard_client.put(df) @@ -221,6 +249,10 @@ def test_bytes_io_pandas_dataframe_int_columns(vineyard_client): pd.testing.assert_frame_equal(df, target) +def test_bytes_io_pandas_dataframe_mixed_columns_with_rpc_client(vineyard_rpc_client): + test_bytes_io_pandas_dataframe_mixed_columns(vineyard_rpc_client) + + def test_bytes_io_pandas_dataframe_mixed_columns(vineyard_client): df = pd.DataFrame( {'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 1: [9, 10, 11, 12], 2: [13, 14, 15, 16]} @@ -230,8 +262,35 @@ def test_bytes_io_pandas_dataframe_mixed_columns(vineyard_client): pd.testing.assert_frame_equal(df, target) +def test_bytes_io_pandas_series_with_rpc_client(vineyard_rpc_client): + test_bytes_io_pandas_series(vineyard_rpc_client) + + def test_bytes_io_pandas_series(vineyard_client): s = pd.Series([1, 3, 5, np.nan, 6, 8], name='foo') object_id = vineyard_client.put(s) target = read_and_build(b1m, vineyard_client.get(object_id)) pd.testing.assert_series_equal(s, target) + + +@pytest.mark.parametrize( + "value", + [ + np.ones((0, 1, 2, 3)), + np.zeros((0, 1, 2, 3), dtype='int'), + np.array(['', 'x', 'ht', 'yyds']), + np.array([1, 'x', 3.14, (1, 4)], dtype=object), + np.ones((), dtype='object'), + np.asfortranarray(np.random.rand(10, 7)), + ], +) +def test_ipc_and_rpc(value, vineyard_client, vineyard_rpc_client): + object_id = vineyard_client.put(value) + v1 = vineyard_client.get(object_id) + v2 = vineyard_rpc_client.get(object_id) + assert np.array_equal(v1, v2) + + object_id = vineyard_rpc_client.put(value) + v1 = vineyard_client.get(object_id) + v2 = vineyard_rpc_client.get(object_id) + assert np.array_equal(v1, v2) diff --git a/python/vineyard/data/tests/test_tensor.py b/python/vineyard/data/tests/test_tensor.py index 7d48bd1a..54694224 100644 --- a/python/vineyard/data/tests/test_tensor.py +++ b/python/vineyard/data/tests/test_tensor.py @@ -33,12 +33,20 @@ register_builtin_types(default_builder_context, default_resolver_context) +def test_numpy_ndarray_with_rpc_client(vineyard_rpc_client): + test_numpy_ndarray(vineyard_rpc_client) + + def test_numpy_ndarray(vineyard_client): arr = np.random.rand(4, 5, 6) object_id = vineyard_client.put(arr) np.testing.assert_allclose(arr, vineyard_client.get(object_id)) +def test_empty_ndarray_with_rpc_client(vineyard_rpc_client): + test_empty_ndarray(vineyard_rpc_client) + + def test_empty_ndarray(vineyard_client): arr = np.ones(()) object_id = vineyard_client.put(arr) @@ -73,12 +81,20 @@ def test_empty_ndarray(vineyard_client): np.testing.assert_allclose(arr, vineyard_client.get(object_id)) +def test_str_ndarray_with_rpc_client(vineyard_rpc_client): + test_str_ndarray(vineyard_rpc_client) + + def test_str_ndarray(vineyard_client): arr = np.array(['', 'x', 'yz', 'uvw']) object_id = vineyard_client.put(arr) np.testing.assert_equal(arr, vineyard_client.get(object_id)) +def test_object_ndarray_with_rpc_client(vineyard_rpc_client): + test_object_ndarray(vineyard_rpc_client) + + def test_object_ndarray(vineyard_client): arr = np.array([1, 'x', 3.14, (1, 4)], dtype=object) object_id = vineyard_client.put(arr) @@ -89,6 +105,10 @@ def test_object_ndarray(vineyard_client): np.testing.assert_equal(arr, vineyard_client.get(object_id)) +def test_tensor_order_with_rpc_client(vineyard_rpc_client): + test_tensor_order(vineyard_rpc_client) + + def test_tensor_order(vineyard_client): arr = np.asfortranarray(np.random.rand(10, 7)) object_id = vineyard_client.put(arr) @@ -97,6 +117,11 @@ def test_tensor_order(vineyard_client): assert res.flags['F_CONTIGUOUS'] == arr.flags['F_CONTIGUOUS'] +@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") +def test_bsr_matrix_with_rpc_client(vineyard_rpc_client): + test_bsr_matrix(vineyard_rpc_client) + + @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_bsr_matrix(vineyard_client): arr = sp.sparse.bsr_matrix((3, 4), dtype=np.int8) @@ -104,6 +129,11 @@ def test_bsr_matrix(vineyard_client): np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) +@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") +def test_coo_matrix_with_rpc_client(vineyard_rpc_client): + test_coo_matrix(vineyard_rpc_client) + + @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_coo_matrix(vineyard_client): arr = sp.sparse.coo_matrix((3, 4), dtype=np.int8) @@ -111,6 +141,11 @@ def test_coo_matrix(vineyard_client): np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) +@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") +def test_csc_matrix_with_rpc_client(vineyard_rpc_client): + test_csc_matrix(vineyard_rpc_client) + + @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_csc_matrix(vineyard_client): arr = sp.sparse.csc_matrix((3, 4), dtype=np.int8) @@ -118,6 +153,11 @@ def test_csc_matrix(vineyard_client): np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) +@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") +def test_csr_matrix_with_rpc_client(vineyard_rpc_client): + test_csr_matrix(vineyard_rpc_client) + + @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_csr_matrix(vineyard_client): arr = sp.sparse.csr_matrix((3, 4), dtype=np.int8) @@ -125,8 +165,27 @@ def test_csr_matrix(vineyard_client): np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) +@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") +def test_dia_matrix_with_rpc_client(vineyard_rpc_client): + test_dia_matrix(vineyard_rpc_client) + + @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_dia_matrix(vineyard_client): arr = sp.sparse.dia_matrix((3, 4), dtype=np.int8) object_id = vineyard_client.put(arr) np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) + + +@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") +def test_ipc_and_rpc(vineyard_client, vineyard_rpc_client): + value = sp.sparse.bsr_matrix((3, 4), dtype=np.int8) + object_id = vineyard_client.put(value) + v1 = vineyard_client.get(object_id) + v2 = vineyard_rpc_client.get(object_id) + assert np.array_equal(v1.todense(), v2.todense()) + + object_id = vineyard_rpc_client.put(value) + v1 = vineyard_client.get(object_id) + v2 = vineyard_rpc_client.get(object_id) + assert np.array_equal(v1.todense(), v2.todense()) diff --git a/python/vineyard/data/utils.py b/python/vineyard/data/utils.py index 8619ad0b..023b6318 100644 --- a/python/vineyard/data/utils.py +++ b/python/vineyard/data/utils.py @@ -16,6 +16,7 @@ # limitations under the License. # +import ctypes import json import pickle from typing import Union @@ -26,6 +27,8 @@ from vineyard._C import Object from vineyard._C import ObjectID from vineyard._C import ObjectMeta +from vineyard._C import RemoteBlobBuilder +from vineyard._C import RPCClient if pickle.HIGHEST_PROTOCOL < 5: import pickle5 as pickle # pylint: disable=import-error @@ -144,15 +147,30 @@ def ensure_ipc_client(client, error_message=None): def build_buffer( client, address, size, *args, **kwargs ) -> Union["Object", "ObjectMeta", "ObjectID"]: - '''Build a blob in vineyard server for the given bytes or memoryview. + '''Build a blob or a remote blob in vineyard server + for the given bytes or memoryview. If address is None or size is 0, an empty blob will be returned. ''' - ensure_ipc_client( - client, - "Vineyard RPC client cannot be used to create local blobs, " - "try using an IPC client or `rpc_client.create_remote_blob()`", - ) + if isinstance(client, RPCClient): + # copy the address with size to a local payloads + if size == 0 or address is None: + meta = ObjectMeta() + meta.id = ObjectID("o8000000000000000") + meta.nbytes = 0 + meta.typename = "vineyard::RemoteBlob" + return client.create_metadata(meta) + if isinstance(address, bytes): + payload = address + else: + payload = bytearray(size) + address_bytes = (ctypes.c_byte * size).from_address(address) + payload[:size] = memoryview(address_bytes)[:size] + buffer = RemoteBlobBuilder(size) + buffer.copy(0, payload) + id = client.create_remote_blob(buffer) + meta = client.get_meta(id) + return meta if size == 0: return client.create_empty_blob() diff --git a/src/client/client_base.cc b/src/client/client_base.cc index 8b72cca6..bb17d2d3 100644 --- a/src/client/client_base.cc +++ b/src/client/client_base.cc @@ -72,6 +72,11 @@ Status ClientBase::CreateData(const json& tree, ObjectID& id, } Status ClientBase::CreateMetaData(ObjectMeta& meta_data, ObjectID& id) { + if (this->instance_id_ == UnspecifiedInstanceID() - 1) { + std::shared_ptr instance_status = nullptr; + VINEYARD_CHECK_OK(this->InstanceStatus(instance_status)); + this->instance_id_ = instance_status->instance_id; + } return this->CreateMetaData(meta_data, this->instance_id_, std::ref(id)); } diff --git a/src/client/ds/blob.cc b/src/client/ds/blob.cc index e243bf35..057d76f7 100644 --- a/src/client/ds/blob.cc +++ b/src/client/ds/blob.cc @@ -132,7 +132,7 @@ void Blob::Construct(ObjectMeta const& meta) { if (meta.GetBuffer(meta.GetId(), this->buffer_).ok()) { if (this->buffer_ == nullptr) { throw std::runtime_error( - "Blob::Construct(): Invalid internal state: local blob found bit it " + "Blob::Construct(): Invalid internal state: local blob found but it " "is nullptr: " + ObjectIDToString(meta.GetId())); } diff --git a/src/client/ds/object_meta.cc b/src/client/ds/object_meta.cc index 0e913c3a..f0c2a1d0 100644 --- a/src/client/ds/object_meta.cc +++ b/src/client/ds/object_meta.cc @@ -107,6 +107,22 @@ bool const ObjectMeta::IsLocal() const { } } +bool const ObjectMeta::IsLocated() const { + auto instance_id = meta_["instance_id"]; + if (instance_id.is_null()) { + // it is a newly created metadata + return true; + } else { + if (client_) { + std::shared_ptr instance_status = nullptr; + VINEYARD_CHECK_OK(client_->InstanceStatus(instance_status)); + return instance_status->instance_id == instance_id.get(); + } else { + return false; + } + } +} + void ObjectMeta::ForceLocal() const { this->force_local_ = true; } bool const ObjectMeta::Haskey(std::string const& key) const { @@ -347,9 +363,15 @@ void ObjectMeta::SetMetaData(ClientBase* client, const json& meta) { ObjectID member_id = ObjectIDFromString(tree["id"].get_ref()); if (IsBlob(member_id)) { - if (client_ == nullptr /* traverse to account blobs */ || - tree["instance_id"].get() == client_->instance_id()) { + if (client_ == nullptr) { VINEYARD_CHECK_OK(buffer_set_->EmplaceBuffer(member_id)); + } else { + std::shared_ptr instance_status = nullptr; + VINEYARD_CHECK_OK(client_->InstanceStatus(instance_status)); + if (tree["instance_id"].get() == + instance_status->instance_id) { + VINEYARD_CHECK_OK(buffer_set_->EmplaceBuffer(member_id)); + } } } else { for (auto& item : tree) { diff --git a/src/client/ds/object_meta.h b/src/client/ds/object_meta.h index 947d77f6..f18d5f9f 100644 --- a/src/client/ds/object_meta.h +++ b/src/client/ds/object_meta.h @@ -131,6 +131,11 @@ class ObjectMeta { */ bool const IsLocal() const; + /** + * @brief Whether the object meta and rpc client is located on the same node. + */ + bool const IsLocated() const; + /** * @brief Mark the metadata as a "local" metadata to make sure the construct * process proceed. @@ -796,6 +801,7 @@ class ObjectMeta { friend class RPCClient; friend class Blob; + friend class RemoteBlob; friend class BlobWriter; }; diff --git a/src/client/ds/remote_blob.cc b/src/client/ds/remote_blob.cc index 72e3b61b..ba9da83a 100644 --- a/src/client/ds/remote_blob.cc +++ b/src/client/ds/remote_blob.cc @@ -18,6 +18,7 @@ limitations under the License. #include #include #include +#include #include #include "client/ds/blob.h" @@ -148,6 +149,48 @@ const std::shared_ptr& RemoteBlobWriter::Buffer() const { Status RemoteBlobWriter::Abort() { return Status::OK(); } +void RemoteBlob::Construct(ObjectMeta const& meta) { + std::string __type_name = type_name(); + VINEYARD_ASSERT(meta.GetTypeName() == __type_name, + "Expect typename '" + __type_name + "', but got '" + + meta.GetTypeName() + "'"); + this->meta_ = meta; + this->id_ = meta.GetId(); + + if (this->buffer_ != nullptr) { + return; + } + if (this->id_ == EmptyBlobID() || meta.GetNBytes() == 0) { + this->size_ = 0; + return; + } + + if (!meta.IsLocated()) { + throw std::runtime_error( + "RemoteBlob::Construct(): Invalid internal state: remote blob found " + "but it " + "is not located with rpc client"); + return; + } + + if (meta.GetBuffer(meta.GetId(), this->buffer_).ok()) { + if (this->buffer_ == nullptr) { + throw std::runtime_error( + "RemoteBlob::Construct(): Invalid internal state: remote blob found " + "but it " + "is nullptr: " + + ObjectIDToString(meta.GetId())); + } + this->size_ = this->buffer_->size(); + } else { + throw std::runtime_error( + "RemoteBlob::Construct(): Invalid internal state: failed to construct " + "local " + "blob since payload is missing: " + + ObjectIDToString(meta.GetId())); + } +} + void RemoteBlobWriter::Dump() const { #ifndef NDEBUG std::stringstream ss; diff --git a/src/client/ds/remote_blob.h b/src/client/ds/remote_blob.h index f90561fe..288236c6 100644 --- a/src/client/ds/remote_blob.h +++ b/src/client/ds/remote_blob.h @@ -17,6 +17,7 @@ limitations under the License. #define SRC_CLIENT_DS_REMOTE_BLOB_H_ #include +#include #include #include "client/ds/i_object.h" @@ -41,7 +42,7 @@ class RPCClient; * a chunk of memory from its memory space to the client space in a * zero-copy fashion. */ -class RemoteBlob { +class RemoteBlob : public Registered { public: ObjectID id() const; @@ -83,6 +84,13 @@ class RemoteBlob { */ const std::shared_ptr& Buffer() const; + /** + * @brief Construct the blob locally for the given object meta. + * + * @param meta The given object meta. + */ + void Construct(ObjectMeta const& meta) override; + /** * @brief Get the arrow buffer of the blob. * @@ -109,12 +117,26 @@ class RemoteBlob { */ const std::shared_ptr ArrowBufferOrEmpty() const; + static std::unique_ptr Create() __attribute__((used)) { + return std::static_pointer_cast( + std::unique_ptr{new RemoteBlob()}); + } + /** * @brief Dump the buffer for debugging. */ void Dump() const; private: + /** + * @brief Construct an empty RemoteBlob + */ + RemoteBlob() { + this->id_ = InvalidObjectID(); + this->size_ = std::numeric_limits::max(); + this->buffer_ = nullptr; + } + RemoteBlob(const ObjectID id, const InstanceID instance_id, const size_t size); @@ -127,6 +149,7 @@ class RemoteBlob { friend class RPCClient; friend class RemoteBlobWriter; + friend class ObjectMeta; }; /** diff --git a/src/client/rpc_client.cc b/src/client/rpc_client.cc index 75515762..5fec843c 100644 --- a/src/client/rpc_client.cc +++ b/src/client/rpc_client.cc @@ -16,6 +16,7 @@ limitations under the License. #include "client/rpc_client.h" #include +#include #include #include #include @@ -23,6 +24,7 @@ limitations under the License. #include #include +#include "client/ds/blob.h" #include "client/ds/object_factory.h" #include "client/ds/remote_blob.h" #include "client/io.h" @@ -180,11 +182,49 @@ Status RPCClient::GetObject(const ObjectID id, ObjectMeta meta; RETURN_ON_ERROR(this->GetMetaData(id, meta, true)); RETURN_ON_ASSERT(!meta.MetaData().empty()); - object = ObjectFactory::Create(meta.GetTypeName()); + std::map> buffers; + std::function travel = [&](json& meta_tree) -> json& { + if (meta_tree.is_object()) { + auto sub_id = + ObjectIDFromString(meta_tree["id"].get_ref()); + if (IsBlob(sub_id)) { + std::shared_ptr remote_blob; + VINEYARD_CHECK_OK(GetRemoteBlob(sub_id, remote_blob)); + ObjectMeta sub_meta; + sub_meta.Reset(); + VINEYARD_CHECK_OK(GetMetaData(sub_id, sub_meta)); + sub_meta.SetTypeName(type_name()); + VINEYARD_CHECK_OK(sub_meta.buffer_set_->EmplaceBuffer(sub_id)); + VINEYARD_CHECK_OK( + sub_meta.buffer_set_->EmplaceBuffer(sub_id, remote_blob->Buffer())); + buffers.emplace(sub_id, remote_blob->Buffer()); + meta_tree = sub_meta.MetaData(); + return meta_tree; + } else { + for (auto& item : meta_tree.items()) { + if (item.value().is_object() && !item.value().empty()) { + meta_tree[item.key()] = travel(item.value()); + } + } + } + } + return meta_tree; + }; + auto meta_tree = meta.MetaData(); + auto new_meta_tree = travel(meta_tree); + ObjectMeta new_meta; + new_meta.Reset(); + new_meta.SetMetaData(this, new_meta_tree); + for (auto& item : buffers) { + VINEYARD_CHECK_OK(new_meta.buffer_set_->EmplaceBuffer(item.first)); + VINEYARD_CHECK_OK( + new_meta.buffer_set_->EmplaceBuffer(item.first, item.second)); + } + object = ObjectFactory::Create(new_meta.GetTypeName()); if (object == nullptr) { object = std::unique_ptr(new Object()); } - object->Construct(meta); + object->Construct(new_meta); return Status::OK(); } diff --git a/thirdparty/libcuckoo b/thirdparty/libcuckoo index 735a74b0..e1d74917 160000 --- a/thirdparty/libcuckoo +++ b/thirdparty/libcuckoo @@ -1 +1 @@ -Subproject commit 735a74b01610fb8d3614019429af370856e4a88c +Subproject commit e1d749174b550bd534cb96e4e82acb144c4ff3dc