From: Nirbheek Chauhan Date: Sat, 5 Mar 2022 14:36:37 +0000 (+0530) Subject: webrtc_sendrecv.py: Implement all negotiation modes X-Git-Tag: 1.22.0~2107 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=583408c312f546e95f73a1ca3013077e79324904;p=platform%2Fupstream%2Fgstreamer.git webrtc_sendrecv.py: Implement all negotiation modes Earlier, the example only supported one negotiation mode: * Browser client is running, gstreamer starts a call and sends offer Now these three modes are also supported: * Browser client is running, gstreamer starts a call and sends an offer request * gstreamer connects and waits for browser client to start a call and send an offer * gstreamer connects and waits for browser client to start a call and send an offer request The following features are still missing: * Data channel support * TWCC support + stats logging Part-of: --- diff --git a/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc-sendrecv.c b/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc-sendrecv.c index 6ba6909..98f2155 100644 --- a/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc-sendrecv.c +++ b/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc-sendrecv.c @@ -59,7 +59,7 @@ static GOptionEntry entries[] = { {"peer-id", 0, 0, G_OPTION_ARG_STRING, &peer_id, "String ID of the peer to connect to", "ID"}, {"our-id", 0, 0, G_OPTION_ARG_STRING, &our_id, - "String ID of the session that peer can connect to us", "ID"}, + "String ID that the peer can use to connect to us", "ID"}, {"server", 0, 0, G_OPTION_ARG_STRING, &server_url, "Signalling server to connect to", "URL"}, {"disable-ssl", 0, 0, G_OPTION_ARG_NONE, &disable_ssl, "Disable ssl", NULL}, diff --git a/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py b/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py index 6822434..1502eec 100755 --- a/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py +++ b/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py @@ -37,50 +37,81 @@ webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.googl from websockets.version import version as wsv +def print_status(msg): + print(f'--- {msg}') + + +def print_error(msg): + print(f'!!! {msg}', file=sys.stderr) + + class WebRTCClient: - def __init__(self, loop, id_, peer_id, server): - self.event_loop = loop - self.id_ = id_ + def __init__(self, loop, our_id, peer_id, server, remote_is_offerer): self.conn = None self.pipe = None self.webrtc = None - self.peer_id = peer_id + self.event_loop = loop self.server = server + # An optional user-specified ID we can use to register + self.our_id = our_id + # The actual ID we used to register + self.id_ = None + # An optional peer ID we should connect to + self.peer_id = peer_id + # Whether we will send the offer or the remote peer will + self.remote_is_offerer = remote_is_offerer async def send(self, msg): assert self.conn - print(f'>>> Sending {msg}') + print(f'>>> {msg}') await self.conn.send(msg) async def connect(self): self.conn = await websockets.connect(self.server) - await self.send('HELLO %d' % self.id_) + if self.our_id is None: + self.id_ = str(random.randrange(10, 10000)) + else: + self.id_ = self.our_id + await self.send(f'HELLO {self.id_}') async def setup_call(self): - await self.send('SESSION {}'.format(self.peer_id)) + assert self.peer_id + await self.send(f'SESSION {self.peer_id}') def send_soon(self, msg): asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop) - def send_sdp_offer(self, offer): + def send_sdp(self, offer): text = offer.sdp.as_text() - print('Sending offer:\n%s' % text) - msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}}) + if offer.type == GstWebRTC.WebRTCSDPType.OFFER: + print_status('Sending offer:\n%s' % text) + msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}}) + elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER: + print_status('Sending answer:\n%s' % text) + msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}}) + else: + raise AssertionError(offer.type) self.send_soon(msg) def on_offer_created(self, promise, _, __): - promise.wait() + assert(promise.wait() == Gst.PromiseResult.REPLIED) reply = promise.get_reply() offer = reply['offer'] promise = Gst.Promise.new() - print('Offer created, setting local description') + print_status('Offer created, setting local description') self.webrtc.emit('set-local-description', offer, promise) - promise.interrupt() - self.send_sdp_offer(offer) + promise.interrupt() # we don't care about the result, discard it + self.send_sdp(offer) - def on_negotiation_needed(self, element): - promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) - element.emit('create-offer', None, promise) + def on_negotiation_needed(self, _, create_offer): + if create_offer: + print_status('Call was connected: creating offer') + promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None) + self.webrtc.emit('create-offer', None, promise) + elif self.remote_is_offerer: + # We are initiating the call, but we want the remote peer to create the offer + print_status('Call was connected: requesting remote peer for offer') + self.send_soon('OFFER_REQUEST') def send_ice_candidate_message(self, _, mlineindex, candidate): icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}}) @@ -88,7 +119,7 @@ class WebRTCClient: def on_incoming_decodebin_stream(self, _, pad): if not pad.has_current_caps(): - print(pad, 'has no caps, ignoring') + print_error(pad, 'has no caps, ignoring') return caps = pad.get_current_caps() @@ -116,6 +147,10 @@ class WebRTCClient: conv.link(resample) resample.link(sink) + def on_ice_gathering_state_notify(self, pspec, _): + state = self.webrtc.get_property('ice-gathering-state') + print_status(f'ICE gathering state changed to {state}') + def on_incoming_stream(self, _, pad): if pad.direction != Gst.PadDirection.SRC: return @@ -126,28 +161,54 @@ class WebRTCClient: decodebin.sync_state_with_parent() self.webrtc.link(decodebin) - def start_pipeline(self): + def start_pipeline(self, create_offer=True): + print_status(f'Creating pipeline, create_offer: {create_offer}') self.pipe = Gst.parse_launch(PIPELINE_DESC) self.webrtc = self.pipe.get_by_name('sendrecv') - self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) + self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer) self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) + self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify) self.webrtc.connect('pad-added', self.on_incoming_stream) self.pipe.set_state(Gst.State.PLAYING) - def handle_sdp(self, message): + def on_answer_created(self, promise, _, __): + assert(promise.wait() == Gst.PromiseResult.REPLIED) + reply = promise.get_reply() + answer = reply['answer'] + promise = Gst.Promise.new() + self.webrtc.emit('set-local-description', answer, promise) + promise.interrupt() # we don't care about the result, discard it + self.send_sdp(answer) + + def on_offer_set(self, promise, _, __): + assert(promise.wait() == Gst.PromiseResult.REPLIED) + promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None) + self.webrtc.emit('create-answer', None, promise) + + def handle_json(self, message): assert (self.webrtc) - msg = json.loads(message) + try: + msg = json.loads(message) + except json.decoder.JSONDecoderError: + print_error('Failed to parse JSON message, this might be a bug') + raise if 'sdp' in msg: - sdp = msg['sdp'] - assert(sdp['type'] == 'answer') - sdp = sdp['sdp'] - print('Received answer:\n%s' % sdp) - res, sdpmsg = GstSdp.SDPMessage.new() - GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) - answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) - promise = Gst.Promise.new() - self.webrtc.emit('set-remote-description', answer, promise) - promise.interrupt() + sdp = msg['sdp']['sdp'] + if msg['sdp']['type'] == 'answer': + print_status('Received answer:\n%s' % sdp) + res, sdpmsg = GstSdp.SDPMessage.new() + GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) + answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) + promise = Gst.Promise.new() + self.webrtc.emit('set-remote-description', answer, promise) + promise.interrupt() # we don't care about the result, discard it + else: + print_status('Received offer:\n%s' % sdp) + res, sdpmsg = GstSdp.SDPMessage.new() + GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) + offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg) + promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None) + self.webrtc.emit('set-remote-description', offer, promise) elif 'ice' in msg: ice = msg['ice'] candidate = ice['candidate'] @@ -160,19 +221,46 @@ class WebRTCClient: self.pipe = None self.webrtc = None + def is_incoming_offer(self, msg): + if self.webrtc: + return False + if self.remote_is_offerer: + return True + return True + async def loop(self): assert self.conn async for message in self.conn: + print(f'<<< {message}') if message == 'HELLO': - await self.setup_call() + assert self.id_ + # If a peer ID is specified, we want to connect to it. If not, + # we wait for an incoming call. + if not self.peer_id: + print_status(f'Waiting for incoming call: ID is {self.id_}') + else: + if self.remote_is_offerer: + print_status('Have peer ID: initiating call (will request remote peer to create offer)') + else: + print_status('Have peer ID: initiating call (will create offer)') + await self.setup_call() elif message == 'SESSION_OK': + if self.remote_is_offerer: + self.start_pipeline(create_offer=False) + else: + self.start_pipeline() + elif message == 'OFFER_REQUEST': + print_status('Incoming call: we have been asked to create the offer') self.start_pipeline() elif message.startswith('ERROR'): - print(message) + print_error(message) self.close_pipeline() return 1 else: - self.handle_sdp(message) + if self.is_incoming_offer(message): + print_status('Incoming call: received an offer, creating pipeline') + self.start_pipeline(create_offer=False) + self.handle_json(message) self.close_pipeline() return 0 @@ -187,7 +275,7 @@ def check_plugins(): "rtpmanager", "videotestsrc", "audiotestsrc"] missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed)) if len(missing): - print('Missing gstreamer plugins:', missing) + print_error('Missing gstreamer plugins:', missing) return False return True @@ -197,13 +285,19 @@ if __name__ == '__main__': if not check_plugins(): sys.exit(1) parser = argparse.ArgumentParser() - parser.add_argument('peerid', help='String ID of the peer to connect to') + parser.add_argument('--peer-id', help='String ID of the peer to connect to') + parser.add_argument('--our-id', help='String ID that the peer can use to connect to us') parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"') + parser.add_argument('--remote-offerer', default=False, action='store_true', + dest='remote_is_offerer', + help='Request that the peer generate the offer and we\'ll answer') args = parser.parse_args() - our_id = random.randrange(10, 10000) + if not args.peer_id and not args.our_id: + print('You must pass either --peer-id or --our-id') + sys.exit(1) loop = asyncio.new_event_loop() - c = WebRTCClient(loop, our_id, args.peerid, args.server) + c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer) loop.run_until_complete(c.connect()) res = loop.run_until_complete(c.loop()) sys.exit(res)