e595073ad060222f994c91b85aa6005a9c9c8332
[platform/core/api/mediatransporter.git] / src / MediaTransporterSenderSrt.cpp
1 /**
2  * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <gio/gio.h>
18
19 #include "MediaTransporterBase.h"
20 #include "MediaTransporterException.h"
21 #include "MediaTransporterSenderSrt.h"
22 #include "MediaTransporterLog.h"
23 #include "MediaTransporterUtil.h"
24 #include "MediaTransporterParseIni.h"
25
26 using namespace tizen_media_transporter;
27 using namespace tizen_media_transporter::param::srt;
28
29 constexpr unsigned int STATS_UPDATE_INTERVAL = 1000;
30
31 static const std::map<std::string, mtprConnectionStatsProp> __senderProp = {
32         { "packets-sent", MTPR_CONNECTION_STATS_PROP_PACKET_SENT },
33         { "packets-sent-lost", MTPR_CONNECTION_STATS_PROP_PACKET_SENT_LOST },
34         { "send-rate-mbps", MTPR_CONNECTION_STATS_PROP_SEND_RATE_MPBS },
35         { "negotiated-latency-ms", MTPR_CONNECTION_STATS_PROP_NEGOTIATED_LATENCY_MS },
36         { "bandwidth-mbps", MTPR_CONNECTION_STATS_PROP_BANDWIDTH_MPBS },
37         { "rtt-ms", MTPR_CONNECTION_STATS_PROP_RTT_MS },
38         { "caller-address", MTPR_CONNECTION_STATS_PROP_CALLER_ADDRESS },
39         { "bytes-sent-total", MTPR_CONNECTION_STATS_PROP_BYTES_SENT_TOTAL }};
40
41 static void __callerAddedCb(GstElement* element, gint socket, GSocketAddress* address, gpointer userData)
42 {
43         g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)address));
44
45         LOG_INFO("socket %d, address %s", socket, ip_addr);
46 }
47
48 static void __callerRemovedCb(GstElement* element, gint socket, GSocketAddress* address, gpointer userData)
49 {
50         g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)address));
51
52         LOG_INFO("socket %d, address %s", socket, ip_addr);
53 }
54
55 static void __callerRejectedCb(GstElement* element, GSocketAddress* peerAddress, const gchar* stream_id, gpointer userData)
56 {
57         g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)peerAddress));
58
59         LOG_INFO("stream_id %s, peer_address %s", stream_id, ip_addr);
60 }
61
62 static void __callerConnectingCb(GstElement* element, GSocketAddress* peerAddress, const gchar* stream_id, gpointer userData)
63 {
64         g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)peerAddress));
65
66         LOG_INFO("stream_id %s, peer_address %s", stream_id, ip_addr);
67 }
68
69 void MediaTransporterSenderSrt::startStatsMonitoring() {
70         GError* error = NULL;
71         _statsMonitor = g_thread_try_new("stats_monitor", (GThreadFunc)_statsMonitorThread, (gpointer)this, &error);
72
73         if (error)
74                 LOG_ERROR("failed to create stats_monitor thread %s", error->message);
75 }
76
77 void MediaTransporterSenderSrt::stopStatsMonitoring() {
78         _threadExit = true;
79         if (_statsMonitor)
80                 g_thread_join(_statsMonitor);
81         _statsMonitor = NULL;
82 }
83
84 gpointer MediaTransporterSenderSrt::_statsMonitorThread(gpointer userData)
85 {
86         MediaTransporterSenderSrt* srt = static_cast<MediaTransporterSenderSrt*>(userData);
87
88         while (!srt->_threadExit) {
89                 _updateStats(userData);
90                 std::this_thread::sleep_for(std::chrono::milliseconds(STATS_UPDATE_INTERVAL));
91         }
92
93         return NULL;
94 }
95
96 gpointer MediaTransporterSenderSrt::_updateStats(gpointer userData)
97 {
98         GstStructure *structure = NULL;
99         gchar *str = NULL;
100         gint latency = 0;
101         MediaTransporterSenderSrt* srt = static_cast<MediaTransporterSenderSrt*>(userData);
102
103         g_object_get(srt->_srtSink, "stats", &structure, NULL);
104         g_object_get(srt->_srtSink, "latency", &latency, NULL);
105
106         str = gst_structure_to_string(structure);
107         SECURE_LOG_DEBUG("SENDER >>> latency %d, stats : %s", latency, str);
108         g_free(str);
109
110         if (gst_structure_has_field(structure, "callers")) {
111           GValueArray *array;
112
113           array = (GValueArray *)g_value_get_boxed(gst_structure_get_value(structure, "callers"));
114
115           LOG_DEBUG("SENDER >>> size of callers array : %d", array->n_values);
116           structure = (GstStructure *)g_value_get_boxed(&array->values[array->n_values - 1]);
117         }
118
119         gst_structure_get_int64(structure, "packets-sent", &srt->_stats.packetsSent);
120         gst_structure_get_int(structure, "packets-sent-lost", &srt->_stats.packetsSentLost);
121         gst_structure_get_double(structure, "send-rate-mbps", &srt->_stats.sendRateMbps);
122         gst_structure_get_double(structure, "bandwidth-mbps", &srt->_stats.bandwidthMbps);
123         gst_structure_get_double(structure, "rtt-ms", &srt->_stats.rttMs);
124
125         LOG_DEBUG("SENDER >>> packets_sent %" G_GINT64_FORMAT
126                 ", packets-sent-lost %d, send-rate-mbps %lf, bandwidth-mbps %lf, rtt-ms %lf",
127                 srt->_stats.packetsSent, srt->_stats.packetsSentLost,
128                 srt->_stats.sendRateMbps, srt->_stats.bandwidthMbps,
129                 srt->_stats.rttMs);
130
131         return NULL;
132 }
133
134 MediaTransporterSenderSrt::MediaTransporterSenderSrt()
135 {
136         LOG_DEBUG("ctor: %p", this);
137 }
138
139 MediaTransporterSenderSrt::~MediaTransporterSenderSrt()
140 {
141         LOG_DEBUG("dtor: %p", this);
142 }
143
144 void MediaTransporterSenderSrt::buildPipeline()
145 {
146         GstElement* mux = NULL;
147         GstElement* sink = NULL;
148
149         if (_senderAddress.empty())
150                 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "address is empty");
151
152         if (_mediaSources.empty())
153                 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "source is empty");
154
155         /* create mux to sink */
156         try {
157                 mux = gst::_createElement(gst::DEFAULT_ELEMENT_TSMUX);
158                 g_object_set(G_OBJECT(mux), "alignment", 7, NULL);
159
160                 sink = gst::_createElement(gst::DEFAULT_ELEMENT_SRTSINK);
161                 g_object_set(G_OBJECT(sink),
162                         "uri", util::addProtocolPrefix(_senderAddress, "srt").c_str(),
163                         "wait-for-connection", FALSE,
164                         "sync", FALSE,
165                         "latency", 10,
166                         NULL);
167
168                 gst_bin_add_many(GST_BIN(_gst.pipeline), mux, sink, NULL);
169                 if (!gst_element_link(mux, sink)) {
170                         LOG_ERROR("failed to gst_element_link()");
171                         throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "failed to gst_element_link()");
172                 }
173
174                 if (_connectionParam.mode != MTPR_CONNECTION_SRT_MODE_NONE)
175                         g_object_set(G_OBJECT(sink),
176                                 "mode", static_cast<int>(_connectionParam.mode), NULL);
177
178                 if (_connectionParam.pbKeyLen != MTPR_CONNECTION_SRT_NO_KEY)
179                         g_object_set(G_OBJECT(sink),
180                                 "pbkeylen", static_cast<int>(_connectionParam.pbKeyLen), NULL);
181
182                 if (!_connectionParam.passPhrase.empty())
183                         g_object_set(G_OBJECT(sink),
184                                 "passphrase", _connectionParam.passPhrase.c_str(), NULL);
185
186                 if (!_connectionParam.streamId.empty())
187                         g_object_set(G_OBJECT(sink),
188                                 "streamid", _connectionParam.streamId.c_str(), NULL);
189
190                 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-added", G_CALLBACK(__callerAddedCb), &_gst);
191                 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-removed", G_CALLBACK(__callerRemovedCb), &_gst);
192                 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-rejected", G_CALLBACK(__callerRejectedCb), &_gst);
193                 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-connecting", G_CALLBACK(__callerConnectingCb), &_gst);
194
195                 linkMediaSourceToMuxer(mux);
196
197                 _srtSink = sink;
198                 LOG_INFO("linked mux and sink");
199
200         } catch (const MediaTransporterException& e) {
201                 LOG_ERROR("%s", e.what());
202
203                 gst::_disconnectSignal(&_gst.signals, G_OBJECT(sink));
204
205                 gst::_destroyElementFromParent(mux);
206                 gst::_destroyElementFromParent(sink);
207
208                 throw;
209         }
210 }
211
212 void MediaTransporterSenderSrt::startPipeline()
213 {
214         gst::_setPipelineState(_gst.pipeline, GST_STATE_PLAYING,
215                                                 MediaTransporterIni::get().general().timeout, _asyncStart);
216         startStatsMonitoring();
217 }
218
219 void MediaTransporterSenderSrt::stopPipeline()
220 {
221         stopStatsMonitoring();
222
223         _srtSink = NULL;
224         gst::_setPipelineState(_gst.pipeline, GST_STATE_NULL,
225                                                                 MediaTransporterIni::get().general().timeout);
226 }
227
228 void MediaTransporterSenderSrt::setConnection(std::string name, std::string val)
229 {
230         setConnectionParam(name, val, &_connectionParam);
231 }
232
233 std::string MediaTransporterSenderSrt::getConnection(std::string name)
234 {
235         return getConnectionParam(_connectionParam, name);
236 }
237
238 void MediaTransporterSenderSrt::setSenderAddress(std::string address)
239 {
240         _senderAddress = address;
241 }
242
243 gboolean MediaTransporterSenderSrt::_statsForeachCb(GQuark fieldId, const GValue* val, gpointer userData)
244 {
245         auto callback = static_cast<IInvokable*>(userData);
246         std::string fieldName = g_quark_to_string(fieldId);
247
248         if (!callback ||
249                 !callback->invoke(fieldName, val, __senderProp)) {
250                 LOG_WARNING("stop calling stats callback");
251                 return FALSE;
252         }
253
254         return TRUE;
255 }
256
257 void MediaTransporterSenderSrt::foreachConnectionStats(InvokablePtr callback)
258 {
259         const GstStructure* structure = NULL;
260
261         g_object_get(_srtSink, "stats", &structure, NULL);
262         gchar* str = gst_structure_to_string(structure);
263         SECURE_LOG_DEBUG("SENDER >>> stats : %s", str);
264         g_free(str);
265
266         gboolean ret = FALSE;
267
268         if (gst_structure_has_field(structure, "callers")) {
269                 GValueArray* callers = (GValueArray *)g_value_get_boxed(gst_structure_get_value(structure, "callers"));
270
271                 LOG_DEBUG("SENDER >>> num of callers : %d", callers->n_values);
272                 for (int i = 0; i < (int)callers->n_values; i++) {
273                         const GstStructure* callerStats = (GstStructure *)g_value_get_boxed(&callers->values[i]);
274
275                         LOG_DEBUG("SENDER >>> caller idx : %d", i);
276                         ret = gst_structure_foreach(callerStats, _statsForeachCb, callback.get());
277                         if (!ret)
278                                 break;
279                 }
280         }
281
282         if (ret && gst_structure_has_field(structure, "bytes-sent-total"))
283                 callback->invoke("bytes-sent-total",
284                                 gst_structure_get_value(structure, "bytes-sent-total"), __senderProp);
285
286 }