Skip to content

Commit

Permalink
Fix crash when mixer server is killed. (envoyproxy#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiwzhang authored Feb 8, 2017
1 parent 48f2f2a commit cd82aaf
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
6 changes: 3 additions & 3 deletions mixerclient/include/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class ReadInterface {
virtual void OnClose(const ::google::protobuf::util::Status &) = 0;
};

typedef std::unique_ptr<WriteInterface<::istio::mixer::v1::CheckRequest>>
typedef std::shared_ptr<WriteInterface<::istio::mixer::v1::CheckRequest>>
CheckWriterPtr;
typedef std::unique_ptr<WriteInterface<::istio::mixer::v1::ReportRequest>>
typedef std::shared_ptr<WriteInterface<::istio::mixer::v1::ReportRequest>>
ReportWriterPtr;
typedef std::unique_ptr<WriteInterface<::istio::mixer::v1::QuotaRequest>>
typedef std::shared_ptr<WriteInterface<::istio::mixer::v1::QuotaRequest>>
QuotaWriterPtr;

typedef ReadInterface<::istio::mixer::v1::CheckResponse> *CheckReaderRawPtr;
Expand Down
29 changes: 18 additions & 11 deletions mixerclient/src/grpc_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ class GrpcStream final : public WriteInterface<RequestType> {
GrpcStream(ReadInterface<ResponseType>* reader, StreamNewFunc create_func)
: reader_(reader), write_closed_(false) {
stream_ = create_func(context_);
worker_thread_ = std::thread([this]() { WorkerThread(); });
}

~GrpcStream() { worker_thread_.join(); }
static void Start(
std::shared_ptr<GrpcStream<RequestType, ResponseType>> grpc_stream) {
std::thread t([grpc_stream]() { grpc_stream->ReadMainLoop(); });
t.detach();
}

void Write(const RequestType& request) override {
if (!stream_->Write(request)) {
Expand All @@ -51,7 +54,7 @@ class GrpcStream final : public WriteInterface<RequestType> {

private:
// The worker loop to read response messages.
void WorkerThread() {
void ReadMainLoop() {
ResponseType response;
while (stream_->Read(&response)) {
reader_->OnRead(response);
Expand All @@ -68,8 +71,6 @@ class GrpcStream final : public WriteInterface<RequestType> {
::grpc::ClientContext context_;
// The reader writer stream.
StreamPtr stream_;
// The thread to read response.
std::thread worker_thread_;
// The reader interface from caller.
ReadInterface<ResponseType>* reader_;
// Indicates if write is closed.
Expand All @@ -94,27 +95,33 @@ GrpcTransport::GrpcTransport(const std::string& mixer_server) {
}

CheckWriterPtr GrpcTransport::NewStream(CheckReaderRawPtr reader) {
return CheckWriterPtr(new CheckGrpcStream(
auto writer = std::make_shared<CheckGrpcStream>(
reader,
[this](::grpc::ClientContext& context) -> CheckGrpcStream::StreamPtr {
return stub_->Check(&context);
}));
});
CheckGrpcStream::Start(writer);
return writer;
}

ReportWriterPtr GrpcTransport::NewStream(ReportReaderRawPtr reader) {
return ReportWriterPtr(new ReportGrpcStream(
auto writer = std::make_shared<ReportGrpcStream>(
reader,
[this](::grpc::ClientContext& context) -> ReportGrpcStream::StreamPtr {
return stub_->Report(&context);
}));
});
ReportGrpcStream::Start(writer);
return writer;
}

QuotaWriterPtr GrpcTransport::NewStream(QuotaReaderRawPtr reader) {
return QuotaWriterPtr(new QuotaGrpcStream(
auto writer = std::make_shared<QuotaGrpcStream>(
reader,
[this](::grpc::ClientContext& context) -> QuotaGrpcStream::StreamPtr {
return stub_->Quota(&context);
}));
});
QuotaGrpcStream::Start(writer);
return writer;
}

} // namespace mixer_client
Expand Down
4 changes: 1 addition & 3 deletions mixerclient/src/stream_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ class StreamTransport {
std::shared_ptr<ReaderImpl<ResponseType>> reader = reader_.lock();
if (!writer || !reader || writer->is_write_closed()) {
reader = std::make_shared<ReaderImpl<ResponseType>>();
auto writer_unique_ptr = transport_->NewStream(reader.get());
// Transfer writer ownership to shared_ptr.
writer.reset(writer_unique_ptr.release());
writer = transport_->NewStream(reader.get());
reader_ = reader;
writer_ = writer;
// Reader and Writer objects are owned by the OnClose callback.
Expand Down

0 comments on commit cd82aaf

Please sign in to comment.