From 09d9a5c67922b6ca1907a8933e66191a9cdffa28 Mon Sep 17 00:00:00 2001 From: andydiwenzhu Date: Fri, 8 Jan 2021 16:26:45 +0800 Subject: [PATCH 1/4] mark stream for one-producer-one-consumer on server side --- CMakeLists.txt | 20 ++++++++----- modules/basic/stream/byte_stream.vineyard-mod | 14 ++++++--- .../stream/dataframe_stream.vineyard-mod | 14 ++++++--- modules/graph/utils/error.h | 4 +++ src/client/client.cc | 11 +++++++ src/client/client.h | 10 +++++++ src/common/util/protocols.cc | 30 +++++++++++++++++++ src/common/util/protocols.h | 9 ++++++ src/common/util/status.cc | 3 ++ src/common/util/status.h | 6 ++++ src/server/async/socket_server.cc | 14 +++++++++ src/server/memory/stream_store.cc | 12 ++++++++ src/server/memory/stream_store.h | 3 ++ test/stream_test.cc | 6 ++++ 14 files changed, 141 insertions(+), 15 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ddfb2be7..f0a27dbd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,13 +91,6 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC \ -Wno-unknown-pragmas" ) -check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14) -if(BUILD_VINEYARD_PYPI_PACKAGES AND NOT HAVE_FLAG_STD_CXX14) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") -else() - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") -endif() - check_cxx_compiler_flag(-Wno-class-memaccess W_NO_CLASS_MEMACCESS) check_cxx_compiler_flag(-Wno-deprecated-declarations W_NO_DEPRECATED_DECLARATIONS) check_cxx_compiler_flag(-Wno-defaulted-function-deleted W_NO_DEFAULTED_FUNCTION_DELETED) @@ -189,6 +182,19 @@ endmacro(find_openssl_libraries) find_common_libraries() +if (CMAKE_VERSION VERSION_LESS "3.1") + if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14) + if(BUILD_VINEYARD_PYPI_PACKAGES AND NOT HAVE_FLAG_STD_CXX14) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + else() + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") + endif() + endif () +else () + set (CMAKE_CXX_STANDARD 14) +endif () + include_directories(${PROJECT_SOURCE_DIR}/src) include_directories(${PROJECT_SOURCE_DIR}/modules) include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/thirdparty) diff --git a/modules/basic/stream/byte_stream.vineyard-mod b/modules/basic/stream/byte_stream.vineyard-mod index cf0525fb..2f56e1a5 100644 --- a/modules/basic/stream/byte_stream.vineyard-mod +++ b/modules/basic/stream/byte_stream.vineyard-mod @@ -153,8 +153,11 @@ class ByteStream : public Registered { * @return The unique pointer to the reader */ std::unique_ptr OpenReader(Client& client) { - return std::unique_ptr( - new ByteStreamReader(client, id_, meta_)); + if (client.MarkStream(id_, 2).ok()) { + return std::unique_ptr( + new ByteStreamReader(client, id_, meta_)); + } + return nullptr; } /** @@ -164,8 +167,11 @@ class ByteStream : public Registered { * @return The unique pointer to the writer */ std::unique_ptr OpenWriter(Client& client) { - return std::unique_ptr( - new ByteStreamWriter(client, id_, meta_)); + if (client.MarkStream(id_, 1).ok()) { + return std::unique_ptr( + new ByteStreamWriter(client, id_, meta_)); + } + return nullptr; } std::unordered_map GetParams() { return params_; } diff --git a/modules/basic/stream/dataframe_stream.vineyard-mod b/modules/basic/stream/dataframe_stream.vineyard-mod index 2c66f096..bfb86284 100644 --- a/modules/basic/stream/dataframe_stream.vineyard-mod +++ b/modules/basic/stream/dataframe_stream.vineyard-mod @@ -343,13 +343,19 @@ class __attribute__((annotate("no-vineyard"))) DataframeStreamReader { class DataframeStream : public Registered { public: std::unique_ptr OpenReader(Client& client) { - return std::unique_ptr( - new DataframeStreamReader(client, id_, meta_, params_)); + if (client.MarkStream(id_, 2).ok()) { + return std::unique_ptr( + new DataframeStreamReader(client, id_, meta_, params_)); + } + return nullptr; } std::unique_ptr OpenWriter(Client& client) { - return std::unique_ptr( - new DataframeStreamWriter(client, id_, meta_)); + if (client.MarkStream(id_, 1).ok()) { + return std::unique_ptr( + new DataframeStreamWriter(client, id_, meta_)); + } + return nullptr; } std::unordered_map GetParams() { return params_; } diff --git a/modules/graph/utils/error.h b/modules/graph/utils/error.h index 0af7f5a4..a8123559 100644 --- a/modules/graph/utils/error.h +++ b/modules/graph/utils/error.h @@ -246,6 +246,7 @@ sync_gs_error(const grape::CommSpec& comm_spec, F_T&& f, ARGS_T&&... args) { (lhs) = std::move(status_name).ValueOrDie(); \ } while (0) +#ifndef ARROW_CHECK_OK #define ARROW_CHECK_OK(expr) \ do { \ auto status = (expr); \ @@ -253,7 +254,9 @@ sync_gs_error(const grape::CommSpec& comm_spec, F_T&& f, ARGS_T&&... args) { LOG(FATAL) << "Arrow check failed: " << status.ToString(); \ } \ } while (0) +#endif // ARROW_CHECK_OK +#ifndef ARROW_CHECK_OK_AND_ASSIGN #define ARROW_CHECK_OK_AND_ASSIGN(lhs, expr) \ do { \ auto status_name = (expr); \ @@ -262,6 +265,7 @@ sync_gs_error(const grape::CommSpec& comm_spec, F_T&& f, ARGS_T&&... args) { } \ (lhs) = std::move(status_name).ValueOrDie(); \ } while (0) +#endif // ARROW_CHECK_OK_AND_ASSIGN } // namespace vineyard #endif // MODULES_GRAPH_UTILS_ERROR_H_ diff --git a/src/client/client.cc b/src/client/client.cc index ad281b4b..35db16a5 100644 --- a/src/client/client.cc +++ b/src/client/client.cc @@ -166,6 +166,17 @@ Status Client::CreateStream(const ObjectID& id) { return Status::OK(); } +Status Client::MarkStream(const ObjectID& id, const int64_t& mark) { + ENSURE_CONNECTED(this); + std::string message_out; + WriteMarkStreamRequest(id, mark, message_out); + RETURN_ON_ERROR(doWrite(message_out)); + json message_in; + RETURN_ON_ERROR(doRead(message_in)); + RETURN_ON_ERROR(ReadMarkStreamReply(message_in)); + return Status::OK(); +} + Status Client::GetNextStreamChunk(ObjectID const id, size_t const size, std::unique_ptr& blob) { ENSURE_CONNECTED(this); diff --git a/src/client/client.h b/src/client/client.h index 3151129a..2586bac4 100644 --- a/src/client/client.h +++ b/src/client/client.h @@ -201,6 +201,16 @@ class Client : public ClientBase { */ Status CreateStream(const ObjectID& id); + /** + * @brief Mark a stream on vineyard to ensure 1-producer-1-consumer. + * + * @param id The id of stream to mark. + * @param mark The mark, 1 for producer, 2 for consumer. + * + * @return Status that indicates whether the mark action has succeeded. + */ + Status MarkStream(const ObjectID& id, const int64_t& mark); + /** * @brief Allocate a chunk of given size in vineyard for a stream. When the * request cannot be statisfied immediately, e.g., vineyard doesn't have diff --git a/src/common/util/protocols.cc b/src/common/util/protocols.cc index b5d51571..846d1a73 100644 --- a/src/common/util/protocols.cc +++ b/src/common/util/protocols.cc @@ -86,6 +86,8 @@ CommandType ParseCommandType(const std::string& str_type) { return CommandType::InstanceStatusRequest; } else if (str_type == "shallow_copy_request") { return CommandType::ShallowCopyRequest; + } else if (str_type == "mark_stream_request") { + return CommandType::MarkStreamRequest; } else { return CommandType::NullCommand; } @@ -634,6 +636,34 @@ Status ReadCreateStreamReply(const json& root) { return Status::OK(); } +void WriteMarkStreamRequest(const ObjectID& object_id, const int64_t& mark, std::string& msg) { + json root; + root["type"] = "mark_stream_request"; + root["object_id"] = object_id; + root["mark"] = mark; + + encode_msg(root, msg); +} + +Status ReadMarkStreamRequest(const json& root, ObjectID& object_id, int64_t& mark) { + RETURN_ON_ASSERT(root["type"] == "mark_stream_request"); + object_id = root["object_id"].get(); + mark = root["mark"].get(); + return Status::OK(); +} + +void WriteMarkStreamReply(std::string& msg) { + json root; + root["type"] = "mark_stream_reply"; + + encode_msg(root, msg); +} + +Status ReadMarkStreamReply(const json& root) { + CHECK_IPC_ERROR(root, "mark_stream_reply"); + return Status::OK(); +} + void WriteGetNextStreamChunkRequest(const ObjectID stream_id, const size_t size, std::string& msg) { json root; diff --git a/src/common/util/protocols.h b/src/common/util/protocols.h index 636432c9..0ccb33b8 100644 --- a/src/common/util/protocols.h +++ b/src/common/util/protocols.h @@ -56,6 +56,7 @@ enum class CommandType { IfPersistRequest = 25, InstanceStatusRequest = 26, ShallowCopyRequest = 27, + MarkStreamRequest = 28, }; CommandType ParseCommandType(const std::string& str_type); @@ -218,6 +219,14 @@ void WriteCreateStreamReply(std::string& msg); Status ReadCreateStreamReply(const json& root); +void WriteMarkStreamRequest(const ObjectID& object_id, const int64_t& mark, std::string& msg); + +Status ReadMarkStreamRequest(const json& root, ObjectID& object_id, int64_t& mark); + +void WriteMarkStreamReply(std::string& msg); + +Status ReadMarkStreamReply(const json& root); + void WriteGetNextStreamChunkRequest(const ObjectID stream_id, const size_t size, std::string& msg); diff --git a/src/common/util/status.cc b/src/common/util/status.cc index 53a6a7ee..10b36bb9 100644 --- a/src/common/util/status.cc +++ b/src/common/util/status.cc @@ -137,6 +137,9 @@ std::string Status::CodeAsString() const { case StatusCode::kInvalidStreamState: type = "Invalid stream state"; break; + case StatusCode::kStreamOpened: + type = "Stream opened"; + break; case StatusCode::kUserInputError: type = "User input error"; break; diff --git a/src/common/util/status.h b/src/common/util/status.h index e0de6175..17436b54 100644 --- a/src/common/util/status.h +++ b/src/common/util/status.h @@ -163,6 +163,7 @@ enum class StatusCode : unsigned char { kStreamDrained = 42, kStreamFailed = 43, kInvalidStreamState = 44, + kStreamOpened = 45, kUserInputError = 51, @@ -379,6 +380,11 @@ class VINEYARD_MUST_USE_TYPE Status { return Status(StatusCode::kInvalidStreamState, error_message); } + /// Return a status code that indicates the stream has been opened. + static Status StreamOpened() { + return Status(StatusCode::kStreamOpened, "Stream already opened"); + } + /// Return a status code indicates invalid user input. static Status UserInputError(std::string const& message = "") { return Status(StatusCode::kUserInputError, message); diff --git a/src/server/async/socket_server.cc b/src/server/async/socket_server.cc index 7b3a15ae..5285cd32 100644 --- a/src/server/async/socket_server.cc +++ b/src/server/async/socket_server.cc @@ -350,6 +350,20 @@ bool SocketConnection::processMessage(const std::string& message_in) { } this->doWrite(message_out); } break; + case CommandType::MarkStreamRequest: { + ObjectID stream_id; + int64_t mark; + TRY_READ_REQUEST(ReadMarkStreamRequest(root, stream_id, mark)); + auto status = server_ptr_->GetStreamStore()->Mark(stream_id, mark); + std::string message_out; + if (status.ok()) { + WriteMarkStreamReply(message_out); + } else { + LOG(ERROR) << status.ToString(); + WriteErrorReply(status, message_out); + } + this->doWrite(message_out); + } break; case CommandType::GetNextStreamChunkRequest: { ObjectID stream_id; size_t size; diff --git a/src/server/memory/stream_store.cc b/src/server/memory/stream_store.cc index 89670077..d7fac1b4 100644 --- a/src/server/memory/stream_store.cc +++ b/src/server/memory/stream_store.cc @@ -42,6 +42,18 @@ Status StreamStore::Create(ObjectID const stream_id) { return Status::ObjectExists(); } streams_.emplace(stream_id, std::make_shared()); + stream_marks_.emplace(stream_id, 0); + return Status::OK(); +} + +Status StreamStore::Mark(ObjectID const stream_id, int64_t const mark) { + if (stream_marks_.find(stream_id) == stream_marks_.end()) { + return Status::ObjectNotExists(); + } + if (stream_marks_[stream_id] & mark) { + return Status::StreamOpened(); + } + stream_marks_[stream_id] |= mark; return Status::OK(); } diff --git a/src/server/memory/stream_store.h b/src/server/memory/stream_store.h index 77bc5772..51a7e709 100644 --- a/src/server/memory/stream_store.h +++ b/src/server/memory/stream_store.h @@ -52,6 +52,8 @@ class StreamStore { Status Create(ObjectID const stream_id); + Status Mark(ObjectID const stream_id, int64_t const mark); + /** * @brief This is called by the producer of the steram and it makes current * chunk available for the consumer to read @@ -86,6 +88,7 @@ class StreamStore { std::shared_ptr store_; size_t threshold_; std::unordered_map> streams_; + std::unordered_map stream_marks_; }; } // namespace vineyard diff --git a/test/stream_test.cc b/test/stream_test.cc index 418fbf15..a264ec84 100644 --- a/test/stream_test.cc +++ b/test/stream_test.cc @@ -61,6 +61,9 @@ int main(int argc, char** argv) { CHECK(byte_stream != nullptr); auto reader = byte_stream->OpenReader(reader_client); + auto failed_reader = byte_stream->OpenReader(reader_client); + CHECK(failed_reader == nullptr); + while (true) { std::unique_ptr buffer = nullptr; auto status = reader->GetNext(buffer); @@ -83,6 +86,9 @@ int main(int argc, char** argv) { CHECK(byte_stream != nullptr); auto writer = byte_stream->OpenWriter(writer_client); + auto failed_writer = byte_stream->OpenWriter(writer_client); + CHECK(failed_writer == nullptr); + CHECK(writer != nullptr); for (size_t idx = 1; idx <= 11; ++idx) { std::unique_ptr buffer = nullptr; From 45bc1ee90ef37a0f87930a326bf85c42302895ed Mon Sep 17 00:00:00 2001 From: andydiwenzhu Date: Fri, 8 Jan 2021 16:31:18 +0800 Subject: [PATCH 2/4] cpplint --- src/common/util/protocols.cc | 6 ++++-- src/common/util/protocols.h | 6 ++++-- test/stream_test.cc | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/common/util/protocols.cc b/src/common/util/protocols.cc index 846d1a73..f0c4349b 100644 --- a/src/common/util/protocols.cc +++ b/src/common/util/protocols.cc @@ -636,7 +636,8 @@ Status ReadCreateStreamReply(const json& root) { return Status::OK(); } -void WriteMarkStreamRequest(const ObjectID& object_id, const int64_t& mark, std::string& msg) { +void WriteMarkStreamRequest(const ObjectID& object_id, const int64_t& mark, + std::string& msg) { json root; root["type"] = "mark_stream_request"; root["object_id"] = object_id; @@ -645,7 +646,8 @@ void WriteMarkStreamRequest(const ObjectID& object_id, const int64_t& mark, std: encode_msg(root, msg); } -Status ReadMarkStreamRequest(const json& root, ObjectID& object_id, int64_t& mark) { +Status ReadMarkStreamRequest(const json& root, ObjectID& object_id, + int64_t& mark) { RETURN_ON_ASSERT(root["type"] == "mark_stream_request"); object_id = root["object_id"].get(); mark = root["mark"].get(); diff --git a/src/common/util/protocols.h b/src/common/util/protocols.h index 0ccb33b8..a6bdbc25 100644 --- a/src/common/util/protocols.h +++ b/src/common/util/protocols.h @@ -219,9 +219,11 @@ void WriteCreateStreamReply(std::string& msg); Status ReadCreateStreamReply(const json& root); -void WriteMarkStreamRequest(const ObjectID& object_id, const int64_t& mark, std::string& msg); +void WriteMarkStreamRequest(const ObjectID& object_id, const int64_t& mark, + std::string& msg); -Status ReadMarkStreamRequest(const json& root, ObjectID& object_id, int64_t& mark); +Status ReadMarkStreamRequest(const json& root, ObjectID& object_id, + int64_t& mark); void WriteMarkStreamReply(std::string& msg); diff --git a/test/stream_test.cc b/test/stream_test.cc index a264ec84..bb205194 100644 --- a/test/stream_test.cc +++ b/test/stream_test.cc @@ -88,7 +88,7 @@ int main(int argc, char** argv) { auto writer = byte_stream->OpenWriter(writer_client); auto failed_writer = byte_stream->OpenWriter(writer_client); CHECK(failed_writer == nullptr); - + CHECK(writer != nullptr); for (size_t idx = 1; idx <= 11; ++idx) { std::unique_ptr buffer = nullptr; From cff4b19dc2745a0abda0ac1416f9c27d22c8dca4 Mon Sep 17 00:00:00 2001 From: andydiwenzhu Date: Fri, 8 Jan 2021 19:46:58 +0800 Subject: [PATCH 3/4] fix for comments --- modules/basic/stream/byte_stream.vineyard-mod | 27 +++++++++--------- .../stream/dataframe_stream.vineyard-mod | 25 +++++++++-------- modules/basic/stream/stream_utils.h | 28 +++++++++++++++++++ modules/graph/loader/arrow_fragment_loader.h | 6 ++-- modules/io/adaptors/dump_dataframe.cc | 4 ++- .../io/adaptors/parse_bytes_to_dataframe.cc | 6 ++-- modules/io/adaptors/read_kafka_bytes.cc | 3 +- modules/io/adaptors/read_local_bytes.cc | 3 +- modules/io/adaptors/write_kafka_bytes.cc | 3 +- modules/io/adaptors/write_kafka_dataframe.cc | 3 +- python/stream.cc | 16 ++++++++--- src/client/client.cc | 6 ++-- src/client/client.h | 10 ++++--- src/common/util/protocols.cc | 26 ++++++++--------- src/common/util/protocols.h | 12 ++++---- src/common/util/status.h | 2 ++ src/server/async/socket_server.cc | 10 +++---- src/server/memory/stream_store.cc | 9 +++--- src/server/memory/stream_store.h | 4 +-- test/stream_test.cc | 24 ++++++++++------ 20 files changed, 142 insertions(+), 85 deletions(-) create mode 100644 modules/basic/stream/stream_utils.h diff --git a/modules/basic/stream/byte_stream.vineyard-mod b/modules/basic/stream/byte_stream.vineyard-mod index 2f56e1a5..cd00bbd6 100644 --- a/modules/basic/stream/byte_stream.vineyard-mod +++ b/modules/basic/stream/byte_stream.vineyard-mod @@ -25,6 +25,7 @@ limitations under the License. #include "arrow/status.h" #include "basic/ds/arrow_utils.h" +#include "basic/stream/stream_utils.h" #include "client/client.h" #include "client/ds/blob.h" #include "client/ds/i_object.h" @@ -150,28 +151,26 @@ class ByteStream : public Registered { * @brief Open a reader to consume data from the byte stream * * @param client The client connected to the vineyard server - * @return The unique pointer to the reader + * @param The unique pointer to the reader */ - std::unique_ptr OpenReader(Client& client) { - if (client.MarkStream(id_, 2).ok()) { - return std::unique_ptr( - new ByteStreamReader(client, id_, meta_)); - } - return nullptr; + Status OpenReader(Client& client, std::unique_ptr& reader) { + RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::read)); + reader = std::unique_ptr( + new ByteStreamReader(client, id_, meta_)); + return Status::OK(); } /** * @brief Open a writer to produce data to the byte stream * * @param client The client connected to the vineyard server - * @return The unique pointer to the writer + * @param The unique pointer to the writer */ - std::unique_ptr OpenWriter(Client& client) { - if (client.MarkStream(id_, 1).ok()) { - return std::unique_ptr( - new ByteStreamWriter(client, id_, meta_)); - } - return nullptr; + Status OpenWriter(Client& client, std::unique_ptr& writer) { + RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::write)); + writer = std::unique_ptr( + new ByteStreamWriter(client, id_, meta_)); + return Status::OK(); } std::unordered_map GetParams() { return params_; } diff --git a/modules/basic/stream/dataframe_stream.vineyard-mod b/modules/basic/stream/dataframe_stream.vineyard-mod index bfb86284..c6f8e9b7 100644 --- a/modules/basic/stream/dataframe_stream.vineyard-mod +++ b/modules/basic/stream/dataframe_stream.vineyard-mod @@ -25,6 +25,7 @@ limitations under the License. #include "arrow/util/key_value_metadata.h" #include "basic/ds/dataframe.vineyard.h" +#include "basic/stream/stream_utils.h" #include "client/client.h" #include "client/ds/blob.h" #include "client/ds/i_object.h" @@ -342,20 +343,20 @@ class __attribute__((annotate("no-vineyard"))) DataframeStreamReader { class DataframeStream : public Registered { public: - std::unique_ptr OpenReader(Client& client) { - if (client.MarkStream(id_, 2).ok()) { - return std::unique_ptr( - new DataframeStreamReader(client, id_, meta_, params_)); - } - return nullptr; + Status OpenReader(Client& client, + std::unique_ptr& reader) { + RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::read)); + reader = std::unique_ptr( + new DataframeStreamReader(client, id_, meta_, params_)); + return Status::OK(); } - std::unique_ptr OpenWriter(Client& client) { - if (client.MarkStream(id_, 1).ok()) { - return std::unique_ptr( - new DataframeStreamWriter(client, id_, meta_)); - } - return nullptr; + Status OpenWriter(Client& client, + std::unique_ptr& writer) { + RETURN_ON_ERROR(client.OpenStream(id_, OpenStreamMode::write)); + writer = std::unique_ptr( + new DataframeStreamWriter(client, id_, meta_)); + return Status::OK(); } std::unordered_map GetParams() { return params_; } diff --git a/modules/basic/stream/stream_utils.h b/modules/basic/stream/stream_utils.h new file mode 100644 index 00000000..6811d914 --- /dev/null +++ b/modules/basic/stream/stream_utils.h @@ -0,0 +1,28 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef MODULES_BASIC_STREAM_STREAM_UTILS_H_ +#define MODULES_BASIC_STREAM_STREAM_UTILS_H_ + +namespace vineyard { + +enum class OpenStreamMode { + read = 1, + write = 2, +}; + +} // namespace vineyard + +#endif // MODULES_BASIC_STREAM_STREAM_UTILS_H_ diff --git a/modules/graph/loader/arrow_fragment_loader.h b/modules/graph/loader/arrow_fragment_loader.h index 26c44c0f..682f69c7 100644 --- a/modules/graph/loader/arrow_fragment_loader.h +++ b/modules/graph/loader/arrow_fragment_loader.h @@ -75,7 +75,8 @@ inline Status ReadRecordBatchesFromVineyard( // use a local client, since reading from stream may block the client. Client local_client; RETURN_ON_ERROR(local_client.Connect(client.IPCSocket())); - auto reader = local_streams[idx]->OpenReader(local_client); + std::unique_ptr reader; + VINEYARD_CHECK_OK(local_streams[idx]->OpenReader(local_client, reader)); std::vector> read_batches; RETURN_ON_ERROR(reader->ReadRecordBatches(read_batches)); { @@ -119,7 +120,8 @@ inline Status ReadTableFromVineyard(Client& client, const ObjectID object_id, // use a local client, since reading from stream may block the client. Client local_client; RETURN_ON_ERROR(local_client.Connect(client.IPCSocket())); - auto reader = local_streams[idx]->OpenReader(local_client); + std::unique_ptr reader; + VINEYARD_CHECK_OK(local_streams[idx]->OpenReader(local_client, reader)); std::shared_ptr table; RETURN_ON_ERROR(reader->ReadTable(table)); VLOG(10) << "table from stream: " << table->schema()->ToString(); diff --git a/modules/io/adaptors/dump_dataframe.cc b/modules/io/adaptors/dump_dataframe.cc index 9b7c9c1c..dacc9c6d 100644 --- a/modules/io/adaptors/dump_dataframe.cc +++ b/modules/io/adaptors/dump_dataframe.cc @@ -38,7 +38,9 @@ int main(int argc, const char** argv) { auto s = std::dynamic_pointer_cast(client.GetObject(stream_id)); LOG(INFO) << "Got dataframe stream: " << s->id(); - auto reader = s->OpenReader(client); + + std::unique_ptr reader; + VINEYARD_CHECK_OK(s->OpenReader(client, reader)); std::shared_ptr table; VINEYARD_CHECK_OK(reader->ReadTable(table)); diff --git a/modules/io/adaptors/parse_bytes_to_dataframe.cc b/modules/io/adaptors/parse_bytes_to_dataframe.cc index f5d43ec5..5a53bc6d 100644 --- a/modules/io/adaptors/parse_bytes_to_dataframe.cc +++ b/modules/io/adaptors/parse_bytes_to_dataframe.cc @@ -203,8 +203,10 @@ int main(int argc, const char** argv) { LOG(INFO) << "Created dataframe stream " << bs->id() << " at " << proc_index; ReportStatus("return", VYObjectIDToString(bs->id())); - auto reader = ls->OpenReader(client); - auto writer = bs->OpenWriter(client); + std::unique_ptr reader; + std::unique_ptr writer; + VINEYARD_CHECK_OK(ls->OpenReader(client, reader)); + VINEYARD_CHECK_OK(bs->OpenWriter(client, writer)); while (true) { std::unique_ptr buffer; diff --git a/modules/io/adaptors/read_kafka_bytes.cc b/modules/io/adaptors/read_kafka_bytes.cc index be971417..cd74bcf8 100644 --- a/modules/io/adaptors/read_kafka_bytes.cc +++ b/modules/io/adaptors/read_kafka_bytes.cc @@ -54,7 +54,8 @@ int main(int argc, char** argv) { VINEYARD_CHECK_OK(client.Persist(bstream->id())); ReportStatus("return", VYObjectIDToString(bstream->id())); - auto writer = bstream->OpenWriter(client); + std::unique_ptr writer; + VINEYARD_CHECK_OK(bstream->OpenWriter(client, writer)); writer->SetBufferSizeLimit(2 * 1024 * 1024); std::string line; diff --git a/modules/io/adaptors/read_local_bytes.cc b/modules/io/adaptors/read_local_bytes.cc index 129aad0d..8608fcd2 100644 --- a/modules/io/adaptors/read_local_bytes.cc +++ b/modules/io/adaptors/read_local_bytes.cc @@ -59,7 +59,8 @@ int main(int argc, const char** argv) { LOG(INFO) << "Local stream: " << proc << " " << lstream->id(); ReportStatus("return", VYObjectIDToString(lstream->id())); - auto writer = lstream->OpenWriter(client); + std::unique_ptr writer; + VINEYARD_CHECK_OK(lstream->OpenWriter(client, writer)); writer->SetBufferSizeLimit(2 * 1024 * 1024); std::string line; diff --git a/modules/io/adaptors/write_kafka_bytes.cc b/modules/io/adaptors/write_kafka_bytes.cc index 9099b9b7..e91b256f 100644 --- a/modules/io/adaptors/write_kafka_bytes.cc +++ b/modules/io/adaptors/write_kafka_bytes.cc @@ -60,7 +60,8 @@ int main(int argc, char** argv) { auto ls = s->GetStream(proc_index); LOG(INFO) << "Got dataframe stream " << ls->id() << " at " << proc_index; - auto reader = ls->OpenReader(client); + std::unique_ptr reader; + VINEYARD_CHECK_OK(ls->OpenReader(client, reader)); std::string line; while (reader->ReadLine(line).ok()) { diff --git a/modules/io/adaptors/write_kafka_dataframe.cc b/modules/io/adaptors/write_kafka_dataframe.cc index e86ae503..e72f742a 100644 --- a/modules/io/adaptors/write_kafka_dataframe.cc +++ b/modules/io/adaptors/write_kafka_dataframe.cc @@ -60,7 +60,8 @@ int main(int argc, char** argv) { auto ls = s->GetStream(proc_index); LOG(INFO) << "Got dataframe stream " << ls->id() << " at " << proc_index; - auto reader = ls->OpenReader(client); + std::unique_ptr reader; + VINEYARD_CHECK_OK(ls->OpenReader(client, reader)); std::string line; while (reader->ReadLine(line).ok()) { diff --git a/python/stream.cc b/python/stream.cc index 51c66095..173e8f83 100644 --- a/python/stream.cc +++ b/python/stream.cc @@ -73,14 +73,18 @@ void bind_stream(py::module& mod) { "open_reader", [](ByteStream* self, Client& client) -> std::unique_ptr { - return self->OpenReader(client); + std::unique_ptr reader = nullptr; + throw_on_error(self->OpenReader(client, reader)); + return reader; }, "client"_a) .def( "open_writer", [](ByteStream* self, Client& client) -> std::unique_ptr { - return self->OpenWriter(client); + std::unique_ptr writer = nullptr; + throw_on_error(self->OpenWriter(client, writer)); + return writer; }, "client"_a) .def("__getitem__", @@ -150,14 +154,18 @@ void bind_stream(py::module& mod) { "open_reader", [](DataframeStream* self, Client& client) -> std::unique_ptr { - return self->OpenReader(client); + std::unique_ptr reader = nullptr; + throw_on_error(self->OpenReader(client, reader)); + return reader; }, "client"_a) .def( "open_writer", [](DataframeStream* self, Client& client) -> std::unique_ptr { - return self->OpenWriter(client); + std::unique_ptr writer = nullptr; + throw_on_error(self->OpenWriter(client, writer)); + return writer; }, "client"_a) .def("__getitem__", diff --git a/src/client/client.cc b/src/client/client.cc index 35db16a5..df740fa0 100644 --- a/src/client/client.cc +++ b/src/client/client.cc @@ -166,14 +166,14 @@ Status Client::CreateStream(const ObjectID& id) { return Status::OK(); } -Status Client::MarkStream(const ObjectID& id, const int64_t& mark) { +Status Client::OpenStream(const ObjectID& id, OpenStreamMode mode) { ENSURE_CONNECTED(this); std::string message_out; - WriteMarkStreamRequest(id, mark, message_out); + WriteOpenStreamRequest(id, static_cast(mode), message_out); RETURN_ON_ERROR(doWrite(message_out)); json message_in; RETURN_ON_ERROR(doRead(message_in)); - RETURN_ON_ERROR(ReadMarkStreamReply(message_in)); + RETURN_ON_ERROR(ReadOpenStreamReply(message_in)); return Status::OK(); } diff --git a/src/client/client.h b/src/client/client.h index 2586bac4..099ec7c1 100644 --- a/src/client/client.h +++ b/src/client/client.h @@ -26,6 +26,7 @@ limitations under the License. #include "arrow/buffer.h" +#include "basic/stream/stream_utils.h" #include "client/client_base.h" #include "client/ds/i_object.h" #include "client/ds/object_meta.h" @@ -202,14 +203,15 @@ class Client : public ClientBase { Status CreateStream(const ObjectID& id); /** - * @brief Mark a stream on vineyard to ensure 1-producer-1-consumer. + * @brief open a stream on vineyard. Failed if the stream is already opened on + * the given mode. * * @param id The id of stream to mark. - * @param mark The mark, 1 for producer, 2 for consumer. + * @param mode The mode, OpenStreamMode::read or OpenStreamMode::write. * - * @return Status that indicates whether the mark action has succeeded. + * @return Status that indicates whether the open action has succeeded. */ - Status MarkStream(const ObjectID& id, const int64_t& mark); + Status OpenStream(const ObjectID& id, OpenStreamMode mode); /** * @brief Allocate a chunk of given size in vineyard for a stream. When the diff --git a/src/common/util/protocols.cc b/src/common/util/protocols.cc index f0c4349b..fb54d45a 100644 --- a/src/common/util/protocols.cc +++ b/src/common/util/protocols.cc @@ -86,8 +86,8 @@ CommandType ParseCommandType(const std::string& str_type) { return CommandType::InstanceStatusRequest; } else if (str_type == "shallow_copy_request") { return CommandType::ShallowCopyRequest; - } else if (str_type == "mark_stream_request") { - return CommandType::MarkStreamRequest; + } else if (str_type == "open_stream_request") { + return CommandType::OpenStreamRequest; } else { return CommandType::NullCommand; } @@ -636,33 +636,33 @@ Status ReadCreateStreamReply(const json& root) { return Status::OK(); } -void WriteMarkStreamRequest(const ObjectID& object_id, const int64_t& mark, +void WriteOpenStreamRequest(const ObjectID& object_id, const int64_t& mode, std::string& msg) { json root; - root["type"] = "mark_stream_request"; + root["type"] = "open_stream_request"; root["object_id"] = object_id; - root["mark"] = mark; + root["mode"] = mode; encode_msg(root, msg); } -Status ReadMarkStreamRequest(const json& root, ObjectID& object_id, - int64_t& mark) { - RETURN_ON_ASSERT(root["type"] == "mark_stream_request"); +Status ReadOpenStreamRequest(const json& root, ObjectID& object_id, + int64_t& mode) { + RETURN_ON_ASSERT(root["type"] == "open_stream_request"); object_id = root["object_id"].get(); - mark = root["mark"].get(); + mode = root["mode"].get(); return Status::OK(); } -void WriteMarkStreamReply(std::string& msg) { +void WriteOpenStreamReply(std::string& msg) { json root; - root["type"] = "mark_stream_reply"; + root["type"] = "open_stream_reply"; encode_msg(root, msg); } -Status ReadMarkStreamReply(const json& root) { - CHECK_IPC_ERROR(root, "mark_stream_reply"); +Status ReadOpenStreamReply(const json& root) { + CHECK_IPC_ERROR(root, "open_stream_reply"); return Status::OK(); } diff --git a/src/common/util/protocols.h b/src/common/util/protocols.h index a6bdbc25..a9321afa 100644 --- a/src/common/util/protocols.h +++ b/src/common/util/protocols.h @@ -56,7 +56,7 @@ enum class CommandType { IfPersistRequest = 25, InstanceStatusRequest = 26, ShallowCopyRequest = 27, - MarkStreamRequest = 28, + OpenStreamRequest = 28, }; CommandType ParseCommandType(const std::string& str_type); @@ -219,15 +219,15 @@ void WriteCreateStreamReply(std::string& msg); Status ReadCreateStreamReply(const json& root); -void WriteMarkStreamRequest(const ObjectID& object_id, const int64_t& mark, +void WriteOpenStreamRequest(const ObjectID& object_id, const int64_t& mode, std::string& msg); -Status ReadMarkStreamRequest(const json& root, ObjectID& object_id, - int64_t& mark); +Status ReadOpenStreamRequest(const json& root, ObjectID& object_id, + int64_t& mode); -void WriteMarkStreamReply(std::string& msg); +void WriteOpenStreamReply(std::string& msg); -Status ReadMarkStreamReply(const json& root); +Status ReadOpenStreamReply(const json& root); void WriteGetNextStreamChunkRequest(const ObjectID stream_id, const size_t size, std::string& msg); diff --git a/src/common/util/status.h b/src/common/util/status.h index 17436b54..8c6cbe76 100644 --- a/src/common/util/status.h +++ b/src/common/util/status.h @@ -467,6 +467,8 @@ class VINEYARD_MUST_USE_TYPE Status { bool IsInvalidStreamState() const { return code() == StatusCode::kInvalidStreamState; } + /// Return true iff the stream has been opened. + bool IsStreamOpened() const { return code() == StatusCode::kStreamOpened; } /// Return true iff there's some problems in user's input. bool IsUserInputError() const { return code() == StatusCode::kUserInputError; diff --git a/src/server/async/socket_server.cc b/src/server/async/socket_server.cc index 5285cd32..dc0dae44 100644 --- a/src/server/async/socket_server.cc +++ b/src/server/async/socket_server.cc @@ -350,14 +350,14 @@ bool SocketConnection::processMessage(const std::string& message_in) { } this->doWrite(message_out); } break; - case CommandType::MarkStreamRequest: { + case CommandType::OpenStreamRequest: { ObjectID stream_id; - int64_t mark; - TRY_READ_REQUEST(ReadMarkStreamRequest(root, stream_id, mark)); - auto status = server_ptr_->GetStreamStore()->Mark(stream_id, mark); + int64_t mode; + TRY_READ_REQUEST(ReadOpenStreamRequest(root, stream_id, mode)); + auto status = server_ptr_->GetStreamStore()->Open(stream_id, mode); std::string message_out; if (status.ok()) { - WriteMarkStreamReply(message_out); + WriteOpenStreamReply(message_out); } else { LOG(ERROR) << status.ToString(); WriteErrorReply(status, message_out); diff --git a/src/server/memory/stream_store.cc b/src/server/memory/stream_store.cc index d7fac1b4..9c48593e 100644 --- a/src/server/memory/stream_store.cc +++ b/src/server/memory/stream_store.cc @@ -42,18 +42,17 @@ Status StreamStore::Create(ObjectID const stream_id) { return Status::ObjectExists(); } streams_.emplace(stream_id, std::make_shared()); - stream_marks_.emplace(stream_id, 0); return Status::OK(); } -Status StreamStore::Mark(ObjectID const stream_id, int64_t const mark) { - if (stream_marks_.find(stream_id) == stream_marks_.end()) { +Status StreamStore::Open(ObjectID const stream_id, int64_t const mode) { + if (streams_.find(stream_id) == streams_.end()) { return Status::ObjectNotExists(); } - if (stream_marks_[stream_id] & mark) { + if (streams_[stream_id]->open_mark & mode) { return Status::StreamOpened(); } - stream_marks_[stream_id] |= mark; + streams_[stream_id]->open_mark |= mode; return Status::OK(); } diff --git a/src/server/memory/stream_store.h b/src/server/memory/stream_store.h index 51a7e709..1a35d13d 100644 --- a/src/server/memory/stream_store.h +++ b/src/server/memory/stream_store.h @@ -39,6 +39,7 @@ struct StreamHolder { boost::optional> reader_; boost::optional>> writer_; bool drained{false}, failed{false}; + int64_t open_mark{0}; }; /** @@ -52,7 +53,7 @@ class StreamStore { Status Create(ObjectID const stream_id); - Status Mark(ObjectID const stream_id, int64_t const mark); + Status Open(ObjectID const stream_id, int64_t const mode); /** * @brief This is called by the producer of the steram and it makes current @@ -88,7 +89,6 @@ class StreamStore { std::shared_ptr store_; size_t threshold_; std::unordered_map> streams_; - std::unordered_map stream_marks_; }; } // namespace vineyard diff --git a/test/stream_test.cc b/test/stream_test.cc index bb205194..ae087190 100644 --- a/test/stream_test.cc +++ b/test/stream_test.cc @@ -60,9 +60,12 @@ int main(int argc, char** argv) { auto byte_stream = reader_client.GetObject(stream_id); CHECK(byte_stream != nullptr); - auto reader = byte_stream->OpenReader(reader_client); - auto failed_reader = byte_stream->OpenReader(reader_client); - CHECK(failed_reader == nullptr); + std::unique_ptr reader; + VINEYARD_CHECK_OK(byte_stream->OpenReader(reader_client, reader)); + + std::unique_ptr failed_reader; + auto status1 = byte_stream->OpenReader(reader_client, failed_reader); + CHECK(status1.IsStreamOpened()); while (true) { std::unique_ptr buffer = nullptr; @@ -85,9 +88,12 @@ int main(int argc, char** argv) { auto byte_stream = writer_client.GetObject(stream_id); CHECK(byte_stream != nullptr); - auto writer = byte_stream->OpenWriter(writer_client); - auto failed_writer = byte_stream->OpenWriter(writer_client); - CHECK(failed_writer == nullptr); + std::unique_ptr writer; + VINEYARD_CHECK_OK(byte_stream->OpenWriter(writer_client, writer)); + + std::unique_ptr failed_writer; + auto status1 = byte_stream->OpenWriter(writer_client, failed_writer); + CHECK(status1.IsStreamOpened()); CHECK(writer != nullptr); for (size_t idx = 1; idx <= 11; ++idx) { @@ -122,8 +128,10 @@ int main(int argc, char** argv) { auto failed_byte_stream = client.GetObject(stream_id); - auto reader = failed_byte_stream->OpenReader(client); - auto writer = failed_byte_stream->OpenWriter(client); + std::unique_ptr reader = nullptr; + std::unique_ptr writer = nullptr; + VINEYARD_CHECK_OK(failed_byte_stream->OpenReader(client, reader)); + VINEYARD_CHECK_OK(failed_byte_stream->OpenWriter(client, writer)); CHECK(reader != nullptr); CHECK(writer != nullptr); VINEYARD_CHECK_OK(writer->Abort()); From 25b7f1fae0e876b38f3085496d0aedf183bb166a Mon Sep 17 00:00:00 2001 From: andydiwenzhu Date: Fri, 8 Jan 2021 21:23:07 +0800 Subject: [PATCH 4/4] minor fix --- modules/io/adaptors/read_oss_dataframe.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/io/adaptors/read_oss_dataframe.cc b/modules/io/adaptors/read_oss_dataframe.cc index 2d84dedc..bfae2582 100644 --- a/modules/io/adaptors/read_oss_dataframe.cc +++ b/modules/io/adaptors/read_oss_dataframe.cc @@ -62,7 +62,8 @@ int main(int argc, const char** argv) { LOG(INFO) << "Local stream: " << proc << " " << lstream->id(); ReportStatus("return", VYObjectIDToString(lstream->id())); - auto writer = lstream->OpenWriter(client); + std::unique_ptr writer; + VINEYARD_CHECK_OK(lstream->OpenWriter(client, writer)); std::shared_ptr table; VINEYARD_CHECK_OK(oss_io_adaptor->ReadTable(&table));