diff --git a/mixerclient/include/transport.h b/mixerclient/include/transport.h index 3dafe266a19e..35376b10590f 100644 --- a/mixerclient/include/transport.h +++ b/mixerclient/include/transport.h @@ -53,11 +53,11 @@ class ReadInterface { virtual void OnClose(const ::google::protobuf::util::Status &) = 0; }; -typedef std::unique_ptr> +typedef std::shared_ptr> CheckWriterPtr; -typedef std::unique_ptr> +typedef std::shared_ptr> ReportWriterPtr; -typedef std::unique_ptr> +typedef std::shared_ptr> QuotaWriterPtr; typedef ReadInterface<::istio::mixer::v1::CheckResponse> *CheckReaderRawPtr; diff --git a/mixerclient/src/grpc_transport.cc b/mixerclient/src/grpc_transport.cc index 356a2aebea61..5de534098f9a 100644 --- a/mixerclient/src/grpc_transport.cc +++ b/mixerclient/src/grpc_transport.cc @@ -31,10 +31,13 @@ class GrpcStream final : public WriteInterface { GrpcStream(ReadInterface* reader, StreamNewFunc create_func) : reader_(reader), write_closed_(false) { stream_ = create_func(context_); - worker_thread_ = std::thread([this]() { WorkerThread(); }); } - ~GrpcStream() { worker_thread_.join(); } + static void Start( + std::shared_ptr> grpc_stream) { + std::thread t([grpc_stream]() { grpc_stream->ReadMainLoop(); }); + t.detach(); + } void Write(const RequestType& request) override { if (!stream_->Write(request)) { @@ -51,7 +54,7 @@ class GrpcStream final : public WriteInterface { private: // The worker loop to read response messages. - void WorkerThread() { + void ReadMainLoop() { ResponseType response; while (stream_->Read(&response)) { reader_->OnRead(response); @@ -68,8 +71,6 @@ class GrpcStream final : public WriteInterface { ::grpc::ClientContext context_; // The reader writer stream. StreamPtr stream_; - // The thread to read response. - std::thread worker_thread_; // The reader interface from caller. ReadInterface* reader_; // Indicates if write is closed. @@ -94,27 +95,33 @@ GrpcTransport::GrpcTransport(const std::string& mixer_server) { } CheckWriterPtr GrpcTransport::NewStream(CheckReaderRawPtr reader) { - return CheckWriterPtr(new CheckGrpcStream( + auto writer = std::make_shared( reader, [this](::grpc::ClientContext& context) -> CheckGrpcStream::StreamPtr { return stub_->Check(&context); - })); + }); + CheckGrpcStream::Start(writer); + return writer; } ReportWriterPtr GrpcTransport::NewStream(ReportReaderRawPtr reader) { - return ReportWriterPtr(new ReportGrpcStream( + auto writer = std::make_shared( reader, [this](::grpc::ClientContext& context) -> ReportGrpcStream::StreamPtr { return stub_->Report(&context); - })); + }); + ReportGrpcStream::Start(writer); + return writer; } QuotaWriterPtr GrpcTransport::NewStream(QuotaReaderRawPtr reader) { - return QuotaWriterPtr(new QuotaGrpcStream( + auto writer = std::make_shared( reader, [this](::grpc::ClientContext& context) -> QuotaGrpcStream::StreamPtr { return stub_->Quota(&context); - })); + }); + QuotaGrpcStream::Start(writer); + return writer; } } // namespace mixer_client diff --git a/mixerclient/src/stream_transport.h b/mixerclient/src/stream_transport.h index 011ac6fa6aee..716d99c0ad98 100644 --- a/mixerclient/src/stream_transport.h +++ b/mixerclient/src/stream_transport.h @@ -145,9 +145,7 @@ class StreamTransport { std::shared_ptr> reader = reader_.lock(); if (!writer || !reader || writer->is_write_closed()) { reader = std::make_shared>(); - auto writer_unique_ptr = transport_->NewStream(reader.get()); - // Transfer writer ownership to shared_ptr. - writer.reset(writer_unique_ptr.release()); + writer = transport_->NewStream(reader.get()); reader_ = reader; writer_ = writer; // Reader and Writer objects are owned by the OnClose callback.