* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <string>
-#include <gst/rtsp-server/rtsp-server.h>
-#include <gst/rtsp-server/rtsp-media-factory-mtpr.h>
#include "MediaTransporterBase.h"
#include "MediaTransporterException.h"
#define ADDR_PATH_DELIM "/"
#define ADDR_RTSP_PREFIX "rtsp://"
-
-
static GstRTSPFilterResult
-__clientFilter(GstRTSPServer *server, GstRTSPClient *client, gpointer userData)
+__clientFilter(GstRTSPServer* server, GstRTSPClient* client, gpointer userData)
{
/* Simple filter that shuts down all clients. */
return GST_RTSP_FILTER_REMOVE;
void MediaTransporterSenderRtsp::stopPipeline()
{
- stopRtspServer();
-}
-
-void MediaTransporterSenderRtsp::setSenderAddress(std::string address)
-{
- _senderAddress = address; // rtsp server addr
-}
-
-static void __clientConnectedCb(GstRTSPServer* server, GstRTSPClient* client)
-{
- LOG_DEBUG("client connected %p", client);
-}
-
-static void __mediaConstructedCb(GstRTSPMediaFactory* factory, GstRTSPMedia* media, gpointer userData)
-{
- LOG_DEBUG("media %p is constructed and has %u streams", media, gst_rtsp_media_n_streams(media));
-}
-
-static void __onSsrcActive(GObject* session, GObject* source, GstRTSPMedia* media)
-{
- GstStructure* stats;
-
- LOG_DEBUG("source %p in session %p is active", source, session);
-
- g_object_get(source, "stats", &stats, NULL);
- if (stats) {
- gchar* sstr;
-
- sstr = gst_structure_to_string(stats);
- LOG_DEBUG("structure: %s", sstr);
- g_free(sstr);
-
- gst_structure_free(stats);
- }
-}
-
-static void __onSenderSsrcActive(GObject* session, GObject* source, GstRTSPMedia* media)
-{
- GstStructure* stats;
-
- LOG_DEBUG("source %p in session %p is active", source, session);
-
- g_object_get(source, "stats", &stats, NULL);
- if (stats) {
- gchar* sstr;
+ if (_rtspPipeline) {
+ GstBus* bus = gst_pipeline_get_bus(GST_PIPELINE(_rtspPipeline));
+ gst_bus_set_sync_handler(bus, NULL, NULL, NULL);
+ gst_object_unref(bus);
- sstr = gst_structure_to_string(stats);
- LOG_DEBUG("Sender stats:\nstructure: %s", sstr);
- g_free(sstr);
-
- gst_structure_free(stats);
+ gst_object_unref(_rtspPipeline);
+ _rtspPipeline = NULL;
}
-}
-
-static void __mediaPreparedCb(GstRTSPMedia* media)
-{
- guint i, n_streams;
-
- n_streams = gst_rtsp_media_n_streams(media);
-
- LOG_DEBUG("media %p is prepared and has %u streams", media, n_streams);
- for (i = 0; i < n_streams; i++) {
- GstRTSPStream *stream;
- GObject *session;
-
- stream = gst_rtsp_media_get_stream(media, i);
- if (!stream)
- continue;
-
- session = gst_rtsp_stream_get_rtpsession(stream);
- LOG_DEBUG("watching session %p on stream %u", session, i);
-
- g_signal_connect(session, "on-ssrc-active",
- G_CALLBACK(__onSsrcActive), media);
- g_signal_connect(session, "on-sender-ssrc-active",
- G_CALLBACK(__onSenderSsrcActive), media);
- }
+ stopRtspServer();
+ removeIdleEventSource();
}
-static void __mediaConfigureCb(GstRTSPMediaFactory* factory, GstRTSPMedia* media)
+void MediaTransporterSenderRtsp::setSenderAddress(std::string address)
{
- LOG_DEBUG("media %p is prepared and has %u streams", media, gst_rtsp_media_n_streams(media));
- g_signal_connect(media, "prepared", G_CALLBACK(__mediaPreparedCb), factory);
+ _senderAddress = address; // rtsp server addr
}
void MediaTransporterSenderRtsp::startRtspServer()
{
- GstRTSPMountPoints *mounts;
- GstRTSPMediaFactoryMTPR *factoryMtpr;
- GstRTSPServer *server = NULL;
+ GstRTSPMountPoints* mounts = NULL;
+ GstRTSPMediaFactoryMTPR* factoryMtpr = NULL;
+ GstRTSPServer* server = NULL;
std::string rtspPrefix("rtsp://");
std::string address;
std::string ipAddr;
gst_rtsp_server_set_address(GST_RTSP_SERVER(server), ipAddr.c_str());
gst_rtsp_server_set_service(GST_RTSP_SERVER(server), portNum.c_str());
- g_signal_connect(server, "client-connected", G_CALLBACK(__clientConnectedCb), NULL);
+ g_signal_connect(server, "client-connected", G_CALLBACK(_clientConnectedCb), NULL);
mounts = gst_rtsp_server_get_mount_points(server);
factoryMtpr = gst_rtsp_media_factory_mtpr_new();
- g_object_ref(_gst.pipeline);
+ gst_object_ref(_gst.pipeline);
gst_rtsp_media_factory_mtpr_set_custom_element(factoryMtpr, GST_ELEMENT(_gst.pipeline));
gst::_generateDot(_gst.pipeline, "rtsp-sender");
gst_rtsp_mount_points_add_factory(mounts, mountPoint.c_str(), GST_RTSP_MEDIA_FACTORY(factoryMtpr));
g_object_unref(mounts);
- g_signal_connect(GST_RTSP_MEDIA_FACTORY(factoryMtpr), "media-constructed", G_CALLBACK(__mediaConstructedCb), NULL);
- g_signal_connect(GST_RTSP_MEDIA_FACTORY(factoryMtpr), "media-configure", G_CALLBACK(__mediaConfigureCb), NULL);
+ g_signal_connect(GST_RTSP_MEDIA_FACTORY(factoryMtpr), "media-constructed", G_CALLBACK(_mediaConstructedCb), this);
+ g_signal_connect(GST_RTSP_MEDIA_FACTORY(factoryMtpr), "media-configure", G_CALLBACK(_mediaConfigureCb), this);
if ((_sourceId = gst_rtsp_server_attach(server, NULL)) == 0) {
LOG_ERROR("failed to attach server to default context");
void MediaTransporterSenderRtsp::stopRtspServer()
{
-
if (!_rtspServer) {
LOG_ERROR("there is no running server");
throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "there is no running server");
_rtspServer = NULL;
_rtspMountPoint.clear();
}
+
+void MediaTransporterSenderRtsp::_clientConnectedCb(GstRTSPServer* server, GstRTSPClient* client)
+{
+ LOG_DEBUG("client connected %p", client);
+}
+
+void MediaTransporterSenderRtsp::_mediaConstructedCb(GstRTSPMediaFactory* factory, GstRTSPMedia* media, gpointer userData)
+{
+ LOG_DEBUG("media %p is constructed and has %u streams", media, gst_rtsp_media_n_streams(media));
+}
+
+void MediaTransporterSenderRtsp::_mediaConfigureCb(GstRTSPMediaFactory* factory, GstRTSPMedia* media, gpointer userData)
+{
+ LOG_DEBUG("media %p is prepared and has %u streams", media, gst_rtsp_media_n_streams(media));
+ g_signal_connect(media, "prepared", G_CALLBACK(_mediaPreparedCb), userData);
+}
+
+void MediaTransporterSenderRtsp::_mediaPreparedCb(GstRTSPMedia* media, gpointer userData)
+{
+ guint i, n_streams;
+ auto handle = static_cast<MediaTransporterSenderRtsp*>(userData);
+
+ n_streams = gst_rtsp_media_n_streams(media);
+ LOG_DEBUG("media %p is prepared and has %u streams", media, n_streams);
+
+ for (i = 0; i < n_streams; i++) {
+ GstRTSPStream* stream = gst_rtsp_media_get_stream(media, i);
+ if (!stream)
+ continue;
+
+ GObject* session = gst_rtsp_stream_get_rtpsession(stream);
+ LOG_DEBUG("watching session %p on stream %u", session, i);
+
+ g_signal_connect(session, "on-ssrc-active",
+ G_CALLBACK(_onSsrcActive), media);
+ g_signal_connect(session, "on-sender-ssrc-active",
+ G_CALLBACK(_onSenderSsrcActive), media);
+ }
+
+ /* REF: pipeline ref count +1, it will be unref at stopPipeline() */
+ handle->_rtspPipeline = gst_rtsp_media_get_pipeline(media);
+
+ GstBus* bus = gst_pipeline_get_bus(GST_PIPELINE(handle->_rtspPipeline));
+ gst_bus_set_sync_handler(bus, (GstBusSyncHandler)_busSyncCb, userData, NULL);
+ gst_object_unref(bus);
+
+ gst::_generateDot(handle->_rtspPipeline, "rtsp-sender-prepared");
+}
+
+/* received RTCP packet from the client */
+void MediaTransporterSenderRtsp::_onSsrcActive(GObject* session, GObject* source, GstRTSPMedia* media)
+{
+ GstStructure* stats;
+
+ LOG_DEBUG("source %p in session %p is active", source, session);
+
+ g_object_get(source, "stats", &stats, NULL);
+ RET_IF(!stats, "stats is NULL");
+
+ gchar* sstr = gst_structure_to_string(stats);
+ LOG_DEBUG("structure: %s", sstr);
+ g_free(sstr);
+
+ gst_structure_free(stats);
+}
+
+void MediaTransporterSenderRtsp::_onSenderSsrcActive(GObject* session, GObject* source, GstRTSPMedia* media)
+{
+ GstStructure* stats;
+
+ LOG_DEBUG("source %p in session %p is active", source, session);
+
+ g_object_get(source, "stats", &stats, NULL);
+ RET_IF(!stats, "stats is NULL");
+
+ gchar* sstr = gst_structure_to_string(stats);
+ LOG_DEBUG("Sender stats:\nstructure: %s", sstr);
+ g_free(sstr);
+
+ gst_structure_free(stats);
+}
+
+GstBusSyncReply MediaTransporterSenderRtsp::_busSyncCb(GstBus* bus, GstMessage* message, gpointer userData)
+{
+ GstMessageType type = GST_MESSAGE_TYPE(message);
+ auto handle = static_cast<MediaTransporterSenderRtsp*>(userData);
+
+ switch (type) {
+ case GST_MESSAGE_ERROR: {
+ GError* error = NULL;
+
+ gst_message_parse_error(message, &error, NULL);
+
+ LOG_ERROR("Error[from %s]: message[%s], code[%d]",
+ GST_OBJECT_NAME(GST_OBJECT_CAST(GST_ELEMENT(GST_MESSAGE_SRC(message)))), error->message, error->code);
+
+ handle->postMsgOnIdle(message);
+ g_error_free (error);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return GST_BUS_PASS;
+}
+
+gboolean MediaTransporterSenderRtsp::_idleCb(gpointer userData)
+{
+ auto data = static_cast<idleData*>(userData);
+ RET_VAL_IF(!data, G_SOURCE_REMOVE, "userData is NULL");
+
+ auto handle = static_cast<MediaTransporterSenderRtsp*>(data->handle);
+ RET_VAL_IF(!handle, G_SOURCE_REMOVE, "handle is NULL");
+
+ std::unique_lock<std::mutex> mutex(handle->_idleEventMutex);
+
+ LOG_DEBUG("_idleSourceId %d will be handled", handle->_idleSourceId);
+ handle->_idleSourceId = 0;
+
+ mutex.unlock();
+
+ if (handle->_errorCallback)
+ handle->_errorCallback->invoke(VariantData{data->error});
+
+ return G_SOURCE_REMOVE;
+}
+
+void MediaTransporterSenderRtsp::postMsgOnIdle(GstMessage* message)
+{
+ RET_IF(!message, "message is NULL");
+
+ idleData* data = g_new0(idleData, 1);
+ data->handle = (void *)this;
+ data->error = MTPR_ERROR_INVALID_OPERATION; /* FIXME: error code could be different depends on the message */
+
+ std::lock_guard<std::mutex> mutex(_idleEventMutex);
+ removeIdleEventSource();
+ _idleSourceId = g_idle_add_full(G_PRIORITY_DEFAULT_IDLE, _idleCb, data, g_free);
+
+ LOG_DEBUG("_idleSourceId %d", _idleSourceId);
+}
+
+void MediaTransporterSenderRtsp::removeIdleEventSource()
+{
+ RET_IF(_idleSourceId == 0, "_idleSourceId is 0");
+
+ g_source_remove(_idleSourceId);
+ LOG_DEBUG("_idleSourceId %d is removed", _idleSourceId);
+ _idleSourceId = 0;
+}