[1.0.27] Add ConnectionStats abstraction
[platform/core/api/mediatransporter.git] / src / MediaTransporterSenderSrt.cpp
1 /**
2  * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <gio/gio.h>
18
19 #include "MediaTransporterBase.h"
20 #include "MediaTransporterException.h"
21 #include "MediaTransporterSenderSrt.h"
22 #include "MediaTransporterLog.h"
23 #include "MediaTransporterUtil.h"
24 #include "MediaTransporterParseIni.h"
25 #include "MediaTransporterConnectionStats.h"
26
27 using namespace tizen_media_transporter;
28 using namespace tizen_media_transporter::param::srt;
29
30 constexpr unsigned int STATS_UPDATE_INTERVAL = 1000;
31
32 static void __callerAddedCb(GstElement* element, gint socket, GSocketAddress* address, gpointer userData)
33 {
34         g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)address));
35
36         LOG_INFO("socket %d, address %s", socket, ip_addr);
37 }
38
39 static void __callerRemovedCb(GstElement* element, gint socket, GSocketAddress* address, gpointer userData)
40 {
41         g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)address));
42
43         LOG_INFO("socket %d, address %s", socket, ip_addr);
44 }
45
46 static void __callerRejectedCb(GstElement* element, GSocketAddress* peerAddress, const gchar* stream_id, gpointer userData)
47 {
48         g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)peerAddress));
49
50         LOG_INFO("stream_id %s, peer_address %s", stream_id, ip_addr);
51 }
52
53 static void __callerConnectingCb(GstElement* element, GSocketAddress* peerAddress, const gchar* stream_id, gpointer userData)
54 {
55         g_autofree gchar* ip_addr = g_inet_address_to_string(g_inet_socket_address_get_address((GInetSocketAddress*)peerAddress));
56
57         LOG_INFO("stream_id %s, peer_address %s", stream_id, ip_addr);
58 }
59
60 void MediaTransporterSenderSrt::startStatsMonitoring() {
61         GError* error = NULL;
62         _statsMonitor = g_thread_try_new("stats_monitor", (GThreadFunc)_statsMonitorThread, (gpointer)this, &error);
63
64         if (error)
65                 LOG_ERROR("failed to create stats_monitor thread %s", error->message);
66 }
67
68 void MediaTransporterSenderSrt::stopStatsMonitoring() {
69         _threadExit = true;
70         if (_statsMonitor)
71                 g_thread_join(_statsMonitor);
72         _statsMonitor = NULL;
73 }
74
75 gpointer MediaTransporterSenderSrt::_statsMonitorThread(gpointer userData)
76 {
77         MediaTransporterSenderSrt* srt = static_cast<MediaTransporterSenderSrt*>(userData);
78
79         while (!srt->_threadExit) {
80                 _updateStats(userData);
81                 std::this_thread::sleep_for(std::chrono::milliseconds(STATS_UPDATE_INTERVAL));
82         }
83
84         return NULL;
85 }
86
87 gpointer MediaTransporterSenderSrt::_updateStats(gpointer userData)
88 {
89         GstStructure *structure = NULL;
90         gchar *str = NULL;
91         gint latency = 0;
92         MediaTransporterSenderSrt* srt = static_cast<MediaTransporterSenderSrt*>(userData);
93
94         g_object_get(srt->_srtSink, "stats", &structure, NULL);
95         g_object_get(srt->_srtSink, "latency", &latency, NULL);
96
97         str = gst_structure_to_string(structure);
98         SECURE_LOG_DEBUG("SENDER >>> latency %d, stats : %s", latency, str);
99         g_free(str);
100
101         if (gst_structure_has_field(structure, "callers")) {
102           GValueArray *array;
103
104           array = (GValueArray *)g_value_get_boxed(gst_structure_get_value(structure, "callers"));
105
106           LOG_DEBUG("SENDER >>> size of callers array : %d", array->n_values);
107           structure = (GstStructure *)g_value_get_boxed(&array->values[array->n_values - 1]);
108         }
109
110         gst_structure_get_int64(structure, "packets-sent", &srt->_stats.packetsSent);
111         gst_structure_get_int(structure, "packets-sent-lost", &srt->_stats.packetsSentLost);
112         gst_structure_get_double(structure, "send-rate-mbps", &srt->_stats.sendRateMbps);
113         gst_structure_get_double(structure, "bandwidth-mbps", &srt->_stats.bandwidthMbps);
114         gst_structure_get_double(structure, "rtt-ms", &srt->_stats.rttMs);
115
116         LOG_DEBUG("SENDER >>> packets_sent %" G_GINT64_FORMAT
117                 ", packets-sent-lost %d, send-rate-mbps %lf, bandwidth-mbps %lf, rtt-ms %lf",
118                 srt->_stats.packetsSent, srt->_stats.packetsSentLost,
119                 srt->_stats.sendRateMbps, srt->_stats.bandwidthMbps,
120                 srt->_stats.rttMs);
121
122         return NULL;
123 }
124
125 MediaTransporterSenderSrt::MediaTransporterSenderSrt()
126 {
127         LOG_DEBUG("ctor: %p", this);
128 }
129
130 MediaTransporterSenderSrt::~MediaTransporterSenderSrt()
131 {
132         LOG_DEBUG("dtor: %p", this);
133 }
134
135 void MediaTransporterSenderSrt::buildPipeline()
136 {
137         GstElement* mux = NULL;
138         GstElement* sink = NULL;
139
140         if (_senderAddress.empty())
141                 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "address is empty");
142
143         if (_mediaSources.empty())
144                 throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "source is empty");
145
146         /* create mux to sink */
147         try {
148                 mux = gst::_createElement(gst::DEFAULT_ELEMENT_TSMUX);
149                 g_object_set(G_OBJECT(mux), "alignment", 7, NULL);
150
151                 sink = gst::_createElement(gst::DEFAULT_ELEMENT_SRTSINK);
152                 g_object_set(G_OBJECT(sink),
153                         "uri", util::addProtocolPrefix(_senderAddress, "srt").c_str(),
154                         "wait-for-connection", FALSE,
155                         "sync", FALSE,
156                         "latency", 10,
157                         NULL);
158
159                 gst_bin_add_many(GST_BIN(_gst.pipeline), mux, sink, NULL);
160                 if (!gst_element_link(mux, sink)) {
161                         LOG_ERROR("failed to gst_element_link()");
162                         throw MediaTransporterException(MTPR_ERROR_INVALID_OPERATION, "failed to gst_element_link()");
163                 }
164
165                 if (_connectionParam.mode != MTPR_CONNECTION_SRT_MODE_NONE)
166                         g_object_set(G_OBJECT(sink),
167                                 "mode", static_cast<int>(_connectionParam.mode), NULL);
168
169                 if (_connectionParam.pbKeyLen != MTPR_CONNECTION_SRT_NO_KEY)
170                         g_object_set(G_OBJECT(sink),
171                                 "pbkeylen", static_cast<int>(_connectionParam.pbKeyLen), NULL);
172
173                 if (!_connectionParam.passPhrase.empty())
174                         g_object_set(G_OBJECT(sink),
175                                 "passphrase", _connectionParam.passPhrase.c_str(), NULL);
176
177                 if (!_connectionParam.streamId.empty())
178                         g_object_set(G_OBJECT(sink),
179                                 "streamid", _connectionParam.streamId.c_str(), NULL);
180
181                 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-added", G_CALLBACK(__callerAddedCb), &_gst);
182                 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-removed", G_CALLBACK(__callerRemovedCb), &_gst);
183                 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-rejected", G_CALLBACK(__callerRejectedCb), &_gst);
184                 gst::_connectAndAppendSignal(&_gst.signals, G_OBJECT(sink), "caller-connecting", G_CALLBACK(__callerConnectingCb), &_gst);
185
186                 linkMediaSourceToMuxer(mux);
187
188                 _srtSink = sink;
189                 LOG_INFO("linked mux and sink");
190
191         } catch (const MediaTransporterException& e) {
192                 LOG_ERROR("%s", e.what());
193
194                 gst::_disconnectSignal(&_gst.signals, G_OBJECT(sink));
195
196                 gst::_destroyElementFromParent(mux);
197                 gst::_destroyElementFromParent(sink);
198
199                 throw;
200         }
201 }
202
203 void MediaTransporterSenderSrt::startPipeline()
204 {
205         gst::_setPipelineState(_gst.pipeline, GST_STATE_PLAYING,
206                                                 MediaTransporterIni::get().general().timeout, _asyncStart);
207         startStatsMonitoring();
208 }
209
210 void MediaTransporterSenderSrt::stopPipeline()
211 {
212         stopStatsMonitoring();
213
214         _srtSink = NULL;
215         gst::_setPipelineState(_gst.pipeline, GST_STATE_NULL,
216                                                                 MediaTransporterIni::get().general().timeout);
217 }
218
219 void MediaTransporterSenderSrt::setConnection(std::string name, std::string val)
220 {
221         setConnectionParam(name, val, &_connectionParam);
222 }
223
224 std::string MediaTransporterSenderSrt::getConnection(std::string name)
225 {
226         return getConnectionParam(_connectionParam, name);
227 }
228
229 void MediaTransporterSenderSrt::setSenderAddress(std::string address)
230 {
231         _senderAddress = address;
232 }
233
234 void MediaTransporterSenderSrt::foreachConnectionStats(InvokablePtr callback)
235 {
236         ConnectionStatsFactory::createSenderSrtConnectionStats(_srtSink, std::move(callback))->foreach();
237 }