Skip to content

Commit

Permalink
Implements the API to check if a blob is inside the shared memory reg…
Browse files Browse the repository at this point in the history
…ion. (#552)

Add the `IsSharedMemory` API (and `is_shared_memory` in Python) to check if a pointer is inside the shared memory region.

Signed-off-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
sighingnow authored Oct 26, 2021
1 parent 2541bf9 commit fa8089c
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 75 deletions.
4 changes: 4 additions & 0 deletions python/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ void bind_client(py::module& mod) {
return size;
},
"target"_a)
.def("is_shared_memory",
[](Client* self, const uintptr_t target) {
return self->IsSharedMemory(target);
})
.def("close",
[](Client* self) {
return ClientManager<Client>::GetManager()->Disconnect(
Expand Down
15 changes: 15 additions & 0 deletions python/vineyard/_vineyard_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,21 @@ def add_doc(target, doc):
int
''')

add_doc(
IPCClient.is_shared_memory, r'''
.. method:: allocated_size(target: ptr) -> bool
:noindex:
Check if the address is on the shared memory region.
Parameters:
target: address, in int format
The given address.
Returns:
bool
''')

add_doc(IPCClient.close, r'''
Close the client.
''')
Expand Down
186 changes: 119 additions & 67 deletions src/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,58 +35,9 @@ limitations under the License.

namespace vineyard {

MmapEntry::MmapEntry(int fd, int64_t map_size, bool readonly, bool realign)
: fd_(fd), ro_pointer_(nullptr), rw_pointer_(nullptr), length_(0) {
// fake_mmap in malloc.h leaves a gap between memory segments, to make
// map_size page-aligned again.
if (realign) {
length_ = map_size - sizeof(size_t);
} else {
length_ = map_size;
}
}

MmapEntry::~MmapEntry() {
if (ro_pointer_) {
int r = munmap(ro_pointer_, length_);
if (r != 0) {
LOG(ERROR) << "munmap returned " << r << ", errno = " << errno << ": "
<< strerror(errno);
}
}
if (rw_pointer_) {
int r = munmap(rw_pointer_, length_);
if (r != 0) {
LOG(ERROR) << "munmap returned " << r << ", errno = " << errno << ": "
<< strerror(errno);
}
}
close(fd_);
}

uint8_t* MmapEntry::map_readonly() {
if (!ro_pointer_) {
ro_pointer_ = reinterpret_cast<uint8_t*>(
mmap(NULL, length_, PROT_READ, MAP_SHARED, fd_, 0));
if (ro_pointer_ == MAP_FAILED) {
LOG(ERROR) << "mmap failed: errno = " << errno << ": " << strerror(errno);
ro_pointer_ = nullptr;
}
}
return ro_pointer_;
}
Client::Client() : shm_(new detail::SharedMemoryManager(-1)) {}

uint8_t* MmapEntry::map_readwrite() {
if (!rw_pointer_) {
rw_pointer_ = reinterpret_cast<uint8_t*>(
mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0));
if (rw_pointer_ == MAP_FAILED) {
LOG(ERROR) << "mmap failed: errno = " << errno << ": " << strerror(errno);
rw_pointer_ = nullptr;
}
}
return rw_pointer_;
}
Client::~Client() { Disconnect(); }

Status Client::Connect() {
auto ep = read_env("VINEYARD_IPC_SOCKET");
Expand Down Expand Up @@ -124,6 +75,7 @@ Status Client::Connect(const std::string& ipc_socket) {
<< ", while the server's version is " << server_version_;
}

shm_.reset(new detail::SharedMemoryManager(vineyard_conn_));
return Status::OK();
}

Expand Down Expand Up @@ -238,8 +190,8 @@ Status Client::GetNextStreamChunk(ObjectID const id, size_t const size,
"The size of returned chunk doesn't match");
uint8_t *mmapped_ptr = nullptr, *dist = nullptr;
if (object.data_size > 0) {
RETURN_ON_ERROR(mmapToClient(object.store_fd, object.map_size, false, true,
&mmapped_ptr));
RETURN_ON_ERROR(shm_->Mmap(object.store_fd, object.map_size, false, true,
&mmapped_ptr));
dist = mmapped_ptr + object.data_offset;
}
blob.reset(new arrow::MutableBuffer(dist, object.data_size));
Expand All @@ -258,8 +210,8 @@ Status Client::PullNextStreamChunk(ObjectID const id,
RETURN_ON_ERROR(ReadPullNextStreamChunkReply(message_in, object));
uint8_t *mmapped_ptr = nullptr, *dist = nullptr;
if (object.data_size > 0) {
RETURN_ON_ERROR(mmapToClient(object.store_fd, object.map_size, true, true,
&mmapped_ptr));
RETURN_ON_ERROR(
shm_->Mmap(object.store_fd, object.map_size, true, true, &mmapped_ptr));
dist = mmapped_ptr + object.data_offset;
}
blob.reset(new arrow::Buffer(dist, object.data_size));
Expand Down Expand Up @@ -365,6 +317,14 @@ std::vector<std::shared_ptr<Object>> Client::ListObjects(
return objects;
}

bool Client::IsSharedMemory(const void* target) const {
return shm_->Exists(target);
}

bool Client::IsSharedMemory(const uintptr_t target) const {
return shm_->Exists(target);
}

Status Client::AllocatedSize(const ObjectID id, size_t& size) {
ENSURE_CONNECTED(this);
json tree;
Expand Down Expand Up @@ -395,8 +355,7 @@ Status Client::CreateArena(const size_t size, int& fd, size_t& available_size,
VINEYARD_ASSERT(size == std::numeric_limits<size_t>::max() ||
size == available_size);
uint8_t* mmapped_ptr = nullptr;
VINEYARD_CHECK_OK(
mmapToClient(fd, available_size, false, false, &mmapped_ptr));
VINEYARD_CHECK_OK(shm_->Mmap(fd, available_size, false, false, &mmapped_ptr));
space = reinterpret_cast<uintptr_t>(mmapped_ptr);
return Status::OK();
}
Expand Down Expand Up @@ -427,7 +386,7 @@ Status Client::CreateBuffer(const size_t size, ObjectID& id, Payload& payload,
uint8_t *shared = nullptr, *dist = nullptr;
if (payload.data_size > 0) {
RETURN_ON_ERROR(
mmapToClient(payload.store_fd, payload.map_size, false, true, &shared));
shm_->Mmap(payload.store_fd, payload.map_size, false, true, &shared));
dist = shared + payload.data_offset;
}
buffer = std::make_shared<arrow::MutableBuffer>(dist, payload.data_size);
Expand Down Expand Up @@ -465,7 +424,7 @@ Status Client::GetBuffers(
uint8_t *shared = nullptr, *dist = nullptr;
if (item.data_size > 0) {
VINEYARD_CHECK_OK(
mmapToClient(item.store_fd, item.map_size, true, true, &shared));
shm_->Mmap(item.store_fd, item.map_size, true, true, &shared));
dist = shared + item.data_offset;
}
buffer = std::make_shared<arrow::Buffer>(dist, item.data_size);
Expand All @@ -491,7 +450,7 @@ Status Client::GetBufferSizes(const std::set<ObjectID>& ids,
uint8_t* shared = nullptr;
if (item.data_size > 0) {
VINEYARD_CHECK_OK(
mmapToClient(item.store_fd, item.map_size, true, true, &shared));
shm_->Mmap(item.store_fd, item.map_size, true, true, &shared));
}
sizes.emplace(item.object_id, item.data_size);
}
Expand All @@ -502,10 +461,13 @@ Status Client::DropBuffer(const ObjectID id, const int fd) {
ENSURE_CONNECTED(this);

// unmap from client
auto entry = mmap_table_.find(fd);
if (entry != mmap_table_.end()) {
mmap_table_.erase(entry);
}
//
// FIXME: the erase may cause re-recv fd problem, needs further inspection.

// auto entry = mmap_table_.find(fd);
// if (entry != mmap_table_.end()) {
// mmap_table_.erase(entry);
// }

// free on server
std::string message_out;
Expand All @@ -517,8 +479,66 @@ Status Client::DropBuffer(const ObjectID id, const int fd) {
return Status::OK();
}

Status Client::mmapToClient(int fd, int64_t map_size, bool readonly,
bool realign, uint8_t** ptr) {
namespace detail {

MmapEntry::MmapEntry(int fd, int64_t map_size, bool readonly, bool realign)
: fd_(fd), ro_pointer_(nullptr), rw_pointer_(nullptr), length_(0) {
// fake_mmap in malloc.h leaves a gap between memory segments, to make
// map_size page-aligned again.
if (realign) {
length_ = map_size - sizeof(size_t);
} else {
length_ = map_size;
}
}

MmapEntry::~MmapEntry() {
if (ro_pointer_) {
int r = munmap(ro_pointer_, length_);
if (r != 0) {
LOG(ERROR) << "munmap returned " << r << ", errno = " << errno << ": "
<< strerror(errno);
}
}
if (rw_pointer_) {
int r = munmap(rw_pointer_, length_);
if (r != 0) {
LOG(ERROR) << "munmap returned " << r << ", errno = " << errno << ": "
<< strerror(errno);
}
}
close(fd_);
}

uint8_t* MmapEntry::map_readonly() {
if (!ro_pointer_) {
ro_pointer_ = reinterpret_cast<uint8_t*>(
mmap(NULL, length_, PROT_READ, MAP_SHARED, fd_, 0));
if (ro_pointer_ == MAP_FAILED) {
LOG(ERROR) << "mmap failed: errno = " << errno << ": " << strerror(errno);
ro_pointer_ = nullptr;
}
}
return ro_pointer_;
}

uint8_t* MmapEntry::map_readwrite() {
if (!rw_pointer_) {
rw_pointer_ = reinterpret_cast<uint8_t*>(
mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0));
if (rw_pointer_ == MAP_FAILED) {
LOG(ERROR) << "mmap failed: errno = " << errno << ": " << strerror(errno);
rw_pointer_ = nullptr;
}
}
return rw_pointer_;
}

SharedMemoryManager::SharedMemoryManager(int vineyard_conn)
: vineyard_conn_(vineyard_conn) {}

Status SharedMemoryManager::Mmap(int fd, int64_t map_size, bool readonly,
bool realign, uint8_t** ptr) {
auto entry = mmap_table_.find(fd);
if (entry == mmap_table_.end()) {
int client_fd = recv_fd(vineyard_conn_);
Expand All @@ -541,9 +561,41 @@ Status Client::mmapToClient(int fd, int64_t map_size, bool readonly,
return Status::IOError("Failed to mmap received fd as a writable buffer");
}
}
segments_.emplace(reinterpret_cast<uintptr_t>(*ptr), map_size);
return Status::OK();
}

Client::~Client() { Disconnect(); }
bool SharedMemoryManager::Exists(const uintptr_t target) {
if (segments_.empty()) {
return false;
}
#ifndef NDEBUG
VLOG(100) << "-------- Shared memory segments: ";
for (auto const& item : segments_) {
VLOG(100) << "[" << item.first << ", " << (item.first + item.second) << ")";
}
#endif

VLOG(100) << "Check address: " << target;
auto loc = segments_.upper_bound(
std::make_pair(target, std::numeric_limits<size_t>::max()));
if (loc == segments_.begin()) {
return false;
} else if (loc == segments_.end()) {
// check rbegin
auto const item = segments_.rbegin();
return target < item->first + item->second;
} else {
// check prev
auto const item = std::prev(loc);
return target < item->first + item->second;
}
}

bool SharedMemoryManager::Exists(const void* target) {
return Exists(reinterpret_cast<const uintptr_t>(target));
}

} // namespace detail

} // namespace vineyard
50 changes: 46 additions & 4 deletions src/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "arrow/buffer.h"
Expand All @@ -38,6 +39,8 @@ namespace vineyard {
class Blob;
class BlobWriter;

namespace detail {

/**
* @brief MmapEntry represents a memory-mapped fd on the client side. The fd
* can be mmapped as readonly or readwrite memory.
Expand Down Expand Up @@ -73,13 +76,39 @@ class MmapEntry {
size_t length_;
};

class SharedMemoryManager {
public:
explicit SharedMemoryManager(int vineyard_conn);

Status Mmap(int fd, int64_t map_size, bool readonly, bool realign,
uint8_t** ptr);

bool Exists(const uintptr_t target);

bool Exists(const void* target);

private:
// UNIX-domain socket
int vineyard_conn_ = -1;

// mmap table
std::unordered_map<int, std::unique_ptr<MmapEntry>> mmap_table_;

// sorted shm segments for fast "if exists" query
std::set<std::pair<uintptr_t, size_t>> segments_;
};

} // namespace detail

/**
* @brief Vineyard's IPC Client connects to to UNIX domain socket of the
* vineyard server. Vineyard's IPC Client talks to vineyard server
* and manipulate objects in vineyard.
*/
class Client : public ClientBase {
public:
Client();

~Client() override;

/**
Expand Down Expand Up @@ -334,6 +363,22 @@ class Client : public ClientBase {
const bool regex = false,
size_t const limit = 5);

/**
* Check if the given address belongs to the shared memory region.
*
* Return true if the address (client-side address) comes from the vineyard
* server.
*/
bool IsSharedMemory(const void* target) const;

/**
* Check if the given address belongs to the shared memory region.
*
* Return true if the address (client-side address) comes from the vineyard
* server.
*/
bool IsSharedMemory(const uintptr_t target) const;

/**
* Get the allocated size for the given object.
*/
Expand Down Expand Up @@ -368,10 +413,7 @@ class Client : public ClientBase {
Status DropBuffer(const ObjectID id, const int fd);

private:
Status mmapToClient(int fd, int64_t map_size, bool readonly, bool realign,
uint8_t** ptr);

std::unordered_map<int, std::unique_ptr<MmapEntry>> mmap_table_;
std::shared_ptr<detail::SharedMemoryManager> shm_;

private:
friend class Blob;
Expand Down
Loading

0 comments on commit fa8089c

Please sign in to comment.