Change GstSdp.sdp_message_parse_buffer to GstSdp.SDPMessage.new_from_text in examples
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / check / validate / client.py
1 # Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
2 #
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU Lesser General Public
5 # License as published by the Free Software Foundation; either
6 # version 2.1 of the License, or (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 # Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU Lesser General Public
14 # License along with this program; if not, write to the
15 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
16 # Boston, MA 02110-1301, USA.
17
18 import threading
19 import copy
20
21 from observer import Signal, WebRTCObserver, DataChannelObserver, StateObserver
22 from enums import NegotiationState, DataChannelState
23
24 import gi
25 gi.require_version("Gst", "1.0")
26 from gi.repository import Gst
27 gi.require_version("GstWebRTC", "1.0")
28 from gi.repository import GstWebRTC
29 gi.require_version("GstSdp", "1.0")
30 from gi.repository import GstSdp
31 gi.require_version("GstValidate", "1.0")
32 from gi.repository import GstValidate
33
34
35 class WebRTCBinObserver(WebRTCObserver):
36     """
37     Observe a webrtcbin element.
38     """
39     def __init__(self, element):
40         WebRTCObserver.__init__(self)
41         self.element = element
42         self.signal_handlers = []
43         self.signal_handlers.append(element.connect("on-negotiation-needed", self._on_negotiation_needed))
44         self.signal_handlers.append(element.connect("on-ice-candidate", self._on_ice_candidate))
45         self.signal_handlers.append(element.connect("pad-added", self._on_pad_added))
46         self.signal_handlers.append(element.connect("on-new-transceiver", self._on_new_transceiver))
47         self.signal_handlers.append(element.connect("on-data-channel", self._on_data_channel))
48         self.negotiation_needed = 0
49         self._negotiation_needed_observer = StateObserver(self, "negotiation_needed", threading.Condition())
50         self.on_negotiation_needed = Signal()
51         self.on_ice_candidate = Signal()
52         self.on_pad_added = Signal()
53         self.on_new_transceiver = Signal()
54
55     def _on_negotiation_needed(self, element):
56         self.negotiation_needed += 1
57         self._negotiation_needed_observer.update(self.negotiation_needed)
58         self.on_negotiation_needed.fire()
59
60     def _on_ice_candidate(self, element, mline, candidate):
61         self.on_ice_candidate.fire(mline, candidate)
62
63     def _on_pad_added(self, element, pad):
64         self.on_pad_added.fire(pad)
65
66     def _on_description_set(self, promise, desc):
67         new_state = self._update_negotiation_from_description_state(desc)
68         if new_state == NegotiationState.OFFER_SET:
69             self.on_offer_set.fire (desc)
70         elif new_state == NegotiationState.ANSWER_SET:
71             self.on_answer_set.fire (desc)
72
73     def _on_new_transceiver(self, element, transceiver):
74         self.on_new_transceiver.fire(transceiver)
75
76     def _on_data_channel(self, element, channel):
77         observer = WebRTCBinDataChannelObserver(channel, channel.props.label, 'remote')
78         self.add_channel(observer)
79
80     def _update_negotiation_from_description_state(self, desc):
81         new_state = None
82         if desc.type == GstWebRTC.WebRTCSDPType.OFFER:
83             new_state = NegotiationState.OFFER_SET
84         elif desc.type == GstWebRTC.WebRTCSDPType.ANSWER:
85             new_state = NegotiationState.ANSWER_SET
86         assert new_state is not None
87         self._update_negotiation_state(new_state)
88         return new_state
89
90     def _deepcopy_session_description(self, desc):
91         _, new_sdp = GstSdp.SDPMessage.new_from_text(desc.sdp.as_text())
92         return GstWebRTC.WebRTCSessionDescription.new(desc.type, new_sdp)
93
94     def _on_offer_created(self, promise, element):
95         self._update_negotiation_state(NegotiationState.OFFER_CREATED)
96         reply = promise.get_reply()
97         offer = reply['offer']
98
99         new_offer = self._deepcopy_session_description(offer)
100         promise = Gst.Promise.new_with_change_func(self._on_description_set, new_offer)
101
102         new_offer = self._deepcopy_session_description(offer)
103         self.element.emit('set-local-description', new_offer, promise)
104         self.on_offer_created.fire(offer)
105
106     def _on_answer_created(self, promise, element):
107         self._update_negotiation_state(NegotiationState.ANSWER_CREATED)
108         reply = promise.get_reply()
109         offer = reply['answer']
110
111         new_offer = self._deepcopy_session_description(offer)
112         promise = Gst.Promise.new_with_change_func(self._on_description_set, new_offer)
113
114         new_offer = self._deepcopy_session_description(offer)
115         self.element.emit('set-local-description', new_offer, promise)
116         self.on_answer_created.fire(offer)
117
118     def create_offer(self, options=None):
119         promise = Gst.Promise.new_with_change_func(self._on_offer_created, self.element)
120         self.element.emit('create-offer', options, promise)
121
122     def create_answer(self, options=None):
123         promise = Gst.Promise.new_with_change_func(self._on_answer_created, self.element)
124         self.element.emit('create-answer', options, promise)
125
126     def set_remote_description(self, desc):
127         promise = Gst.Promise.new_with_change_func(self._on_description_set, desc)
128         self.element.emit('set-remote-description', desc, promise)
129
130     def add_ice_candidate(self, mline, candidate):
131         self.element.emit('add-ice-candidate', mline, candidate)
132
133     def add_data_channel(self, ident):
134         channel = self.element.emit('create-data-channel', ident, None)
135         observer = WebRTCBinDataChannelObserver(channel, ident, 'local')
136         self.add_channel(observer)
137
138     def wait_for_negotiation_needed(self, generation):
139         self._negotiation_needed_observer.wait_for ((generation,))
140
141 class WebRTCStream(object):
142     """
143     An stream attached to a webrtcbin element
144     """
145     def __init__(self):
146         self.bin = None
147
148     def set_description(self, desc):
149         assert self.bin is None
150         self.bin = Gst.parse_bin_from_description(desc, True)
151
152     def add_and_link(self, parent, link):
153         assert self.bin is not None
154         self.bin.set_locked_state(True)
155         parent.add(self.bin)
156         src = self.bin.get_static_pad("src")
157         sink = self.bin.get_static_pad("sink")
158         assert src is None or sink is None
159         if src:
160             self.bin.link(link)
161         if sink:
162             link.link(self.bin)
163         self.bin.set_locked_state(False)
164         self.bin.sync_state_with_parent()
165
166     def add_and_link_to(self, parent, link, pad):
167         assert self.bin is not None
168         self.bin.set_locked_state(True)
169         parent.add(self.bin)
170         src = self.bin.get_static_pad("src")
171         sink = self.bin.get_static_pad("sink")
172         assert src is None or sink is None
173         if pad.get_direction() == Gst.PadDirection.SRC:
174             assert sink is not None
175             pad.link(sink)
176         if pad.get_direction() == Gst.PadDirection.SINK:
177             assert src is not None
178             src.link(pad)
179         self.bin.set_locked_state(False)
180         self.bin.sync_state_with_parent()
181
182 class WebRTCClient(WebRTCBinObserver):
183     """
184     Client for performing webrtc operations.  Controls the pipeline that
185     contains a webrtcbin element.
186     """
187     def __init__(self):
188         self.pipeline = Gst.Pipeline(None)
189         self.webrtcbin = Gst.ElementFactory.make("webrtcbin")
190         super().__init__(self.webrtcbin)
191         self.pipeline.add(self.webrtcbin)
192         self._streams = []
193
194     def stop(self):
195         self.pipeline.set_state (Gst.State.NULL)
196
197     def add_stream(self, desc):
198         stream = WebRTCStream()
199         stream.set_description(desc)
200         stream.add_and_link (self.pipeline, self.webrtcbin)
201         self._streams.append(stream)
202
203     def add_stream_with_pad(self, desc, pad):
204         stream = WebRTCStream()
205         stream.set_description(desc)
206         stream.add_and_link_to (self.pipeline, self.webrtcbin, pad)
207         self._streams.append(stream)
208
209     def set_options (self, opts):
210         if opts.has_field("local-bundle-policy"):
211             self.webrtcbin.props.bundle_policy = opts["local-bundle-policy"]
212
213
214 class WebRTCBinDataChannelObserver(DataChannelObserver):
215     """
216     Data channel observer for a webrtcbin data channel.
217     """
218     def __init__(self, target, ident, location):
219         super().__init__(ident, location)
220         self.target = target
221         self.signal_handlers = []
222         self.signal_handlers.append(target.connect("on-open", self._on_open))
223         self.signal_handlers.append(target.connect("on-close", self._on_close))
224         self.signal_handlers.append(target.connect("on-error", self._on_error))
225         self.signal_handlers.append(target.connect("on-message-data", self._on_message_data))
226         self.signal_handlers.append(target.connect("on-message-string", self._on_message_string))
227         self.signal_handlers.append(target.connect("on-buffered-amount-low", self._on_buffered_amount_low))
228
229     def _on_open(self, channel):
230         self._update_state (DataChannelState.OPEN)
231     def _on_close(self, channel):
232         self._update_state (DataChannelState.CLOSED)
233     def _on_error(self, channel):
234         self._update_state (DataChannelState.ERROR)
235     def _on_message_data(self, channel, data):
236         self.data.append(msg)
237     def _on_message_string(self, channel, msg):
238         self.got_message (msg)
239     def _on_buffered_amount_low(self, channel):
240         pass
241
242     def close(self):
243         self.target.emit('close')
244
245     def send_string (self, msg):
246         self.target.emit('send-string', msg)