Skip to content

Commit

Permalink
Markstream to enable 1-producer-1-consumer on server side (#138)
Browse files Browse the repository at this point in the history
* mark stream for one-producer-one-consumer on server side
  • Loading branch information
andydiwenzhu authored Jan 8, 2021
1 parent a783e86 commit b80529a
Show file tree
Hide file tree
Showing 24 changed files with 223 additions and 35 deletions.
20 changes: 13 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC \
-Wno-unknown-pragmas"
)

check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14)
if(BUILD_VINEYARD_PYPI_PACKAGES AND NOT HAVE_FLAG_STD_CXX14)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
endif()

check_cxx_compiler_flag(-Wno-class-memaccess W_NO_CLASS_MEMACCESS)
check_cxx_compiler_flag(-Wno-deprecated-declarations W_NO_DEPRECATED_DECLARATIONS)
check_cxx_compiler_flag(-Wno-defaulted-function-deleted W_NO_DEFAULTED_FUNCTION_DELETED)
Expand Down Expand Up @@ -189,6 +182,19 @@ endmacro(find_openssl_libraries)

find_common_libraries()

if (CMAKE_VERSION VERSION_LESS "3.1")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14)
if(BUILD_VINEYARD_PYPI_PACKAGES AND NOT HAVE_FLAG_STD_CXX14)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
endif()
endif ()
else ()
set (CMAKE_CXX_STANDARD 14)
endif ()

include_directories(${PROJECT_SOURCE_DIR}/src)
include_directories(${PROJECT_SOURCE_DIR}/modules)
include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/thirdparty)
Expand Down
17 changes: 11 additions & 6 deletions modules/basic/stream/byte_stream.vineyard-mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ limitations under the License.
#include "arrow/status.h"

#include "basic/ds/arrow_utils.h"
#include "basic/stream/stream_utils.h"
#include "client/client.h"
#include "client/ds/blob.h"
#include "client/ds/i_object.h"
Expand Down Expand Up @@ -150,22 +151,26 @@ class ByteStream : public Registered<ByteStream> {
* @brief Open a reader to consume data from the byte stream
*
* @param client The client connected to the vineyard server
* @return The unique pointer to the reader
* @param The unique pointer to the reader
*/
std::unique_ptr<ByteStreamReader> OpenReader(Client& client) {
return std::unique_ptr<ByteStreamReader>(
Status OpenReader(Client& client, std::unique_ptr<ByteStreamReader>& reader) {
RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::read));
reader = std::unique_ptr<ByteStreamReader>(
new ByteStreamReader(client, id_, meta_));
return Status::OK();
}

/**
* @brief Open a writer to produce data to the byte stream
*
* @param client The client connected to the vineyard server
* @return The unique pointer to the writer
* @param The unique pointer to the writer
*/
std::unique_ptr<ByteStreamWriter> OpenWriter(Client& client) {
return std::unique_ptr<ByteStreamWriter>(
Status OpenWriter(Client& client, std::unique_ptr<ByteStreamWriter>& writer) {
RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::write));
writer = std::unique_ptr<ByteStreamWriter>(
new ByteStreamWriter(client, id_, meta_));
return Status::OK();
}

std::unordered_map<std::string, std::string> GetParams() { return params_; }
Expand Down
15 changes: 11 additions & 4 deletions modules/basic/stream/dataframe_stream.vineyard-mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ limitations under the License.
#include "arrow/util/key_value_metadata.h"

#include "basic/ds/dataframe.vineyard.h"
#include "basic/stream/stream_utils.h"
#include "client/client.h"
#include "client/ds/blob.h"
#include "client/ds/i_object.h"
Expand Down Expand Up @@ -342,14 +343,20 @@ class __attribute__((annotate("no-vineyard"))) DataframeStreamReader {

class DataframeStream : public Registered<DataframeStream> {
public:
std::unique_ptr<DataframeStreamReader> OpenReader(Client& client) {
return std::unique_ptr<DataframeStreamReader>(
Status OpenReader(Client& client,
std::unique_ptr<DataframeStreamReader>& reader) {
RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::read));
reader = std::unique_ptr<DataframeStreamReader>(
new DataframeStreamReader(client, id_, meta_, params_));
return Status::OK();
}

std::unique_ptr<DataframeStreamWriter> OpenWriter(Client& client) {
return std::unique_ptr<DataframeStreamWriter>(
Status OpenWriter(Client& client,
std::unique_ptr<DataframeStreamWriter>& writer) {
RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::write));
writer = std::unique_ptr<DataframeStreamWriter>(
new DataframeStreamWriter(client, id_, meta_));
return Status::OK();
}

std::unordered_map<std::string, std::string> GetParams() { return params_; }
Expand Down
28 changes: 28 additions & 0 deletions modules/basic/stream/stream_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/** Copyright 2020 Alibaba Group Holding Limited.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef MODULES_BASIC_STREAM_STREAM_UTILS_H_
#define MODULES_BASIC_STREAM_STREAM_UTILS_H_

namespace vineyard {

enum class OpenStreamMode {
read = 1,
write = 2,
};

} // namespace vineyard

#endif // MODULES_BASIC_STREAM_STREAM_UTILS_H_
6 changes: 4 additions & 2 deletions modules/graph/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ inline Status ReadRecordBatchesFromVineyard(
// use a local client, since reading from stream may block the client.
Client local_client;
RETURN_ON_ERROR(local_client.Connect(client.IPCSocket()));
auto reader = local_streams[idx]->OpenReader(local_client);
std::unique_ptr<DataframeStreamReader> reader;
VINEYARD_CHECK_OK(local_streams[idx]->OpenReader(local_client, reader));
std::vector<std::shared_ptr<arrow::RecordBatch>> read_batches;
RETURN_ON_ERROR(reader->ReadRecordBatches(read_batches));
{
Expand Down Expand Up @@ -119,7 +120,8 @@ inline Status ReadTableFromVineyard(Client& client, const ObjectID object_id,
// use a local client, since reading from stream may block the client.
Client local_client;
RETURN_ON_ERROR(local_client.Connect(client.IPCSocket()));
auto reader = local_streams[idx]->OpenReader(local_client);
std::unique_ptr<DataframeStreamReader> reader;
VINEYARD_CHECK_OK(local_streams[idx]->OpenReader(local_client, reader));
std::shared_ptr<arrow::Table> table;
RETURN_ON_ERROR(reader->ReadTable(table));
VLOG(10) << "table from stream: " << table->schema()->ToString();
Expand Down
4 changes: 4 additions & 0 deletions modules/graph/utils/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,17 @@ sync_gs_error(const grape::CommSpec& comm_spec, F_T&& f, ARGS_T&&... args) {
(lhs) = std::move(status_name).ValueOrDie(); \
} while (0)

#ifndef ARROW_CHECK_OK
#define ARROW_CHECK_OK(expr) \
do { \
auto status = (expr); \
if (!status.ok()) { \
LOG(FATAL) << "Arrow check failed: " << status.ToString(); \
} \
} while (0)
#endif // ARROW_CHECK_OK

#ifndef ARROW_CHECK_OK_AND_ASSIGN
#define ARROW_CHECK_OK_AND_ASSIGN(lhs, expr) \
do { \
auto status_name = (expr); \
Expand All @@ -262,6 +265,7 @@ sync_gs_error(const grape::CommSpec& comm_spec, F_T&& f, ARGS_T&&... args) {
} \
(lhs) = std::move(status_name).ValueOrDie(); \
} while (0)
#endif // ARROW_CHECK_OK_AND_ASSIGN

} // namespace vineyard
#endif // MODULES_GRAPH_UTILS_ERROR_H_
4 changes: 3 additions & 1 deletion modules/io/adaptors/dump_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ int main(int argc, const char** argv) {
auto s =
std::dynamic_pointer_cast<DataframeStream>(client.GetObject(stream_id));
LOG(INFO) << "Got dataframe stream: " << s->id();
auto reader = s->OpenReader(client);

std::unique_ptr<DataframeStreamReader> reader;
VINEYARD_CHECK_OK(s->OpenReader(client, reader));

std::shared_ptr<arrow::Table> table;
VINEYARD_CHECK_OK(reader->ReadTable(table));
Expand Down
6 changes: 4 additions & 2 deletions modules/io/adaptors/parse_bytes_to_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ int main(int argc, const char** argv) {
LOG(INFO) << "Created dataframe stream " << bs->id() << " at " << proc_index;
ReportStatus("return", VYObjectIDToString(bs->id()));

auto reader = ls->OpenReader(client);
auto writer = bs->OpenWriter(client);
std::unique_ptr<ByteStreamReader> reader;
std::unique_ptr<DataframeStreamWriter> writer;
VINEYARD_CHECK_OK(ls->OpenReader(client, reader));
VINEYARD_CHECK_OK(bs->OpenWriter(client, writer));

while (true) {
std::unique_ptr<arrow::Buffer> buffer;
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/read_kafka_bytes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ int main(int argc, char** argv) {
VINEYARD_CHECK_OK(client.Persist(bstream->id()));
ReportStatus("return", VYObjectIDToString(bstream->id()));

auto writer = bstream->OpenWriter(client);
std::unique_ptr<ByteStreamWriter> writer;
VINEYARD_CHECK_OK(bstream->OpenWriter(client, writer));
writer->SetBufferSizeLimit(2 * 1024 * 1024);

std::string line;
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/read_local_bytes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ int main(int argc, const char** argv) {
LOG(INFO) << "Local stream: " << proc << " " << lstream->id();
ReportStatus("return", VYObjectIDToString(lstream->id()));

auto writer = lstream->OpenWriter(client);
std::unique_ptr<ByteStreamWriter> writer;
VINEYARD_CHECK_OK(lstream->OpenWriter(client, writer));
writer->SetBufferSizeLimit(2 * 1024 * 1024);

std::string line;
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/read_oss_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ int main(int argc, const char** argv) {
LOG(INFO) << "Local stream: " << proc << " " << lstream->id();
ReportStatus("return", VYObjectIDToString(lstream->id()));

auto writer = lstream->OpenWriter(client);
std::unique_ptr<DataframeStreamWriter> writer;
VINEYARD_CHECK_OK(lstream->OpenWriter(client, writer));

std::shared_ptr<arrow::Table> table;
VINEYARD_CHECK_OK(oss_io_adaptor->ReadTable(&table));
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/write_kafka_bytes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ int main(int argc, char** argv) {
auto ls = s->GetStream<ByteStream>(proc_index);
LOG(INFO) << "Got dataframe stream " << ls->id() << " at " << proc_index;

auto reader = ls->OpenReader(client);
std::unique_ptr<ByteStreamReader> reader;
VINEYARD_CHECK_OK(ls->OpenReader(client, reader));

std::string line;
while (reader->ReadLine(line).ok()) {
Expand Down
3 changes: 2 additions & 1 deletion modules/io/adaptors/write_kafka_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ int main(int argc, char** argv) {
auto ls = s->GetStream<DataframeStream>(proc_index);
LOG(INFO) << "Got dataframe stream " << ls->id() << " at " << proc_index;

auto reader = ls->OpenReader(client);
std::unique_ptr<DataframeStreamReader> reader;
VINEYARD_CHECK_OK(ls->OpenReader(client, reader));

std::string line;
while (reader->ReadLine(line).ok()) {
Expand Down
16 changes: 12 additions & 4 deletions python/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,18 @@ void bind_stream(py::module& mod) {
"open_reader",
[](ByteStream* self,
Client& client) -> std::unique_ptr<ByteStreamReader> {
return self->OpenReader(client);
std::unique_ptr<ByteStreamReader> reader = nullptr;
throw_on_error(self->OpenReader(client, reader));
return reader;
},
"client"_a)
.def(
"open_writer",
[](ByteStream* self,
Client& client) -> std::unique_ptr<ByteStreamWriter> {
return self->OpenWriter(client);
std::unique_ptr<ByteStreamWriter> writer = nullptr;
throw_on_error(self->OpenWriter(client, writer));
return writer;
},
"client"_a)
.def("__getitem__",
Expand Down Expand Up @@ -150,14 +154,18 @@ void bind_stream(py::module& mod) {
"open_reader",
[](DataframeStream* self,
Client& client) -> std::unique_ptr<DataframeStreamReader> {
return self->OpenReader(client);
std::unique_ptr<DataframeStreamReader> reader = nullptr;
throw_on_error(self->OpenReader(client, reader));
return reader;
},
"client"_a)
.def(
"open_writer",
[](DataframeStream* self,
Client& client) -> std::unique_ptr<DataframeStreamWriter> {
return self->OpenWriter(client);
std::unique_ptr<DataframeStreamWriter> writer = nullptr;
throw_on_error(self->OpenWriter(client, writer));
return writer;
},
"client"_a)
.def("__getitem__",
Expand Down
11 changes: 11 additions & 0 deletions src/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ Status Client::CreateStream(const ObjectID& id) {
return Status::OK();
}

Status Client::OpenStream(const ObjectID& id, OpenStreamMode mode) {
ENSURE_CONNECTED(this);
std::string message_out;
WriteOpenStreamRequest(id, static_cast<int64_t>(mode), message_out);
RETURN_ON_ERROR(doWrite(message_out));
json message_in;
RETURN_ON_ERROR(doRead(message_in));
RETURN_ON_ERROR(ReadOpenStreamReply(message_in));
return Status::OK();
}

Status Client::GetNextStreamChunk(ObjectID const id, size_t const size,
std::unique_ptr<arrow::MutableBuffer>& blob) {
ENSURE_CONNECTED(this);
Expand Down
12 changes: 12 additions & 0 deletions src/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License.

#include "arrow/buffer.h"

#include "basic/stream/stream_utils.h"
#include "client/client_base.h"
#include "client/ds/i_object.h"
#include "client/ds/object_meta.h"
Expand Down Expand Up @@ -201,6 +202,17 @@ class Client : public ClientBase {
*/
Status CreateStream(const ObjectID& id);

/**
* @brief open a stream on vineyard. Failed if the stream is already opened on
* the given mode.
*
* @param id The id of stream to mark.
* @param mode The mode, OpenStreamMode::read or OpenStreamMode::write.
*
* @return Status that indicates whether the open action has succeeded.
*/
Status OpenStream(const ObjectID& id, OpenStreamMode mode);

/**
* @brief Allocate a chunk of given size in vineyard for a stream. When the
* request cannot be statisfied immediately, e.g., vineyard doesn't have
Expand Down
32 changes: 32 additions & 0 deletions src/common/util/protocols.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ CommandType ParseCommandType(const std::string& str_type) {
return CommandType::InstanceStatusRequest;
} else if (str_type == "shallow_copy_request") {
return CommandType::ShallowCopyRequest;
} else if (str_type == "open_stream_request") {
return CommandType::OpenStreamRequest;
} else {
return CommandType::NullCommand;
}
Expand Down Expand Up @@ -634,6 +636,36 @@ Status ReadCreateStreamReply(const json& root) {
return Status::OK();
}

void WriteOpenStreamRequest(const ObjectID& object_id, const int64_t& mode,
std::string& msg) {
json root;
root["type"] = "open_stream_request";
root["object_id"] = object_id;
root["mode"] = mode;

encode_msg(root, msg);
}

Status ReadOpenStreamRequest(const json& root, ObjectID& object_id,
int64_t& mode) {
RETURN_ON_ASSERT(root["type"] == "open_stream_request");
object_id = root["object_id"].get<ObjectID>();
mode = root["mode"].get<int64_t>();
return Status::OK();
}

void WriteOpenStreamReply(std::string& msg) {
json root;
root["type"] = "open_stream_reply";

encode_msg(root, msg);
}

Status ReadOpenStreamReply(const json& root) {
CHECK_IPC_ERROR(root, "open_stream_reply");
return Status::OK();
}

void WriteGetNextStreamChunkRequest(const ObjectID stream_id, const size_t size,
std::string& msg) {
json root;
Expand Down
Loading

0 comments on commit b80529a

Please sign in to comment.