-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rpc graceful shutdown #291
rpc graceful shutdown #291
Conversation
@@ -318,7 +317,6 @@ namespace rpc { | |||
}; | |||
intrusive_list<ThreadLink> m_list; | |||
uint64_t m_serving_count = 0; | |||
bool m_concurrent; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
删除同步模式,降低代码复杂度
@@ -381,48 +373,55 @@ namespace rpc { | |||
auto ctx = (Context*)args_; | |||
Context context(std::move(*ctx)); | |||
ctx->got_it = true; | |||
thread_yield_to(nullptr); | |||
thread_yield(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thread_yield_to 第一步会有一个unlikely的判断,thread_yield 是直接调用
@@ -222,7 +222,7 @@ namespace rpc { | |||
bool got_it; | |||
int* stream_serv_count; | |||
photon::condition_variable *stream_cv; | |||
std::shared_ptr<photon::mutex> w_lock; | |||
photon::mutex* w_lock; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
现在的代码中已经有cv,能够保证mutex在所有request完成后再析构,所以这里不需要用共享指针了。
9ae94ed
to
e4a5806
Compare
rpc/rpc.cpp
Outdated
DEFER(if (ownership) delete stream;); | ||
DEFER(if (ownership) { | ||
delete stream; | ||
stream = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
防止被socket server double delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As stream
is a local variable, assigning it with nullptr won't affect the caller side.
// means shutdown called by rpc serve, should return to give chance to shutdown | ||
if ((m_serving_count == 1) && (m_list.front()->thread == nullptr)) | ||
return 0; | ||
m_cond_served.wait_no_lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
现有代码已经可以保证在stream析构前等待所有requests结束,因此这段可以删了
e4a5806
to
e16c15d
Compare
rpc/rpc.cpp
Outdated
DEFER(if (ownership) delete stream;); | ||
DEFER(if (ownership) { | ||
delete stream; | ||
stream = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As stream
is a local variable, assigning it with nullptr won't affect the caller side.
rpc/rpc.cpp
Outdated
if (!passive) { | ||
m_running = false; | ||
} | ||
std::vector<thread*> stream_threads; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid copying the threads out by:
while(m_list) {
// wait for the 1st thread in m_list to complete
}
if we need to wait for them.
rpc/rpc.h
Outdated
* @warning DO NOT invoke this function within the RPC request. | ||
* You should create a thread to invoke it, or just use shutdown_inplace. | ||
*/ | ||
virtual int shutdown(bool passive = false) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bool no_more_requests = true, bool wait = true
in order to behave as before, and allow for more control
rpc/rpc.h
Outdated
|
||
virtual int shutdown() = 0; | ||
/** @brief Shutdown the server within a RPC request */ | ||
virtual void shutdown_inplace(bool passive = false) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
effectively not waiting for the completion of shutndown()
, i.e. shutdown_no_wait()
?
@@ -168,9 +168,16 @@ namespace rpc | |||
// the default allocator is defined in iovector.h/cpp | |||
virtual void set_allocator(IOAlloc allocation) = 0; | |||
|
|||
virtual int shutdown_no_wait() = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int shutdown_no_wait(bool no_more_requests = true) {
return shutdown(no_more_requests, false);
}
e1c039f
to
e1eaa1a
Compare
rpc/rpc.h
Outdated
virtual int serve(IStream* stream, bool ownership_stream = false) = 0; | ||
virtual int serve(IStream* stream) = 0; | ||
|
||
[[deprecated]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use __attribute__((deprecated))
to fit for C++11 (and gcc 4.9)
similar for new_skeleton()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's contradictory to support Clang/Windows and old gcc compiler at the same time
rpc/rpc.cpp
Outdated
m_cond_served.wait_no_lock(); | ||
virtual int shutdown(bool no_more_requests) override { | ||
if (no_more_requests) | ||
m_running = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
m_running = !no_more_requests;
rpc/rpc.cpp
Outdated
return 0; | ||
} | ||
void shutdown_no_wait(bool no_more_requests) override { | ||
thread_create11(&SkeletonImpl::shutdown, this, no_more_requests); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simply m_running = ! no_more_requests;
rpc/rpc.cpp
Outdated
if (no_more_requests) | ||
m_running = false; | ||
while (m_list) { | ||
thread_enable_join(m_list.front()->thread); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto th = m_list.front()->thread;
then use th
instead.
a8829b9
to
c322fc1
Compare
c322fc1
to
67172d2
Compare
rpc支持graceful shutdown,即等待客户端主动关闭链接。
配合socket server现有的terminate(即只关listen fd,阻止建立新链接;不关active connection),可以用如下代码实现优雅关闭: