diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index a37c5131c2..4ea69dfce2 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -61,24 +61,16 @@ namespace internal { } - SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t ims, bool joinable) + SrsThread::SrsThread(const char* n, ISrsThreadHandler* h, int64_t ims, bool j) { - _name = name; - handler = thread_handler; + name = n; + handler = h; cims = ims; - tid = NULL; + trd = NULL; loop = false; - really_terminated = true; - _cid = -1; - _joinable = joinable; - disposed = false; - - // in start(), the thread cycle method maybe stop and remove the thread itself, - // and the thread start() is waiting for the _cid, and segment fault then. - // @see https://github.com/ossrs/srs/issues/110 - // thread will set _cid, callback on_thread_start(), then wait for the can_run signal. - can_run = false; + context_id = -1; + joinable = j; } SrsThread::~SrsThread() @@ -88,50 +80,50 @@ namespace internal int SrsThread::cid() { - return _cid; + return context_id; } int SrsThread::start() { int ret = ERROR_SUCCESS; - if(tid) { + if(trd) { srs_info("thread %s already running.", _name); return ret; } - if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){ + loop = true; + + if((trd = st_thread_create(pfn, this, (joinable? 1:0), 0)) == NULL){ ret = ERROR_ST_CREATE_CYCLE_THREAD; srs_error("st_thread_create failed. ret=%d", ret); return ret; } - disposed = false; - // we set to loop to true for thread to run. - loop = true; - - // wait for cid to ready, for parent thread to get the cid. - while (_cid < 0) { - st_usleep(10 * 1000); - } - - // now, cycle thread can run. - can_run = true; - return ret; } void SrsThread::stop() { - if (!tid) { + if (!trd) { return; } - dispose(); + // notify the cycle to stop loop. + loop = false; + + // the interrupt will cause the socket to read/write error, + // which will terminate the cycle thread. + st_thread_interrupt(trd); + + // when joinable, wait util quit. + if (joinable) { + // wait the thread to exit. + int ret = st_thread_join(trd, NULL); + srs_assert(ret == ERROR_SUCCESS); + } - _cid = -1; - can_run = false; - tid = NULL; + trd = NULL; } bool SrsThread::can_loop() @@ -144,46 +136,7 @@ namespace internal loop = false; } - void SrsThread::dispose() - { - if (disposed) { - return; - } - - // notify the cycle to stop loop. - loop = false; - - // the interrupt will cause the socket to read/write error, - // which will terminate the cycle thread. - st_thread_interrupt(tid); - - // when joinable, wait util quit. - if (_joinable) { - // wait the thread to exit. - int ret = st_thread_join(tid, NULL); - if (ret) { - srs_warn("core: ignore join thread failed."); - } - } - - // wait the thread actually terminated. - // sometimes the thread join return -1, for example, - // when thread use st_recvfrom, the thread join return -1. - // so here, we use a variable to ensure the thread stopped. - // @remark even the thread not joinable, we must ensure the thread stopped when stop. - while (!really_terminated) { - st_usleep(10 * 1000); - - if (really_terminated) { - break; - } - srs_warn("core: wait thread to actually terminated"); - } - - disposed = true; - } - - void SrsThread::thread_cycle() + void SrsThread::cycle() { int ret = ERROR_SUCCESS; @@ -191,37 +144,29 @@ namespace internal // because sometimes we need to merge cid, for example, // the publish thread should use the same cid of connection. _srs_context->generate_id(); - srs_info("thread %s cycle start", _name); - _cid = _srs_context->get_id(); + srs_info("thread %s cycle start", name); + context_id = _srs_context->get_id(); srs_assert(handler); handler->on_thread_start(); - // thread is running now. - really_terminated = false; - - // wait for cid to ready, for parent thread to get the cid. - while (!can_run && loop) { - st_usleep(10 * 1000); - } - while (loop) { if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) { - srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret); + srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", name, ret); goto failed; } srs_info("thread %s on before cycle success", _name); if ((ret = handler->cycle()) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { - srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret); + srs_warn("thread %s cycle failed, ignored and retry, ret=%d", name, ret); } goto failed; } srs_info("thread %s cycle success", _name); if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) { - srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret); + srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", name, ret); goto failed; } srs_info("thread %s on end cycle success", _name); @@ -239,20 +184,17 @@ namespace internal } } - // readly terminated now. - really_terminated = true; - - srs_info("thread %s cycle finished", _name); + srs_info("thread %s cycle finished", name); // @remark in this callback, user may delete this, so never use this->xxx anymore. handler->on_thread_stop(); } - void* SrsThread::thread_fun(void* arg) + void* SrsThread::pfn(void* arg) { SrsThread* obj = (SrsThread*)arg; srs_assert(obj); - obj->thread_cycle(); + obj->cycle(); // delete cid for valgrind to detect memory leak. SrsThreadContext* ctx = dynamic_cast(_srs_context); diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 8bcbcafa96..b8024c7013 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -100,14 +100,11 @@ namespace internal class SrsThread { private: - st_thread_t tid; - int _cid; + st_thread_t trd; + int context_id; bool loop; - bool can_run; - bool really_terminated; - bool _joinable; - const char* _name; - bool disposed; + bool joinable; + const char* name; private: ISrsThreadHandler* handler; // The cycle interval in ms. @@ -115,10 +112,10 @@ namespace internal public: /** * initialize the thread. - * @param name, human readable name for st debug. - * @param thread_handler, the cycle handler for the thread. + * @param n, human readable name for st debug. + * @param h, the cycle handler for the thread. * @param ims, the sleep interval in ms when cycle finished. - * @param joinable, if joinable, other thread must stop the thread. + * @param j, if joinable, other thread must stop the thread. * @remark if joinable, thread never quit itself, or memory leak. * @see: https://github.com/ossrs/srs/issues/78 * @remark about st debug, see st-1.9/README, _st_iterate_threads_flag @@ -127,7 +124,7 @@ namespace internal * TODO: FIXME: maybe all thread must be reap by others threads, * @see: https://github.com/ossrs/srs/issues/77 */ - SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t ims, bool joinable); + SrsThread(const char* n, ISrsThreadHandler* h, int64_t ims, bool j); virtual ~SrsThread(); public: /** @@ -163,9 +160,8 @@ namespace internal */ virtual void stop_loop(); private: - virtual void dispose(); - virtual void thread_cycle(); - static void* thread_fun(void* arg); + virtual void cycle(); + static void* pfn(void* arg); }; }