2 * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include "MediaTransporterBase.h"
20 #include "MediaTransporterException.h"
21 #include "MediaTransporterSenderSrt.h"
22 #include "MediaTransporterLog.h"
23 #include "MediaTransporterUtil.h"
24 #include "MediaTransporterParseIni.h"
25 #include "MediaTransporterConnectionStats.h"
27 using namespace tizen_media_transporter;
28 using namespace tizen_media_transporter::param::srt;
30 constexpr unsigned int STATS_UPDATE_INTERVAL = 1000;
32 static void __callerAddedCb(GstElement* element, gint socket, GSocketAddress* address, gpointer userData)
34 g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)address));
36 LOG_INFO("socket %d, address %s", socket, ip_addr);
39 static void __callerRemovedCb(GstElement* element, gint socket, GSocketAddress* address, gpointer userData)
41 g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)address));
43 LOG_INFO("socket %d, address %s", socket, ip_addr);
46 static void __callerRejectedCb(GstElement* element, GSocketAddress* peerAddress, const gchar* stream_id, gpointer userData)
48 g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)peerAddress));
50 LOG_INFO("stream_id %s, peer_address %s", stream_id, ip_addr);
53 static void __callerConnectingCb(GstElement* element, GSocketAddress* peerAddress, const gchar* stream_id, gpointer userData)
55 g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)peerAddress));
57 LOG_INFO("stream_id %s, peer_address %s", stream_id, ip_addr);
60 void MediaTransporterSenderSrt::startStatsMonitoring() {
62 _statsMonitor = g_thread_try_new("stats_monitor", (GThreadFunc)_statsMonitorThread, (gpointer)this, &error);
65 LOG_ERROR("failed to create stats_monitor thread %s", error->message);
68 void MediaTransporterSenderSrt::stopStatsMonitoring() {
71 g_thread_join(_statsMonitor);
75 gpointer MediaTransporterSenderSrt::_statsMonitorThread(gpointer userData)
77 MediaTransporterSenderSrt* srt = static_cast<MediaTransporterSenderSrt*>(userData);
79 while (!srt->_threadExit) {
80 _updateStats(userData);
81 std::this_thread::sleep_for(std::chrono::milliseconds(STATS_UPDATE_INTERVAL));
87 gpointer MediaTransporterSenderSrt::_updateStats(gpointer userData)
89 GstStructure *structure = NULL;
92 MediaTransporterSenderSrt* srt = static_cast<MediaTransporterSenderSrt*>(userData);
94 g_object_get(srt->_srtSink, "stats", &structure, NULL);
95 g_object_get(srt->_srtSink, "latency", &latency, NULL);
97 str = gst_structure_to_string(structure);
98 SECURE_LOG_DEBUG("SENDER >>> latency %d, stats : %s", latency, str);
101 if (gst_structure_has_field(structure, "callers")) {
104 array = (GValueArray *)g_value_get_boxed(gst_structure_get_value(structure, "callers"));
106 LOG_DEBUG("SENDER >>> size of callers array : %d", array->n_values);
107 structure = (GstStructure *)g_value_get_boxed(&array->values[array->n_values - 1]);
110 gst_structure_get_int64(structure, "packets-sent", &srt->_stats.packetsSent);
111 gst_structure_get_int(structure, "packets-sent-lost", &srt->_stats.packetsSentLost);
112 gst_structure_get_double(structure, "send-rate-mbps", &srt->_stats.sendRateMbps);
113 gst_structure_get_double(structure, "bandwidth-mbps", &srt->_stats.bandwidthMbps);
114 gst_structure_get_double(structure, "rtt-ms", &srt->_stats.rttMs);
116 LOG_DEBUG("SENDER >>> packets_sent %" G_GINT64_FORMAT
117 ", packets-sent-lost %d, send-rate-mbps %lf, bandwidth-mbps %lf, rtt-ms %lf",
118 srt->_stats.packetsSent, srt->_stats.packetsSentLost,
119 srt->_stats.sendRateMbps, srt->_stats.bandwidthMbps,
125 MediaTransporterSenderSrt::MediaTransporterSenderSrt()
127 LOG_DEBUG("ctor: %p", this);
130 MediaTransporterSenderSrt::~MediaTransporterSenderSrt()
132 LOG_DEBUG("dtor: %p", this);
135 void MediaTransporterSenderSrt::buildPipeline()
137 GstElement* mux = NULL;
138 GstElement* sink = NULL;
140 if (_senderAddress.empty())
141 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "address is empty");
143 if (_mediaSources.empty())
144 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "source is empty");
146 /* create mux to sink */
148 mux = gst::_createElement(gst::DEFAULT_ELEMENT_TSMUX);
149 g_object_set(G_OBJECT(mux), "alignment", 7, NULL);
151 sink = gst::_createElement(gst::DEFAULT_ELEMENT_SRTSINK);
152 g_object_set(G_OBJECT(sink),
153 "uri", util::addProtocolPrefix(_senderAddress, "srt").c_str(),
154 "wait-for-connection", FALSE,
159 gst_bin_add_many(GST_BIN(_gst.pipeline), mux, sink, NULL);
160 if (!gst_element_link(mux, sink)) {
161 LOG_ERROR("failed to gst_element_link()");
162 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "failed to gst_element_link()");
165 if (_connectionParam.mode != MTPR_CONNECTION_SRT_MODE_NONE)
166 g_object_set(G_OBJECT(sink),
167 "mode", static_cast<int>(_connectionParam.mode), NULL);
169 if (_connectionParam.pbKeyLen != MTPR_CONNECTION_SRT_NO_KEY)
170 g_object_set(G_OBJECT(sink),
171 "pbkeylen", static_cast<int>(_connectionParam.pbKeyLen), NULL);
173 if (!_connectionParam.passPhrase.empty())
174 g_object_set(G_OBJECT(sink),
175 "passphrase", _connectionParam.passPhrase.c_str(), NULL);
177 if (!_connectionParam.streamId.empty())
178 g_object_set(G_OBJECT(sink),
179 "streamid", _connectionParam.streamId.c_str(), NULL);
181 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-added", G_CALLBACK(__callerAddedCb), &_gst);
182 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-removed", G_CALLBACK(__callerRemovedCb), &_gst);
183 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-rejected", G_CALLBACK(__callerRejectedCb), &_gst);
184 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-connecting", G_CALLBACK(__callerConnectingCb), &_gst);
186 linkMediaSourceToMuxer(mux);
189 LOG_INFO("linked mux and sink");
191 } catch (const MediaTransporterException& e) {
192 LOG_ERROR("%s", e.what());
194 gst::_disconnectSignal(&_gst.signals, G_OBJECT(sink));
196 gst::_destroyElementFromParent(mux);
197 gst::_destroyElementFromParent(sink);
203 void MediaTransporterSenderSrt::startPipeline()
205 gst::_setPipelineState(_gst.pipeline, GST_STATE_PLAYING,
206 MediaTransporterIni::get().general().timeout, _asyncStart);
207 startStatsMonitoring();
210 void MediaTransporterSenderSrt::stopPipeline()
212 stopStatsMonitoring();
215 gst::_setPipelineState(_gst.pipeline, GST_STATE_NULL,
216 MediaTransporterIni::get().general().timeout);
219 void MediaTransporterSenderSrt::setConnection(std::string name, std::string val)
221 setConnectionParam(name, val, &_connectionParam);
224 std::string MediaTransporterSenderSrt::getConnection(std::string name)
226 return getConnectionParam(_connectionParam, name);
229 void MediaTransporterSenderSrt::setSenderAddress(std::string address)
231 _senderAddress = address;
234 void MediaTransporterSenderSrt::foreachConnectionStats(InvokablePtr callback)
236 ConnectionStatsFactory::createSenderSrtConnectionStats(_srtSink, std::move(callback))->foreach();