check: first pass at a couple of validate tests
authorMatthew Waters <matthew@centricular.com>
Mon, 17 Dec 2018 11:34:10 +0000 (22:34 +1100)
committerMatthew Waters <ystreet00@gmail.com>
Wed, 6 May 2020 06:01:57 +0000 (06:01 +0000)
20 files changed:
webrtc/check/basic.py [moved from webrtc/sendrecv/gst/tests/basic.py with 100% similarity]
webrtc/check/meson.build [moved from webrtc/sendrecv/gst/tests/meson.build with 100% similarity]
webrtc/check/validate/README.md [new file with mode: 0644]
webrtc/check/validate/actions.py [new file with mode: 0644]
webrtc/check/validate/apps/__init__.py [new file with mode: 0644]
webrtc/check/validate/apps/gstwebrtc.py [new file with mode: 0644]
webrtc/check/validate/browser.py [new file with mode: 0644]
webrtc/check/validate/client.py [new file with mode: 0644]
webrtc/check/validate/enums.py [new file with mode: 0644]
webrtc/check/validate/observer.py [new file with mode: 0644]
webrtc/check/validate/scenarios/offer_answer.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/vp8_send_stream.scenario [new file with mode: 0644]
webrtc/check/validate/signalling.py [new file with mode: 0644]
webrtc/check/validate/testsuites/__init__.py [new file with mode: 0644]
webrtc/check/validate/testsuites/webrtc.py [new file with mode: 0644]
webrtc/check/validate/web/single_stream.html [new file with mode: 0644]
webrtc/check/validate/web/webrtc.js [new file with mode: 0644]
webrtc/check/validate/webrtc_validate.py [new file with mode: 0644]
webrtc/meson.build
webrtc/sendrecv/gst/meson.build

diff --git a/webrtc/check/validate/README.md b/webrtc/check/validate/README.md
new file mode 100644 (file)
index 0000000..ea07153
--- /dev/null
@@ -0,0 +1,17 @@
+# What is this?
+
+The entire contents of this folder perform testing of GStreamer's webrtc
+implementation against browser implementations using the selenium webdriver
+testing framework.
+
+# Dependencies:
+
+- gst-validate: https://gitlab.freedesktop.org/gstreamer/gst-devtools/tree/master/validate
+- gst-python: https://gitlab.freedesktop.org/gstreamer/gst-python/
+- selenium: https://www.seleniumhq.org/projects/webdriver/
+- selenium python bindings
+- chrome and firefox with webdriver support
+
+# Run the tests
+
+`GST_VALIDATE_APPS_DIR=/path/to/gstwebrtc-demos/check/validate/apps/ GST_VALIDATE_SCENARIOS_PATH=/path/to/gstwebrtc-demos/check/validate/scenarios/ gst-validate-launcher --testsuites-dir /path/to/gstwebrtc-demos/check/validate/testsuites/ webrtc`
diff --git a/webrtc/check/validate/actions.py b/webrtc/check/validate/actions.py
new file mode 100644 (file)
index 0000000..6ced2cf
--- /dev/null
@@ -0,0 +1,77 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+# Boston, MA 02110-1301, USA.
+
+import gi
+gi.require_version("GstValidate", "1.0")
+from gi.repository import GstValidate
+
+from observer import Signal
+
+class ActionObserver(object):
+    def __init__(self):
+        def _action_continue(val):
+            return val not in [GstValidate.ActionReturn.ERROR, GstValidate.ActionReturn.ERROR_REPORTED]
+        def _action_accum(previous, val):
+            # we want to always keep any errors propagated
+            if val in [GstValidate.ActionReturn.ERROR, GstValidate.ActionReturn.ERROR_REPORTED]:
+                return val
+            if previous in [GstValidate.ActionReturn.ERROR, GstValidate.ActionReturn.ERROR_REPORTED]:
+                return previous
+
+            # we want to always prefer async returns
+            if previous in [GstValidate.ActionReturn.ASYNC, GstValidate.ActionReturn.INTERLACED]:
+                return previous
+            if val in [GstValidate.ActionReturn.ASYNC, GstValidate.ActionReturn.INTERLACED]:
+                return val
+
+            return val
+
+        self.create_offer = Signal(_action_continue, _action_accum)
+        self.wait_for_negotiation_state = Signal(_action_continue, _action_accum)
+        self.add_stream = Signal(_action_continue, _action_accum)
+        self.wait_for_remote_state = Signal(_action_continue, _action_accum)
+
+    def _create_offer(self, scenario, action):
+        print("action create-offer")
+        return self.create_offer.fire()
+    def _wait_for_negotiation_state(self, scenario, action):
+        state = action.structure["state"]
+        print("action wait-for-negotiation-state", state)
+        return self.wait_for_negotiation_state.fire(state)
+    def _add_stream(self, scenario, action):
+        pipeline = action.structure["pipeline"]
+        print("action add-stream", pipeline)
+        return self.add_stream.fire(pipeline)
+
+def register_action_types(observer):
+    if not isinstance(observer, ActionObserver):
+        raise TypeError
+
+    GstValidate.register_action_type("create-offer", "webrtc",
+                                     observer._create_offer, None,
+                                     "Instruct a create-offer to commence",
+                                     GstValidate.ActionTypeFlags.NONE)
+    GstValidate.register_action_type("wait-for-negotiation-state", "webrtc",
+                                     observer._wait_for_negotiation_state, None,
+                                     "Wait for a specific negotiation state to be reached",
+                                     GstValidate.ActionTypeFlags.NONE)
+    GstValidate.register_action_type("add-stream", "webrtc",
+                                     observer._add_stream, None,
+                                     "Add a stream to the webrtcbin",
+                                     GstValidate.ActionTypeFlags.NONE)
diff --git a/webrtc/check/validate/apps/__init__.py b/webrtc/check/validate/apps/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/webrtc/check/validate/apps/gstwebrtc.py b/webrtc/check/validate/apps/gstwebrtc.py
new file mode 100644 (file)
index 0000000..693ac4a
--- /dev/null
@@ -0,0 +1,187 @@
+#*- vi:si:et:sw=4:sts=4:ts=4:syntax=python
+#
+# Copyright (c) 2018 Matthew Waters <matthew@centricular.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+# Boston, MA 02110-1301, USA.
+
+import inspect
+import os
+import sys
+import shutil
+
+import tempfile
+
+from launcher.baseclasses import TestsManager, TestsGenerator, GstValidateTest, ScenarioManager
+from launcher.utils import DEFAULT_TIMEOUT
+
+DEFAULT_BROWSERS = ['firefox', 'chrome']
+DEFAULT_SCENARIOS = [
+                "offer_answer",
+                "vp8_send_stream"
+                ]
+
+BROWSER_SCENARIO_BLACKLISTS = {
+    'firefox' : [
+        'offer_answer', # fails to accept an SDP without any media sections
+    ],
+    'chrome' : [
+    ],
+}
+
+class MutableInt(object):
+    def __init__(self, value):
+        self.value = value
+
+class GstWebRTCTest(GstValidateTest):
+    __used_ports = set()
+    __last_id = MutableInt(10)
+
+    @classmethod
+    def __get_open_port(cls):
+        while True:
+            # hackish trick from
+            # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python?answertab=votes#tab-top
+            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            s.bind(("", 0))
+            port = s.getsockname()[1]
+            if port not in cls.__used_ports:
+                cls.__used_ports.add(port)
+                s.close()
+                return port
+
+            s.close()
+
+    @classmethod
+    def __get_available_peer_id(cls):
+        peerid = cls.__last_id.value
+        cls.__last_id.value += 2
+        return peerid
+
+    def __init__(self, classname, tests_manager, scenario, browser, timeout=DEFAULT_TIMEOUT):
+        super().__init__("python3",
+                        classname,
+                        tests_manager.options,
+                        tests_manager.reporter,
+                        timeout=timeout,
+                        scenario=scenario)
+        self.webrtc_server = None
+        filename = inspect.getframeinfo (inspect.currentframe ()).filename
+        self.current_file_path = os.path.dirname (os.path.abspath (filename))
+        self.certdir = None
+        self.browser = browser
+
+    def launch_server(self):
+        if self.options.redirect_logs == 'stdout':
+            self.webrtcserver_logs = sys.stdout
+        elif self.options.redirect_logs == 'stderr':
+            self.webrtcserver_logs = sys.stderr
+        else:
+            self.webrtcserver_logs = open(self.logfile + '_webrtcserver.log', 'w+')
+            self.extra_logfiles.add(self.webrtcserver_logs.name)
+
+        generate_certs_location = os.path.join(self.current_file_path, "..", "..", "..", "signalling", "generate_cert.sh")
+        self.certdir = tempfile.mkdtemp()
+        command = [generate_certs_location, self.certdir]
+
+        server_env = os.environ.copy()
+
+        subprocess.run(command,
+                         stderr=self.webrtcserver_logs,
+                         stdout=self.webrtcserver_logs,
+                         env=server_env)
+
+        self.server_port = self.__get_open_port()
+
+        server_location = os.path.join(self.current_file_path, "..", "..", "..", "signalling", "simple_server.py")
+        command = [server_location, "--cert-path", self.certdir, "--addr", "127.0.0.1", "--port", str(self.server_port)]
+
+        self.webrtc_server = subprocess.Popen(command,
+                                              stderr=self.webrtcserver_logs,
+                                              stdout=self.webrtcserver_logs,
+                                              env=server_env)
+        while True:
+            s = socket.socket()
+            try:
+                s.connect((("127.0.0.1", self.server_port)))
+                break
+            except ConnectionRefusedError:
+                time.sleep(0.1)
+                continue
+            finally:
+                s.close()
+
+        return ' '.join(command)
+
+    def build_arguments(self):
+        gst_id = self.__get_available_peer_id()
+        web_id = gst_id + 1
+
+        self.add_arguments(os.path.join(self.current_file_path, '..', 'webrtc_validate.py'))
+        self.add_arguments('--server')
+        self.add_arguments("wss://127.0.0.1:%s" % (self.server_port,))
+        self.add_arguments('--browser')
+        self.add_arguments(self.browser)
+        self.add_arguments("--html-source")
+        html_page = os.path.join(self.current_file_path, '..', 'web', 'single_stream.html')
+        html_params = '?server=127.0.0.1&port=' + str(self.server_port) + '&id=' + str(web_id)
+        self.add_arguments("file://" + html_page + html_params)
+        self.add_arguments('--peer-id')
+        self.add_arguments(str(web_id))
+        self.add_arguments(str(gst_id))
+
+    def close_logfile(self):
+        super().close_logfile()
+        if not self.options.redirect_logs:
+            self.webrtcserver_logs.close()
+
+    def process_update(self):
+        res = super().process_update()
+        if res:
+            kill_subprocess(self, self.webrtc_server, DEFAULT_TIMEOUT)
+            self.__used_ports.remove(self.server_port)
+            if self.certdir:
+                shutil.rmtree(self.certdir, ignore_errors=True)
+                self.certdir
+
+        return res
+
+class GstWebRTCTestsManager(TestsManager):
+    scenarios_manager = ScenarioManager()
+    name = "webrtc"
+
+    def __init__(self):
+        super(GstWebRTCTestsManager, self).__init__()
+        self.loading_testsuite = self.name
+
+    def webrtc_server_address(self):
+        return "wss://127.0.0.1:8443"
+
+    def populate_testsuite(self):
+        self.add_scenarios (DEFAULT_SCENARIOS)
+
+        scenarios = [(scenario_name, self.scenarios_manager.get_scenario(scenario_name))
+                     for scenario_name in self.get_scenarios()]
+
+        for name, scenario in scenarios:
+            if not scenario:
+                self.warning("Could not find scenario %s" % name)
+                continue
+            for browser in DEFAULT_BROWSERS:
+                if name in BROWSER_SCENARIO_BLACKLISTS[browser]:
+                    self.warning('Skipping broken test', name, 'for browser', browser)
+                    continue
+                classname = browser + '_' + name
+                self.add_test(GstWebRTCTest(classname, self, scenario, browser))
diff --git a/webrtc/check/validate/browser.py b/webrtc/check/validate/browser.py
new file mode 100644 (file)
index 0000000..536a031
--- /dev/null
@@ -0,0 +1,79 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+# Boston, MA 02110-1301, USA.
+
+from selenium import webdriver
+from selenium.webdriver.support.wait import WebDriverWait
+from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
+from selenium.webdriver.chrome.options import Options as COptions
+
+def create_firefox_driver():
+    capabilities = webdriver.DesiredCapabilities().FIREFOX.copy()
+    capabilities['acceptSslCerts'] = True
+    profile = FirefoxProfile()
+    profile.set_preference ('media.navigator.streams.fake', True)
+    profile.set_preference ('media.navigator.permission.disabled', True)
+
+    return webdriver.Firefox(firefox_profile=profile, capabilities=capabilities)
+
+def create_chrome_driver():
+    capabilities = webdriver.DesiredCapabilities().CHROME.copy()
+    capabilities['acceptSslCerts'] = True
+    copts = COptions()
+    copts.add_argument('--allow-file-access-from-files')
+    copts.add_argument('--use-fake-ui-for-media-stream')
+    copts.add_argument('--use-fake-device-for-media-stream')
+    copts.add_argument('--enable-blink-features=RTCUnifiedPlanByDefault')
+
+    return webdriver.Chrome(options=copts, desired_capabilities=capabilities)
+
+def create_driver(name):
+    if name == 'firefox':
+        return create_firefox_driver()
+    elif name == 'chrome':
+        return create_chrome_driver()
+    else:
+        raise ValueError("Unknown browser name " + name)
+
+def valid_int(n):
+    if isinstance(n, int):
+        return True
+    if isinstance(n, str):
+        try:
+            num = int(n)
+            return True
+        except:
+            return False
+    return False
+
+class Browser(object):
+    def __init__(self, driver, html_source):
+        self.driver = driver
+        self.html_source = html_source
+
+    def get_peer_id(self):
+        self.driver.get(self.html_source)
+        peer_id = WebDriverWait(self.driver, 10).until(
+            lambda x: x.find_element_by_id('peer-id'),
+            message='Peer-id element was never seen'
+        )
+        WebDriverWait (self.driver, 10).until(
+            lambda x: valid_int(peer_id.text),
+            message='Peer-id never became a number'
+        )
+        return int(peer_id.text)        
diff --git a/webrtc/check/validate/client.py b/webrtc/check/validate/client.py
new file mode 100644 (file)
index 0000000..24c8bcd
--- /dev/null
@@ -0,0 +1,212 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+# Boston, MA 02110-1301, USA.
+
+import threading
+import copy
+
+from observer import Signal
+from enums import NegotiationState
+
+import gi
+gi.require_version("Gst", "1.0")
+from gi.repository import Gst
+gi.require_version("GstWebRTC", "1.0")
+from gi.repository import GstWebRTC
+gi.require_version("GstSdp", "1.0")
+from gi.repository import GstSdp
+gi.require_version("GstValidate", "1.0")
+from gi.repository import GstValidate
+
+class WebRTCBinObserver(object):
+    def __init__(self, element):
+        self.state = NegotiationState.NEW
+        self.state_cond = threading.Condition()
+        self.element = element
+        self.signal_handlers = []
+        self.signal_handlers.append(element.connect("on-negotiation-needed", self._on_negotiation_needed))
+        self.signal_handlers.append(element.connect("on-ice-candidate", self._on_ice_candidate))
+        self.signal_handlers.append(element.connect("pad-added", self._on_pad_added))
+        self.signal_handlers.append(element.connect("on-new-transceiver", self._on_new_transceiver))
+        self.signal_handlers.append(element.connect("on-data-channel", self._on_data_channel))
+        self.on_offer_created = Signal()
+        self.on_answer_created = Signal()
+        self.on_negotiation_needed = Signal()
+        self.on_ice_candidate = Signal()
+        self.on_pad_added = Signal()
+        self.on_new_transceiver = Signal()
+        self.on_data_channel = Signal()
+        self.on_local_description_set = Signal()
+        self.on_remote_description_set = Signal()
+
+    def _on_negotiation_needed(self, element):
+        self.on_negotiation_needed.fire()
+
+    def _on_ice_candidate(self, element, mline, candidate):
+        self.on_ice_candidate.fire(mline, candidate)
+
+    def _on_pad_added(self, element, pad):
+        self.on_pad_added.fire(pad)
+
+    def _on_local_description_set(self, promise, desc):
+        self._update_negotiation_from_description_state(desc)
+        self.on_local_description_set.fire(desc)
+
+    def _on_remote_description_set(self, promise, desc):
+        self._update_negotiation_from_description_state(desc)
+        self.on_remote_description_set.fire(desc)
+
+    def _on_new_transceiver(self, element, transceiver):
+        self.on_new_transceiver.fire(transceiver)
+
+    def _on_data_channel(self, element):
+        self.on_data_channel.fire(desc)
+
+    def _update_negotiation_state(self, new_state):
+        with self.state_cond:
+            old_state = self.state
+            self.state = new_state
+            self.state_cond.notify_all()
+            print ("observer updated state to", new_state)
+
+    def wait_for_negotiation_states(self, states):
+        ret = None
+        with self.state_cond:
+            while self.state not in states:
+                self.state_cond.wait()
+            print ("observer waited for", states)
+            ret = self.state
+        return ret
+
+    def _update_negotiation_from_description_state(self, desc):
+        new_state = None
+        if desc.type == GstWebRTC.WebRTCSDPType.OFFER:
+            new_state = NegotiationState.OFFER_SET
+        elif desc.type == GstWebRTC.WebRTCSDPType.ANSWER:
+            new_state = NegotiationState.ANSWER_SET
+        assert new_state is not None
+        self._update_negotiation_state(new_state)
+
+    def _deepcopy_session_description(self, desc):
+        # XXX: passing 'offer' to both a promise and an action signal without
+        # a deepcopy will segfault...
+        new_sdp = GstSdp.SDPMessage.new()[1]
+        GstSdp.sdp_message_parse_buffer(bytes(desc.sdp.as_text().encode()), new_sdp)
+        return GstWebRTC.WebRTCSessionDescription.new(desc.type, new_sdp)
+
+    def _on_offer_created(self, promise, element):
+        self._update_negotiation_state(NegotiationState.OFFER_CREATED)
+        reply = promise.get_reply()
+        offer = reply['offer']
+
+        new_offer = self._deepcopy_session_description(offer)
+        promise = Gst.Promise.new_with_change_func(self._on_local_description_set, new_offer)
+
+        new_offer = self._deepcopy_session_description(offer)
+        self.element.emit('set-local-description', new_offer, promise)
+
+        self.on_offer_created.fire(offer)
+
+    def _on_answer_created(self, promise, element):
+        self._update_negotiation_state(NegotiationState.ANSWER_CREATED)
+        reply = promise.get_reply()
+        offer = reply['answer']
+
+        new_offer = self._deepcopy_session_description(offer)
+        promise = Gst.Promise.new_with_change_func(self._on_local_description_set, new_offer)
+
+        new_offer = self._deepcopy_session_description(offer)
+        self.element.emit('set-local-description', new_offer, promise)
+
+        self.on_answer_created.fire(offer)
+
+    def create_offer(self, options=None):
+        promise = Gst.Promise.new_with_change_func(self._on_offer_created, self.element)
+        self.element.emit('create-offer', options, promise)
+
+    def create_answer(self, options=None):
+        promise = Gst.Promise.new_with_change_func(self._on_answer_created, self.element)
+        self.element.emit('create-answer', options, promise)
+
+    def set_remote_description(self, desc):
+        promise = Gst.Promise.new_with_change_func(self._on_remote_description_set, desc)
+        self.element.emit('set-remote-description', desc, promise)
+
+    def add_ice_candidate(self, mline, candidate):
+        self.element.emit('add-ice-candidate', mline, candidate)
+
+class WebRTCStream(object):
+    def __init__(self):
+        self.bin = None
+
+    def set_description(self, desc):
+        assert self.bin is None
+        self.bin = Gst.parse_bin_from_description(desc, True)
+
+    def add_and_link(self, parent, link):
+        assert self.bin is not None
+        self.bin.set_locked_state(True)
+        parent.add(self.bin)
+        src = self.bin.get_static_pad("src")
+        sink = self.bin.get_static_pad("sink")
+        assert src is None or sink is None
+        if src:
+            self.bin.link(link)
+        if sink:
+            link.link(self.bin)
+        self.bin.set_locked_state(False)
+        self.bin.sync_state_with_parent()
+
+    def add_and_link_to(self, parent, link, pad):
+        assert self.bin is not None
+        self.bin.set_locked_state(True)
+        parent.add(self.bin)
+        src = self.bin.get_static_pad("src")
+        sink = self.bin.get_static_pad("sink")
+        assert src is None or sink is None
+        if pad.get_direction() == Gst.PadDirection.SRC:
+            assert sink is not None
+            pad.link(sink)
+        if pad.get_direction() == Gst.PadDirection.SINK:
+            assert src is not None
+            src.link(pad)
+        self.bin.set_locked_state(False)
+        self.bin.sync_state_with_parent()
+
+class WebRTCClient(WebRTCBinObserver):
+    def __init__(self):
+        self.pipeline = Gst.Pipeline(None)
+        self.webrtcbin = Gst.ElementFactory.make("webrtcbin")
+        super().__init__(self.webrtcbin)
+        self.pipeline.add(self.webrtcbin)
+        self._streams = []
+
+    def stop(self):
+        self.pipeline.set_state (Gst.State.NULL)
+
+    def add_stream(self, desc):
+        stream = WebRTCStream()
+        stream.set_description(desc)
+        stream.add_and_link (self.pipeline, self.webrtcbin)
+        self._streams.append(stream)
+
+    def add_stream_with_pad(self, desc, pad):
+        stream = WebRTCStream()
+        stream.set_description(desc)
+        stream.add_and_link_to (self.pipeline, self.webrtcbin, pad)
+        self._streams.append(stream)
diff --git a/webrtc/check/validate/enums.py b/webrtc/check/validate/enums.py
new file mode 100644 (file)
index 0000000..14bc31a
--- /dev/null
@@ -0,0 +1,36 @@
+# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+# Boston, MA 02110-1301, USA.
+
+class SignallingState(object):
+    NEW = "new"                 # no connection has been made
+    OPEN = "open"               # websocket connection is open
+    ERROR = "error"             # and error was thrown.  overrides all others
+    HELLO = "hello"             # hello was sent and received
+    SESSION = "session"         # session setup was sent and received
+
+class NegotiationState(object):
+    NEW = "new"
+    ERROR = "error"
+    NEGOTIATION_NEEDED = "negotiation-needed"
+    OFFER_CREATED = "offer-created"
+    ANSWER_CREATED = "answer-created"
+    OFFER_SET = "offer-set"
+    ANSWER_SET = "answer-set"
+
+class RemoteState(object):
+    ERROR = "error"
+    REMOTE_STREAM_RECEIVED = "remote-stream-received"
diff --git a/webrtc/check/validate/observer.py b/webrtc/check/validate/observer.py
new file mode 100644 (file)
index 0000000..b4f6be5
--- /dev/null
@@ -0,0 +1,43 @@
+# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+# Boston, MA 02110-1301, USA.
+
+class Signal(object):
+    def __init__(self, cont_func=None, accum_func=None):
+        self._handlers = []
+        if not cont_func:
+            # by default continue when None/no return value is provided or
+            # True is returned
+            cont_func = lambda x: x is None or x
+        self.cont_func = cont_func
+        # default to accumulating truths
+        if not accum_func:
+            accum_func = lambda prev, v: prev and v
+        self.accum_func = accum_func
+
+    def connect(self, handler):
+        self._handlers.append(handler)
+
+    def disconnect(self, handler):
+        self._handlers.remove(handler)
+
+    def fire(self, *args):
+        ret = None
+        for handler in self._handlers:
+            ret = self.accum_func(ret, handler(*args))
+            if not self.cont_func(ret):
+                break
+        return ret
diff --git a/webrtc/check/validate/scenarios/offer_answer.scenario b/webrtc/check/validate/scenarios/offer_answer.scenario
new file mode 100644 (file)
index 0000000..559f65f
--- /dev/null
@@ -0,0 +1,3 @@
+description, summary="Produce an offer"
+create-offer;
+wait-for-negotiation-state, state="answer-set"
diff --git a/webrtc/check/validate/scenarios/vp8_send_stream.scenario b/webrtc/check/validate/scenarios/vp8_send_stream.scenario
new file mode 100644 (file)
index 0000000..2ec15a5
--- /dev/null
@@ -0,0 +1,5 @@
+description, summary="Send a VP8 stream", handles-state=true
+add-stream, pipeline="videotestsrc is-live=1 ! vp8enc ! rtpvp8pay ! queue"
+set-state, state="playing";
+create-offer;
+wait-for-negotiation-state, state="answer-set"
diff --git a/webrtc/check/validate/signalling.py b/webrtc/check/validate/signalling.py
new file mode 100644 (file)
index 0000000..2a7d938
--- /dev/null
@@ -0,0 +1,161 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+# Boston, MA 02110-1301, USA.
+
+import websockets
+import asyncio
+import ssl
+import os
+import sys
+import threading
+import json
+
+from observer import Signal
+from enums import SignallingState, RemoteState
+
+class AsyncIOThread(threading.Thread):
+    def __init__ (self, loop):
+        threading.Thread.__init__(self)
+        self.loop = loop
+
+    def run(self):
+        asyncio.set_event_loop(self.loop)
+        self.loop.run_forever()
+
+    def stop_thread(self):
+        self.loop.call_soon_threadsafe(self.loop.stop)
+
+class SignallingClientThread(object):
+    def __init__(self, server):
+        self.server = server
+
+        self.wss_connected = Signal()
+        self.message = Signal()
+
+        self._init_async()
+
+    def _init_async(self):
+        self.conn = None
+        self._loop = asyncio.new_event_loop()
+
+        self._thread = AsyncIOThread(self._loop)
+        self._thread.start()
+
+        self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(self._a_loop()))
+
+    async def _a_connect(self):
+        assert not self.conn
+        sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
+        self.conn = await websockets.connect(self.server, ssl=sslctx)
+
+    async def _a_loop(self):
+        await self._a_connect()
+        self.wss_connected.fire()
+        assert self.conn
+        async for message in self.conn:
+            self.message.fire(message)
+
+    def send(self, data):
+        async def _a_send():
+            await self.conn.send(data)
+        self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_send()))
+
+    def stop(self):
+        cond = threading.Condition()
+
+        # asyncio, why you so complicated to stop ?
+        tasks = asyncio.all_tasks(self._loop)
+        async def _a_stop():
+            if self.conn:
+                await self.conn.close()
+            self.conn = None
+
+            to_wait = [t for t in tasks if not t.done()]
+            if to_wait:
+                done, pending = await asyncio.wait(to_wait)
+            with cond:
+                cond.notify()
+
+        self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_stop()))
+        with cond:
+                cond.wait()
+        self._thread.stop_thread()
+        self._thread.join()
+
+class WebRTCSignallingClient(SignallingClientThread):
+    def __init__(self, server, id_):
+        super().__init__(server)
+
+        self.wss_connected.connect(self._on_connection)
+        self.message.connect(self._on_message)
+        self.state = SignallingState.NEW
+        self._state_cond = threading.Condition()
+
+        self.id = id_
+        self._peerid = None
+
+        # override that base class
+        self.connected = Signal()
+        self.session_created = Signal()
+        self.error = Signal()
+        self.have_json = Signal()
+
+    def wait_for_states(self, states):
+        ret = None
+        with self._state_cond:
+            while self.state not in states:
+                self._state_cond.wait()
+            ret = self.state
+        return ret
+
+    def _update_state(self, state):
+        with self._state_cond:
+            if self.state is not SignallingState.ERROR:
+                self.state = state
+            self._state_cond.notify_all()
+
+    def hello(self):
+        self.send('HELLO ' + str(self.id))
+        self.wait_for_states([SignallingState.HELLO])
+        print("signalling-client sent HELLO")
+
+    def create_session(self, peerid):
+        self._peerid = peerid
+        self.send('SESSION {}'.format(self._peerid))
+        self.wait_for_states([SignallingState.SESSION, SignallingState.ERROR])
+        print("signalling-client sent SESSION")
+
+    def _on_connection(self):
+        self._update_state (SignallingState.OPEN)
+
+    def _on_message(self, message):
+        print("signalling-client received", message)
+        if message == 'HELLO':
+            self._update_state (SignallingState.HELLO)
+            self.connected.fire()
+        elif message == 'SESSION_OK':
+            self._update_state (SignallingState.SESSION)
+            self.session_created.fire()
+        elif message.startswith('ERROR'):
+            self._update_state (SignallingState.ERROR)
+            self.error.fire(message)
+        else:
+            msg = json.loads(message)
+            self.have_json.fire(msg)
+        return False
+
diff --git a/webrtc/check/validate/testsuites/__init__.py b/webrtc/check/validate/testsuites/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/webrtc/check/validate/testsuites/webrtc.py b/webrtc/check/validate/testsuites/webrtc.py
new file mode 100644 (file)
index 0000000..c078e5d
--- /dev/null
@@ -0,0 +1,36 @@
+#*- vi:si:et:sw=4:sts=4:ts=4:syntax=python
+#
+# Copyright (c) 2018 Matthew Waters <matthew@centricular.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+# Boston, MA 02110-1301, USA.
+
+"""
+The GstValidate webrtc streams testsuite
+"""
+
+import os
+
+TEST_MANAGER = "webrtc"
+
+BLACKLIST = [
+    ]
+
+def setup_tests(test_manager, options):
+    print("Setting up webrtc tests")
+#    test_manager.set_default_blacklist(BLACKLIST)
+
+    return True
+
diff --git a/webrtc/check/validate/web/single_stream.html b/webrtc/check/validate/web/single_stream.html
new file mode 100644 (file)
index 0000000..9a95fb7
--- /dev/null
@@ -0,0 +1,35 @@
+<!DOCTYPE html>
+<!--
+  vim: set sts=2 sw=2 et :
+
+
+  Demo Javascript app for negotiating and streaming a sendrecv webrtc stream
+  with a GStreamer app. Runs only in passive mode, i.e., responds to offers
+  with answers, exchanges ICE candidates, and streams.
+
+  Author: Nirbheek Chauhan <nirbheek@centricular.com>
+-->
+<html>
+  <head>
+    <style>
+      .error { color: red; }
+    </style>
+    <script src="https://webrtc.github.io/adapter/adapter-latest.js"></script>
+    <script src="webrtc.js"></script>
+    <script>
+      window.onload = websocketServerConnect;
+    </script>
+  </head>
+
+  <body>
+    <div><video id="stream" autoplay>Your browser doesn't support video</video></div>
+    <div>Status: <span id="status">unknown</span></div>
+    <div><textarea id="text" cols=40 rows=4></textarea></div>
+    <div>Our id is <b id="peer-id">unknown</b></div>
+    <br/>
+    <div>
+      <div>getUserMedia constraints being used:</div>
+      <div><textarea id="constraints" cols=40 rows=4></textarea></div>
+    </div>
+  </body>
+</html>
diff --git a/webrtc/check/validate/web/webrtc.js b/webrtc/check/validate/web/webrtc.js
new file mode 100644 (file)
index 0000000..eb7e00f
--- /dev/null
@@ -0,0 +1,320 @@
+/* vim: set sts=4 sw=4 et :
+ *
+ * Demo Javascript app for negotiating and streaming a sendrecv webrtc stream
+ * with a GStreamer app. Runs only in passive mode, i.e., responds to offers
+ * with answers, exchanges ICE candidates, and streams.
+ *
+ * Author: Nirbheek Chauhan <nirbheek@centricular.com>
+ */
+
+// Set this to override the automatic detection in websocketServerConnect()
+var ws_server;
+var ws_port;
+// Set this to use a specific peer id instead of a random one
+var default_peer_id;
+// Override with your own STUN servers if you want
+var rtc_configuration = {iceServers: [{urls: "stun:stun.services.mozilla.com"},
+                                      {urls: "stun:stun.l.google.com:19302"}]};
+// The default constraints that will be attempted. Can be overriden by the user.
+var default_constraints = {video: true, audio: true};
+
+var connect_attempts = 0;
+var peer_connection;
+var send_channel;
+var ws_conn;
+// Promise for local stream after constraints are approved by the user
+var local_stream_promise;
+
+function getOurId() {
+    return Math.floor(Math.random() * (9000 - 10) + 10).toString();
+}
+
+function resetState() {
+    // This will call onServerClose()
+    ws_conn.close();
+}
+
+function handleIncomingError(error) {
+    setError("ERROR: " + error);
+    resetState();
+}
+
+function getVideoElement() {
+    return document.getElementById("stream");
+}
+
+function setStatus(text) {
+    console.log(text);
+    var span = document.getElementById("status")
+    // Don't set the status if it already contains an error
+    if (!span.classList.contains('error'))
+        span.textContent = text;
+}
+
+function setError(text) {
+    console.error(text);
+    var span = document.getElementById("status")
+    span.textContent = text;
+    span.classList.add('error');
+    ws_conn.send(JSON.stringify({'STATE': 'error'}))
+}
+
+function resetVideo() {
+    // Release the webcam and mic
+    if (local_stream_promise)
+        local_stream_promise.then(stream => {
+            if (stream) {
+                stream.getTracks().forEach(function (track) { track.stop(); });
+            }
+        });
+
+    // Reset the video element and stop showing the last received frame
+    var videoElement = getVideoElement();
+    videoElement.pause();
+    videoElement.src = "";
+    videoElement.load();
+}
+
+// SDP offer received from peer, set remote description and create an answer
+function onIncomingSDP(sdp) {
+    peer_connection.setRemoteDescription(sdp).then(() => {
+        setStatus("Remote SDP set");
+        if (sdp.type != "offer")
+            return;
+        setStatus("Got SDP offer");
+        local_stream_promise.then((stream) => {
+            setStatus("Got local stream, creating answer");
+            peer_connection.createAnswer()
+            .then(onLocalDescription).catch(setError);
+        }).catch(setError);
+    }).catch(setError);
+}
+
+// Local description was set, send it to peer
+function onLocalDescription(desc) {
+    console.log("Got local description: " + JSON.stringify(desc));
+    peer_connection.setLocalDescription(desc).then(function() {
+        setStatus("Sending SDP answer");
+        sdp = {'sdp': peer_connection.localDescription}
+        ws_conn.send(JSON.stringify(sdp));
+    });
+}
+
+// ICE candidate received from peer, add it to the peer connection
+function onIncomingICE(ice) {
+    var candidate = new RTCIceCandidate(ice);
+    peer_connection.addIceCandidate(candidate).catch(setError);
+}
+
+function onServerMessage(event) {
+    console.log("Received " + event.data);
+    switch (event.data) {
+        case "HELLO":
+            setStatus("Registered with server, waiting for call");
+            return;
+        default:
+            if (event.data.startsWith("ERROR")) {
+                handleIncomingError(event.data);
+                return;
+            }
+            // Handle incoming JSON SDP and ICE messages
+            try {
+                msg = JSON.parse(event.data);
+            } catch (e) {
+                if (e instanceof SyntaxError) {
+                    handleIncomingError("Error parsing incoming JSON: " + event.data);
+                } else {
+                    handleIncomingError("Unknown error parsing response: " + event.data);
+                }
+                return;
+            }
+
+            // Incoming JSON signals the beginning of a call
+            if (!peer_connection)
+                createCall(msg);
+
+            if (msg.sdp != null) {
+                onIncomingSDP(msg.sdp);
+            } else if (msg.ice != null) {
+                onIncomingICE(msg.ice);
+            } else {
+                handleIncomingError("Unknown incoming JSON: " + msg);
+            }
+    }
+}
+
+function onServerClose(event) {
+    setStatus('Disconnected from server');
+    resetVideo();
+
+    if (peer_connection) {
+        peer_connection.close();
+        peer_connection = null;
+    }
+
+    // Reset after a second
+    window.setTimeout(websocketServerConnect, 1000);
+}
+
+function onServerError(event) {
+    setError("Unable to connect to server, did you add an exception for the certificate?")
+    // Retry after 3 seconds
+    window.setTimeout(websocketServerConnect, 3000);
+}
+
+function getLocalStream() {
+    var constraints;
+    var textarea = document.getElementById('constraints');
+    try {
+        constraints = JSON.parse(textarea.value);
+    } catch (e) {
+        console.error(e);
+        setError('ERROR parsing constraints: ' + e.message + ', using default constraints');
+        constraints = default_constraints;
+    }
+    console.log(JSON.stringify(constraints));
+
+    // Add local stream
+    if (navigator.mediaDevices.getUserMedia) {
+        return navigator.mediaDevices.getUserMedia(constraints);
+    } else {
+        errorUserMediaHandler();
+    }
+}
+
+function websocketServerConnect() {
+    connect_attempts++;
+    if (connect_attempts > 3) {
+        setError("Too many connection attempts, aborting. Refresh page to try again");
+        return;
+    }
+    // Clear errors in the status span
+    var span = document.getElementById("status");
+    span.classList.remove('error');
+    span.textContent = '';
+    // Populate constraints
+    var textarea = document.getElementById('constraints');
+    if (textarea.value == '')
+        textarea.value = JSON.stringify(default_constraints);
+    // Fetch the peer id to use
+
+    var url = new URL(window.location.href);
+
+    peer_id = url.searchParams.get("id");
+    peer_id = peer_id || default_peer_id || getOurId();
+
+    ws_port = ws_port || url.searchParams.get("port");
+    ws_port = ws_port || '8443';
+
+    ws_server = ws_server || url.searchParams.get("server");
+    if (window.location.protocol.startsWith ("file")) {
+        ws_server = ws_server || "127.0.0.1";
+    } else if (window.location.protocol.startsWith ("http")) {
+        ws_server = ws_server || window.location.hostname;
+    } else {
+        throw new Error ("Don't know how to connect to the signalling server with uri" + window.location);
+    }
+
+    var ws_url = 'wss://' + ws_server + ':' + ws_port
+    setStatus("Connecting to server " + ws_url);
+    ws_conn = new WebSocket(ws_url);
+    /* When connected, immediately register with the server */
+    ws_conn.addEventListener('open', (event) => {
+        document.getElementById("peer-id").textContent = peer_id;
+        ws_conn.send('HELLO ' + peer_id);
+        setStatus("Registering with server");
+    });
+    ws_conn.addEventListener('error', onServerError);
+    ws_conn.addEventListener('message', onServerMessage);
+    ws_conn.addEventListener('close', onServerClose);
+}
+
+function onRemoteStreamAdded(event) {
+    videoTracks = event.stream.getVideoTracks();
+    audioTracks = event.stream.getAudioTracks();
+
+    if (videoTracks.length > 0) {
+        console.log('Incoming stream: ' + videoTracks.length + ' video tracks and ' + audioTracks.length + ' audio tracks');
+        getVideoElement().srcObject = event.stream;
+        ws_conn.send(JSON.stringify({'STATE': 'remote-stream-received'}))
+    } else {
+        handleIncomingError('Stream with unknown tracks added, resetting');
+    }
+}
+
+function errorUserMediaHandler() {
+    setError("Browser doesn't support getUserMedia!");
+}
+
+const handleDataChannelOpen = (event) =>{
+    console.log("dataChannel.OnOpen", event);
+};
+
+const handleDataChannelMessageReceived = (event) =>{
+    console.log("dataChannel.OnMessage:", event, event.data.type);
+
+    setStatus("Received data channel message");
+    if (typeof event.data === 'string' || event.data instanceof String) {
+        console.log('Incoming string message: ' + event.data);
+        textarea = document.getElementById("text")
+        textarea.value = textarea.value + '\n' + event.data
+    } else {
+        console.log('Incoming data message');
+    }
+    send_channel.send("Hi! (from browser)");
+};
+
+const handleDataChannelError = (error) =>{
+    console.log("dataChannel.OnError:", error);
+};
+
+const handleDataChannelClose = (event) =>{
+    console.log("dataChannel.OnClose", event);
+};
+
+function onDataChannel(event) {
+    setStatus("Data channel created");
+    let receiveChannel = event.channel;
+    receiveChannel.onopen = handleDataChannelOpen;
+    receiveChannel.onmessage = handleDataChannelMessageReceived;
+    receiveChannel.onerror = handleDataChannelError;
+    receiveChannel.onclose = handleDataChannelClose;
+}
+
+function createCall(msg) {
+    // Reset connection attempts because we connected successfully
+    connect_attempts = 0;
+
+    console.log('Creating RTCPeerConnection');
+
+    peer_connection = new RTCPeerConnection(rtc_configuration);
+    send_channel = peer_connection.createDataChannel('label', null);
+    send_channel.onopen = handleDataChannelOpen;
+    send_channel.onmessage = handleDataChannelMessageReceived;
+    send_channel.onerror = handleDataChannelError;
+    send_channel.onclose = handleDataChannelClose;
+    peer_connection.ondatachannel = onDataChannel;
+    peer_connection.onaddstream = onRemoteStreamAdded;
+    /* Send our video/audio to the other peer */
+    local_stream_promise = getLocalStream().then((stream) => {
+        console.log('Adding local stream');
+        peer_connection.addStream(stream);
+        return stream;
+    }).catch(setError);
+
+    if (!msg.sdp) {
+        console.log("WARNING: First message wasn't an SDP message!?");
+    }
+
+    peer_connection.onicecandidate = (event) => {
+       // We have a candidate, send it to the remote party with the
+       // same uuid
+       if (event.candidate == null) {
+            console.log("ICE Candidate was null, done");
+            return;
+       }
+       ws_conn.send(JSON.stringify({'ice': event.candidate}));
+    };
+
+    setStatus("Created peer connection for call, waiting for SDP");
+}
diff --git a/webrtc/check/validate/webrtc_validate.py b/webrtc/check/validate/webrtc_validate.py
new file mode 100644 (file)
index 0000000..6215a0a
--- /dev/null
@@ -0,0 +1,203 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+# Boston, MA 02110-1301, USA.
+
+import os
+import sys
+import argparse
+import json
+
+from signalling import WebRTCSignallingClient
+from actions import register_action_types, ActionObserver
+from client import WebRTCClient
+from browser import Browser, create_driver
+from enums import SignallingState, NegotiationState, RemoteState
+
+import gi
+gi.require_version("GLib", "2.0")
+from gi.repository import GLib
+gi.require_version("Gst", "1.0")
+from gi.repository import Gst
+gi.require_version("GstWebRTC", "1.0")
+from gi.repository import GstWebRTC
+gi.require_version("GstSdp", "1.0")
+from gi.repository import GstSdp
+gi.require_version("GstValidate", "1.0")
+from gi.repository import GstValidate
+
+class WebRTCApplication(object):
+    def __init__(self, server, id_, peerid, scenario_name, browser_name, html_source):
+        self.server = server
+        self.peerid = peerid
+        self.html_source = html_source
+        self.id = id_
+        self.scenario_name = scenario_name
+        self.browser_name = browser_name
+
+    def _init_validate(self, scenario_file):
+        self.runner = GstValidate.Runner.new()
+        self.monitor = GstValidate.Monitor.factory_create(
+            self.client.pipeline, self.runner, None)
+        self._scenario = GstValidate.Scenario.factory_create(
+            self.runner, self.client.pipeline, self.scenario_name)
+        self._scenario.connect("done", self._on_scenario_done)
+        self._scenario.props.execute_on_idle = True
+        if not self._scenario.props.handles_states:
+            self.client.pipeline.set_state(Gst.State.PLAYING)
+
+    def _on_scenario_done(self, scenario):
+        self.quit()
+
+    def _connect_actions(self):
+        def create_offer():
+            self.client.create_offer(None)
+            return GstValidate.ActionReturn.OK
+        self.actions.create_offer.connect(create_offer)
+
+        def wait_for_negotiation_state(state):
+            states = [state, NegotiationState.ERROR]
+            state = self.client.wait_for_negotiation_states(states)
+            return GstValidate.ActionReturn.OK if state != RemoteState.ERROR else GstValidate.ActionReturn.ERROR
+        self.actions.wait_for_negotiation_state.connect(wait_for_negotiation_state)
+
+        def add_stream(pipeline):
+            self.client.add_stream(pipeline)
+            return GstValidate.ActionReturn.OK
+        self.actions.add_stream.connect(add_stream)
+
+    def _connect_client_observer(self):
+        def on_offer_created(offer):
+            text = offer.sdp.as_text()
+            msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
+            self.signalling.send(msg)
+        self.client.on_offer_created.connect(on_offer_created)
+
+        def on_ice_candidate(mline, candidate):
+            msg = json.dumps({'ice': {'sdpMLineIndex': str(mline), 'candidate' : candidate}})
+            self.signalling.send(msg)
+        self.client.on_ice_candidate.connect(on_ice_candidate)
+
+        def on_pad_added(pad):
+            if pad.get_direction() != Gst.PadDirection.SRC:
+                return
+            self.client.add_stream_with_pad('fakesink', pad)
+        self.client.on_pad_added.connect(on_pad_added)
+
+    def _connect_signalling_observer(self):
+        def have_json(msg):
+            if 'sdp' in msg:
+                sdp = msg['sdp']
+                res, sdpmsg = GstSdp.SDPMessage.new()
+                GstSdp.sdp_message_parse_buffer(bytes(sdp['sdp'].encode()), sdpmsg)
+                sdptype = GstWebRTC.WebRTCSDPType.ANSWER if sdp['type'] == 'answer' else GstWebRTC.WebRTCSDPType.OFFER
+                desc = GstWebRTC.WebRTCSessionDescription.new(sdptype, sdpmsg)
+                self.client.set_remote_description(desc)
+            elif 'ice' in msg:
+                ice = msg['ice']
+                candidate = ice['candidate']
+                sdpmlineindex = ice['sdpMLineIndex']
+                self.client.add_ice_candidate(sdpmlineindex, candidate)
+        self.signalling.have_json.connect(have_json)
+
+        def error(msg):
+            # errors are unexpected
+            GLib.idle_add(self.quit)
+            GLib.idle_add(sys.exit, -20)
+        self.signalling.error.connect(error)
+
+    def _init(self):
+        self.main_loop = GLib.MainLoop()
+
+        self.client = WebRTCClient()
+        self._connect_client_observer()
+
+        self.actions = ActionObserver()
+        register_action_types(self.actions)
+        self._connect_actions()
+
+        self.signalling = WebRTCSignallingClient(self.server, self.id)
+        self._connect_signalling_observer()
+        self.signalling.wait_for_states([SignallingState.OPEN])
+        self.signalling.hello()
+
+        self.browser = Browser(create_driver(self.browser_name), self.html_source)
+
+        browser_id = self.browser.get_peer_id ()
+        assert browser_id == self.peerid
+
+        self.signalling.create_session(self.peerid)
+
+        self._init_validate(self.scenario_name)
+        print("app initialized")
+
+    def quit(self):
+        # Stop signalling first so asyncio doesn't keep us alive on weird failures
+        self.signalling.stop()
+        self.browser.driver.quit()
+        self.client.stop()
+        self.main_loop.quit()
+
+    def run(self):
+        try:
+            self._init()
+            self.main_loop.run()
+        except:
+            self.quit()
+            raise
+
+def parse_options():
+    parser = argparse.ArgumentParser()
+    parser.add_argument('id', help='ID of this client', type=int)
+    parser.add_argument('--peer-id', help='ID of the peer to connect to', type=int)
+    parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
+    parser.add_argument('--html-source', help='HTML page to open in the browser', default=None)
+    parser.add_argument('--scenario', help='Scenario file to execute', default=None)
+    parser.add_argument('--browser', help='Browser name to use', default=None)
+    return parser.parse_args()
+
+def init():
+    Gst.init(None)
+    GstValidate.init()
+
+    args = parse_options()
+    if not args.scenario:
+        args.scenario = os.environ.get('GST_VALIDATE_SCENARIO', None)
+    # if we have both manual scenario creation and env, then the scenario
+    # is executed twice...
+    if 'GST_VALIDATE_SCENARIO' in os.environ:
+        del os.environ['GST_VALIDATE_SCENARIO']
+    if not args.scenario:
+        raise ValueError("No scenario file provided")
+    if not args.server:
+        raise ValueError("No server location provided")
+    if not args.peer_id:
+        raise ValueError("No peer id provided")
+    if not args.html_source:
+        raise ValueError("No HTML page provided")
+    if not args.browser:
+        raise ValueError("No Browser provided")
+
+    return args
+
+def run():
+    args = init()
+    w = WebRTCApplication (args.server, args.id, args.peer_id, args.scenario, args.browser, args.html_source)
+    return w.run()
+
+if __name__ == "__main__":
+    sys.exit(run())
index 3ee35f7..db32d48 100644 (file)
@@ -37,3 +37,5 @@ endif
 subdir('multiparty-sendrecv')
 subdir('signalling')
 subdir('sendrecv')
+
+subdir('check')
index 85950ad..5c9509c 100644 (file)
@@ -3,5 +3,3 @@ executable('webrtc-sendrecv',
             dependencies : [gst_dep, gstsdp_dep, gstwebrtc_dep, libsoup_dep, json_glib_dep ])
 
 webrtc_py = files('webrtc_sendrecv.py')
-
-subdir('tests')