Skip to content

Commit

Permalink
Implement concurrent memcpy for building Python objects into vineyard (
Browse files Browse the repository at this point in the history
…#1646)

Remove the problematic `.buffer` property (as it cannot bind the
lifetime of the underlying blob to the memoryview object) and add
concurrent support for memcpy for faster object building.

Fixes #1631

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow authored Dec 13, 2023
1 parent a219eae commit 89530bf
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 123 deletions.
115 changes: 45 additions & 70 deletions python/core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License.
#include "client/ds/object_meta.h"
#include "client/ds/remote_blob.h"
#include "client/rpc_client.h"
#include "common/memory/memcpy.h"
#include "common/util/json.h"
#include "common/util/status.h"
#include "common/util/uuid.h"
Expand Down Expand Up @@ -481,18 +482,6 @@ void bind_blobs(py::module& mod) {
"address",
[](Blob* self) { return reinterpret_cast<uintptr_t>(self->data()); },
doc::Blob_address)
.def_property_readonly(
"buffer",
[](Blob& blob) -> py::object {
auto buffer = blob.Buffer();
if (buffer == nullptr) {
return py::none();
} else {
return py::memoryview::from_memory(
const_cast<uint8_t*>(buffer->data()), buffer->size(), true);
}
},
doc::Blob_buffer)
.def_buffer([](Blob& blob) -> py::buffer_info {
return py::buffer_info(const_cast<char*>(blob.data()), sizeof(int8_t),
py::format_descriptor<int8_t>::format(), 1,
Expand Down Expand Up @@ -550,21 +539,29 @@ void bind_blobs(py::module& mod) {
.def(
"copy",
[](BlobWriter* self, size_t const offset, uintptr_t ptr,
size_t const size) {
std::memcpy(self->data() + offset, reinterpret_cast<void*>(ptr),
size);
},
"offset"_a, "address"_a, "size"_a, doc::BlobBuilder_copy)
size_t const size,
size_t const concurrency = memory::default_memcpy_concurrency) {
memory::concurrent_memcpy(self->data() + offset,
reinterpret_cast<void*>(ptr), size,
concurrency);
},
"offset"_a, "address"_a, "size"_a,
py::arg("concurrency") = memory::default_memcpy_concurrency,
doc::BlobBuilder_copy)
.def(
"copy",
[](BlobWriter* self, size_t offset, py::buffer const& buffer) {
[](BlobWriter* self, size_t offset, py::buffer const& buffer,
size_t const concurrency = memory::default_memcpy_concurrency) {
throw_on_error(copy_memoryview(buffer.ptr(), self->data(),
self->size(), offset));
self->size(), offset, concurrency));
},
"offset"_a, "buffer"_a)
"offset"_a, "buffer"_a,
py::arg("concurrency") = memory::default_memcpy_concurrency,
doc::BlobBuilder_copy)
.def(
"copy",
[](BlobWriter* self, size_t offset, py::bytes const& bs) {
[](BlobWriter* self, size_t offset, py::bytes const& bs,
size_t const concurrency = memory::default_memcpy_concurrency) {
char* buffer = nullptr;
ssize_t length = 0;
if (PYBIND11_BYTES_AS_STRING_AND_SIZE(bs.ptr(), &buffer, &length)) {
Expand All @@ -577,27 +574,18 @@ void bind_blobs(py::module& mod) {
"', but the buffer size is '" + std::to_string(length) +
"'"));
}
std::memcpy(self->data() + offset, buffer, length);
memory::concurrent_memcpy(self->data() + offset, buffer, length,
concurrency);
},
"offset"_a, "bytes"_a)
"offset"_a, "bytes"_a,
py::arg("concurrency") = memory::default_memcpy_concurrency,
doc::BlobBuilder_copy)
.def_property_readonly(
"address",
[](BlobWriter* self) {
return reinterpret_cast<uintptr_t>(self->data());
},
doc::BlobBuilder_address)
.def_property_readonly(
"buffer",
[](BlobWriter& blob) -> py::object {
auto buffer = blob.Buffer();
if (buffer == nullptr) {
return py::none();
} else {
return py::memoryview::from_memory(buffer->mutable_data(),
buffer->size(), false);
}
},
doc::BlobBuilder_buffer)
.def_buffer([](BlobWriter& blob) -> py::buffer_info {
return py::buffer_info(blob.data(), sizeof(int8_t),
py::format_descriptor<int8_t>::format(), 1,
Expand Down Expand Up @@ -641,18 +629,6 @@ void bind_blobs(py::module& mod) {
return reinterpret_cast<uintptr_t>(self->data());
},
doc::RemoteBlob_address)
.def_property_readonly(
"buffer",
[](RemoteBlob& blob) -> py::object {
auto buffer = blob.Buffer();
if (buffer == nullptr) {
return py::none();
} else {
return py::memoryview::from_memory(
const_cast<uint8_t*>(buffer->data()), buffer->size(), true);
}
},
doc::RemoteBlob_buffer)
.def_buffer([](RemoteBlob& blob) -> py::buffer_info {
return py::buffer_info(const_cast<char*>(blob.data()), sizeof(int8_t),
py::format_descriptor<int8_t>::format(), 1,
Expand Down Expand Up @@ -733,21 +709,29 @@ void bind_blobs(py::module& mod) {
.def(
"copy",
[](RemoteBlobWriter* self, size_t const offset, uintptr_t ptr,
size_t const size) {
std::memcpy(self->data() + offset, reinterpret_cast<void*>(ptr),
size);
},
"offset"_a, "address"_a, "size"_a, doc::RemoteBlobBuilder_copy)
size_t const size,
size_t const concurrency = memory::default_memcpy_concurrency) {
memory::concurrent_memcpy(self->data() + offset,
reinterpret_cast<void*>(ptr), size,
concurrency);
},
"offset"_a, "address"_a, "size"_a,
py::arg("concurrency") = memory::default_memcpy_concurrency,
doc::RemoteBlobBuilder_copy)
.def(
"copy",
[](RemoteBlobWriter* self, size_t offset, py::buffer const& buffer) {
[](RemoteBlobWriter* self, size_t offset, py::buffer const& buffer,
size_t const concurrency = memory::default_memcpy_concurrency) {
throw_on_error(copy_memoryview(buffer.ptr(), self->data(),
self->size(), offset));
self->size(), offset, concurrency));
},
"offset"_a, "buffer"_a)
"offset"_a, "buffer"_a,
py::arg("concurrency") = memory::default_memcpy_concurrency,
doc::RemoteBlobBuilder_copy)
.def(
"copy",
[](RemoteBlobWriter* self, size_t offset, py::bytes const& bs) {
[](RemoteBlobWriter* self, size_t offset, py::bytes const& bs,
size_t const concurrency = memory::default_memcpy_concurrency) {
char* buffer = nullptr;
ssize_t length = 0;
if (PYBIND11_BYTES_AS_STRING_AND_SIZE(bs.ptr(), &buffer, &length)) {
Expand All @@ -760,27 +744,18 @@ void bind_blobs(py::module& mod) {
"', but the buffer size is '" + std::to_string(length) +
"'"));
}
std::memcpy(self->data() + offset, buffer, length);
memory::concurrent_memcpy(self->data() + offset, buffer, length,
concurrency);
},
"offset"_a, "bytes"_a)
"offset"_a, "bytes"_a,
py::arg("concurrency") = memory::default_memcpy_concurrency,
doc::RemoteBlobBuilder_copy)
.def_property_readonly(
"address",
[](RemoteBlobWriter* self) {
return reinterpret_cast<uintptr_t>(self->data());
},
doc::RemoteBlobBuilder_address)
.def_property_readonly(
"buffer",
[](RemoteBlobWriter& blob) -> py::object {
auto buffer = blob.Buffer();
if (buffer == nullptr) {
return py::none();
} else {
return py::memoryview::from_memory(buffer->mutable_data(),
buffer->size(), false);
}
},
doc::RemoteBlobBuilder_buffer)
.def_buffer([](RemoteBlobWriter& blob) -> py::buffer_info {
return py::buffer_info(blob.data(), sizeof(int8_t),
py::format_descriptor<int8_t>::format(), 1,
Expand Down
24 changes: 2 additions & 22 deletions python/pybind11_docs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,6 @@ const char* Blob_address = R"doc(
The memory address value of this blob.
)doc";

const char* Blob_buffer = R"doc(
The readonly buffer behind this blob. The result buffer has type
:code:`memoryview`.
)doc";

const char* BlobBuilder = R"doc(
:class:`BlobBuilder` is the builder for creating a finally immutable blob in
vineyard server.
Expand Down Expand Up @@ -373,7 +368,7 @@ Shrink the blob builder to the given size if it is not sealed yet.
)doc";

const char* BlobBuilder_copy = R"doc(
.. method:: copy(self, offset: int, ptr: int, size: int)
.. method:: copy(self, offset: int, ptr: int, size: int, concurrency: int = 6)
:noindex:
Copy the given address to the given offset.
Expand All @@ -383,11 +378,6 @@ const char* BlobBuilder_address = R"doc(
The memory address value of this blob builder.
)doc";

const char* BlobBuilder_buffer = R"doc(
The writeable buffer behind this blob builder. The result buffer has type
:code:`memoryview`, and it is a mutable one.
)doc";

const char* RemoteBlob = R"doc(
:class:`RemoteBlob` is a holder for :class:`Blob` in cases like the :class:`Blob`
doesn't exist in the local vineyardd instance but the client still want th access
Expand All @@ -414,11 +404,6 @@ const char* RemoteBlob_address = R"doc(
The memory address value of this blob.
)doc";

const char* RemoteBlob_buffer = R"doc(
The readonly buffer behind this blob. The result buffer has type
:code:`memoryview`.
)doc";

const char* RemoteBlobBuilder = R"doc(
:class:`RemoteBlobBuilder` is the builder for creating a finally immutable blob in
vineyard server over a RPC client.
Expand Down Expand Up @@ -476,7 +461,7 @@ Abort the blob builder if it is not sealed yet.
)doc";

const char* RemoteBlobBuilder_copy = R"doc(
.. method:: copy(self, offset: int, ptr: int, size: int)
.. method:: copy(self, offset: int, ptr: int, size: int, concurrency: int = 6)
:noindex:
Copy the given address to the given offset.
Expand All @@ -486,11 +471,6 @@ const char* RemoteBlobBuilder_address = R"doc(
The memory address value of this blob builder.
)doc";

const char* RemoteBlobBuilder_buffer = R"doc(
The writeable buffer behind this blob builder. The result buffer has type
:code:`memoryview`, and it is a mutable one.
)doc";

const char* InstanceStatus = R"doc(
:class:`InstanceStatus` represents the status of connected vineyard instance, including
the instance identity, memory statistics and workloads on this instance.
Expand Down
4 changes: 0 additions & 4 deletions python/pybind11_docs.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ extern const char* Blob_is_empty;
extern const char* Blob_empty;
extern const char* Blob__len__;
extern const char* Blob_address;
extern const char* Blob_buffer;

extern const char* BlobBuilder;
extern const char* BlobBuilder_id;
Expand All @@ -67,22 +66,19 @@ extern const char* BlobBuilder_abort;
extern const char* BlobBuilder_shrink;
extern const char* BlobBuilder_copy;
extern const char* BlobBuilder_address;
extern const char* BlobBuilder_buffer;

extern const char* RemoteBlob;
extern const char* RemoteBlob_id;
extern const char* RemoteBlob_instance_id;
extern const char* RemoteBlob_is_empty;
extern const char* RemoteBlob__len__;
extern const char* RemoteBlob_address;
extern const char* RemoteBlob_buffer;

extern const char* RemoteBlobBuilder;
extern const char* RemoteBlobBuilder_size;
extern const char* RemoteBlobBuilder_abort;
extern const char* RemoteBlobBuilder_copy;
extern const char* RemoteBlobBuilder_address;
extern const char* RemoteBlobBuilder_buffer;

extern const char* InstanceStatus;
extern const char* InstanceStatus_instance_id;
Expand Down
25 changes: 14 additions & 11 deletions python/pybind11_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ limitations under the License.
#include <cstdlib>
#include <cstring>
#include <string>
#include <unordered_map>
#include <utility>

#include "common/memory/memcpy.h"
Expand Down Expand Up @@ -133,11 +132,13 @@ void bind_utils(py::module& mod) {
mod.def(
"memory_copy",
[](py::buffer const dst, size_t offset, py::buffer const src,
size_t const size) {
throw_on_error(
copy_memoryview_to_memoryview(src.ptr(), dst.ptr(), size, offset));
size_t const size,
size_t const concurrency = memory::default_memcpy_concurrency) {
throw_on_error(copy_memoryview_to_memoryview(src.ptr(), dst.ptr(), size,
offset, concurrency));
},
"dst"_a, "offset"_a, "src"_a, py::arg("size") = 0 /* not checked */);
"dst"_a, "offset"_a, "src"_a, py::arg("size") = 0 /* not checked */,
py::arg("concurrency") = memory::default_memcpy_concurrency);

PyModule_AddFunctions(mod.ptr(), vineyard_utils_methods);
}
Expand Down Expand Up @@ -177,7 +178,7 @@ class PyBufferGetter {
} // namespace detail

Status copy_memoryview(PyObject* src, void* dst, size_t const size,
size_t const offset) {
size_t const offset, size_t const concurrency) {
detail::PyBufferGetter src_buffer(src);
if (!src_buffer.has_buffer()) {
return Status::AssertionFailed(
Expand All @@ -201,15 +202,17 @@ Status copy_memoryview(PyObject* src, void* dst, size_t const size,
{
py::gil_scoped_release release;
// memcpy
memory::inline_memcpy(reinterpret_cast<uint8_t*>(dst) + offset,
src_buffer.data(), src_buffer.size());
memory::concurrent_memcpy(reinterpret_cast<uint8_t*>(dst) + offset,
src_buffer.data(), src_buffer.size(),
concurrency);
}

return Status::OK();
}

Status copy_memoryview_to_memoryview(PyObject* src, PyObject* dst,
size_t const size, size_t const offset) {
size_t const size, size_t const offset,
size_t const concurrency) {
detail::PyBufferGetter src_buffer(src);
if (!src_buffer.has_buffer()) {
return Status::AssertionFailed(
Expand Down Expand Up @@ -256,8 +259,8 @@ Status copy_memoryview_to_memoryview(PyObject* src, PyObject* dst,
{
py::gil_scoped_release release;
// memcpy
memory::inline_memcpy(dst_buffer.data() + offset, src_buffer.data(),
src_buffer.size());
memory::concurrent_memcpy(dst_buffer.data() + offset, src_buffer.data(),
src_buffer.size(), concurrency);
}

return Status::OK();
Expand Down
12 changes: 7 additions & 5 deletions python/pybind11_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License.
#include <string>
#include <utility>

#include "common/memory/memcpy.h"
#include "common/util/json.h"
#include "common/util/status.h"
#include "common/util/uuid.h"
Expand Down Expand Up @@ -107,17 +108,18 @@ void throw_on_error(Status const& status);
*
* @size: capacity of the dst memory block.
*/
Status copy_memoryview(PyObject* src, void* dst, size_t const size,
size_t const offset = 0);
Status copy_memoryview(
PyObject* src, void* dst, size_t const size, size_t const offset = 0,
size_t const concurrency = memory::default_memcpy_concurrency);

/**
* Copy a memoryview/buffer to a dst pointer.
*
* @size: capacity of the dst memoryview.
*/
Status copy_memoryview_to_memoryview(PyObject* src, PyObject* dst,
size_t const size,
size_t const offset = 0);
Status copy_memoryview_to_memoryview(
PyObject* src, PyObject* dst, size_t const size, size_t const offset = 0,
size_t const concurrency = memory::default_memcpy_concurrency);

namespace detail {
py::object from_json(const json& value);
Expand Down
Loading

0 comments on commit 89530bf

Please sign in to comment.