Skip to content

Commit

Permalink
Threads-Hybrid: Move acquire pid file from hybrid to pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 97f6684 commit 9eb9b19
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 87 deletions.
80 changes: 1 addition & 79 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,6 @@ SrsServer::SrsServer()
signal_gmc_stop = false;
signal_fast_quit = false;
signal_gracefully_quit = false;
pid_fd = -1;

signal_manager = new SrsSignalManager(this);
conn_manager = new SrsResourceManager("TCP", true);
Expand Down Expand Up @@ -723,11 +722,6 @@ void SrsServer::destroy()
srs_freep(http_heartbeat);
srs_freep(ingester);

if (pid_fd > 0) {
::close(pid_fd);
pid_fd = -1;
}

srs_freep(signal_manager);
srs_freep(conn_manager);

Expand Down Expand Up @@ -847,64 +841,6 @@ srs_error_t SrsServer::initialize_signal()
return signal_manager->initialize();
}

srs_error_t SrsServer::acquire_pid_file()
{
std::string pid_file = _srs_config->get_pid_file();

// -rw-r--r--
// 644
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

int fd;
// open pid file
if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) == -1) {
return srs_error_new(ERROR_SYSTEM_PID_ACQUIRE, "open pid file=%s", pid_file.c_str());
}

// require write lock
struct flock lock;

lock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK
lock.l_start = 0; // type offset, relative to l_whence
lock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END
lock.l_len = 0;

if (fcntl(fd, F_SETLK, &lock) == -1) {
if(errno == EACCES || errno == EAGAIN) {
::close(fd);
srs_error("srs is already running!");
return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running");
}
return srs_error_new(ERROR_SYSTEM_PID_LOCK, "access to pid=%s", pid_file.c_str());
}

// truncate file
if (ftruncate(fd, 0) != 0) {
return srs_error_new(ERROR_SYSTEM_PID_TRUNCATE_FILE, "truncate pid file=%s", pid_file.c_str());
}

// write the pid
string pid = srs_int2str(getpid());
if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) {
return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%s to file=%s", pid.c_str(), pid_file.c_str());
}

// auto close when fork child process.
int val;
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fcntl fd=%d", fd);
}
val |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, val) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "lock file=%s fd=%d", pid_file.c_str(), fd);
}

srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file.c_str());
pid_fd = fd;

return srs_success;
}

srs_error_t SrsServer::listen()
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -1534,16 +1470,7 @@ srs_error_t SrsServer::on_reload_listen()
srs_error_t SrsServer::on_reload_pid()
{
srs_error_t err = srs_success;

if (pid_fd > 0) {
::close(pid_fd);
pid_fd = -1;
}

if ((err = acquire_pid_file()) != srs_success) {
return srs_error_wrap(err, "reload pid");
}

// TODO: FIXME: Do not support reload pid.
return err;
}

Expand Down Expand Up @@ -1682,11 +1609,6 @@ srs_error_t SrsServerAdapter::run()
return srs_error_wrap(err, "initialize st");
}

// TODO: FIXME: It should be thread-local or thread-safe.
if ((err = srs->acquire_pid_file()) != srs_success) {
return srs_error_wrap(err, "acquire pid file");
}

// TODO: FIXME: It should be thread-local or thread-safe.
if ((err = srs->initialize_signal()) != srs_success) {
return srs_error_wrap(err, "initialize signal");
Expand Down
6 changes: 0 additions & 6 deletions trunk/src/app/srs_app_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,6 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan
SrsCoroutine* trd_;
SrsHourGlass* timer_;
private:
// The pid file fd, lock the file write when server is running.
// @remark the init.d script should cleanup the pid file, when stop service,
// for the server never delete the file; when system startup, the pid in pid file
// maybe valid but the process is not SRS, the init.d script will never start server.
int pid_fd;
// All listners, listener manager.
std::vector<SrsListener*> listeners;
// Signal manager which convert gignal to io message.
Expand Down Expand Up @@ -327,7 +322,6 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan
virtual srs_error_t initialize(ISrsServerCycle* ch);
virtual srs_error_t initialize_st();
virtual srs_error_t initialize_signal();
virtual srs_error_t acquire_pid_file();
virtual srs_error_t listen();
virtual srs_error_t register_signal();
virtual srs_error_t ingest();
Expand Down
75 changes: 73 additions & 2 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <srs_app_pithy_print.hpp>

#include <unistd.h>
#include <fcntl.h>

#ifdef SRS_OSX
pid_t gettid() {
Expand Down Expand Up @@ -475,12 +476,19 @@ SrsThreadPool::SrsThreadPool()
char buf[256];
snprintf(buf, sizeof(buf), "srs-master-%d", entry->num);
entry->name = buf;

pid_fd = -1;
}

// TODO: FIMXE: If free the pool, we should stop all threads.
SrsThreadPool::~SrsThreadPool()
{
srs_freep(lock_);

if (pid_fd > 0) {
::close(pid_fd);
pid_fd = -1;
}
}

bool SrsThreadPool::hybrid_high_water_level()
Expand Down Expand Up @@ -554,6 +562,10 @@ srs_error_t SrsThreadPool::initialize()
return srs_error_wrap(err, "rtc dtls certificate initialize");
}

if ((err = acquire_pid_file()) != srs_success) {
return srs_error_wrap(err, "acquire pid file");
}

// Initialize the master primordial thread.
SrsThreadEntry* entry = (SrsThreadEntry*)entry_;
#ifndef SRS_OSX
Expand Down Expand Up @@ -601,6 +613,64 @@ srs_error_t SrsThreadPool::initialize()
return err;
}

srs_error_t SrsThreadPool::acquire_pid_file()
{
std::string pid_file = _srs_config->get_pid_file();

// -rw-r--r--
// 644
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

int fd;
// open pid file
if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) == -1) {
return srs_error_new(ERROR_SYSTEM_PID_ACQUIRE, "open pid file=%s", pid_file.c_str());
}

// require write lock
struct flock lock;

lock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK
lock.l_start = 0; // type offset, relative to l_whence
lock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END
lock.l_len = 0;

if (fcntl(fd, F_SETLK, &lock) == -1) {
if(errno == EACCES || errno == EAGAIN) {
::close(fd);
srs_error("srs is already running!");
return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running");
}
return srs_error_new(ERROR_SYSTEM_PID_LOCK, "access to pid=%s", pid_file.c_str());
}

// truncate file
if (ftruncate(fd, 0) != 0) {
return srs_error_new(ERROR_SYSTEM_PID_TRUNCATE_FILE, "truncate pid file=%s", pid_file.c_str());
}

// write the pid
string pid = srs_int2str(getpid());
if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) {
return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%s to file=%s", pid.c_str(), pid_file.c_str());
}

// auto close when fork child process.
int val;
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fcntl fd=%d", fd);
}
val |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, val) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "lock file=%s fd=%d", pid_file.c_str(), fd);
}

srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file.c_str());
pid_fd = fd;

return srs_success;
}

srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg), void* arg)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -673,8 +743,9 @@ srs_error_t SrsThreadPool::run()
for (int i = 0; i < (int)threads.size(); i++) {
SrsThreadEntry* entry = threads.at(i);
if (entry->err != srs_success) {
err = srs_error_wrap(entry->err, "thread #%d(%s)", entry->num, entry->label.c_str());
return srs_error_copy(err);
err = srs_error_copy(entry->err);
err = srs_error_wrap(err, "thread #%d(%s)", entry->num, entry->label.c_str());
return err;
}
}

Expand Down
10 changes: 10 additions & 0 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ class SrsThreadPool
int critical_pulse_;
int dying_threshold_;
int dying_pulse_;
private:
// The pid file fd, lock the file write when server is running.
// @remark the init.d script should cleanup the pid file, when stop service,
// for the server never delete the file; when system startup, the pid in pid file
// maybe valid but the process is not SRS, the init.d script will never start server.
int pid_fd;
public:
SrsThreadPool();
virtual ~SrsThreadPool();
Expand All @@ -350,6 +356,10 @@ class SrsThreadPool
static srs_error_t setup();
// Initialize the thread pool.
srs_error_t initialize();
private:
// Require the PID file for the whole process.
virtual srs_error_t acquire_pid_file();
public:
// Execute start function with label in thread.
srs_error_t execute(std::string label, srs_error_t (*start)(void* arg), void* arg);
// Run in the primordial thread, util stop or quit.
Expand Down

0 comments on commit 9eb9b19

Please sign in to comment.