Skip to content

Commit

Permalink
Threads-Hybrid: Change global variables to thread-local or with lock
Browse files Browse the repository at this point in the history
1. Thread-Safe: Config subscribes, subscribe or unsubscribe.
2. Global-Shared: Async SRTP/RECV/SEND/Log use thread-safe objects.
3. Global-Shared: SRTP and DTLS certificate, without critical data.
4. Thread-Local: Blackhole, ResourceManager, StreamManager, ObjectCache, by design.
5. Global-Shared: Log and context, which use thread-safe objects.
6. Thread-Local: Pithy print for each thread.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 7dbac15 commit eab4f89
Show file tree
Hide file tree
Showing 22 changed files with 93 additions and 50 deletions.
8 changes: 8 additions & 0 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ using namespace std;
#include <srs_app_http_hooks.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_app_threads.hpp>

using namespace srs_internal;

Expand Down Expand Up @@ -1189,15 +1190,20 @@ SrsConfig::SrsConfig()
root = new SrsConfDirective();
root->conf_line = 0;
root->name = "root";

lock_ = new SrsThreadMutex();
}

SrsConfig::~SrsConfig()
{
srs_freep(lock_);
srs_freep(root);
}

void SrsConfig::subscribe(ISrsReloadHandler* handler)
{
SrsThreadLocker(lock_);

std::vector<ISrsReloadHandler*>::iterator it;

it = std::find(subscribes.begin(), subscribes.end(), handler);
Expand All @@ -1210,6 +1216,8 @@ void SrsConfig::subscribe(ISrsReloadHandler* handler)

void SrsConfig::unsubscribe(ISrsReloadHandler* handler)
{
SrsThreadLocker(lock_);

std::vector<ISrsReloadHandler*>::iterator it;

it = std::find(subscribes.begin(), subscribes.end(), handler);
Expand Down
3 changes: 3 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class SrsConfig;
class SrsRequest;
class SrsJsonArray;
class SrsConfDirective;
class SrsThreadMutex;

/**
* whether the two vector actual equals, for instance,
Expand Down Expand Up @@ -140,6 +141,7 @@ extern srs_error_t srs_config_transform_vhost(SrsConfDirective* root);
extern srs_error_t srs_config_transform_vhost2(SrsConfDirective* root);

// TODO: FIXME: It should be thread-local or thread-safe.
// TODO: FIXME: We should use channel to deliver changes of config.
extern SrsConfig* _srs_config;

// The config directive.
Expand Down Expand Up @@ -299,6 +301,7 @@ class SrsConfig
private:
// The reload subscribers, when reload, callback all handlers.
std::vector<ISrsReloadHandler*> subscribes;
SrsThreadMutex* lock_;
public:
SrsConfig();
virtual ~SrsConfig();
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_pithy_print.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ bool SrsErrorPithyPrint::can_print(int error_code, uint32_t* pnn)
return new_stage || stage->can_print();
}

// The global stage manager for pithy print, multiple stages.
static SrsStageManager* _srs_stages = new SrsStageManager();
// It MUST be thread-local, by design.
__thread SrsStageManager* _srs_stages = NULL;

SrsPithyPrint::SrsPithyPrint(int _stage_id)
{
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ class ISrsRtcHijacker
};

// TODO: FIXME: It should be thread-local or thread-safe.
// TODO: FIXME: It seems thread-local make sense.
extern ISrsRtcHijacker* _srs_rtc_hijacker;

#endif
Expand Down
3 changes: 3 additions & 0 deletions trunk/src/app/srs_app_rtc_dtls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ srs_error_t SrsDtlsCertificate::initialize()
// OPENSSL_init_ssl();
#endif

// Initialize SRTP first.
srs_assert(srtp_init() == 0);

// Whether use ECDSA certificate.
ecdsa_mode = _srs_config->get_rtc_server_ecdsa();

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtc_dtls.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SrsDtlsCertificate
bool is_ecdsa();
};

// TODO: FIXME: It should be thread-local or thread-safe.
// It's shared global object, MUST be thread-safe.
extern SrsDtlsCertificate* _srs_rtc_dtls_certificate;

// @remark: play the role of DTLS_CLIENT, will send handshake
Expand Down
6 changes: 2 additions & 4 deletions trunk/src/app/srs_app_rtc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void SrsRtcBlackhole::sendto(void* data, int len)
srs_sendto(blackhole_stfd, data, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}

SrsRtcBlackhole* _srs_blackhole = new SrsRtcBlackhole();
__thread SrsRtcBlackhole* _srs_blackhole = NULL;

// @global dtls certficate for rtc module.
SrsDtlsCertificate* _srs_rtc_dtls_certificate = new SrsDtlsCertificate();
Expand Down Expand Up @@ -749,7 +749,6 @@ srs_error_t RtcServerAdapter::run()
return srs_error_wrap(err, "listen udp");
}

// TODO: FIXME: It should be thread-local or thread-safe.
if ((err = _srs_rtc_manager->start()) != srs_success) {
return srs_error_wrap(err, "start manager");
}
Expand All @@ -761,6 +760,5 @@ void RtcServerAdapter::stop()
{
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsResourceManager* _srs_rtc_manager = new SrsResourceManager("RTC", true);
__thread SrsResourceManager* _srs_rtc_manager = NULL;

8 changes: 4 additions & 4 deletions trunk/src/app/srs_app_rtc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class SrsRtcBlackhole
void sendto(void* data, int len);
};

// TODO: FIXME: It should be thread-local or thread-safe.
extern SrsRtcBlackhole* _srs_blackhole;
// It MUST be thread-local, because it create ST socket.
extern __thread SrsRtcBlackhole* _srs_blackhole;

// The handler for RTC server to call.
class ISrsRtcServerHandler
Expand Down Expand Up @@ -145,8 +145,8 @@ class RtcServerAdapter : public ISrsHybridServer
virtual void stop();
};

// TODO: FIXME: It should be thread-local or thread-safe.
extern SrsResourceManager* _srs_rtc_manager;
// It SHOULD be thread-local, because used to find connection for each UDP packet.
extern __thread SrsResourceManager* _srs_rtc_manager;

#endif

3 changes: 1 addition & 2 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,7 @@ SrsRtcStream* SrsRtcStreamManager::fetch(SrsRequest* r)
return source;
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsRtcStreamManager* _srs_rtc_sources = new SrsRtcStreamManager();
__thread SrsRtcStreamManager* _srs_rtc_sources = NULL;

ISrsRtcPublishStream::ISrsRtcPublishStream()
{
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ class SrsRtcStreamManager
virtual SrsRtcStream* fetch(SrsRequest* r);
};

// TODO: FIXME: It should be thread-local or thread-safe.
extern SrsRtcStreamManager* _srs_rtc_sources;
// It SHOULD be thread-local, because stream source is isolated by threads.
extern __thread SrsRtcStreamManager* _srs_rtc_sources;

// A publish stream interface, for source to callback with.
class ISrsRtcPublishStream
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,12 +977,14 @@ srs_error_t SrsServer::cycle()
{
srs_error_t err = srs_success;

// TODO: FIXME: It should be thread-local or thread-safe.
// Start the inotify auto reload by watching config file.
SrsInotifyWorker inotify(this);
if ((err = inotify.start()) != srs_success) {
return srs_error_wrap(err, "start inotify");
}

// TODO: FIXME: It should be thread-local or thread-safe.
// Do server main cycle.
err = do_cycle();

Expand Down
34 changes: 29 additions & 5 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
#include <srs_app_utility.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_source.hpp>
#include <srs_app_rtc_server.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_kernel_rtc_rtp.hpp>
#include <srs_app_pithy_print.hpp>

#include <unistd.h>

Expand Down Expand Up @@ -492,6 +497,7 @@ bool SrsThreadPool::hybrid_dying_water_level()
// Thread local objects.
extern const int LOG_MAX_SIZE;
extern __thread char* _srs_log_data;
extern __thread SrsStageManager* _srs_stages;

// Setup the thread-local variables, MUST call when each thread starting.
srs_error_t SrsThreadPool::setup()
Expand All @@ -513,6 +519,25 @@ srs_error_t SrsThreadPool::setup()
// Create the source manager for server.
_srs_sources = new SrsSourceManager();

// The blackhole for RTC server.
_srs_blackhole = new SrsRtcBlackhole();

// The resource manager for RTC server.
_srs_rtc_manager = new SrsResourceManager("RTC", true);

// The source manager for RTC streams.
_srs_rtc_sources = new SrsRtcStreamManager();

// The object cache for RTC server.
_srs_rtp_cache = new SrsRtpObjectCacheManager<SrsRtpPacket2>(sizeof(SrsRtpPacket2));
_srs_rtp_raw_cache = new SrsRtpObjectCacheManager<SrsRtpRawPayload>(sizeof(SrsRtpRawPayload));
_srs_rtp_fua_cache = new SrsRtpObjectCacheManager<SrsRtpFUAPayload2>(sizeof(SrsRtpFUAPayload2));
_srs_rtp_msg_cache_buffers = new SrsRtpObjectCacheManager<SrsSharedPtrMessage>(sizeof(SrsSharedPtrMessage) + kRtpPacketSize);
_srs_rtp_msg_cache_objs = new SrsRtpObjectCacheManager<SrsSharedPtrMessage>(sizeof(SrsSharedPtrMessage));

// The pithy print for each thread.
_srs_stages = new SrsStageManager();

return err;
}

Expand All @@ -521,8 +546,6 @@ srs_error_t SrsThreadPool::initialize()
srs_error_t err = srs_success;

// Initialize global shared thread-safe objects once.
srs_assert(srtp_init() == 0);

if ((err = _srs_rtc_dtls_certificate->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc dtls certificate initialize");
}
Expand Down Expand Up @@ -581,7 +604,7 @@ srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg)
SrsThreadEntry* entry = new SrsThreadEntry();

// Update the hybrid thread entry for circuit breaker.
if (label == "hybrid") {
if (label == "hybrid" && !hybrid_) {
hybrid_ = entry;
}

Expand Down Expand Up @@ -664,6 +687,7 @@ srs_error_t SrsThreadPool::run()
}

// Update the Circuit-Breaker by water-level.
// TODO: FIXME: Should stat all hybrid servers.
if (hybrid_ && hybrid_->stat) {
// Reset the high water-level when CPU is low for N times.
if (hybrid_->stat->percent * 100 > high_threshold_) {
Expand Down Expand Up @@ -823,7 +847,7 @@ void* SrsThreadPool::start(void* arg)
return NULL;
}

// TODO: FIXME: It should be thread-local or thread-safe.
// It MUST be thread-safe, global and shared object.
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();

SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
Expand Down Expand Up @@ -1072,7 +1096,7 @@ srs_error_t SrsAsyncLogManager::do_start()
return err;
}

// TODO: FIXME: It should be thread-local or thread-safe.
// It MUST be thread-safe, global shared object.
SrsAsyncLogManager* _srs_async_log = new SrsAsyncLogManager();

SrsAsyncSRTP::SrsAsyncSRTP(SrsSecurityTransport* transport)
Expand Down
10 changes: 5 additions & 5 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class SrsThreadPool
static void* start(void* arg);
};

// TODO: FIXME: It should be thread-local or thread-safe.
// It MUST be thread-safe, global and shared object.
extern SrsThreadPool* _srs_thread_pool;

// Async file writer, it's thread safe.
Expand Down Expand Up @@ -426,7 +426,7 @@ class SrsAsyncLogManager
srs_error_t do_start();
};

// TODO: FIXME: It should be thread-local or thread-safe.
// It MUST be thread-safe, global shared object.
extern SrsAsyncLogManager* _srs_async_log;

// The async SRTP codec.
Expand Down Expand Up @@ -527,7 +527,7 @@ class SrsAsyncSRTPManager
virtual srs_error_t consume(SrsThreadEntry* entry, int* nn_consumed);
};

// TODO: FIXME: It should be thread-local or thread-safe.
// It MUST be thread-safe, because it runs in multiple threads by design.
extern SrsAsyncSRTPManager* _srs_async_srtp;

// A thread-safe UDP listener.
Expand Down Expand Up @@ -608,7 +608,7 @@ class SrsAsyncRecvManager
bool consume_by_tunnel(SrsUdpMuxSocket* skt);
};

// TODO: FIXME: It should be thread-local or thread-safe.
// It MUST be thread-safe, because it runs in multiple threads by design.
extern SrsAsyncRecvManager* _srs_async_recv;

// The async UDP packet.
Expand Down Expand Up @@ -651,7 +651,7 @@ class SrsAsyncSendManager
srs_error_t do_start();
};

// TODO: FIXME: It should be thread-local or thread-safe.
// It MUST be thread-safe, because it runs in multiple threads by design.
extern SrsAsyncSendManager* _srs_async_send;

#endif
8 changes: 8 additions & 0 deletions trunk/src/app/srs_app_utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ srs_error_t srs_kill_forced(int& pid)
return err;
}

// TODO: FIXME: It should be thread-local or thread-safe.
static SrsRusage _srs_system_rusage;

SrsRusage::SrsRusage()
Expand All @@ -247,6 +248,7 @@ void srs_update_system_rusage()
_srs_system_rusage.ok = true;
}

// TODO: FIXME: It should be thread-local or thread-safe.
static SrsProcSelfStat _srs_system_cpu_self_stat;
static SrsProcSystemStat _srs_system_cpu_system_stat;

Expand Down Expand Up @@ -523,6 +525,7 @@ SrsDiskStat::SrsDiskStat()
wr_ticks = nb_current = ticks = aveq = 0;
}

// TODO: FIXME: It should be thread-local or thread-safe.
static SrsDiskStat _srs_disk_stat;

SrsDiskStat* srs_get_disk_stat()
Expand Down Expand Up @@ -717,6 +720,7 @@ SrsMemInfo::SrsMemInfo()
SwapFree = 0;
}

// TODO: FIXME: It should be thread-local or thread-safe.
static SrsMemInfo _srs_system_meminfo;

SrsMemInfo* srs_get_meminfo()
Expand Down Expand Up @@ -809,6 +813,7 @@ SrsPlatformInfo::SrsPlatformInfo()
load_fifteen_minutes = 0;
}

// TODO: FIXME: It should be thread-local or thread-safe.
static SrsPlatformInfo _srs_system_platform_info;

SrsPlatformInfo* srs_get_platform_info()
Expand Down Expand Up @@ -909,6 +914,7 @@ SrsSnmpUdpStat::~SrsSnmpUdpStat()
{
}

// TODO: FIXME: It should be thread-local or thread-safe.
static SrsSnmpUdpStat _srs_snmp_udp_stat;

void srs_update_udp_snmp_statistic()
Expand Down Expand Up @@ -987,6 +993,7 @@ SrsNetworkDevices::SrsNetworkDevices()
scompressed = 0;
}

// TODO: FIXME: It should be thread-local or thread-safe.
#define MAX_NETWORK_DEVICES_COUNT 16
static SrsNetworkDevices _srs_system_network_devices[MAX_NETWORK_DEVICES_COUNT];
static int _nb_srs_system_network_devices = -1;
Expand Down Expand Up @@ -1055,6 +1062,7 @@ SrsNetworkRtmpServer::SrsNetworkRtmpServer()
rkbps_5m = skbps_5m = 0;
}

// TODO: FIXME: It should be thread-local or thread-safe.
static SrsNetworkRtmpServer _srs_network_rtmp_server;

SrsNetworkRtmpServer* srs_get_network_rtmp_server()
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/kernel/srs_kernel_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ class ISrsContext
virtual const SrsContextId& set_id(const SrsContextId& v) = 0;
};

// TODO: FIXME: It should be thread-local or thread-safe.
// It SHOULD be thread-safe, because it use async log and thread-local buffer.
extern ISrsLog* _srs_log;

// TODO: FIXME: It should be thread-local or thread-safe.
// It SHOULD be thread-safe, because it use thread-local thread private data.
extern ISrsContext* _srs_context;

// Log style.
Expand Down
Loading

0 comments on commit eab4f89

Please sign in to comment.