Skip to content

Commit

Permalink
Add builder and resolver for rpc client.
Browse files Browse the repository at this point in the history
Signed-off-by: Ye Cao <caoye.cao@alibaba-inc.com>
  • Loading branch information
dashanji committed Nov 24, 2023
1 parent 14f4bf7 commit 99426c8
Show file tree
Hide file tree
Showing 20 changed files with 536 additions and 41 deletions.
2 changes: 1 addition & 1 deletion python/core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ void bind_blobs(py::module& mod) {
});

// RemoteBlob
py::class_<RemoteBlob, std::shared_ptr<RemoteBlob>>(
py::class_<RemoteBlob, std::shared_ptr<RemoteBlob>, Object>(
mod, "RemoteBlob", py::buffer_protocol(), doc::RemoteBlob)
.def_property_readonly(
"id", [](RemoteBlob* self) -> ObjectIDWrapper { return self->id(); },
Expand Down
11 changes: 5 additions & 6 deletions python/vineyard/core/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,11 @@ def get(
object_id = client.get_name(name)

# run resolver
obj = client.get_object(object_id, fetch=fetch)
meta = obj.meta
if not meta.islocal and not meta.isglobal:
raise ValueError(
"Not a local object: for remote object, you can only get its metadata"
)
if isinstance(client, RPCClient):
obj = client.get_object(object_id)
else:
obj = client.get_object(object_id, fetch=fetch)

if resolver is None:
resolver = get_current_resolvers()
return resolver(obj, __vineyard_client=client, **kw)
Expand Down
9 changes: 0 additions & 9 deletions python/vineyard/core/tests/test_rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,6 @@ def test_remote_blob_create_and_get_large_object(vineyard_endpoint):
assert memoryview(remote_blob) == memoryview(large_payload)


def test_remote_blob_error(vineyard_endpoint):
vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':'))

with pytest.raises(
ValueError, match="Vineyard RPC client cannot be used to create local blobs"
):
vineyard_rpc_client.put(np.ones((2, 3, 4)))


def test_multiple_remote_blobs(vineyard_endpoint):
vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':'))

Expand Down
17 changes: 13 additions & 4 deletions python/vineyard/data/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
from vineyard._C import Blob
from vineyard._C import IPCClient
from vineyard._C import Object
from vineyard._C import ObjectID

Check failure on line 34 in python/vineyard/data/arrow.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/data/arrow.py#L34

No name '_C' in module 'vineyard'
from vineyard._C import ObjectMeta
from vineyard._C import RemoteBlob

Check failure on line 36 in python/vineyard/data/arrow.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/data/arrow.py#L36

No name '_C' in module 'vineyard'

Check warning on line 36 in python/vineyard/data/arrow.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/data/arrow.py#L36

Unused RemoteBlob imported from vineyard._C
from vineyard.core.builder import BuilderContext
from vineyard.core.resolver import ResolverContext
from vineyard.data.utils import build_buffer
from vineyard.data.utils import normalize_dtype


def buffer_builder(
client: IPCClient, buffer: Union[bytes, memoryview], builder: BuilderContext
):
def buffer_builder(client, buffer: Union[bytes, memoryview], builder: BuilderContext):
if buffer is None:
address = None
size = 0
Expand All @@ -51,7 +51,16 @@ def buffer_builder(


def as_arrow_buffer(blob: Blob):
buffer = blob.buffer
if isinstance(blob, Blob):
buffer = blob.buffer
else:
if (
blob.id == ObjectID("o8000000000000000")
or (int(blob.id) & int(ObjectID("o8000000000000000"))) == 0
):
buffer = b''
else:
buffer = memoryview(blob)
if buffer is None:
return pa.py_buffer(bytearray())
return pa.py_buffer(buffer)
Expand Down
1 change: 1 addition & 0 deletions python/vineyard/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def register_base_types(

if resolver_ctx is not None:
resolver_ctx.register('vineyard::Blob', bytes_resolver)
resolver_ctx.register('vineyard::RemoteBlob', bytes_resolver)
resolver_ctx.register('vineyard::Scalar', scalar_resolver)
resolver_ctx.register('vineyard::Array', array_resolver)
resolver_ctx.register('vineyard::Sequence', sequence_resolver)
50 changes: 48 additions & 2 deletions python/vineyard/data/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,40 @@ def test_arrow_array(vineyard_client):
assert vineyard_client.get(object_id).values.equals(nested_arr.values)


def test_record_batch(vineyard_client):
def test_arrow_array_with_rpc_client(vineyard_rpc_client):
test_arrow_array(vineyard_rpc_client)


def test_record_batch_with_rpc_client(vineyard_rpc_client):
test_record_batch(vineyard_rpc_client)


@pytest.mark.skip
def build_record_batch():
arrays = [
pa.array([1, 2, 3, 4]),
pa.array(['foo', 'bar', 'baz', None]),
pa.array([True, None, False, True]),
]
batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'f2'])
return batch


def test_record_batch(vineyard_client):
batch = build_record_batch()
_object_id = vineyard_client.put(batch) # noqa: F841
# processing tables that contains string is not roundtrip, as StringArray
# will be transformed to LargeStringArray
#
# assert batch.equals(vineyard_client.get(object_id))


def test_table(vineyard_client):
def test_table_with_rpc_client(vineyard_rpc_client):
test_table(vineyard_rpc_client)


@pytest.mark.skip
def build_tabel():
arrays = [
pa.array([1, 2, 3, 4]),
pa.array(['foo', 'bar', 'baz', None]),
Expand All @@ -89,13 +108,23 @@ def test_table(vineyard_client):
batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'f2'])
batches = [batch] * 5
table = pa.Table.from_batches(batches)
return table


def test_table(vineyard_client):
table = build_tabel()
_object_id = vineyard_client.put(table) # noqa: F841
# processing tables that contains string is not roundtrip, as StringArray
# will be transformed to LargeStringArray
#
# assert table.equals(vineyard_client.get(object_id))


@pytest.mark.skipif(polars is None, reason='polars is not installed')
def test_polars_dataframe_with_rpc_client(vineyard_rpc_client):
test_polars_dataframe(vineyard_rpc_client)


@pytest.mark.skipif(polars is None, reason='polars is not installed')
def test_polars_dataframe(vineyard_client):
arrays = [
Expand All @@ -112,3 +141,20 @@ def test_polars_dataframe(vineyard_client):
# will be transformed to LargeStringArray
#
# assert table.equals(vineyard_client.get(object_id))


@pytest.mark.parametrize(
"value",
[
pa.array([1, 2, None, 3]),
pa.array(["a", None, None, None]),
pa.array([True, False, True, False]),
build_record_batch(),
build_tabel(),
],
)
def test_with_ipc_and_rpc(value, vineyard_client, vineyard_rpc_client):
object_id = vineyard_client.put(value)
assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id)
object_id = vineyard_rpc_client.put(value)
assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id)
38 changes: 38 additions & 0 deletions python/vineyard/data/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,66 @@
register_builtin_types(default_builder_context, default_resolver_context)


def test_int_with_rpc_client(vineyard_rpc_client):
test_int(vineyard_rpc_client)


def test_int(vineyard_client):
object_id = vineyard_client.put(1)
assert vineyard_client.get(object_id) == 1


def test_double_with_rpc_client(vineyard_rpc_client):
test_double(vineyard_rpc_client)


def test_double(vineyard_client):
object_id = vineyard_client.put(1.234)
assert vineyard_client.get(object_id) == pytest.approx(1.234)


def test_string_with_rpc_client(vineyard_rpc_client):
test_string(vineyard_rpc_client)


def test_string(vineyard_client):
object_id = vineyard_client.put('abcde')
assert vineyard_client.get(object_id) == 'abcde'


def test_bytes_with_rpc_client(vineyard_rpc_client):
test_bytes(vineyard_rpc_client)


def test_bytes(vineyard_client):
bs = b'abcde'
object_id = vineyard_client.put(bs)
assert vineyard_client.get(object_id) == memoryview(bs)


def test_memoryview_with_rpc_client(vineyard_rpc_client):
test_memoryview(vineyard_rpc_client)


def test_memoryview(vineyard_client):
bs = memoryview(b'abcde')
object_id = vineyard_client.put(bs)
assert vineyard_client.get(object_id) == bs


def test_pair_with_rpc_client(vineyard_rpc_client):
test_pair(vineyard_rpc_client)


def test_pair(vineyard_client):
object_id = vineyard_client.put((1, "2"))
assert vineyard_client.get(object_id) == (1, "2")


def test_tuple_with_rpc_client(vineyard_rpc_client):
test_tuple(vineyard_rpc_client)


def test_tuple(vineyard_client):
object_id = vineyard_client.put(())
assert vineyard_client.get(object_id) == ()
Expand All @@ -81,3 +109,13 @@ def test_tuple(vineyard_client):
4444,
"5.5.5.5.5.5.5",
)


@pytest.mark.parametrize(
"value", [1, 1.234, 'abcd', b'abcde', memoryview(b'abcde'), (1, "2")]
)
def test_with_ipc_and_rpc(value, vineyard_client, vineyard_rpc_client):
object_id = vineyard_client.put(value)
assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id)
object_id = vineyard_rpc_client.put(value)
assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id)
Loading

0 comments on commit 99426c8

Please sign in to comment.