diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf
index d20284aa6f..31d6def3cd 100644
--- a/trunk/conf/full.conf
+++ b/trunk/conf/full.conf
@@ -157,8 +157,8 @@ stream_caster {
# rtmp://127.0.0.1/live/livestream
output rtmp://127.0.0.1/live/livestream;
# the listen port for stream caster.
- # for mpegts_over_udp caster, listen at udp port.
- # for rtsp caster, listen at tcp port.
+ # for mpegts_over_udp caster, listen at udp port. for example, 8935.
+ # for rtsp caster, listen at tcp port. for example, 554.
listen 8935;
}
stream_caster {
@@ -168,7 +168,7 @@ stream_caster {
listen 8935;
}
stream_caster {
- enabled on;
+ enabled off;
caster rtsp;
output rtmp://127.0.0.1/[app]/[stream];
listen 554;
diff --git a/trunk/configure b/trunk/configure
index 41630abb68..b1e79e28fe 100755
--- a/trunk/configure
+++ b/trunk/configure
@@ -376,7 +376,7 @@ MODULE_DEPENDS=("CORE" "KERNEL")
ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot})
MODULE_FILES=("srs_rtmp_amf0" "srs_rtmp_io" "srs_rtmp_stack" "srs_rtmp_sdk"
"srs_rtmp_handshake" "srs_rtmp_utility" "srs_rtmp_msg_array" "srs_rtmp_buffer"
- "srs_raw_avc")
+ "srs_raw_avc" "srs_rtsp_stack")
RTMP_INCS="src/protocol"; MODULE_DIR=${RTMP_INCS} . auto/modules.sh
RTMP_OBJS="${MODULE_OBJS[@]}"
#
diff --git a/trunk/ide/srs_upp/srs_upp.upp b/trunk/ide/srs_upp/srs_upp.upp
index d617bc52dc..245dd7fbd0 100755
--- a/trunk/ide/srs_upp/srs_upp.upp
+++ b/trunk/ide/srs_upp/srs_upp.upp
@@ -36,6 +36,8 @@ file
../../src/kernel/srs_kernel_log.cpp,
../../src/kernel/srs_kernel_mp3.hpp,
../../src/kernel/srs_kernel_mp3.cpp,
+ ../../src/kernel/srs_rtsp_stack.hpp,
+ ../../src/kernel/srs_rtsp_stack.cpp,
../../src/kernel/srs_kernel_stream.hpp,
../../src/kernel/srs_kernel_stream.cpp,
../../src/kernel/srs_kernel_ts.cpp,
diff --git a/trunk/ide/srs_vs2010/srs.vcxproj b/trunk/ide/srs_vs2010/srs.vcxproj
index 0c0afb3018..3013b0eaeb 100755
--- a/trunk/ide/srs_vs2010/srs.vcxproj
+++ b/trunk/ide/srs_vs2010/srs.vcxproj
@@ -89,6 +89,7 @@
+
@@ -125,6 +126,7 @@
+
@@ -166,6 +168,7 @@
+
@@ -203,6 +206,7 @@
+
diff --git a/trunk/ide/srs_vs2010/srs.vcxproj.filters b/trunk/ide/srs_vs2010/srs.vcxproj.filters
index a592ec709c..1c1ad77401 100755
--- a/trunk/ide/srs_vs2010/srs.vcxproj.filters
+++ b/trunk/ide/srs_vs2010/srs.vcxproj.filters
@@ -229,6 +229,12 @@
srs
+
+ srs
+
+
+ srs
+
@@ -420,6 +426,12 @@
srs
+
+ srs
+
+
+ srs
+
diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp
index 56c091bf39..1763363804 100644
--- a/trunk/src/app/srs_app_rtsp.cpp
+++ b/trunk/src/app/srs_app_rtsp.cpp
@@ -23,7 +23,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include
+#include
+using namespace std;
+
#include
+#include
+#include
+#include
+#include
+#include
#ifdef SRS_AUTO_STREAM_CASTER
@@ -35,13 +43,115 @@ ISrsRtspHandler::~ISrsRtspHandler()
{
}
-SrsRtspConn::SrsRtspConn(SrsConfDirective* c)
+SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
{
- output = _srs_config->get_stream_caster_output(c);
+ output = o;
+ caster = c;
+ stfd = fd;
+ skt = new SrsStSocket(fd);
+ rtsp = new SrsRtspStack(skt);
+ trd = new SrsThread("rtsp", this, 0, false);
}
SrsRtspConn::~SrsRtspConn()
{
+ srs_close_stfd(stfd);
+ trd->stop();
+
+ srs_freep(trd);
+ srs_freep(skt);
+ srs_freep(rtsp);
+}
+
+int SrsRtspConn::serve()
+{
+ return trd->start();
+}
+
+int SrsRtspConn::do_cycle()
+{
+ int ret = ERROR_SUCCESS;
+
+ // retrieve ip of client.
+ std::string ip = srs_get_peer_ip(st_netfd_fileno(stfd));
+ srs_trace("rtsp: serve %s", ip.c_str());
+
+ return ret;
+}
+
+int SrsRtspConn::cycle()
+{
+ // serve the rtsp client.
+ int ret = do_cycle();
+
+ // if socket io error, set to closed.
+ if (srs_is_client_gracefully_close(ret)) {
+ ret = ERROR_SOCKET_CLOSED;
+ }
+
+ // success.
+ if (ret == ERROR_SUCCESS) {
+ srs_trace("client finished.");
+ }
+
+ // client close peer.
+ if (ret == ERROR_SOCKET_CLOSED) {
+ srs_warn("client disconnect peer. ret=%d", ret);
+ }
+
+ // terminate thread in the thread cycle itself.
+ trd->stop_loop();
+
+ return ERROR_SUCCESS;
+}
+
+void SrsRtspConn::on_thread_stop()
+{
+ caster->remove(this);
+}
+
+SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c)
+{
+ // TODO: FIXME: support reload.
+ output = _srs_config->get_stream_caster_output(c);
+}
+
+SrsRtspCaster::~SrsRtspCaster()
+{
+ std::vector::iterator it;
+ for (it = clients.begin(); it != clients.end(); ++it) {
+ SrsRtspConn* conn = *it;
+ srs_freep(conn);
+ }
+ clients.clear();
+}
+
+int SrsRtspCaster::serve_client(st_netfd_t stfd)
+{
+ int ret = ERROR_SUCCESS;
+
+ SrsRtspConn* conn = new SrsRtspConn(this, stfd, output);
+ if ((ret = conn->serve()) != ERROR_SUCCESS) {
+ srs_error("rtsp: serve client failed. ret=%d", ret);
+ srs_freep(conn);
+ return ret;
+ }
+
+ clients.push_back(conn);
+ srs_info("rtsp: start thread to serve client.");
+
+ return ret;
+}
+
+void SrsRtspCaster::remove(SrsRtspConn* conn)
+{
+ std::vector::iterator it = find(clients.begin(), clients.end(), conn);
+ if (it != clients.end()) {
+ clients.erase(it);
+ }
+ srs_info("rtsp: remove connection from caster.");
+
+ srs_freep(conn);
}
#endif
diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp
index acf7995c8f..0fd461e130 100644
--- a/trunk/src/app/srs_app_rtsp.hpp
+++ b/trunk/src/app/srs_app_rtsp.hpp
@@ -31,10 +31,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include
#include
+#include
+
+#include
+#include
#ifdef SRS_AUTO_STREAM_CASTER
class SrsConfDirective;
+class SrsStSocket;
+class SrsRtspStack;
+class SrsRtspCaster;
/**
* the handler for rtsp handler.
@@ -44,18 +51,54 @@ class ISrsRtspHandler
public:
ISrsRtspHandler();
virtual ~ISrsRtspHandler();
+public:
+ /**
+ * serve the rtsp connection.
+ */
+ virtual int serve_client(st_netfd_t stfd) = 0;
};
/**
-* the connection for rtsp.
+* the rtsp connection serve the fd.
*/
-class SrsRtspConn : public ISrsRtspHandler
+class SrsRtspConn : public ISrsThreadHandler
{
private:
std::string output;
+ st_netfd_t stfd;
+ SrsStSocket* skt;
+ SrsRtspStack* rtsp;
+ SrsRtspCaster* caster;
+ SrsThread* trd;
public:
- SrsRtspConn(SrsConfDirective* c);
+ SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o);
virtual ~SrsRtspConn();
+public:
+ virtual int serve();
+private:
+ virtual int do_cycle();
+// interface ISrsThreadHandler
+public:
+ virtual int cycle();
+ virtual void on_thread_stop();
+};
+
+/**
+* the caster for rtsp.
+*/
+class SrsRtspCaster : public ISrsRtspHandler
+{
+private:
+ std::string output;
+ std::vector clients;
+public:
+ SrsRtspCaster(SrsConfDirective* c);
+ virtual ~SrsRtspCaster();
+public:
+ virtual int serve_client(st_netfd_t stfd);
+// internal methods.
+public:
+ virtual void remove(SrsRtspConn* conn);
};
#endif
diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp
index 626f20640c..100fc34421 100644
--- a/trunk/src/app/srs_app_server.cpp
+++ b/trunk/src/app/srs_app_server.cpp
@@ -240,7 +240,7 @@ SrsRtspListener::SrsRtspListener(SrsServer* server, SrsListenerType type, SrsCon
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerRtsp);
if (_type == SrsListenerRtsp) {
- caster = new SrsRtspConn(c);
+ caster = new SrsRtspCaster(c);
}
}
@@ -262,7 +262,7 @@ int SrsRtspListener::cycle()
}
srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
- if ((ret = _server->accept_client(_type, client_stfd)) != ERROR_SUCCESS) {
+ if ((ret = caster->serve_client(client_stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
}
diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp
index ff35305fa1..1af72a99e1 100644
--- a/trunk/src/app/srs_app_thread.hpp
+++ b/trunk/src/app/srs_app_thread.hpp
@@ -46,22 +46,78 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* which will cause the socket to return error and
* terminate the cycle thread.
*
-* when thread interrupt, the socket maybe not got EINT,
-* espectially on st_usleep(), so the cycle must check the loop,
-* when handler->cycle() has loop itself, for example:
-* while (true):
-* if (read_from_socket(skt) < 0) break;
-* if thread stop when read_from_socket, it's ok, the loop will break,
-* but when thread stop interrupt the s_usleep(0), then the loop is
-* death loop.
-* in a word, the handler->cycle() must:
-* while (pthread->can_loop()):
-* if (read_from_socket(skt) < 0) break;
-* check the loop, then it works.
+* Usage 1: stop by other thread.
+* user can create thread and stop then start again and again,
+* generally must provides a start and stop method, @see SrsIngester.
+* the step to create a thread stop by other thread:
+* 1. create SrsThread field, with joinable true.
+* 2. must use stop to stop and join the thread.
+* for example:
+* class SrsIngester : public ISrsThreadHandler {
+* public: SrsIngester() { pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true); }
+* public: virtual int start() { return pthread->start(); }
+* public: virtual void stop() { pthread->stop(); }
+* public: virtual int cycle() {
+* // check status, start ffmpeg when stopped.
+* }
+* };
*
-* in the thread itself, that is the cycle method,
-* if itself want to terminate the thread, should never use stop(),
-* but use stop_loop() to set the loop to false and terminate normally.
+* Usage 2: stop by thread itself.
+* user can create thread which stop itself,
+* generally only need to provides a start method,
+* the object will destroy itself then terminate the thread, @see SrsConnection
+* 1. create SrsThread field, with joinable false.
+* 2. owner stop thread loop, destroy itself when thread stop.
+* for example:
+* class SrsConnection : public ISrsThreadHandler {
+* public: SrsConnection() { pthread = new SrsThread("conn", this, 0, false); }
+* public: virtual int start() { return pthread->start(); }
+* public: virtual int cycle() {
+* // serve client.
+* // set loop to stop to quit, stop thread itself.
+* pthread->stop_loop();
+* }
+* public: virtual int on_thread_stop() {
+* // remove the connection in thread itself.
+* server->remove(this);
+* }
+* };
+*
+* Usage 3: loop in the cycle method.
+* user can use loop code in the cycle method, @see SrsForwarder
+* 1. create SrsThread field, with or without joinable is ok.
+* 2. loop code in cycle method, check the can_loop() for thread to quit.
+* for example:
+* class SrsForwarder : public ISrsThreadHandler {
+* public: virtual int cycle() {
+* while (pthread->can_loop()) {
+* // read msgs from queue and forward to server.
+* }
+* }
+* };
+*
+* @remark why should check can_loop() in cycle method?
+* when thread interrupt, the socket maybe not got EINT,
+* espectially on st_usleep(), so the cycle must check the loop,
+* when handler->cycle() has loop itself, for example:
+* while (true):
+* if (read_from_socket(skt) < 0) break;
+* if thread stop when read_from_socket, it's ok, the loop will break,
+* but when thread stop interrupt the s_usleep(0), then the loop is
+* death loop.
+* in a word, the handler->cycle() must:
+* while (pthread->can_loop()):
+* if (read_from_socket(skt) < 0) break;
+* check the loop, then it works.
+*
+* @remark why should use stop_loop() to terminate thread in itself?
+* in the thread itself, that is the cycle method,
+* if itself want to terminate the thread, should never use stop(),
+* but use stop_loop() to set the loop to false and terminate normally.
+*
+* @remark when should set the interval_us, and when not?
+* the cycle will invoke util cannot loop, eventhough the return code of cycle is error,
+* so the interval_us used to sleep for each cycle.
*/
class ISrsThreadHandler
{
diff --git a/trunk/src/protocol/srs_rtsp_stack.cpp b/trunk/src/protocol/srs_rtsp_stack.cpp
new file mode 100644
index 0000000000..220e9a3931
--- /dev/null
+++ b/trunk/src/protocol/srs_rtsp_stack.cpp
@@ -0,0 +1,40 @@
+/*
+The MIT License (MIT)
+
+Copyright (c) 2013-2015 winlin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#include
+
+#include
+
+#ifdef SRS_AUTO_STREAM_CASTER
+
+SrsRtspStack::SrsRtspStack(ISrsProtocolReaderWriter* s)
+{
+ skt = s;
+}
+
+SrsRtspStack::~SrsRtspStack()
+{
+}
+
+#endif
+
diff --git a/trunk/src/protocol/srs_rtsp_stack.hpp b/trunk/src/protocol/srs_rtsp_stack.hpp
new file mode 100644
index 0000000000..a011e7c699
--- /dev/null
+++ b/trunk/src/protocol/srs_rtsp_stack.hpp
@@ -0,0 +1,55 @@
+/*
+The MIT License (MIT)
+
+Copyright (c) 2013-2015 winlin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#ifndef SRS_PROTOCOL_RTSP_STACK_HPP
+#define SRS_PROTOCOL_RTSP_STACK_HPP
+
+/*
+#include
+*/
+
+#include
+
+#ifdef SRS_AUTO_STREAM_CASTER
+
+class ISrsProtocolReaderWriter;
+
+/**
+* the rtsp protocol stack to parse the rtsp packets.
+*/
+class SrsRtspStack
+{
+private:
+ /**
+ * underlayer socket object, send/recv bytes.
+ */
+ ISrsProtocolReaderWriter* skt;
+public:
+ SrsRtspStack(ISrsProtocolReaderWriter* s);
+ virtual ~SrsRtspStack();
+};
+
+#endif
+
+#endif
+