Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Markstream to enable 1-producer-1-consumer on server side #138

Merged
merged 4 commits into from
Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC \
-Wno-unknown-pragmas"
)

check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14)
if(BUILD_VINEYARD_PYPI_PACKAGES AND NOT HAVE_FLAG_STD_CXX14)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
endif()

check_cxx_compiler_flag(-Wno-class-memaccess W_NO_CLASS_MEMACCESS)
check_cxx_compiler_flag(-Wno-deprecated-declarations W_NO_DEPRECATED_DECLARATIONS)
check_cxx_compiler_flag(-Wno-defaulted-function-deleted W_NO_DEFAULTED_FUNCTION_DELETED)
Expand Down Expand Up @@ -189,6 +182,19 @@ endmacro(find_openssl_libraries)

find_common_libraries()

if (CMAKE_VERSION VERSION_LESS "3.1")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14)
if(BUILD_VINEYARD_PYPI_PACKAGES AND NOT HAVE_FLAG_STD_CXX14)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
endif()
endif ()
else ()
set (CMAKE_CXX_STANDARD 14)
endif ()

include_directories(${PROJECT_SOURCE_DIR}/src)
include_directories(${PROJECT_SOURCE_DIR}/modules)
include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/thirdparty)
Expand Down
17 changes: 11 additions & 6 deletions modules/basic/stream/byte_stream.vineyard-mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ limitations under the License.
#include "arrow/status.h"

#include "basic/ds/arrow_utils.h"
#include "basic/stream/stream_utils.h"
#include "client/client.h"
#include "client/ds/blob.h"
#include "client/ds/i_object.h"
Expand Down Expand Up @@ -150,22 +151,26 @@ class ByteStream : public Registered<ByteStream> {
* @brief Open a reader to consume data from the byte stream
*
* @param client The client connected to the vineyard server
* @return The unique pointer to the reader
* @param The unique pointer to the reader
*/
std::unique_ptr<ByteStreamReader> OpenReader(Client& client) {
return std::unique_ptr<ByteStreamReader>(
Status OpenReader(Client& client, std::unique_ptr<ByteStreamReader>& reader) {
RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::read));
reader = std::unique_ptr<ByteStreamReader>(
new ByteStreamReader(client, id_, meta_));
return Status::OK();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to have API like

Status OpenReader(Client &client, std::unique_ptr<ByteStreamReader> &r) {
   RETURN_ON_ERROR(client.MarkStream(id_, 2))....
   r = ...
   return Status::OK();
}

which will be more consistent will the style of our other APIs, and more importantly, let user know why the operation fail. Return nullptr sliently usually the root of many unintended bugs.


/**
* @brief Open a writer to produce data to the byte stream
*
* @param client The client connected to the vineyard server
* @return The unique pointer to the writer
* @param The unique pointer to the writer
*/
std::unique_ptr<ByteStreamWriter> OpenWriter(Client& client) {
return std::unique_ptr<ByteStreamWriter>(
Status OpenWriter(Client& client, std::unique_ptr<ByteStreamWriter>& writer) {
RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::write));
writer = std::unique_ptr<ByteStreamWriter>(
new ByteStreamWriter(client, id_, meta_));
return Status::OK();
}

std::unordered_map<std::string, std::string> GetParams() { return params_; }
Expand Down
15 changes: 11 additions & 4 deletions modules/basic/stream/dataframe_stream.vineyard-mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ limitations under the License.
#include "arrow/util/key_value_metadata.h"

#include "basic/ds/dataframe.vineyard.h"
#include "basic/stream/stream_utils.h"
#include "client/client.h"
#include "client/ds/blob.h"
#include "client/ds/i_object.h"
Expand Down Expand Up @@ -342,14 +343,20 @@ class __attribute__((annotate("no-vineyard"))) DataframeStreamReader {

class DataframeStream : public Registered<DataframeStream> {
public:
std::unique_ptr<DataframeStreamReader> OpenReader(Client& client) {
return std::unique_ptr<DataframeStreamReader>(
Status OpenReader(Client& client,
std::unique_ptr<DataframeStreamReader>& reader) {
RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::read));
reader = std::unique_ptr<DataframeStreamReader>(
new DataframeStreamReader(client, id_, meta_, params_));
return Status::OK();
}

std::unique_ptr<DataframeStreamWriter> OpenWriter(Client& client) {
return std::unique_ptr<DataframeStreamWriter>(
Status OpenWriter(Client& client,
std::unique_ptr<DataframeStreamWriter>& writer) {
RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::write));
writer = std::unique_ptr<DataframeStreamWriter>(
new DataframeStreamWriter(client, id_, meta_));
return Status::OK();
}

std::unordered_map<std::string, std::string> GetParams() { return params_; }
Expand Down
28 changes: 28 additions & 0 deletions modules/basic/stream/stream_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/** Copyright 2020 Alibaba Group Holding Limited.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef MODULES_BASIC_STREAM_STREAM_UTILS_H_
#define MODULES_BASIC_STREAM_STREAM_UTILS_H_

namespace vineyard {

enum class OpenStreamMode {
read = 1,
write = 2,
};

} // namespace vineyard

#endif // MODULES_BASIC_STREAM_STREAM_UTILS_H_
6 changes: 4 additions & 2 deletions modules/graph/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ inline Status ReadRecordBatchesFromVineyard(
// use a local client, since reading from stream may block the client.
Client local_client;
RETURN_ON_ERROR(local_client.Connect(client.IPCSocket()));
auto reader = local_streams[idx]->OpenReader(local_client);
std::unique_ptr<DataframeStreamReader> reader;
VINEYARD_CHECK_OK(local_streams[idx]->OpenReader(local_client, reader));
std::vector<std::shared_ptr<arrow::RecordBatch>> read_batches;
RETURN_ON_ERROR(reader->ReadRecordBatches(read_batches));
{
Expand Down Expand Up @@ -119,7 +120,8 @@ inline Status ReadTableFromVineyard(Client& client, const ObjectID object_id,
// use a local client, since reading from stream may block the client.
Client local_client;
RETURN_ON_ERROR(local_client.Connect(client.IPCSocket()));
auto reader = local_streams[idx]->OpenReader(local_client);
std::unique_ptr<DataframeStreamReader> reader;
VINEYARD_CHECK_OK(local_streams[idx]->OpenReader(local_client, reader));
std::shared_ptr<arrow::Table> table;
RETURN_ON_ERROR(reader->ReadTable(table));
VLOG(10) << "table from stream: " << table->schema()->ToString();
Expand Down
4 changes: 4 additions & 0 deletions modules/graph/utils/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,17 @@ sync_gs_error(const grape::CommSpec& comm_spec, F_T&& f, ARGS_T&&... args) {
(lhs) = std::move(status_name).ValueOrDie(); \
} while (0)

#ifndef ARROW_CHECK_OK
#define ARROW_CHECK_OK(expr) \
do { \
auto status = (expr); \
if (!status.ok()) { \
LOG(FATAL) << "Arrow check failed: " << status.ToString(); \
} \
} while (0)
#endif // ARROW_CHECK_OK

#ifndef ARROW_CHECK_OK_AND_ASSIGN
#define ARROW_CHECK_OK_AND_ASSIGN(lhs, expr) \
do { \
auto status_name = (expr); \
Expand All @@ -262,6 +265,7 @@ sync_gs_error(const grape::CommSpec& comm_spec, F_T&& f, ARGS_T&&... args) {
} \
(lhs) = std::move(status_name).ValueOrDie(); \
} while (0)
#endif // ARROW_CHECK_OK_AND_ASSIGN

} // namespace vineyard
#endif // MODULES_GRAPH_UTILS_ERROR_H_
4 changes: 3 additions & 1 deletion modules/io/adaptors/dump_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ int main(int argc, const char** argv) {
auto s =
std::dynamic_pointer_cast<DataframeStream>(client.GetObject(stream_id));
LOG(INFO) << "Got dataframe stream: " << s->id();
auto reader = s->OpenReader(client);

std::unique_ptr<DataframeStreamReader> reader;
VINEYARD_CHECK_OK(s->OpenReader(client, reader));

std::shared_ptr<arrow::Table> table;
VINEYARD_CHECK_OK(reader->ReadTable(table));
Expand Down
6 changes: 4 additions & 2 deletions modules/io/adaptors/parse_bytes_to_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ int main(int argc, const char** argv) {
LOG(INFO) << "Created dataframe stream " << bs->id() << " at " << proc_index;
ReportStatus("return", VYObjectIDToString(bs->id()));

auto reader = ls->OpenReader(client);
auto writer = bs->OpenWriter(client);
std::unique_ptr<ByteStreamReader> reader;
std::unique_ptr<DataframeStreamWriter> writer;
VINEYARD_CHECK_OK(ls->OpenReader(client, reader));
VINEYARD_CHECK_OK(bs->OpenWriter(client, writer));

while (true) {
std::unique_ptr<arrow::Buffer> buffer;
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/read_kafka_bytes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ int main(int argc, char** argv) {
VINEYARD_CHECK_OK(client.Persist(bstream->id()));
ReportStatus("return", VYObjectIDToString(bstream->id()));

auto writer = bstream->OpenWriter(client);
std::unique_ptr<ByteStreamWriter> writer;
VINEYARD_CHECK_OK(bstream->OpenWriter(client, writer));
writer->SetBufferSizeLimit(2 * 1024 * 1024);

std::string line;
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/read_local_bytes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ int main(int argc, const char** argv) {
LOG(INFO) << "Local stream: " << proc << " " << lstream->id();
ReportStatus("return", VYObjectIDToString(lstream->id()));

auto writer = lstream->OpenWriter(client);
std::unique_ptr<ByteStreamWriter> writer;
VINEYARD_CHECK_OK(lstream->OpenWriter(client, writer));
writer->SetBufferSizeLimit(2 * 1024 * 1024);

std::string line;
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/read_oss_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ int main(int argc, const char** argv) {
LOG(INFO) << "Local stream: " << proc << " " << lstream->id();
ReportStatus("return", VYObjectIDToString(lstream->id()));

auto writer = lstream->OpenWriter(client);
std::unique_ptr<DataframeStreamWriter> writer;
VINEYARD_CHECK_OK(lstream->OpenWriter(client, writer));

std::shared_ptr<arrow::Table> table;
VINEYARD_CHECK_OK(oss_io_adaptor->ReadTable(&table));
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/write_kafka_bytes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ int main(int argc, char** argv) {
auto ls = s->GetStream<ByteStream>(proc_index);
LOG(INFO) << "Got dataframe stream " << ls->id() << " at " << proc_index;

auto reader = ls->OpenReader(client);
std::unique_ptr<ByteStreamReader> reader;
VINEYARD_CHECK_OK(ls->OpenReader(client, reader));

std::string line;
while (reader->ReadLine(line).ok()) {
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/write_kafka_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ int main(int argc, char** argv) {
auto ls = s->GetStream<DataframeStream>(proc_index);
LOG(INFO) << "Got dataframe stream " << ls->id() << " at " << proc_index;

auto reader = ls->OpenReader(client);
std::unique_ptr<DataframeStreamReader> reader;
VINEYARD_CHECK_OK(ls->OpenReader(client, reader));

std::string line;
while (reader->ReadLine(line).ok()) {
Expand Down
16 changes: 12 additions & 4 deletions python/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,18 @@ void bind_stream(py::module& mod) {
"open_reader",
[](ByteStream* self,
Client& client) -> std::unique_ptr<ByteStreamReader> {
return self->OpenReader(client);
std::unique_ptr<ByteStreamReader> reader = nullptr;
throw_on_error(self->OpenReader(client, reader));
return reader;
},
"client"_a)
.def(
"open_writer",
[](ByteStream* self,
Client& client) -> std::unique_ptr<ByteStreamWriter> {
return self->OpenWriter(client);
std::unique_ptr<ByteStreamWriter> writer = nullptr;
throw_on_error(self->OpenWriter(client, writer));
return writer;
},
"client"_a)
.def("__getitem__",
Expand Down Expand Up @@ -150,14 +154,18 @@ void bind_stream(py::module& mod) {
"open_reader",
[](DataframeStream* self,
Client& client) -> std::unique_ptr<DataframeStreamReader> {
return self->OpenReader(client);
std::unique_ptr<DataframeStreamReader> reader = nullptr;
throw_on_error(self->OpenReader(client, reader));
return reader;
},
"client"_a)
.def(
"open_writer",
[](DataframeStream* self,
Client& client) -> std::unique_ptr<DataframeStreamWriter> {
return self->OpenWriter(client);
std::unique_ptr<DataframeStreamWriter> writer = nullptr;
throw_on_error(self->OpenWriter(client, writer));
return writer;
},
"client"_a)
.def("__getitem__",
Expand Down
11 changes: 11 additions & 0 deletions src/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ Status Client::CreateStream(const ObjectID& id) {
return Status::OK();
}

Status Client::OpenStream(const ObjectID& id, OpenStreamMode mode) {
ENSURE_CONNECTED(this);
std::string message_out;
WriteOpenStreamRequest(id, static_cast<int64_t>(mode), message_out);
RETURN_ON_ERROR(doWrite(message_out));
json message_in;
RETURN_ON_ERROR(doRead(message_in));
RETURN_ON_ERROR(ReadOpenStreamReply(message_in));
return Status::OK();
}

Status Client::GetNextStreamChunk(ObjectID const id, size_t const size,
std::unique_ptr<arrow::MutableBuffer>& blob) {
ENSURE_CONNECTED(this);
Expand Down
12 changes: 12 additions & 0 deletions src/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License.

#include "arrow/buffer.h"

#include "basic/stream/stream_utils.h"
#include "client/client_base.h"
#include "client/ds/i_object.h"
#include "client/ds/object_meta.h"
Expand Down Expand Up @@ -201,6 +202,17 @@ class Client : public ClientBase {
*/
Status CreateStream(const ObjectID& id);

/**
* @brief open a stream on vineyard. Failed if the stream is already opened on
* the given mode.
*
* @param id The id of stream to mark.
* @param mode The mode, OpenStreamMode::read or OpenStreamMode::write.
*
* @return Status that indicates whether the open action has succeeded.
*/
Status OpenStream(const ObjectID& id, OpenStreamMode mode);

/**
* @brief Allocate a chunk of given size in vineyard for a stream. When the
* request cannot be statisfied immediately, e.g., vineyard doesn't have
Expand Down
32 changes: 32 additions & 0 deletions src/common/util/protocols.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ CommandType ParseCommandType(const std::string& str_type) {
return CommandType::InstanceStatusRequest;
} else if (str_type == "shallow_copy_request") {
return CommandType::ShallowCopyRequest;
} else if (str_type == "open_stream_request") {
return CommandType::OpenStreamRequest;
} else {
return CommandType::NullCommand;
}
Expand Down Expand Up @@ -634,6 +636,36 @@ Status ReadCreateStreamReply(const json& root) {
return Status::OK();
}

void WriteOpenStreamRequest(const ObjectID& object_id, const int64_t& mode,
std::string& msg) {
json root;
root["type"] = "open_stream_request";
root["object_id"] = object_id;
root["mode"] = mode;

encode_msg(root, msg);
}

Status ReadOpenStreamRequest(const json& root, ObjectID& object_id,
int64_t& mode) {
RETURN_ON_ASSERT(root["type"] == "open_stream_request");
object_id = root["object_id"].get<ObjectID>();
mode = root["mode"].get<int64_t>();
return Status::OK();
}

void WriteOpenStreamReply(std::string& msg) {
json root;
root["type"] = "open_stream_reply";

encode_msg(root, msg);
}

Status ReadOpenStreamReply(const json& root) {
CHECK_IPC_ERROR(root, "open_stream_reply");
return Status::OK();
}

void WriteGetNextStreamChunkRequest(const ObjectID stream_id, const size_t size,
std::string& msg) {
json root;
Expand Down
Loading