2 * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include "MediaTransporterBase.h"
20 #include "MediaTransporterException.h"
21 #include "MediaTransporterSenderSrt.h"
22 #include "MediaTransporterLog.h"
23 #include "MediaTransporterUtil.h"
24 #include "MediaTransporterParseIni.h"
26 using namespace tizen_media_transporter;
27 using namespace tizen_media_transporter::param::srt;
29 constexpr unsigned int STATS_UPDATE_INTERVAL = 1000;
31 static const std::map<std::string, mtprConnectionStatsProp> __senderProp = {
32 { "packets-sent", MTPR_CONNECTION_STATS_PROP_PACKET_SENT },
33 { "packets-sent-lost", MTPR_CONNECTION_STATS_PROP_PACKET_SENT_LOST },
34 { "send-rate-mbps", MTPR_CONNECTION_STATS_PROP_SEND_RATE_MPBS },
35 { "negotiated-latency-ms", MTPR_CONNECTION_STATS_PROP_NEGOTIATED_LATENCY_MS },
36 { "bandwidth-mbps", MTPR_CONNECTION_STATS_PROP_BANDWIDTH_MPBS },
37 { "rtt-ms", MTPR_CONNECTION_STATS_PROP_RTT_MS },
38 { "caller-address", MTPR_CONNECTION_STATS_PROP_CALLER_ADDRESS },
39 { "bytes-sent-total", MTPR_CONNECTION_STATS_PROP_BYTES_SENT_TOTAL }};
41 static void __callerAddedCb(GstElement* element, gint socket, GSocketAddress* address, gpointer userData)
43 g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)address));
45 LOG_INFO("socket %d, address %s", socket, ip_addr);
48 static void __callerRemovedCb(GstElement* element, gint socket, GSocketAddress* address, gpointer userData)
50 g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)address));
52 LOG_INFO("socket %d, address %s", socket, ip_addr);
55 static void __callerRejectedCb(GstElement* element, GSocketAddress* peerAddress, const gchar* stream_id, gpointer userData)
57 g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)peerAddress));
59 LOG_INFO("stream_id %s, peer_address %s", stream_id, ip_addr);
62 static void __callerConnectingCb(GstElement* element, GSocketAddress* peerAddress, const gchar* stream_id, gpointer userData)
64 g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)peerAddress));
66 LOG_INFO("stream_id %s, peer_address %s", stream_id, ip_addr);
69 void MediaTransporterSenderSrt::startStatsMonitoring() {
71 _statsMonitor = g_thread_try_new("stats_monitor", (GThreadFunc)_statsMonitorThread, (gpointer)this, &error);
74 LOG_ERROR("failed to create stats_monitor thread %s", error->message);
77 void MediaTransporterSenderSrt::stopStatsMonitoring() {
80 g_thread_join(_statsMonitor);
84 gpointer MediaTransporterSenderSrt::_statsMonitorThread(gpointer userData)
86 MediaTransporterSenderSrt* srt = static_cast<MediaTransporterSenderSrt*>(userData);
88 while (!srt->_threadExit) {
89 _updateStats(userData);
90 std::this_thread::sleep_for(std::chrono::milliseconds(STATS_UPDATE_INTERVAL));
96 gpointer MediaTransporterSenderSrt::_updateStats(gpointer userData)
98 GstStructure *structure = NULL;
101 MediaTransporterSenderSrt* srt = static_cast<MediaTransporterSenderSrt*>(userData);
103 g_object_get(srt->_srtSink, "stats", &structure, NULL);
104 g_object_get(srt->_srtSink, "latency", &latency, NULL);
106 str = gst_structure_to_string(structure);
107 SECURE_LOG_DEBUG("SENDER >>> latency %d, stats : %s", latency, str);
110 if (gst_structure_has_field(structure, "callers")) {
113 array = (GValueArray *)g_value_get_boxed(gst_structure_get_value(structure, "callers"));
115 LOG_DEBUG("SENDER >>> size of callers array : %d", array->n_values);
116 structure = (GstStructure *)g_value_get_boxed(&array->values[array->n_values - 1]);
119 gst_structure_get_int64(structure, "packets-sent", &srt->_stats.packetsSent);
120 gst_structure_get_int(structure, "packets-sent-lost", &srt->_stats.packetsSentLost);
121 gst_structure_get_double(structure, "send-rate-mbps", &srt->_stats.sendRateMbps);
122 gst_structure_get_double(structure, "bandwidth-mbps", &srt->_stats.bandwidthMbps);
123 gst_structure_get_double(structure, "rtt-ms", &srt->_stats.rttMs);
125 LOG_DEBUG("SENDER >>> packets_sent %" G_GINT64_FORMAT
126 ", packets-sent-lost %d, send-rate-mbps %lf, bandwidth-mbps %lf, rtt-ms %lf",
127 srt->_stats.packetsSent, srt->_stats.packetsSentLost,
128 srt->_stats.sendRateMbps, srt->_stats.bandwidthMbps,
134 MediaTransporterSenderSrt::MediaTransporterSenderSrt()
136 LOG_DEBUG("ctor: %p", this);
139 MediaTransporterSenderSrt::~MediaTransporterSenderSrt()
141 LOG_DEBUG("dtor: %p", this);
144 void MediaTransporterSenderSrt::buildPipeline()
146 GstElement* mux = NULL;
147 GstElement* sink = NULL;
149 if (_senderAddress.empty())
150 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "address is empty");
152 if (_mediaSources.empty())
153 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "source is empty");
155 /* create mux to sink */
157 mux = gst::_createElement(gst::DEFAULT_ELEMENT_TSMUX);
158 g_object_set(G_OBJECT(mux), "alignment", 7, NULL);
160 sink = gst::_createElement(gst::DEFAULT_ELEMENT_SRTSINK);
161 g_object_set(G_OBJECT(sink),
162 "uri", util::addProtocolPrefix(_senderAddress, "srt").c_str(),
163 "wait-for-connection", FALSE,
168 gst_bin_add_many(GST_BIN(_gst.pipeline), mux, sink, NULL);
169 if (!gst_element_link(mux, sink)) {
170 LOG_ERROR("failed to gst_element_link()");
171 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "failed to gst_element_link()");
174 if (_connectionParam.mode != MTPR_CONNECTION_SRT_MODE_NONE)
175 g_object_set(G_OBJECT(sink),
176 "mode", static_cast<int>(_connectionParam.mode), NULL);
178 if (_connectionParam.pbKeyLen != MTPR_CONNECTION_SRT_NO_KEY)
179 g_object_set(G_OBJECT(sink),
180 "pbkeylen", static_cast<int>(_connectionParam.pbKeyLen), NULL);
182 if (!_connectionParam.passPhrase.empty())
183 g_object_set(G_OBJECT(sink),
184 "passphrase", _connectionParam.passPhrase.c_str(), NULL);
186 if (!_connectionParam.streamId.empty())
187 g_object_set(G_OBJECT(sink),
188 "streamid", _connectionParam.streamId.c_str(), NULL);
190 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-added", G_CALLBACK(__callerAddedCb), &_gst);
191 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-removed", G_CALLBACK(__callerRemovedCb), &_gst);
192 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-rejected", G_CALLBACK(__callerRejectedCb), &_gst);
193 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-connecting", G_CALLBACK(__callerConnectingCb), &_gst);
195 linkMediaSourceToMuxer(mux);
198 LOG_INFO("linked mux and sink");
200 } catch (const MediaTransporterException& e) {
201 LOG_ERROR("%s", e.what());
203 gst::_disconnectSignal(&_gst.signals, G_OBJECT(sink));
205 gst::_destroyElementFromParent(mux);
206 gst::_destroyElementFromParent(sink);
212 void MediaTransporterSenderSrt::startPipeline()
214 gst::_setPipelineState(_gst.pipeline, GST_STATE_PLAYING,
215 MediaTransporterIni::get().general().timeout, _asyncStart);
216 startStatsMonitoring();
219 void MediaTransporterSenderSrt::stopPipeline()
221 stopStatsMonitoring();
224 gst::_setPipelineState(_gst.pipeline, GST_STATE_NULL,
225 MediaTransporterIni::get().general().timeout);
228 void MediaTransporterSenderSrt::setConnection(std::string name, std::string val)
230 setConnectionParam(name, val, &_connectionParam);
233 std::string MediaTransporterSenderSrt::getConnection(std::string name)
235 return getConnectionParam(_connectionParam, name);
238 void MediaTransporterSenderSrt::setSenderAddress(std::string address)
240 _senderAddress = address;
243 gboolean MediaTransporterSenderSrt::_statsForeachCb(GQuark fieldId, const GValue* val, gpointer userData)
245 auto callback = static_cast<IInvokable*>(userData);
246 std::string fieldName = g_quark_to_string(fieldId);
249 !callback->invoke(fieldName, val, __senderProp)) {
250 LOG_WARNING("stop calling stats callback");
257 void MediaTransporterSenderSrt::foreachConnectionStats(InvokablePtr callback)
259 const GstStructure* structure = NULL;
261 g_object_get(_srtSink, "stats", &structure, NULL);
262 gchar* str = gst_structure_to_string(structure);
263 SECURE_LOG_DEBUG("SENDER >>> stats : %s", str);
266 gboolean ret = FALSE;
268 if (gst_structure_has_field(structure, "callers")) {
269 GValueArray* callers = (GValueArray *)g_value_get_boxed(gst_structure_get_value(structure, "callers"));
271 LOG_DEBUG("SENDER >>> num of callers : %d", callers->n_values);
272 for (int i = 0; i < (int)callers->n_values; i++) {
273 const GstStructure* callerStats = (GstStructure *)g_value_get_boxed(&callers->values[i]);
275 LOG_DEBUG("SENDER >>> caller idx : %d", i);
276 ret = gst_structure_foreach(callerStats, _statsForeachCb, callback.get());
282 if (ret && gst_structure_has_field(structure, "bytes-sent-total"))
283 callback->invoke("bytes-sent-total",
284 gst_structure_get_value(structure, "bytes-sent-total"), __senderProp);