From 95b4baee7cb2af9246759577bd7cfceef26ea794 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 21 Feb 2015 16:25:04 +0800 Subject: [PATCH] for #179, refine dvr code to more simple. --- trunk/conf/full.conf | 29 +- trunk/src/app/srs_app_config.cpp | 18 + trunk/src/app/srs_app_config.hpp | 9 +- trunk/src/app/srs_app_dvr.cpp | 677 ++++++++++++++++------------- trunk/src/app/srs_app_dvr.hpp | 118 +++-- trunk/src/app/srs_app_http_api.cpp | 3 +- 6 files changed, 517 insertions(+), 337 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index a8a92994ba..c5d6181d17 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -281,6 +281,12 @@ vhost dvr.srs.com { # whether enabled dvr features # default: off enabled on; + # the dvr plan. canbe: + # session reap flv when session end(unpublish). + # segment reap flv when flv duration exceed the specified dvr_duration. + # api reap flv when api required. + # default: session + dvr_plan session; # the dvr output path. # we supports some variables to generate the filename. # [vhost], the vhost of stream. @@ -314,22 +320,28 @@ vhost dvr.srs.com { # dvr_path /data/ossrs.net/live/2015/01/livestream-03-10.57.30.776.flv; # @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DVR#custom-path # @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_EN_DVR#custom-path + # segment,session apply it. + # api apply before api specified the path. # default: ./objs/nginx/html dvr_path ./objs/nginx/html; - # the dvr plan. canbe: - # session reap flv when session end(unpublish). - # segment reap flv when flv duration exceed the specified dvr_duration. - # default: session - dvr_plan session; - # the param for plan(segment), in seconds. + # the duration for dvr file, reap if exeed, in seconds. + # segment apply it. + # session,api ignore. # default: 30 dvr_duration 30; - # the param for plan(segment), # whether wait keyframe to reap segment, # if off, reap segment when duration exceed the dvr_duration, # if on, reap segment when duration exceed and got keyframe. + # segment apply it. + # session,api ignore. # default: on dvr_wait_keyframe on; + # whether dvr auto start when publish. + # if off, dvr wait for api to start it. + # api apply it. + # segment,session ignore. + # default: on + dvr_autostart on; # about the stream monotonically increasing: # 1. video timestamp is monotonically increasing, # 2. audio timestamp is monotonically increasing, @@ -340,10 +352,11 @@ vhost dvr.srs.com { # 1. full, to ensure stream start at zero, and ensure stream monotonically increasing. # 2. zero, only ensure sttream start at zero, ignore timestamp jitter. # 3. off, disable the time jitter algorithm, like atc. + # apply for all dvr plan. # default: full time_jitter full; - # on_dvr + # on_dvr, never config in here, should config in http_hooks. # for the dvr http callback, @see http_hooks.on_dvr of vhost hooks.callback.srs.com # @read https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DVR#http-callback # @read https://github.com/winlinvip/simple-rtmp-server/wiki/v2_EN_DVR#http-callback diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index f0d92fdbb7..dc3614154a 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1418,6 +1418,7 @@ int SrsConfig::check_config() string m = conf->at(j)->name.c_str(); if (m != "enabled" && m != "dvr_path" && m != "dvr_plan" && m != "dvr_duration" && m != "dvr_wait_keyframe" && m != "time_jitter" + && m != "dvr_autostart" ) { ret = ERROR_SYSTEM_CONFIG_INVALID; srs_error("unsupported vhost dvr directive %s, ret=%d", m.c_str(), ret); @@ -3377,6 +3378,23 @@ bool SrsConfig::get_dvr_wait_keyframe(string vhost) return false; } +bool SrsConfig::get_dvr_autostart(string vhost) +{ + SrsConfDirective* dvr = get_dvr(vhost); + + if (!dvr) { + return true; + } + + SrsConfDirective* conf = dvr->get("dvr_autostart"); + + if (!conf || conf->arg0() != "off") { + return true; + } + + return false; +} + int SrsConfig::get_dvr_time_jitter(string vhost) { SrsConfDirective* dvr = get_dvr(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index dad115ee60..afa0b0aa5a 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -60,6 +60,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONF_DEFAULT_DVR_PATH "./objs/nginx/html" #define SRS_CONF_DEFAULT_DVR_PLAN_SESSION "session" #define SRS_CONF_DEFAULT_DVR_PLAN_SEGMENT "segment" +#define SRS_CONF_DEFAULT_DVR_PLAN_API "api" #define SRS_CONF_DEFAULT_DVR_PLAN SRS_CONF_DEFAULT_DVR_PLAN_SESSION #define SRS_CONF_DEFAULT_DVR_DURATION 30 #define SRS_CONF_DEFAULT_TIME_JITTER "full" @@ -921,14 +922,18 @@ class SrsConfig */ virtual std::string get_dvr_plan(std::string vhost); /** - * get the duration of dvr flv, for segment plan. + * get the duration of dvr flv. */ virtual int get_dvr_duration(std::string vhost); /** - * whether wait keyframe to reap segment, for segment plan. + * whether wait keyframe to reap segment. */ virtual bool get_dvr_wait_keyframe(std::string vhost); /** + * whether autostart for dvr. wait api to start dvr if false. + */ + virtual bool get_dvr_autostart(std::string vhost); + /** * get the time_jitter algorithm for dvr. */ virtual int get_dvr_time_jitter(std::string vhost); diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 676e1a978b..d83d1c69e7 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -39,8 +39,17 @@ using namespace std; #include #include -SrsFlvSegment::SrsFlvSegment() +SrsFlvSegment::SrsFlvSegment(SrsDvrPlan* p) { + req = NULL; + source = NULL; + jitter = NULL; + plan = p; + + fs = new SrsFileWriter(); + enc = new SrsFlvEncoder(); + jitter_algorithm = SrsRtmpJitterAlgorithmOFF; + path = ""; has_keyframe = false; duration = 0; @@ -48,95 +57,238 @@ SrsFlvSegment::SrsFlvSegment() stream_starttime = 0; stream_previous_pkt_time = -1; stream_duration = 0; + + _srs_config->subscribe(this); } SrsFlvSegment::~SrsFlvSegment() { + _srs_config->unsubscribe(this); + + srs_freep(jitter); + srs_freep(fs); + srs_freep(enc); } -void SrsFlvSegment::reset() +int SrsFlvSegment::initialize(SrsSource* s, SrsRequest* r) { - has_keyframe = false; - starttime = -1; - duration = 0; + int ret = ERROR_SUCCESS; + + source = s; + req = r; + jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost); + + return ret; } -SrsDvrPlan::SrsDvrPlan() +bool SrsFlvSegment::is_overflow(int64_t max_duration) { - _source = NULL; - _req = NULL; - jitter = NULL; - dvr_enabled = false; - fs = new SrsFileWriter(); - enc = new SrsFlvEncoder(); - segment = new SrsFlvSegment(); - jitter_algorithm = SrsRtmpJitterAlgorithmOFF; - - _srs_config->subscribe(this); + return duration >= max_duration; } -SrsDvrPlan::~SrsDvrPlan() +int SrsFlvSegment::open() { - _srs_config->unsubscribe(this); + int ret = ERROR_SUCCESS; - srs_freep(jitter); - srs_freep(fs); - srs_freep(enc); - srs_freep(segment); + // ignore when already open. + if (fs->is_open()) { + return ret; + } + + path = generate_path(); + bool fresh_flv_file = !srs_path_exists(path); + + // create dir first. + std::string dir = path.substr(0, path.rfind("/")); + if ((ret = srs_create_dir_recursively(dir)) != ERROR_SUCCESS) { + srs_error("create dir=%s failed. ret=%d", dir.c_str(), ret); + return ret; + } + srs_info("create dir=%s ok", dir.c_str()); + + // create jitter. + if ((ret = create_jitter(!fresh_flv_file)) != ERROR_SUCCESS) { + srs_error("create jitter failed, path=%s, fresh=%d. ret=%d", path.c_str(), fresh_flv_file, ret); + return ret; + } + + // generate the tmp flv path. + if (!fresh_flv_file) { + // when path exists, always append to it. + // so we must use the target flv path as output flv. + tmp_flv_file = path; + } else { + // when path not exists, dvr to tmp file. + tmp_flv_file = path + ".tmp"; + } + + // open file writer, in append or create mode. + if (!fresh_flv_file) { + if ((ret = fs->open_append(tmp_flv_file)) != ERROR_SUCCESS) { + srs_error("append file stream for file %s failed. ret=%d", path.c_str(), ret); + return ret; + } + srs_trace("dvr: always append to when exists, file=%s.", path.c_str()); + } else { + if ((ret = fs->open(tmp_flv_file)) != ERROR_SUCCESS) { + srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret); + return ret; + } + } + + // when exists, donot write flv header. + if (fresh_flv_file) { + // initialize the encoder. + if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) { + srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret); + return ret; + } + + // write the flv header to writer. + if ((ret = enc->write_header()) != ERROR_SUCCESS) { + srs_error("write flv header failed. ret=%d", ret); + return ret; + } + } + + srs_trace("dvr stream %s to file %s", req->stream.c_str(), path.c_str()); + + return ret; } -int SrsDvrPlan::initialize(SrsSource* source, SrsRequest* req) +int SrsFlvSegment::close() { int ret = ERROR_SUCCESS; - _source = source; - _req = req; + // ignore when already closed. + if (!fs->is_open()) { + return ret; + } - jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(_req->vhost); + fs->close(); + + // when tmp flv file exists, reap it. + if (tmp_flv_file != path) { + if (rename(tmp_flv_file.c_str(), path.c_str()) < 0) { + ret = ERROR_SYSTEM_FILE_RENAME; + srs_error("rename flv file failed, %s => %s. ret=%d", + tmp_flv_file.c_str(), path.c_str(), ret); + return ret; + } + } + +#ifdef SRS_AUTO_HTTP_CALLBACK + if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { + // HTTP: on_dvr + SrsConfDirective* on_dvr = _srs_config->get_vhost_on_dvr(req->vhost); + if (!on_dvr) { + srs_info("ignore the empty http callback: on_dvr"); + return ret; + } + + int connection_id = _srs_context->get_id(); + std::string ip = req->ip; + std::string cwd = _srs_config->cwd(); + std::string file = path; + for (int i = 0; i < (int)on_dvr->args.size(); i++) { + std::string url = on_dvr->args.at(i); + if ((ret = SrsHttpHooks::on_dvr(url, connection_id, ip, req, cwd, file)) != ERROR_SUCCESS) { + srs_error("hook client on_dvr failed. url=%s, ret=%d", url.c_str(), ret); + return ret; + } + } + } +#endif return ret; } -int SrsDvrPlan::on_publish() +int SrsFlvSegment::write_metadata(SrsOnMetaDataPacket* metadata) { int ret = ERROR_SUCCESS; - // support multiple publish. - if (dvr_enabled) { + int size = 0; + char* payload = NULL; + if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) { return ret; } + SrsAutoFree(char, payload); - SrsRequest* req = _req; - - if (!_srs_config->get_dvr_enabled(req->vhost)) { + if ((ret = enc->write_metadata(18, payload, size)) != ERROR_SUCCESS) { return ret; } - // jitter when publish, ensure whole stream start from 0. - srs_freep(jitter); - jitter = new SrsRtmpJitter(); - - // always update time cache. - srs_update_system_time_ms(); + return ret; +} + +int SrsFlvSegment::write_audio(SrsSharedPtrMessage* __audio) +{ + int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* audio = __audio->copy(); + SrsAutoFree(SrsSharedPtrMessage, audio); - // when republish, stream starting. - segment->stream_previous_pkt_time = -1; - segment->stream_starttime = srs_get_system_time_ms(); - segment->stream_duration = 0; + if ((jitter->correct(audio, 0, 0, jitter_algorithm)) != ERROR_SUCCESS) { + return ret; + } - if ((ret = open_new_segment()) != ERROR_SUCCESS) { + char* payload = audio->payload; + int size = audio->size; + int64_t timestamp = plan->filter_timestamp(audio->timestamp); + if ((ret = enc->write_audio(timestamp, payload, size)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = on_update_duration(audio)) != ERROR_SUCCESS) { return ret; } return ret; } -int SrsDvrPlan::open_new_segment() +int SrsFlvSegment::write_video(SrsSharedPtrMessage* __video) { int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* video = __video->copy(); + SrsAutoFree(SrsSharedPtrMessage, video); + + char* payload = video->payload; + int size = video->size; - SrsRequest* req = _req; +#ifdef SRS_AUTO_HTTP_CALLBACK + bool is_key_frame = SrsFlvCodec::video_is_h264(payload, size) + && SrsFlvCodec::video_is_keyframe(payload, size) + && !SrsFlvCodec::video_is_sequence_header(payload, size); + if (is_key_frame) { + has_keyframe = true; + if ((ret = plan->on_video_keyframe()) != ERROR_SUCCESS) { + return ret; + } + } + srs_verbose("dvr video is key: %d", is_key_frame); +#endif + if ((jitter->correct(video, 0, 0, jitter_algorithm)) != ERROR_SUCCESS) { + return ret; + } + + // update segment duration, session plan just update the duration, + // the segment plan will reap segment if exceed, this video will write to next segment. + if ((ret = on_update_duration(video)) != ERROR_SUCCESS) { + return ret; + } + + int32_t timestamp = plan->filter_timestamp(video->timestamp); + if ((ret = enc->write_video(timestamp, payload, size)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +string SrsFlvSegment::generate_path() +{ // the path in config, for example, // /data/[vhost]/[app]/[stream]/[2006]/[01]/[02]/[15].[04].[05].[999].flv std::string path_config = _srs_config->get_dvr_path(req->vhost); @@ -147,26 +299,26 @@ int SrsDvrPlan::open_new_segment() } // the flv file path - std::string path = path_config; + std::string flv_path = path_config; // variable [vhost] - path = srs_string_replace(path, "[vhost]", req->vhost); + flv_path = srs_string_replace(flv_path, "[vhost]", req->vhost); // variable [app] - path = srs_string_replace(path, "[app]", req->app); + flv_path = srs_string_replace(flv_path, "[app]", req->app); // variable [stream] - path = srs_string_replace(path, "[stream]", req->stream); + flv_path = srs_string_replace(flv_path, "[stream]", req->stream); // date and time substitude // clock time timeval tv; if (gettimeofday(&tv, NULL) == -1) { - return ERROR_SYSTEM_TIME; + return flv_path; } // to calendar time struct tm* tm; if ((tm = localtime(&tv.tv_sec)) == NULL) { - return ERROR_SYSTEM_TIME; + return flv_path; } // the buffer to format the date and time. @@ -175,316 +327,212 @@ int SrsDvrPlan::open_new_segment() // [2006], replace with current year. if (true) { snprintf(buf, sizeof(buf), "%d", 1900 + tm->tm_year); - path = srs_string_replace(path, "[2006]", buf); + flv_path = srs_string_replace(flv_path, "[2006]", buf); } // [2006], replace with current year. if (true) { snprintf(buf, sizeof(buf), "%d", 1900 + tm->tm_year); - path = srs_string_replace(path, "[2006]", buf); + flv_path = srs_string_replace(flv_path, "[2006]", buf); } // [01], replace this const to current month. if (true) { snprintf(buf, sizeof(buf), "%d", 1 + tm->tm_mon); - path = srs_string_replace(path, "[01]", buf); + flv_path = srs_string_replace(flv_path, "[01]", buf); } // [02], replace this const to current date. if (true) { snprintf(buf, sizeof(buf), "%d", tm->tm_mday); - path = srs_string_replace(path, "[02]", buf); + flv_path = srs_string_replace(flv_path, "[02]", buf); } // [15], replace this const to current hour. if (true) { snprintf(buf, sizeof(buf), "%d", tm->tm_hour); - path = srs_string_replace(path, "[15]", buf); + flv_path = srs_string_replace(flv_path, "[15]", buf); } // [04], repleace this const to current minute. if (true) { snprintf(buf, sizeof(buf), "%d", tm->tm_min); - path = srs_string_replace(path, "[04]", buf); + flv_path = srs_string_replace(flv_path, "[04]", buf); } // [05], repleace this const to current second. if (true) { snprintf(buf, sizeof(buf), "%d", tm->tm_sec); - path = srs_string_replace(path, "[05]", buf); + flv_path = srs_string_replace(flv_path, "[05]", buf); } // [999], repleace this const to current millisecond. if (true) { snprintf(buf, sizeof(buf), "%03d", (int)(tv.tv_usec / 1000)); - path = srs_string_replace(path, "[999]", buf); + flv_path = srs_string_replace(flv_path, "[999]", buf); } // [timestamp],replace this const to current UNIX timestamp in ms. if (true) { int64_t now_us = ((int64_t)tv.tv_sec) * 1000 * 1000 + (int64_t)tv.tv_usec; snprintf(buf, sizeof(buf), "%"PRId64, now_us / 1000); - path = srs_string_replace(path, "[timestamp]", buf); + flv_path = srs_string_replace(flv_path, "[timestamp]", buf); } + + return flv_path; +} + +int SrsFlvSegment::create_jitter(bool loads_from_flv) +{ + int ret = ERROR_SUCCESS; - // create dir first. - std::string dir = path.substr(0, path.rfind("/")); - if ((ret = srs_create_dir_recursively(dir)) != ERROR_SUCCESS) { - srs_error("create dir=%s failed. ret=%d", dir.c_str(), ret); + // when path exists, use exists jitter. + if (!loads_from_flv) { + // jitter when publish, ensure whole stream start from 0. + srs_freep(jitter); + jitter = new SrsRtmpJitter(); + + // fresh stream starting. + starttime = -1; + stream_previous_pkt_time = -1; + stream_starttime = srs_update_system_time_ms(); + stream_duration = 0; + + // fresh segment starting. + has_keyframe = false; + duration = 0; + return ret; } - srs_info("create dir=%s ok", dir.c_str()); - - if ((ret = flv_open(req->get_stream_url(), path)) != ERROR_SUCCESS) { + + // when jitter ok, do nothing. + if (jitter) { return ret; } - dvr_enabled = true; - + + // always ensure the jitter crote. + // for the first time, initialize jitter from exists file. + jitter = new SrsRtmpJitter(); + + // TODO: FIXME: implements it. + return ret; } -int SrsDvrPlan::on_dvr_request_sh() +int SrsFlvSegment::on_update_duration(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; - // the dvr is enabled, notice the source to push the data. - if ((ret = _source->on_dvr_request_sh()) != ERROR_SUCCESS) { - return ret; + // we must assumpt that the stream timestamp is monotonically increase, + // that is, always use time jitter to correct the timestamp. + // except the time jitter is disabled in config. + + // set the segment starttime at first time + if (starttime < 0) { + starttime = msg->timestamp; } + // no previous packet or timestamp overflow. + if (stream_previous_pkt_time < 0 || stream_previous_pkt_time > msg->timestamp) { + stream_previous_pkt_time = msg->timestamp; + } + + // collect segment and stream duration, timestamp overflow is ok. + duration += msg->timestamp - stream_previous_pkt_time; + stream_duration += msg->timestamp - stream_previous_pkt_time; + + // update previous packet time + stream_previous_pkt_time = msg->timestamp; + return ret; } -int SrsDvrPlan::on_video_keyframe() +int SrsFlvSegment::on_reload_vhost_dvr(std::string /*vhost*/) { int ret = ERROR_SUCCESS; + + jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost); + return ret; } -int64_t SrsDvrPlan::filter_timestamp(int64_t timestamp) +SrsDvrPlan::SrsDvrPlan() { - return timestamp; + source = NULL; + req = NULL; + + dvr_enabled = false; + segment = new SrsFlvSegment(this); } -int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata) +SrsDvrPlan::~SrsDvrPlan() { - int ret = ERROR_SUCCESS; - - if (!dvr_enabled) { - return ret; - } - - int size = 0; - char* payload = NULL; - if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) { - return ret; - } - SrsAutoFree(char, payload); - - if ((ret = enc->write_metadata(18, payload, size)) != ERROR_SUCCESS) { - return ret; - } - - return ret; + srs_freep(segment); } -int SrsDvrPlan::on_audio(SrsSharedPtrMessage* __audio) +int SrsDvrPlan::initialize(SrsSource* s, SrsRequest* r) { int ret = ERROR_SUCCESS; - - if (!dvr_enabled) { - return ret; - } - SrsSharedPtrMessage* audio = __audio->copy(); - SrsAutoFree(SrsSharedPtrMessage, audio); - - if ((jitter->correct(audio, 0, 0, jitter_algorithm)) != ERROR_SUCCESS) { - return ret; - } - - char* payload = audio->payload; - int size = audio->size; - int64_t timestamp = filter_timestamp(audio->timestamp); - if ((ret = enc->write_audio(timestamp, payload, size)) != ERROR_SUCCESS) { - return ret; - } - - if ((ret = update_duration(audio)) != ERROR_SUCCESS) { + source = s; + req = r; + + if ((ret = segment->initialize(s, r)) != ERROR_SUCCESS) { return ret; } - + return ret; } -int SrsDvrPlan::on_video(SrsSharedPtrMessage* __video) +int SrsDvrPlan::on_dvr_request_sh() { int ret = ERROR_SUCCESS; - if (!dvr_enabled) { - return ret; - } - - SrsSharedPtrMessage* video = __video->copy(); - SrsAutoFree(SrsSharedPtrMessage, video); - - char* payload = video->payload; - int size = video->size; - -#ifdef SRS_AUTO_HTTP_CALLBACK - bool is_key_frame = SrsFlvCodec::video_is_h264(payload, size) - && SrsFlvCodec::video_is_keyframe(payload, size) - && !SrsFlvCodec::video_is_sequence_header(payload, size); - if (is_key_frame) { - segment->has_keyframe = true; - if ((ret = on_video_keyframe()) != ERROR_SUCCESS) { - return ret; - } - } - srs_verbose("dvr video is key: %d", is_key_frame); -#endif - - if ((jitter->correct(video, 0, 0, jitter_algorithm)) != ERROR_SUCCESS) { - return ret; - } - - // update segment duration, session plan just update the duration, - // the segment plan will reap segment if exceed, this video will write to next segment. - if ((ret = update_duration(video)) != ERROR_SUCCESS) { - return ret; - } - - int32_t timestamp = filter_timestamp(video->timestamp); - if ((ret = enc->write_video(timestamp, payload, size)) != ERROR_SUCCESS) { + // the dvr is enabled, notice the source to push the data. + if ((ret = source->on_dvr_request_sh()) != ERROR_SUCCESS) { return ret; } return ret; } -int SrsDvrPlan::on_reload_vhost_dvr(std::string /*vhost*/) +int SrsDvrPlan::on_video_keyframe() { - int ret = ERROR_SUCCESS; - - jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(_req->vhost); - - return ret; + return ERROR_SUCCESS; } -int SrsDvrPlan::flv_open(string stream, string path) +int64_t SrsDvrPlan::filter_timestamp(int64_t timestamp) { - int ret = ERROR_SUCCESS; - - segment->reset(); - - if (srs_path_exists(path)) { - // when path exists, always append to it. - // so we must use the target flv path as output flv. - tmp_flv_file = path; - } else { - // when path not exists, dvr to tmp file. - tmp_flv_file = path + ".tmp"; - } - - if (srs_path_exists(path)) { - if ((ret = fs->open_append(tmp_flv_file)) != ERROR_SUCCESS) { - srs_error("append file stream for file %s failed. ret=%d", path.c_str(), ret); - return ret; - } - srs_trace("dvr: always append to when exists, file=%s.", path.c_str()); - } else { - if ((ret = fs->open(tmp_flv_file)) != ERROR_SUCCESS) { - srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret); - return ret; - } - } - - if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) { - srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret); - return ret; - } - - // when exists, donot write flv header. - if (tmp_flv_file != path) { - if ((ret = write_flv_header()) != ERROR_SUCCESS) { - return ret; - } - } - - segment->path = path; - - srs_trace("dvr stream %s to file %s", stream.c_str(), path.c_str()); - return ret; + return timestamp; } -int SrsDvrPlan::flv_close() +int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata) { int ret = ERROR_SUCCESS; - fs->close(); - - // when tmp flv file exists, reap it. - if (tmp_flv_file != segment->path) { - if (rename(tmp_flv_file.c_str(), segment->path.c_str()) < 0) { - ret = ERROR_SYSTEM_FILE_RENAME; - srs_error("rename flv file failed, %s => %s. ret=%d", - tmp_flv_file.c_str(), segment->path.c_str(), ret); - return ret; - } + if (!dvr_enabled) { + return ret; } -#ifdef SRS_AUTO_HTTP_CALLBACK - SrsRequest* req = _req; - if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { - // HTTP: on_dvr - SrsConfDirective* on_dvr = _srs_config->get_vhost_on_dvr(req->vhost); - if (!on_dvr) { - srs_info("ignore the empty http callback: on_dvr"); - return ret; - } - - int connection_id = _srs_context->get_id(); - std::string ip = req->ip; - std::string cwd = _srs_config->cwd(); - std::string file = segment->path; - for (int i = 0; i < (int)on_dvr->args.size(); i++) { - std::string url = on_dvr->args.at(i); - if ((ret = SrsHttpHooks::on_dvr(url, connection_id, ip, req, cwd, file)) != ERROR_SUCCESS) { - srs_error("hook client on_dvr failed. url=%s, ret=%d", url.c_str(), ret); - return ret; - } - } - } -#endif - - return ret; + return segment->write_metadata(metadata); } -int SrsDvrPlan::update_duration(SrsSharedPtrMessage* msg) +int SrsDvrPlan::on_audio(SrsSharedPtrMessage* __audio) { int ret = ERROR_SUCCESS; - - // we must assumpt that the stream timestamp is monotonically increase, - // that is, always use time jitter to correct the timestamp. - // set the segment starttime at first time - if (segment->starttime < 0) { - segment->starttime = msg->timestamp; + if (!dvr_enabled) { + return ret; } - - // no previous packet or timestamp overflow. - if (segment->stream_previous_pkt_time < 0 || segment->stream_previous_pkt_time > msg->timestamp) { - segment->stream_previous_pkt_time = msg->timestamp; + + if ((ret = segment->write_audio(__audio)) != ERROR_SUCCESS) { + return ret; } - // collect segment and stream duration, timestamp overflow is ok. - segment->duration += msg->timestamp - segment->stream_previous_pkt_time; - segment->stream_duration += msg->timestamp - segment->stream_previous_pkt_time; - - // update previous packet time - segment->stream_previous_pkt_time = msg->timestamp; - return ret; } -int SrsDvrPlan::write_flv_header() +int SrsDvrPlan::on_video(SrsSharedPtrMessage* __video) { int ret = ERROR_SUCCESS; - if ((ret = enc->write_header()) != ERROR_SUCCESS) { - srs_error("write flv header failed. ret=%d", ret); + if (!dvr_enabled) { + return ret; + } + + if ((ret = segment->write_video(__video)) != ERROR_SUCCESS) { return ret; } @@ -511,6 +559,32 @@ SrsDvrSessionPlan::~SrsDvrSessionPlan() { } +int SrsDvrSessionPlan::on_publish() +{ + int ret = ERROR_SUCCESS; + + // support multiple publish. + if (dvr_enabled) { + return ret; + } + + if (!_srs_config->get_dvr_enabled(req->vhost)) { + return ret; + } + + if ((ret = segment->close()) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = segment->open()) != ERROR_SUCCESS) { + return ret; + } + + dvr_enabled = true; + + return ret; +} + void SrsDvrSessionPlan::on_unpublish() { // support multiple publish. @@ -519,7 +593,7 @@ void SrsDvrSessionPlan::on_unpublish() } // ignore error. - int ret = flv_close(); + int ret = segment->close(); if (ret != ERROR_SUCCESS) { srs_warn("ignore flv close error. ret=%d", ret); } @@ -558,65 +632,86 @@ int SrsDvrSegmentPlan::on_publish() { int ret = ERROR_SUCCESS; - // if already opened, continue to dvr. - // the segment plan maybe keep running longer than the encoder. - // for example, segment running, encoder restart, - // the segment plan will just continue going and donot open new segment. - if (fs->is_open()) { - dvr_enabled = true; + // support multiple publish. + if (dvr_enabled) { return ret; } - - return SrsDvrPlan::on_publish(); + + if (!_srs_config->get_dvr_enabled(req->vhost)) { + return ret; + } + + if ((ret = segment->close()) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = segment->open()) != ERROR_SUCCESS) { + return ret; + } + + dvr_enabled = true; + + return ret; } void SrsDvrSegmentPlan::on_unpublish() { - // support multiple publish. - if (!dvr_enabled) { - return; - } - dvr_enabled = false; } int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* audio) { + int ret = ERROR_SUCCESS; + if (SrsFlvCodec::audio_is_sequence_header(audio->payload, audio->size)) { srs_freep(sh_audio); sh_audio = audio->copy(); } + + if ((ret = update_duration(audio)) != ERROR_SUCCESS) { + return ret; + } - return SrsDvrPlan::on_audio(audio); + if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) { + return ret; + } + + return ret; } int SrsDvrSegmentPlan::on_video(SrsSharedPtrMessage* video) { + int ret = ERROR_SUCCESS; + if (SrsFlvCodec::video_is_sequence_header(video->payload, video->size)) { srs_freep(sh_video); sh_video = video->copy(); } + + if ((ret = update_duration(video)) != ERROR_SUCCESS) { + return ret; + } - return SrsDvrPlan::on_video(video); + if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) { + return ret; + } + + return ret; } int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; - if ((ret = SrsDvrPlan::update_duration(msg)) != ERROR_SUCCESS) { - return ret; - } - srs_assert(segment); // ignore if duration ok. - if (segment_duration <= 0 || segment->duration < segment_duration) { + if (segment_duration <= 0 || !segment->is_overflow(segment_duration)) { return ret; } // when wait keyframe, ignore if no frame arrived. // @see https://github.com/winlinvip/simple-rtmp-server/issues/177 - if (_srs_config->get_dvr_wait_keyframe(_req->vhost)) { + if (_srs_config->get_dvr_wait_keyframe(req->vhost)) { if (!msg->is_video()) { return ret; } @@ -632,14 +727,12 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg) } // reap segment - if ((ret = flv_close()) != ERROR_SUCCESS) { - segment->reset(); + if ((ret = segment->close()) != ERROR_SUCCESS) { return ret; } - on_unpublish(); // open new flv file - if ((ret = open_new_segment()) != ERROR_SUCCESS) { + if ((ret = segment->open()) != ERROR_SUCCESS) { return ret; } @@ -654,9 +747,9 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg) return ret; } -SrsDvr::SrsDvr(SrsSource* source) +SrsDvr::SrsDvr(SrsSource* s) { - _source = source; + source = s; plan = NULL; } @@ -665,21 +758,21 @@ SrsDvr::~SrsDvr() srs_freep(plan); } -int SrsDvr::initialize(SrsRequest* req) +int SrsDvr::initialize(SrsRequest* r) { int ret = ERROR_SUCCESS; srs_freep(plan); - plan = SrsDvrPlan::create_plan(req->vhost); + plan = SrsDvrPlan::create_plan(r->vhost); - if ((ret = plan->initialize(_source, req)) != ERROR_SUCCESS) { + if ((ret = plan->initialize(source, r)) != ERROR_SUCCESS) { return ret; } return ret; } -int SrsDvr::on_publish(SrsRequest* /*req*/) +int SrsDvr::on_publish(SrsRequest* /*r*/) { int ret = ERROR_SUCCESS; @@ -695,11 +788,11 @@ void SrsDvr::on_unpublish() plan->on_unpublish(); } -int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata) +int SrsDvr::on_meta_data(SrsOnMetaDataPacket* m) { int ret = ERROR_SUCCESS; - if ((ret = plan->on_meta_data(metadata)) != ERROR_SUCCESS) { + if ((ret = plan->on_meta_data(m)) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index d879c9e0ce..93a35b95fa 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -41,16 +41,34 @@ class SrsOnMetaDataPacket; class SrsSharedPtrMessage; class SrsFileWriter; class SrsFlvEncoder; +class SrsDvrPlan; #include #include /** * a piece of flv segment. +* when open segment, support start at 0 or not. */ -class SrsFlvSegment +class SrsFlvSegment : public ISrsReloadHandler { -public: +private: + SrsSource* source; + SrsRequest* req; + SrsDvrPlan* plan; +private: + /** + * the underlayer dvr stream. + * if close, the flv is reap and closed. + * if open, new flv file is crote. + */ + SrsFlvEncoder* enc; + SrsRtmpJitter* jitter; + SrsRtmpJitterAlgorithm jitter_algorithm; + SrsFileWriter* fs; +private: + std::string tmp_flv_file; +private: /** * current segment flv file path. */ @@ -81,10 +99,56 @@ class SrsFlvSegment */ int64_t stream_previous_pkt_time; public: - SrsFlvSegment(); + SrsFlvSegment(SrsDvrPlan* p); virtual ~SrsFlvSegment(); public: - virtual void reset(); + /** + * initialize the segment. + */ + virtual int initialize(SrsSource* s, SrsRequest* r); + /** + * whether segment is overflow. + */ + virtual bool is_overflow(int64_t max_duration); + /** + * open new segment file, timestamp start at 0 for fresh flv file. + * @remark ignore when already open. + */ + virtual int open(); + /** + * close current segment. + * @remark ignore when already closed. + */ + virtual int close(); + /** + * write the metadata to segment. + */ + virtual int write_metadata(SrsOnMetaDataPacket* metadata); + /** + * @param __audio, directly ptr, copy it if need to save it. + */ + virtual int write_audio(SrsSharedPtrMessage* __audio); + /** + * @param __video, directly ptr, copy it if need to save it. + */ + virtual int write_video(SrsSharedPtrMessage* __video); +private: + /** + * generate the flv segment path. + */ + virtual std::string generate_path(); + /** + * create flv jitter. load jitter when flv exists. + * @param loads_from_flv whether loads the jitter from exists flv file. + */ + virtual int create_jitter(bool loads_from_flv); + /** + * when update the duration of segment by rtmp msg. + */ + virtual int on_update_duration(SrsSharedPtrMessage* msg); +// interface ISrsReloadHandler +public: + virtual int on_reload_vhost_dvr(std::string vhost); }; /** @@ -94,32 +158,25 @@ class SrsFlvSegment * 2. reap flv: when to reap the flv and start new piece. */ // TODO: FIXME: the plan is too fat, refine me. -class SrsDvrPlan : public ISrsReloadHandler +class SrsDvrPlan { -private: - /** - * the underlayer dvr stream. - * if close, the flv is reap and closed. - * if open, new flv file is crote. - */ - SrsFlvEncoder* enc; - SrsSource* _source; - SrsRtmpJitter* jitter; - SrsRtmpJitterAlgorithm jitter_algorithm; +public: + friend class SrsFlvSegment; protected: + SrsSource* source; + SrsRequest* req; SrsFlvSegment* segment; - SrsRequest* _req; bool dvr_enabled; - SrsFileWriter* fs; -private: - std::string tmp_flv_file; public: SrsDvrPlan(); virtual ~SrsDvrPlan(); public: - virtual int initialize(SrsSource* source, SrsRequest* req); - virtual int on_publish(); + virtual int initialize(SrsSource* s, SrsRequest* r); + virtual int on_publish() = 0; virtual void on_unpublish() = 0; + /** + * when got metadata. + */ virtual int on_meta_data(SrsOnMetaDataPacket* metadata); /** * @param __audio, directly ptr, copy it if need to save it. @@ -129,15 +186,7 @@ class SrsDvrPlan : public ISrsReloadHandler * @param __video, directly ptr, copy it if need to save it. */ virtual int on_video(SrsSharedPtrMessage* __video); -// interface ISrsReloadHandler -public: - virtual int on_reload_vhost_dvr(std::string vhost); protected: - virtual int flv_open(std::string stream, std::string path); - virtual int flv_close(); - virtual int open_new_segment(); - virtual int update_duration(SrsSharedPtrMessage* msg); - virtual int write_flv_header(); virtual int on_dvr_request_sh(); virtual int on_video_keyframe(); virtual int64_t filter_timestamp(int64_t timestamp); @@ -154,6 +203,7 @@ class SrsDvrSessionPlan : public SrsDvrPlan SrsDvrSessionPlan(); virtual ~SrsDvrSessionPlan(); public: + virtual int on_publish(); virtual void on_unpublish(); }; @@ -193,11 +243,11 @@ class SrsDvrSegmentPlan : public SrsDvrPlan class SrsDvr { private: - SrsSource* _source; + SrsSource* source; private: SrsDvrPlan* plan; public: - SrsDvr(SrsSource* source); + SrsDvr(SrsSource* s); virtual ~SrsDvr(); public: /** @@ -205,12 +255,12 @@ class SrsDvr * when system initialize(encoder publish at first time, or reload), * initialize the dvr will reinitialize the plan, the whole dvr framework. */ - virtual int initialize(SrsRequest* req); + virtual int initialize(SrsRequest* r); /** * publish stream event, * when encoder start to publish RTMP stream. */ - virtual int on_publish(SrsRequest* req); + virtual int on_publish(SrsRequest* r); /** * the unpublish event., * when encoder stop(unpublish) to publish RTMP stream. @@ -219,7 +269,7 @@ class SrsDvr /** * get some information from metadata, it's optinal. */ - virtual int on_meta_data(SrsOnMetaDataPacket* metadata); + virtual int on_meta_data(SrsOnMetaDataPacket* m); /** * mux the audio packets to dvr. * @param __audio, directly ptr, copy it if need to save it. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 7df84f1c5c..8f7dfa6dcc 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -106,7 +106,8 @@ int SrsGoApiV1::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) << __SRS_JFIELD_STR("authors", "the primary authors and contributors") << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("requests", "the request itself, for http debug") << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("vhosts", "dumps vhost to json") << __SRS_JFIELD_CONT - << __SRS_JFIELD_STR("streams", "dumps streams to json") + << __SRS_JFIELD_STR("streams", "dumps streams to json") << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("dvrs", "query or control the dvr plan") << __SRS_JOBJECT_END << __SRS_JOBJECT_END;