check/validate: a few more tests and improvements
authorMatthew Waters <matthew@centricular.com>
Wed, 12 Feb 2020 10:56:34 +0000 (21:56 +1100)
committerMatthew Waters <ystreet00@gmail.com>
Wed, 6 May 2020 06:01:57 +0000 (06:01 +0000)
Tests a matrix of options:
- local/remote negotiation initiator
- 'most' bundle-policy combinations (some combinations will never work)
- firefox or chrome browser

Across 4 test scenarios:
- simple negotiation with default browser streams (or none if gstreamer
  initiates)
- sending a vp8 stream
- opening a data channel
- sending a message over the data channel

for a total of 112 tests!

29 files changed:
webrtc/check/validate/actions.py
webrtc/check/validate/apps/gstwebrtc.py
webrtc/check/validate/browser.py
webrtc/check/validate/client.py
webrtc/check/validate/enums.py
webrtc/check/validate/observer.py
webrtc/check/validate/scenarios/bundle_local_balanced_remote_balanced/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_bundle/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_compat/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_balanced/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_bundle/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_compat/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_max_compat_remote_balanced/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_bundle/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_compat/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_none_remote_balanced/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_none_remote_max_bundle/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/bundle_local_none_remote_max_compat/bundle_policy.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/local_initiates_negotiation/negotiation_initiator.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/offer_answer.scenario
webrtc/check/validate/scenarios/open_data_channel.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/remote_initiates_negotiation/negotiation_initiator.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/send_data_channel_string.scenario [new file with mode: 0644]
webrtc/check/validate/scenarios/vp8_send_stream.scenario
webrtc/check/validate/signalling.py
webrtc/check/validate/testsuites/webrtc.py
webrtc/check/validate/web/single_stream.html
webrtc/check/validate/web/webrtc.js
webrtc/check/validate/webrtc_validate.py

index 6ced2cf..98a3cb8 100644 (file)
@@ -1,6 +1,4 @@
-#!/usr/bin/env python3
-#
-# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
 #
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -22,6 +20,11 @@ gi.require_version("GstValidate", "1.0")
 from gi.repository import GstValidate
 
 from observer import Signal
+from enums import Actions
+
+import logging
+
+l = logging.getLogger(__name__)
 
 class ActionObserver(object):
     def __init__(self):
@@ -42,36 +45,62 @@ class ActionObserver(object):
 
             return val
 
-        self.create_offer = Signal(_action_continue, _action_accum)
-        self.wait_for_negotiation_state = Signal(_action_continue, _action_accum)
-        self.add_stream = Signal(_action_continue, _action_accum)
-        self.wait_for_remote_state = Signal(_action_continue, _action_accum)
+        self.action = Signal(_action_continue, _action_accum)
+
+    def _action(self, scenario, action):
+        l.debug('executing action: ' + str(action.structure))
+        return self.action.fire (Actions(action.structure.get_name()), action)
 
-    def _create_offer(self, scenario, action):
-        print("action create-offer")
-        return self.create_offer.fire()
-    def _wait_for_negotiation_state(self, scenario, action):
-        state = action.structure["state"]
-        print("action wait-for-negotiation-state", state)
-        return self.wait_for_negotiation_state.fire(state)
-    def _add_stream(self, scenario, action):
-        pipeline = action.structure["pipeline"]
-        print("action add-stream", pipeline)
-        return self.add_stream.fire(pipeline)
 
-def register_action_types(observer):
-    if not isinstance(observer, ActionObserver):
-        raise TypeError
+    def register_action_types(observer):
+        if not isinstance(observer, ActionObserver):
+            raise TypeError
 
-    GstValidate.register_action_type("create-offer", "webrtc",
-                                     observer._create_offer, None,
-                                     "Instruct a create-offer to commence",
-                                     GstValidate.ActionTypeFlags.NONE)
-    GstValidate.register_action_type("wait-for-negotiation-state", "webrtc",
-                                     observer._wait_for_negotiation_state, None,
-                                     "Wait for a specific negotiation state to be reached",
-                                     GstValidate.ActionTypeFlags.NONE)
-    GstValidate.register_action_type("add-stream", "webrtc",
-                                     observer._add_stream, None,
-                                     "Add a stream to the webrtcbin",
-                                     GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.CREATE_OFFER.value,
+                                        "webrtc", observer._action, None,
+                                         "Instruct a create-offer to commence",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.CREATE_ANSWER.value,
+                                        "webrtc", observer._action, None,
+                                         "Create answer",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.WAIT_FOR_NEGOTIATION_STATE.value,
+                                        "webrtc", observer._action, None,
+                                         "Wait for a specific negotiation state to be reached",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.ADD_STREAM.value,
+                                        "webrtc", observer._action, None,
+                                         "Add a stream to the webrtcbin",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.ADD_DATA_CHANNEL.value,
+                                        "webrtc", observer._action, None,
+                                         "Add a data channel to the webrtcbin",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.SEND_DATA_CHANNEL_STRING.value,
+                                        "webrtc", observer._action, None,
+                                         "Send a message using a data channel",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.WAIT_FOR_DATA_CHANNEL_STATE.value,
+                                        "webrtc", observer._action, None,
+                                         "Wait for data channel to reach state",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.CLOSE_DATA_CHANNEL.value,
+                                        "webrtc", observer._action, None,
+                                         "Close a data channel",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.WAIT_FOR_DATA_CHANNEL.value,
+                                        "webrtc", observer._action, None,
+                                         "Wait for a data channel to appear",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.WAIT_FOR_DATA_CHANNEL_STRING.value,
+                                        "webrtc", observer._action, None,
+                                         "Wait for a data channel to receive a message",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.WAIT_FOR_NEGOTIATION_NEEDED.value,
+                                        "webrtc", observer._action, None,
+                                         "Wait for a the on-negotiation-needed signal to fire",
+                                         GstValidate.ActionTypeFlags.NONE)
+        GstValidate.register_action_type(Actions.SET_WEBRTC_OPTIONS.value,
+                                        "webrtc", observer._action, None,
+                                         "Set some webrtc options",
+                                         GstValidate.ActionTypeFlags.NONE)
index 693ac4a..27d4088 100644 (file)
@@ -1,6 +1,4 @@
-#*- vi:si:et:sw=4:sts=4:ts=4:syntax=python
-#
-# Copyright (c) 2018 Matthew Waters <matthew@centricular.com>
+# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
 #
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -21,26 +19,82 @@ import inspect
 import os
 import sys
 import shutil
+import itertools
 
 import tempfile
 
-from launcher.baseclasses import TestsManager, TestsGenerator, GstValidateTest, ScenarioManager
+from launcher.baseclasses import TestsManager, GstValidateTest, ScenarioManager
 from launcher.utils import DEFAULT_TIMEOUT
 
 DEFAULT_BROWSERS = ['firefox', 'chrome']
+
+# list of scenarios. These are the names of the actual scenario files stored
+# on disk.
 DEFAULT_SCENARIOS = [
-                "offer_answer",
-                "vp8_send_stream"
-                ]
-
-BROWSER_SCENARIO_BLACKLISTS = {
-    'firefox' : [
-        'offer_answer', # fails to accept an SDP without any media sections
-    ],
-    'chrome' : [
-    ],
+        "offer_answer",
+        "vp8_send_stream",
+        "open_data_channel",
+        "send_data_channel_string",
+    ]
+
+# various configuration changes that are included from other scenarios.
+# key is the name of the override used in the name of the test
+# value is the subdirectory where the override is placed
+# changes some things about the test like:
+# - who initiates the negotiation
+# - bundle settings
+SCENARIO_OVERRIDES = {
+    # name : directory
+
+    # who starts the negotiation
+    'local' : 'local_initiates_negotiation',
+    'remote' : 'remote_initiates_negotiation',
+
+    # bundle-policy configuration
+    # XXX: webrtcbin's bundle-policy=none is not part of the spec
+    'none_compat' : 'bundle_local_none_remote_max_compat',
+    'none_balanced' : 'bundle_local_none_remote_balanced',
+    'none_bundle' : 'bundle_local_none_remote_max_bundle',
+    'compat_compat' : 'bundle_local_max_compat_remote_max_compat',
+    'compat_balanced' : 'bundle_local_max_compat_remote_balanced',
+    'compat_bundle' : 'bundle_local_max_compat_remote_max_bundle',
+    'balanced_compat' : 'bundle_local_balanced_remote_max_compat',
+    'balanced_balanced' : 'bundle_local_balanced_remote_balanced',
+    'balanced_bundle' : 'bundle_local_balanced_remote_bundle',
+    'bundle_compat' : 'bundle_local_max_bundle_remote_max_compat',
+    'bundle_balanced' : 'bundle_local_max_bundle_remote_balanced',
+    'bundle_bundle' : 'bundle_local_max_bundle_remote_max_bundle',
 }
 
+bundle_options = ['compat', 'balanced', 'bundle']
+
+# Given an override, these are the choices to choose from.  Each choice is a
+# separate test
+OVERRIDE_CHOICES = {
+    'initiator' : ['local', 'remote'],
+    'bundle' : ['_'.join(opt) for opt in itertools.product(['none'] + bundle_options, bundle_options)],
+}
+
+# Which scenarios support which override.  All the overrides will be chosen
+SCENARIO_OVERRIDES_SUPPORTED = {
+    "offer_answer" : ['initiator', 'bundle'],
+    "vp8_send_stream" : ['initiator', 'bundle'],
+    "open_data_channel" : ['initiator', 'bundle'],
+    "send_data_channel_string" : ['initiator', 'bundle'],
+}
+
+# Things that don't work for some reason or another.
+DEFAULT_BLACKLIST = [
+    (r"webrtc\.firefox\.local\..*offer_answer",
+     "Firefox doesn't like a SDP without any media"),
+    (r"webrtc.*remote.*vp8_send_stream",
+     "We can't match payload types with a remote offer and a sending stream"),
+    (r"webrtc.*\.balanced_.*",
+     "webrtcbin doesn't implement bundle-policy=balanced"),
+    (r"webrtc.*\.none_bundle.*",
+     "Browsers want a BUNDLE group if in max-bundle mode"),
+]
+
 class MutableInt(object):
     def __init__(self, value):
         self.value = value
@@ -66,11 +120,12 @@ class GstWebRTCTest(GstValidateTest):
 
     @classmethod
     def __get_available_peer_id(cls):
+        # each connection uses two peer ids
         peerid = cls.__last_id.value
         cls.__last_id.value += 2
         return peerid
 
-    def __init__(self, classname, tests_manager, scenario, browser, timeout=DEFAULT_TIMEOUT):
+    def __init__(self, classname, tests_manager, scenario, browser, scenario_override_includes=None, timeout=DEFAULT_TIMEOUT):
         super().__init__("python3",
                         classname,
                         tests_manager.options,
@@ -82,6 +137,7 @@ class GstWebRTCTest(GstValidateTest):
         self.current_file_path = os.path.dirname (os.path.abspath (filename))
         self.certdir = None
         self.browser = browser
+        self.scenario_override_includes = scenario_override_includes
 
     def launch_server(self):
         if self.options.redirect_logs == 'stdout':
@@ -138,6 +194,8 @@ class GstWebRTCTest(GstValidateTest):
         html_page = os.path.join(self.current_file_path, '..', 'web', 'single_stream.html')
         html_params = '?server=127.0.0.1&port=' + str(self.server_port) + '&id=' + str(web_id)
         self.add_arguments("file://" + html_page + html_params)
+        self.add_arguments("--name")
+        self.add_arguments(self.classname)
         self.add_arguments('--peer-id')
         self.add_arguments(str(web_id))
         self.add_arguments(str(gst_id))
@@ -154,10 +212,27 @@ class GstWebRTCTest(GstValidateTest):
             self.__used_ports.remove(self.server_port)
             if self.certdir:
                 shutil.rmtree(self.certdir, ignore_errors=True)
-                self.certdir
 
         return res
 
+    def get_subproc_env(self):
+        env = super().get_subproc_env()
+        if not self.scenario_override_includes:
+            return env
+
+        # this feels gross...
+        paths = env.get('GST_VALIDATE_SCENARIOS_PATH', '').split(os.pathsep)
+        new_paths = []
+        for p in paths:
+            new_paths.append(p)
+            for override_path in self.scenario_override_includes:
+                new_p = os.path.join(p, override_path)
+                if os.path.exists (new_p):
+                    new_paths.append(new_p)
+        env['GST_VALIDATE_SCENARIOS_PATH'] = os.pathsep.join(new_paths)
+
+        return env
+
 class GstWebRTCTestsManager(TestsManager):
     scenarios_manager = ScenarioManager()
     name = "webrtc"
@@ -165,23 +240,50 @@ class GstWebRTCTestsManager(TestsManager):
     def __init__(self):
         super(GstWebRTCTestsManager, self).__init__()
         self.loading_testsuite = self.name
+        self._scenarios = []
+
+    def add_scenarios(self, scenarios):
+        if isinstance(scenarios, list):
+            self._scenarios.extend(scenarios)
+        else:
+            self._scenarios.append(scenarios)
 
-    def webrtc_server_address(self):
-        return "wss://127.0.0.1:8443"
+        self._scenarios = list(set(self._scenarios))
+
+    def set_scenarios(self, scenarios):
+        self._scenarios = []
+        self.add_scenarios(scenarios)
+
+    def get_scenarios(self):
+        return self._scenarios
 
     def populate_testsuite(self):
         self.add_scenarios (DEFAULT_SCENARIOS)
+        self.set_default_blacklist(DEFAULT_BLACKLIST)
+
+    def list_tests(self):
+        if self.tests:
+            return self.tests
 
         scenarios = [(scenario_name, self.scenarios_manager.get_scenario(scenario_name))
                      for scenario_name in self.get_scenarios()]
 
-        for name, scenario in scenarios:
-            if not scenario:
-                self.warning("Could not find scenario %s" % name)
-                continue
-            for browser in DEFAULT_BROWSERS:
-                if name in BROWSER_SCENARIO_BLACKLISTS[browser]:
-                    self.warning('Skipping broken test', name, 'for browser', browser)
+        for browser in DEFAULT_BROWSERS:
+            for name, scenario in scenarios:
+                if not scenario:
+                    self.warning("Could not find scenario %s" % name)
                     continue
-                classname = browser + '_' + name
-                self.add_test(GstWebRTCTest(classname, self, scenario, browser))
+                if not SCENARIO_OVERRIDES_SUPPORTED[name]:
+                    # no override choices supported
+                    classname = browser + '.' + name
+                    print ("adding", classname)
+                    self.add_test(GstWebRTCTest(classname, self, scenario, browser))
+                else:
+                    for overrides in itertools.product(*[OVERRIDE_CHOICES[c] for c in SCENARIO_OVERRIDES_SUPPORTED[name]]):
+                        oname = '.'.join (overrides)
+                        opaths = [SCENARIO_OVERRIDES[p] for p in overrides]
+                        classname = browser + '.' + oname + '.' + name
+                        print ("adding", classname)
+                        self.add_test(GstWebRTCTest(classname, self, scenario, browser, opaths))
+
+        return self.tests
index 536a031..7db9b0b 100644 (file)
@@ -1,6 +1,4 @@
-#!/usr/bin/env python3
-#
-# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
 #
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 # Boston, MA 02110-1301, USA.
 
+import logging
 from selenium import webdriver
 from selenium.webdriver.support.wait import WebDriverWait
 from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
 from selenium.webdriver.chrome.options import Options as COptions
 
+l = logging.getLogger(__name__)
+
 def create_firefox_driver():
     capabilities = webdriver.DesiredCapabilities().FIREFOX.copy()
     capabilities['acceptSslCerts'] = True
@@ -39,6 +40,9 @@ def create_chrome_driver():
     copts.add_argument('--use-fake-ui-for-media-stream')
     copts.add_argument('--use-fake-device-for-media-stream')
     copts.add_argument('--enable-blink-features=RTCUnifiedPlanByDefault')
+    # XXX: until libnice can deal with mdns candidates
+    local_state = {"enabled_labs_experiments" : ["enable-webrtc-hide-local-ips-with-mdns@2"] }
+    copts.add_experimental_option("localState", {"browser" : local_state})
 
     return webdriver.Chrome(options=copts, desired_capabilities=capabilities)
 
@@ -48,7 +52,7 @@ def create_driver(name):
     elif name == 'chrome':
         return create_chrome_driver()
     else:
-        raise ValueError("Unknown browser name " + name)
+        raise ValueError("Unknown browser name \'" + name + "\'")
 
 def valid_int(n):
     if isinstance(n, int):
@@ -62,18 +66,23 @@ def valid_int(n):
     return False
 
 class Browser(object):
-    def __init__(self, driver, html_source):
+    """
+    A browser as connected through selenium.
+    """
+    def __init__(self, driver):
+        l.info('Using driver \'' + driver.name + '\' with capabilities ' + str(driver.capabilities))
         self.driver = driver
-        self.html_source = html_source
+
+    def open(self, html_source):
+        self.driver.get(html_source)
 
     def get_peer_id(self):
-        self.driver.get(self.html_source)
-        peer_id = WebDriverWait(self.driver, 10).until(
+        peer_id = WebDriverWait(self.driver, 5).until(
             lambda x: x.find_element_by_id('peer-id'),
             message='Peer-id element was never seen'
         )
-        WebDriverWait (self.driver, 10).until(
+        WebDriverWait (self.driver, 5).until(
             lambda x: valid_int(peer_id.text),
             message='Peer-id never became a number'
         )
-        return int(peer_id.text)        
+        return int(peer_id.text)
index 24c8bcd..4a19982 100644 (file)
@@ -1,6 +1,4 @@
-#!/usr/bin/env python3
-#
-# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
 #
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -20,8 +18,8 @@
 import threading
 import copy
 
-from observer import Signal
-from enums import NegotiationState
+from observer import Signal, WebRTCObserver, DataChannelObserver, StateObserver
+from enums import NegotiationState, DataChannelState
 
 import gi
 gi.require_version("Gst", "1.0")
@@ -33,10 +31,13 @@ from gi.repository import GstSdp
 gi.require_version("GstValidate", "1.0")
 from gi.repository import GstValidate
 
-class WebRTCBinObserver(object):
+
+class WebRTCBinObserver(WebRTCObserver):
+    """
+    Observe a webrtcbin element.
+    """
     def __init__(self, element):
-        self.state = NegotiationState.NEW
-        self.state_cond = threading.Condition()
+        WebRTCObserver.__init__(self)
         self.element = element
         self.signal_handlers = []
         self.signal_handlers.append(element.connect("on-negotiation-needed", self._on_negotiation_needed))
@@ -44,17 +45,16 @@ class WebRTCBinObserver(object):
         self.signal_handlers.append(element.connect("pad-added", self._on_pad_added))
         self.signal_handlers.append(element.connect("on-new-transceiver", self._on_new_transceiver))
         self.signal_handlers.append(element.connect("on-data-channel", self._on_data_channel))
-        self.on_offer_created = Signal()
-        self.on_answer_created = Signal()
+        self.negotiation_needed = 0
+        self._negotiation_needed_observer = StateObserver(self, "negotiation_needed", threading.Condition())
         self.on_negotiation_needed = Signal()
         self.on_ice_candidate = Signal()
         self.on_pad_added = Signal()
         self.on_new_transceiver = Signal()
-        self.on_data_channel = Signal()
-        self.on_local_description_set = Signal()
-        self.on_remote_description_set = Signal()
 
     def _on_negotiation_needed(self, element):
+        self.negotiation_needed += 1
+        self._negotiation_needed_observer.update(self.negotiation_needed)
         self.on_negotiation_needed.fire()
 
     def _on_ice_candidate(self, element, mline, candidate):
@@ -63,35 +63,19 @@ class WebRTCBinObserver(object):
     def _on_pad_added(self, element, pad):
         self.on_pad_added.fire(pad)
 
-    def _on_local_description_set(self, promise, desc):
-        self._update_negotiation_from_description_state(desc)
-        self.on_local_description_set.fire(desc)
-
-    def _on_remote_description_set(self, promise, desc):
-        self._update_negotiation_from_description_state(desc)
-        self.on_remote_description_set.fire(desc)
+    def _on_description_set(self, promise, desc):
+        new_state = self._update_negotiation_from_description_state(desc)
+        if new_state == NegotiationState.OFFER_SET:
+            self.on_offer_set.fire (desc)
+        elif new_state == NegotiationState.ANSWER_SET:
+            self.on_answer_set.fire (desc)
 
     def _on_new_transceiver(self, element, transceiver):
         self.on_new_transceiver.fire(transceiver)
 
-    def _on_data_channel(self, element):
-        self.on_data_channel.fire(desc)
-
-    def _update_negotiation_state(self, new_state):
-        with self.state_cond:
-            old_state = self.state
-            self.state = new_state
-            self.state_cond.notify_all()
-            print ("observer updated state to", new_state)
-
-    def wait_for_negotiation_states(self, states):
-        ret = None
-        with self.state_cond:
-            while self.state not in states:
-                self.state_cond.wait()
-            print ("observer waited for", states)
-            ret = self.state
-        return ret
+    def _on_data_channel(self, element, channel):
+        observer = WebRTCBinDataChannelObserver(channel, channel.props.label, 'remote')
+        self.add_channel(observer)
 
     def _update_negotiation_from_description_state(self, desc):
         new_state = None
@@ -101,6 +85,7 @@ class WebRTCBinObserver(object):
             new_state = NegotiationState.ANSWER_SET
         assert new_state is not None
         self._update_negotiation_state(new_state)
+        return new_state
 
     def _deepcopy_session_description(self, desc):
         # XXX: passing 'offer' to both a promise and an action signal without
@@ -115,11 +100,10 @@ class WebRTCBinObserver(object):
         offer = reply['offer']
 
         new_offer = self._deepcopy_session_description(offer)
-        promise = Gst.Promise.new_with_change_func(self._on_local_description_set, new_offer)
+        promise = Gst.Promise.new_with_change_func(self._on_description_set, new_offer)
 
         new_offer = self._deepcopy_session_description(offer)
         self.element.emit('set-local-description', new_offer, promise)
-
         self.on_offer_created.fire(offer)
 
     def _on_answer_created(self, promise, element):
@@ -128,11 +112,10 @@ class WebRTCBinObserver(object):
         offer = reply['answer']
 
         new_offer = self._deepcopy_session_description(offer)
-        promise = Gst.Promise.new_with_change_func(self._on_local_description_set, new_offer)
+        promise = Gst.Promise.new_with_change_func(self._on_description_set, new_offer)
 
         new_offer = self._deepcopy_session_description(offer)
         self.element.emit('set-local-description', new_offer, promise)
-
         self.on_answer_created.fire(offer)
 
     def create_offer(self, options=None):
@@ -144,13 +127,24 @@ class WebRTCBinObserver(object):
         self.element.emit('create-answer', options, promise)
 
     def set_remote_description(self, desc):
-        promise = Gst.Promise.new_with_change_func(self._on_remote_description_set, desc)
+        promise = Gst.Promise.new_with_change_func(self._on_description_set, desc)
         self.element.emit('set-remote-description', desc, promise)
 
     def add_ice_candidate(self, mline, candidate):
         self.element.emit('add-ice-candidate', mline, candidate)
 
+    def add_data_channel(self, ident):
+        channel = self.element.emit('create-data-channel', ident, None)
+        observer = WebRTCBinDataChannelObserver(channel, ident, 'local')
+        self.add_channel(observer)
+
+    def wait_for_negotiation_needed(self, generation):
+        self._negotiation_needed_observer.wait_for ((generation,))
+
 class WebRTCStream(object):
+    """
+    An stream attached to a webrtcbin element
+    """
     def __init__(self):
         self.bin = None
 
@@ -189,6 +183,10 @@ class WebRTCStream(object):
         self.bin.sync_state_with_parent()
 
 class WebRTCClient(WebRTCBinObserver):
+    """
+    Client for performing webrtc operations.  Controls the pipeline that
+    contains a webrtcbin element.
+    """
     def __init__(self):
         self.pipeline = Gst.Pipeline(None)
         self.webrtcbin = Gst.ElementFactory.make("webrtcbin")
@@ -210,3 +208,42 @@ class WebRTCClient(WebRTCBinObserver):
         stream.set_description(desc)
         stream.add_and_link_to (self.pipeline, self.webrtcbin, pad)
         self._streams.append(stream)
+
+    def set_options (self, opts):
+        if opts.has_field("local-bundle-policy"):
+            self.webrtcbin.props.bundle_policy = opts["local-bundle-policy"]
+
+
+class WebRTCBinDataChannelObserver(DataChannelObserver):
+    """
+    Data channel observer for a webrtcbin data channel.
+    """
+    def __init__(self, target, ident, location):
+        super().__init__(ident, location)
+        self.target = target
+        self.signal_handlers = []
+        self.signal_handlers.append(target.connect("on-open", self._on_open))
+        self.signal_handlers.append(target.connect("on-close", self._on_close))
+        self.signal_handlers.append(target.connect("on-error", self._on_error))
+        self.signal_handlers.append(target.connect("on-message-data", self._on_message_data))
+        self.signal_handlers.append(target.connect("on-message-string", self._on_message_string))
+        self.signal_handlers.append(target.connect("on-buffered-amount-low", self._on_buffered_amount_low))
+
+    def _on_open(self, channel):
+        self._update_state (DataChannelState.OPEN)
+    def _on_close(self, channel):
+        self._update_state (DataChannelState.CLOSED)
+    def _on_error(self, channel):
+        self._update_state (DataChannelState.ERROR)
+    def _on_message_data(self, channel, data):
+        self.data.append(msg)
+    def _on_message_string(self, channel, msg):
+        self.got_message (msg)
+    def _on_buffered_amount_low(self, channel):
+        pass
+
+    def close(self):
+        self.target.emit('close')
+
+    def send_string (self, msg):
+        self.target.emit('send-string', msg)
index 14bc31a..a23d2c9 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
 #
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 # Boston, MA 02110-1301, USA.
 
-class SignallingState(object):
+from enum import Enum, unique
+
+@unique
+class SignallingState(Enum):
+    """
+    State of the signalling protocol.
+    """
     NEW = "new"                 # no connection has been made
+    ERROR = "error"             # an error was thrown.  overrides all others
     OPEN = "open"               # websocket connection is open
-    ERROR = "error"             # and error was thrown.  overrides all others
     HELLO = "hello"             # hello was sent and received
     SESSION = "session"         # session setup was sent and received
 
-class NegotiationState(object):
-    NEW = "new"
-    ERROR = "error"
-    NEGOTIATION_NEEDED = "negotiation-needed"
-    OFFER_CREATED = "offer-created"
-    ANSWER_CREATED = "answer-created"
-    OFFER_SET = "offer-set"
-    ANSWER_SET = "answer-set"
+@unique
+class NegotiationState(Enum):
+    """
+    State of the webrtc negotiation.  Both peers have separate states and are
+    tracked separately.
+    """
+    NEW = "new"                                 # No negotiation has been performed
+    ERROR = "error"                             # an error occured
+    OFFER_CREATED = "offer-created"             # offer was created
+    ANSWER_CREATED = "answer-created"           # answer was created
+    OFFER_SET = "offer-set"                     # offer has been set
+    ANSWER_SET = "answer-set"                   # answer has been set
+
+@unique
+class DataChannelState(Enum):
+    """
+    State of a data channel.  Each data channel is tracked individually
+    """
+    NEW = "new"                 # data channel created but not connected
+    OPEN = "open"               # data channel is open, data can flow
+    CLOSED = "closed"           # data channel is closed, sending data will fail
+    ERROR = "error"             # data channel encountered an error
 
-class RemoteState(object):
-    ERROR = "error"
-    REMOTE_STREAM_RECEIVED = "remote-stream-received"
+@unique
+class Actions(Enum):
+    """
+    Action names that we implement.  Each name is the structure name for each
+    action as stored in the scenario file.
+    """
+    CREATE_OFFER = "create-offer"                                   # create an offer and send it to the peer
+    CREATE_ANSWER = "create-answer"                                 # create an answer and send it to the peer
+    WAIT_FOR_NEGOTIATION_STATE = "wait-for-negotiation-state"       # wait for the @NegotiationState to reach a certain value
+    ADD_STREAM = "add-stream"                                       # add a stream to send to the peer. local only
+    ADD_DATA_CHANNEL = "add-data-channel"                           # add a stream to send to the peer. local only
+    WAIT_FOR_DATA_CHANNEL = "wait-for-data-channel"                 # wait for a data channel to appear
+    WAIT_FOR_DATA_CHANNEL_STATE = "wait-for-data-channel-state"     # wait for a data channel to have a certain state
+    SEND_DATA_CHANNEL_STRING = "send-data-channel-string"           # send a string over the data channel
+    WAIT_FOR_DATA_CHANNEL_STRING = "wait-for-data-channel-string"   # wait for a string on the data channel
+    CLOSE_DATA_CHANNEL = "close-data-channel"                       # close a data channel
+    WAIT_FOR_NEGOTIATION_NEEDED = "wait-for-negotiation-needed"     # wait for negotiation needed to fire
+    SET_WEBRTC_OPTIONS = "set-webrtc-options"                       # set some options
index b4f6be5..d607858 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
 #
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 # Boston, MA 02110-1301, USA.
 
+import logging
+import threading
+
+from enums import NegotiationState, DataChannelState
+
+l = logging.getLogger(__name__)
+
 class Signal(object):
+    """
+    A class for callback-based signal handlers.
+    """
     def __init__(self, cont_func=None, accum_func=None):
         self._handlers = []
         if not cont_func:
@@ -41,3 +51,119 @@ class Signal(object):
             if not self.cont_func(ret):
                 break
         return ret
+
+
+class StateObserver(object):
+    """
+    Observe some state.  Allows waiting for specific states to occur and
+    notifying waiters of updated values.  Will hold previous states to ensure
+    @update cannot change the state before @wait_for can look at the state.
+    """
+    def __init__(self, target, attr_name, cond):
+        self.target = target
+        self.attr_name = attr_name
+        self.cond = cond
+        # track previous states of the value so that the notification still
+        # occurs even if the field has moved on to another state
+        self.previous_states = []
+
+    def wait_for(self, states):
+        ret = None
+        with self.cond:
+            state = getattr (self.target, self.attr_name)
+            l.debug (str(self.target) + " \'" + self.attr_name +
+                    "\' waiting for " + str(states))
+            while True:
+                l.debug(str(self.target) + " \'" + self.attr_name +
+                        "\' previous states: " + str(self.previous_states))
+                for i, s in enumerate (self.previous_states):
+                    if s in states:
+                        l.debug(str(self.target) + " \'" + self.attr_name +
+                                "\' " + str(s) + " found at position " +
+                                str(i) + " of " + str(self.previous_states))
+                        self.previous_states = self.previous_states[i:]
+                        return s
+                self.cond.wait()
+
+    def update (self, new_state):
+        with self.cond:
+            self.previous_states += [new_state]
+            setattr(self.target, self.attr_name, new_state)
+            self.cond.notify_all()
+            l.debug (str(self.target) + " updated \'" + self.attr_name + "\' to " + str(new_state))
+
+
+class WebRTCObserver(object):
+    """
+    Base webrtc observer class.  Stores a lot of duplicated functionality
+    between the local and remove peer implementations.
+    """
+    def __init__(self):
+        self.state = NegotiationState.NEW
+        self._state_observer = StateObserver(self, "state", threading.Condition())
+        self.on_offer_created = Signal()
+        self.on_answer_created = Signal()
+        self.on_offer_set = Signal()
+        self.on_answer_set = Signal()
+        self.on_data_channel = Signal()
+        self.data_channels = []
+        self._xxxxxxxdata_channel_ids = None
+        self._data_channels_observer = StateObserver(self, "_xxxxxxxdata_channel_ids", threading.Condition())
+
+    def _update_negotiation_state(self, new_state):
+        self._state_observer.update (new_state)
+
+    def wait_for_negotiation_states(self, states):
+        return self._state_observer.wait_for (states)
+
+    def find_channel (self, ident):
+        for c in self.data_channels:
+            if c.ident == ident:
+                return c
+
+    def add_channel (self, channel):
+        l.debug('adding channel ' + str (channel) + ' with name ' + str(channel.ident))
+        self.data_channels.append (channel)
+        self._data_channels_observer.update (channel.ident)
+        self.on_data_channel.fire(channel)
+
+    def wait_for_data_channel(self, ident):
+        return self._data_channels_observer.wait_for (ident)
+
+    def create_offer(self, options):
+        raise NotImplementedError()
+
+    def add_data_channel(self, ident):
+        raise NotImplementedError()
+
+
+class DataChannelObserver(object):
+    """
+    Base webrtc data channelobserver class.  Stores a lot of duplicated
+    functionality between the local and remove peer implementations.
+    """
+    def __init__(self, ident, location):
+        self.state = DataChannelState.NEW
+        self._state_observer = StateObserver(self, "state", threading.Condition())
+        self.ident = ident
+        self.location = location
+        self.message = None
+        self._message_observer = StateObserver(self, "message", threading.Condition())
+
+    def _update_state(self, new_state):
+        self._state_observer.update (new_state)
+
+    def wait_for_states(self, states):
+        return self._state_observer.wait_for (states)
+
+    def wait_for_message (self, msg):
+        return self._message_observer.wait_for (msg)
+
+    def got_message(self, msg):
+        self._message_observer.update (msg)
+
+    def close (self):
+        raise NotImplementedError()
+
+    def send_string (self, msg):
+        raise NotImplementedError()
diff --git a/webrtc/check/validate/scenarios/bundle_local_balanced_remote_balanced/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_balanced_remote_balanced/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..b3220c4
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=balanced, remote_bundle_policy=balanced
diff --git a/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_bundle/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_bundle/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..fd29616
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=balanced, remote_bundle_policy=max-bundle
diff --git a/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_compat/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_compat/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..d770994
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=balanced, remote_bundle_policy=max-compat
diff --git a/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_balanced/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_balanced/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..34c2946
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=max-bundle, remote_bundle_policy=balanced
diff --git a/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_bundle/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_bundle/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..6504adf
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=max-bundle, remote_bundle_policy=max-bundle
diff --git a/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_compat/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_compat/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..cbe82f4
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=max-bundle, remote_bundle_policy=max-compat
diff --git a/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_balanced/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_balanced/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..30bd7d1
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=max-compat, remote_bundle_policy=balanced
diff --git a/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_bundle/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_bundle/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..2b564b9
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=max-compat, remote_bundle_policy=max-bundle
diff --git a/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_compat/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_compat/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..34508fa
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=max-compat, remote_bundle_policy=max-compat
diff --git a/webrtc/check/validate/scenarios/bundle_local_none_remote_balanced/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_none_remote_balanced/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..57d208a
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=none, remote_bundle_policy=balanced
diff --git a/webrtc/check/validate/scenarios/bundle_local_none_remote_max_bundle/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_none_remote_max_bundle/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..adee196
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=none, remote_bundle_policy=max-bundle
diff --git a/webrtc/check/validate/scenarios/bundle_local_none_remote_max_compat/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_none_remote_max_compat/bundle_policy.scenario
new file mode 100644 (file)
index 0000000..3708815
--- /dev/null
@@ -0,0 +1 @@
+set-vars, local_bundle_policy=none, remote_bundle_policy=max-compat
diff --git a/webrtc/check/validate/scenarios/local_initiates_negotiation/negotiation_initiator.scenario b/webrtc/check/validate/scenarios/local_initiates_negotiation/negotiation_initiator.scenario
new file mode 100644 (file)
index 0000000..022d235
--- /dev/null
@@ -0,0 +1 @@
+set-vars, negotiation_initiator=local, negotiation_responder=remote
index 559f65f..4e797bd 100644 (file)
@@ -1,3 +1,15 @@
-description, summary="Produce an offer"
-create-offer;
-wait-for-negotiation-state, state="answer-set"
+description, summary="Produce an offer and negotiate it with the peer"
+include,location=negotiation_initiator.scenario
+include,location=bundle_policy.scenario
+
+set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)"
+
+create-offer, which="$(negotiation_initiator)";
+# all of these waits are technically unnecessary and only the last is needed
+wait-for-negotiation-state, which="$(negotiation_initiator)", state="offer-created"
+wait-for-negotiation-state, which="$(negotiation_initiator)", state="offer-set"
+wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set"
+create-answer, which="$(negotiation_responder)";
+wait-for-negotiation-state, which="$(negotiation_responder)", state="answer-created"
+wait-for-negotiation-state, which="$(negotiation_responder)", state="answer-set"
+wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set"
diff --git a/webrtc/check/validate/scenarios/open_data_channel.scenario b/webrtc/check/validate/scenarios/open_data_channel.scenario
new file mode 100644 (file)
index 0000000..8c1bc39
--- /dev/null
@@ -0,0 +1,23 @@
+description, summary="Open a data channel"
+include,location=negotiation_initiator.scenario
+include,location=bundle_policy.scenario
+
+set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)"
+
+# add the channel on the initiator so that datachannel is added to the sdp
+add-data-channel, which="$(negotiation_initiator)", id="gstreamer";
+
+# negotiate
+create-offer, which="$(negotiation_initiator)";
+wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set"
+create-answer, which="$(negotiation_responder)";
+wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set"
+
+# ensure data channel is created
+wait-for-data-channel, which="$(negotiation_responder)", id="gstreamer";
+wait-for-data-channel, which="$(negotiation_initiator)", id="gstreamer";
+wait-for-data-channel-state, which="$(negotiation_initiator)", id="gstreamer", state="open";
+
+# only the browser closing works at the moment
+close-data-channel, which="remote", id="gstreamer"
+wait-for-data-channel-state, which="local", id="gstreamer", state="closed";
diff --git a/webrtc/check/validate/scenarios/remote_initiates_negotiation/negotiation_initiator.scenario b/webrtc/check/validate/scenarios/remote_initiates_negotiation/negotiation_initiator.scenario
new file mode 100644 (file)
index 0000000..07ba3bb
--- /dev/null
@@ -0,0 +1 @@
+set-vars, negotiation_initiator=remote, negotiation_responder=local
diff --git a/webrtc/check/validate/scenarios/send_data_channel_string.scenario b/webrtc/check/validate/scenarios/send_data_channel_string.scenario
new file mode 100644 (file)
index 0000000..5d247ba
--- /dev/null
@@ -0,0 +1,21 @@
+description, summary="Send data over a data channel"
+include,location=negotiation_initiator.scenario
+include,location=bundle_policy.scenario
+
+set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)"
+
+add-data-channel, which="$(negotiation_initiator)", id="gstreamer";
+
+create-offer, which="$(negotiation_initiator)";
+wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set"
+create-answer, which="$(negotiation_responder)";
+wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set"
+
+# wait for the data channel to appear
+wait-for-data-channel, which="$(negotiation_initiator)", id="gstreamer";
+wait-for-data-channel, which="$(negotiation_responder)", id="gstreamer";
+wait-for-data-channel-state, which="$(negotiation_initiator)", id="gstreamer", state="open";
+
+# send something
+send-data-channel-string, which="local", id="gstreamer", msg="some data";
+wait-for-data-channel-string, which="remote", id="gstreamer", msg="some data";
index 2ec15a5..3081488 100644 (file)
@@ -1,5 +1,15 @@
 description, summary="Send a VP8 stream", handles-state=true
+include,location=negotiation_initiator.scenario
+include,location=bundle_policy.scenario
+
+set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)"
+
 add-stream, pipeline="videotestsrc is-live=1 ! vp8enc ! rtpvp8pay ! queue"
 set-state, state="playing";
-create-offer;
-wait-for-negotiation-state, state="answer-set"
+wait-for-negotiation-needed, generation=1;
+
+# negotiate
+create-offer, which="$(negotiation_initiator)";
+wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set"
+create-answer, which="$(negotiation_responder)";
+wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set"
index 2a7d938..2e6d674 100644 (file)
@@ -1,6 +1,4 @@
-#!/usr/bin/env python3
-#
-# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
+# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
 #
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -24,11 +22,17 @@ import os
 import sys
 import threading
 import json
+import logging
+
+from observer import Signal, StateObserver, WebRTCObserver, DataChannelObserver
+from enums import SignallingState, NegotiationState, DataChannelState
 
-from observer import Signal
-from enums import SignallingState, RemoteState
+l = logging.getLogger(__name__)
 
 class AsyncIOThread(threading.Thread):
+    """
+    Run an asyncio loop in another thread.
+    """
     def __init__ (self, loop):
         threading.Thread.__init__(self)
         self.loop = loop
@@ -40,16 +44,24 @@ class AsyncIOThread(threading.Thread):
     def stop_thread(self):
         self.loop.call_soon_threadsafe(self.loop.stop)
 
+
 class SignallingClientThread(object):
+    """
+    Connect to a signalling server
+    """
     def __init__(self, server):
+        # server string to connect to.  Passed directly to websockets.connect()
         self.server = server
 
+        # fired after we have connected to the signalling server
         self.wss_connected = Signal()
+        # fired every time we receive a message from the signalling server
         self.message = Signal()
 
         self._init_async()
 
     def _init_async(self):
+        self._running = False
         self.conn = None
         self._loop = asyncio.new_event_loop()
 
@@ -59,23 +71,31 @@ class SignallingClientThread(object):
         self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(self._a_loop()))
 
     async def _a_connect(self):
+        # connect to the signalling server
         assert not self.conn
         sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
         self.conn = await websockets.connect(self.server, ssl=sslctx)
 
     async def _a_loop(self):
+        self._running = True
+        l.info('loop started')
         await self._a_connect()
         self.wss_connected.fire()
         assert self.conn
         async for message in self.conn:
             self.message.fire(message)
+        l.info('loop exited')
 
     def send(self, data):
+        # send some information to the peer
         async def _a_send():
             await self.conn.send(data)
         self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_send()))
 
     def stop(self):
+        if self._running == False:
+            return
+
         cond = threading.Condition()
 
         # asyncio, why you so complicated to stop ?
@@ -87,64 +107,70 @@ class SignallingClientThread(object):
 
             to_wait = [t for t in tasks if not t.done()]
             if to_wait:
+                l.info('waiting for ' + str(to_wait))
                 done, pending = await asyncio.wait(to_wait)
             with cond:
+                l.error('notifying cond')
                 cond.notify()
+            self._running = False
 
-        self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_stop()))
         with cond:
-                cond.wait()
+            self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_stop()))
+            l.error('cond waiting')
+            cond.wait()
+            l.error('cond waited')
         self._thread.stop_thread()
         self._thread.join()
+        l.error('thread joined')
+
 
 class WebRTCSignallingClient(SignallingClientThread):
+    """
+    Signalling client implementation.  Deals wit session management over the
+    signalling protocol.  Sends and receives from a peer.
+    """
     def __init__(self, server, id_):
         super().__init__(server)
 
         self.wss_connected.connect(self._on_connection)
         self.message.connect(self._on_message)
         self.state = SignallingState.NEW
-        self._state_cond = threading.Condition()
+        self._state_observer = StateObserver(self, "state", threading.Condition())
 
         self.id = id_
         self._peerid = None
 
-        # override that base class
+        # fired when the hello has been received
         self.connected = Signal()
+        # fired when the signalling server responds that the session creation is ok
         self.session_created = Signal()
+        # fired on an error
         self.error = Signal()
+        # fired when the peer receives some json data
         self.have_json = Signal()
 
+    def _update_state(self, new_state):
+        self._state_observer.update (new_state)
+
     def wait_for_states(self, states):
-        ret = None
-        with self._state_cond:
-            while self.state not in states:
-                self._state_cond.wait()
-            ret = self.state
-        return ret
-
-    def _update_state(self, state):
-        with self._state_cond:
-            if self.state is not SignallingState.ERROR:
-                self.state = state
-            self._state_cond.notify_all()
+        return self._state_observer.wait_for (states)
 
     def hello(self):
         self.send('HELLO ' + str(self.id))
+        l.info("sent HELLO")
         self.wait_for_states([SignallingState.HELLO])
-        print("signalling-client sent HELLO")
 
     def create_session(self, peerid):
         self._peerid = peerid
         self.send('SESSION {}'.format(self._peerid))
-        self.wait_for_states([SignallingState.SESSION, SignallingState.ERROR])
-        print("signalling-client sent SESSION")
+        l.info("sent SESSION")
+        self.wait_for_states([SignallingState.SESSION])
 
     def _on_connection(self):
         self._update_state (SignallingState.OPEN)
 
     def _on_message(self, message):
-        print("signalling-client received", message)
+        l.debug("received: " + message)
         if message == 'HELLO':
             self._update_state (SignallingState.HELLO)
             self.connected.fire()
@@ -159,3 +185,82 @@ class WebRTCSignallingClient(SignallingClientThread):
             self.have_json.fire(msg)
         return False
 
+
+class RemoteWebRTCObserver(WebRTCObserver):
+    """
+    Use information sent over the signalling channel to construct the current
+    state of a remote peer. Allow performing actions by sending requests over
+    the signalling channel.
+    """
+    def __init__(self, signalling):
+        super().__init__()
+        self.signalling = signalling
+
+        def on_json(msg):
+            if 'STATE' in msg:
+                state = NegotiationState (msg['STATE'])
+                self._update_negotiation_state(state)
+                if state == NegotiationState.OFFER_CREATED:
+                    self.on_offer_created.fire(msg['description'])
+                elif state == NegotiationState.ANSWER_CREATED:
+                    self.on_answer_created.fire(msg['description'])
+                elif state == NegotiationState.OFFER_SET:
+                    self.on_offer_set.fire (msg['description'])
+                elif state == NegotiationState.ANSWER_SET:
+                    self.on_answer_set.fire (msg['description'])
+            elif 'DATA-NEW' in msg:
+                new = msg['DATA-NEW']
+                observer = RemoteDataChannelObserver(new['id'], new['location'], self)
+                self.add_channel (observer)
+            elif 'DATA-STATE' in msg:
+                ident = msg['id']
+                channel = self.find_channel(ident)
+                channel._update_state (DataChannelState(msg['DATA-STATE']))
+            elif 'DATA-MSG' in msg:
+                ident = msg['id']
+                channel = self.find_channel(ident)
+                channel.got_message(msg['DATA-MSG'])
+        self.signalling.have_json.connect (on_json)
+
+    def add_data_channel (self, ident):
+        msg = json.dumps({'DATA_CREATE': {'id': ident}})
+        self.signalling.send (msg)
+
+    def create_offer (self):
+        msg = json.dumps({'CREATE_OFFER': ""})
+        self.signalling.send (msg)
+
+    def create_answer (self):
+        msg = json.dumps({'CREATE_ANSWER': ""})
+        self.signalling.send (msg)
+
+    def set_title (self, title):
+        # entirely for debugging purposes
+        msg = json.dumps({'SET_TITLE': title})
+        self.signalling.send (msg)
+
+    def set_options (self, opts):
+        options = {}
+        if opts.has_field("remote-bundle-policy"):
+            options["bundlePolicy"] = opts["remote-bundle-policy"]
+        msg = json.dumps({'OPTIONS' : options})
+        self.signalling.send (msg)
+
+
+class RemoteDataChannelObserver(DataChannelObserver):
+    """
+    Use information sent over the signalling channel to construct the current
+    state of a remote peer's data channel. Allow performing actions by sending
+    requests over the signalling channel.
+    """
+    def __init__(self, ident, location, webrtc):
+        super().__init__(ident, location)
+        self.webrtc = webrtc
+
+    def send_string(self, msg):
+        msg = json.dumps({'DATA_SEND_MSG': {'msg' : msg, 'id': self.ident}})
+        self.webrtc.signalling.send (msg)
+
+    def close (self):
+        msg = json.dumps({'DATA_CLOSE': {'id': self.ident}})
+        self.webrtc.signalling.send (msg)
index c078e5d..3101098 100644 (file)
@@ -1,6 +1,4 @@
-#*- vi:si:et:sw=4:sts=4:ts=4:syntax=python
-#
-# Copyright (c) 2018 Matthew Waters <matthew@centricular.com>
+# Copyright (c) 2020 Matthew Waters <matthew@centricular.com>
 #
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -25,12 +23,9 @@ import os
 
 TEST_MANAGER = "webrtc"
 
-BLACKLIST = [
-    ]
 
 def setup_tests(test_manager, options):
     print("Setting up webrtc tests")
-#    test_manager.set_default_blacklist(BLACKLIST)
 
     return True
 
index 9a95fb7..218b200 100644 (file)
@@ -27,9 +27,5 @@
     <div><textarea id="text" cols=40 rows=4></textarea></div>
     <div>Our id is <b id="peer-id">unknown</b></div>
     <br/>
-    <div>
-      <div>getUserMedia constraints being used:</div>
-      <div><textarea id="constraints" cols=40 rows=4></textarea></div>
-    </div>
   </body>
 </html>
index eb7e00f..2a0d535 100644 (file)
@@ -14,13 +14,12 @@ var ws_port;
 var default_peer_id;
 // Override with your own STUN servers if you want
 var rtc_configuration = {iceServers: [{urls: "stun:stun.services.mozilla.com"},
-                                      {urls: "stun:stun.l.google.com:19302"}]};
-// The default constraints that will be attempted. Can be overriden by the user.
-var default_constraints = {video: true, audio: true};
+                                      {urls: "stun:stun.l.google.com:19302"},]};
+var default_constraints = {video: true, audio: false};
 
 var connect_attempts = 0;
 var peer_connection;
-var send_channel;
+var channels = []
 var ws_conn;
 // Promise for local stream after constraints are approved by the user
 var local_stream_promise;
@@ -56,7 +55,7 @@ function setError(text) {
     var span = document.getElementById("status")
     span.textContent = text;
     span.classList.add('error');
-    ws_conn.send(JSON.stringify({'STATE': 'error'}))
+    ws_conn.send(JSON.stringify({'STATE': 'error', 'msg' : text}))
 }
 
 function resetVideo() {
@@ -75,27 +74,40 @@ function resetVideo() {
     videoElement.load();
 }
 
+function updateRemoteStateFromSetSDPJson(sdp) {
+  if (sdp.type == "offer")
+      ws_conn.send(JSON.stringify({'STATE': 'offer-set', 'description' : sdp}))
+  else if (sdp.type == "answer")
+      ws_conn.send(JSON.stringify({'STATE': 'answer-set', 'description' : sdp}))
+  else
+      throw new Error ("Unknown SDP type!");
+}
+
+function updateRemoteStateFromGeneratedSDPJson(sdp) {
+  if (sdp.type == "offer")
+      ws_conn.send(JSON.stringify({'STATE': 'offer-created', 'description' : sdp}))
+  else if (sdp.type == "answer")
+      ws_conn.send(JSON.stringify({'STATE': 'answer-created', 'description' : sdp}))
+  else
+      throw new Error ("Unknown SDP type!");
+}
+
 // SDP offer received from peer, set remote description and create an answer
 function onIncomingSDP(sdp) {
     peer_connection.setRemoteDescription(sdp).then(() => {
-        setStatus("Remote SDP set");
-        if (sdp.type != "offer")
-            return;
-        setStatus("Got SDP offer");
-        local_stream_promise.then((stream) => {
-            setStatus("Got local stream, creating answer");
-            peer_connection.createAnswer()
-            .then(onLocalDescription).catch(setError);
-        }).catch(setError);
+        updateRemoteStateFromSetSDPJson(sdp)
+        setStatus("Set remote SDP", sdp.type);
     }).catch(setError);
 }
 
 // Local description was set, send it to peer
 function onLocalDescription(desc) {
+    updateRemoteStateFromGeneratedSDPJson(desc)
     console.log("Got local description: " + JSON.stringify(desc));
     peer_connection.setLocalDescription(desc).then(function() {
-        setStatus("Sending SDP answer");
-        sdp = {'sdp': peer_connection.localDescription}
+        updateRemoteStateFromSetSDPJson(desc)
+        sdp = {'sdp': desc}
+        setStatus("Sending SDP", sdp.type);
         ws_conn.send(JSON.stringify(sdp));
     });
 }
@@ -103,9 +115,33 @@ function onLocalDescription(desc) {
 // ICE candidate received from peer, add it to the peer connection
 function onIncomingICE(ice) {
     var candidate = new RTCIceCandidate(ice);
+    console.log("adding candidate", candidate)
     peer_connection.addIceCandidate(candidate).catch(setError);
 }
 
+function createOffer(offer) {
+    local_stream_promise.then((stream) => {
+        setStatus("Got local stream, creating offer");
+        peer_connection.createOffer()
+            .then(onLocalDescription).catch(setError);
+    }).catch(setError)
+}
+
+function createAnswer(offer) {
+    local_stream_promise.then((stream) => {
+        setStatus("Got local stream, creating answer");
+        peer_connection.createAnswer()
+            .then(onLocalDescription).catch(setError);
+    }).catch(setError)
+}
+
+function handleOptions(options) {
+    console.log ('received options', options);
+    if (options.bundlePolicy != null) {
+        rtc_configuration['bundlePolicy'] = options.bundlePolicy;
+    }
+}
+
 function onServerMessage(event) {
     console.log("Received " + event.data);
     switch (event.data) {
@@ -129,14 +165,33 @@ function onServerMessage(event) {
                 return;
             }
 
+            if (msg.SET_TITLE != null) {
+                // some debugging for tests that hang around
+                document.title = msg['SET_TITLE']
+                return;
+            } else if (msg.OPTIONS != null) {
+                handleOptions(msg.OPTIONS);
+                return;
+            }
+
             // Incoming JSON signals the beginning of a call
             if (!peer_connection)
-                createCall(msg);
+                createCall();
 
             if (msg.sdp != null) {
                 onIncomingSDP(msg.sdp);
             } else if (msg.ice != null) {
                 onIncomingICE(msg.ice);
+            } else if (msg.CREATE_OFFER != null) {
+                createOffer(msg.CREATE_OFFER)
+            } else if (msg.CREATE_ANSWER != null) {
+                createAnswer(msg.CREATE_ANSWER)
+            } else if (msg.DATA_CREATE != null) {
+                addDataChannel(msg.DATA_CREATE.id)
+            } else if (msg.DATA_CLOSE != null) {
+                closeDataChannel(msg.DATA_CLOSE.id)
+            } else if (msg.DATA_SEND_MSG != null) {
+                sendDataChannelMessage(msg.DATA_SEND_MSG)
             } else {
                 handleIncomingError("Unknown incoming JSON: " + msg);
             }
@@ -151,6 +206,7 @@ function onServerClose(event) {
         peer_connection.close();
         peer_connection = null;
     }
+    channels = []
 
     // Reset after a second
     window.setTimeout(websocketServerConnect, 1000);
@@ -164,14 +220,7 @@ function onServerError(event) {
 
 function getLocalStream() {
     var constraints;
-    var textarea = document.getElementById('constraints');
-    try {
-        constraints = JSON.parse(textarea.value);
-    } catch (e) {
-        console.error(e);
-        setError('ERROR parsing constraints: ' + e.message + ', using default constraints');
-        constraints = default_constraints;
-    }
+    constraints = default_constraints;
     console.log(JSON.stringify(constraints));
 
     // Add local stream
@@ -192,12 +241,7 @@ function websocketServerConnect() {
     var span = document.getElementById("status");
     span.classList.remove('error');
     span.textContent = '';
-    // Populate constraints
-    var textarea = document.getElementById('constraints');
-    if (textarea.value == '')
-        textarea.value = JSON.stringify(default_constraints);
     // Fetch the peer id to use
-
     var url = new URL(window.location.href);
 
     peer_id = url.searchParams.get("id");
@@ -236,7 +280,6 @@ function onRemoteStreamAdded(event) {
     if (videoTracks.length > 0) {
         console.log('Incoming stream: ' + videoTracks.length + ' video tracks and ' + audioTracks.length + ' audio tracks');
         getVideoElement().srcObject = event.stream;
-        ws_conn.send(JSON.stringify({'STATE': 'remote-stream-received'}))
     } else {
         handleIncomingError('Stream with unknown tracks added, resetting');
     }
@@ -246,53 +289,80 @@ function errorUserMediaHandler() {
     setError("Browser doesn't support getUserMedia!");
 }
 
-const handleDataChannelOpen = (event) =>{
-    console.log("dataChannel.OnOpen", event);
-};
-
 const handleDataChannelMessageReceived = (event) =>{
     console.log("dataChannel.OnMessage:", event, event.data.type);
-
     setStatus("Received data channel message");
-    if (typeof event.data === 'string' || event.data instanceof String) {
-        console.log('Incoming string message: ' + event.data);
-        textarea = document.getElementById("text")
-        textarea.value = textarea.value + '\n' + event.data
-    } else {
-        console.log('Incoming data message');
-    }
-    send_channel.send("Hi! (from browser)");
+    ws_conn.send(JSON.stringify({'DATA-MSG' : event.data, 'id' : event.target.label}));
+};
+
+const handleDataChannelOpen = (event) =>{
+    console.log("dataChannel.OnOpen", event);
+    ws_conn.send(JSON.stringify({'DATA-STATE' : 'open', 'id' : event.target.label}));
 };
 
 const handleDataChannelError = (error) =>{
     console.log("dataChannel.OnError:", error);
+    ws_conn.send(JSON.stringify({'DATA-STATE' : error, 'id' : event.target.label}));
 };
 
 const handleDataChannelClose = (event) =>{
     console.log("dataChannel.OnClose", event);
+    ws_conn.send(JSON.stringify({'DATA-STATE' : 'closed', 'id' : event.target.label}));
 };
 
 function onDataChannel(event) {
     setStatus("Data channel created");
-    let receiveChannel = event.channel;
-    receiveChannel.onopen = handleDataChannelOpen;
-    receiveChannel.onmessage = handleDataChannelMessageReceived;
-    receiveChannel.onerror = handleDataChannelError;
-    receiveChannel.onclose = handleDataChannelClose;
+    let channel = event.channel;
+    console.log('adding remote data channel with label', channel.label)
+    ws_conn.send(JSON.stringify({'DATA-NEW' : {'id' : channel.label, 'location' : 'remote'}}));
+    channel.onopen = handleDataChannelOpen;
+    channel.onmessage = handleDataChannelMessageReceived;
+    channel.onerror = handleDataChannelError;
+    channel.onclose = handleDataChannelClose;
+    channels.push(channel)
 }
 
-function createCall(msg) {
+function addDataChannel(label) {
+    channel = peer_connection.createDataChannel(label, null);
+    console.log('adding local data channel with label', label)
+    ws_conn.send(JSON.stringify({'DATA-NEW' : {'id' : label, 'location' : 'local'}}));
+    channel.onopen = handleDataChannelOpen;
+    channel.onmessage = handleDataChannelMessageReceived;
+    channel.onerror = handleDataChannelError;
+    channel.onclose = handleDataChannelClose;
+    channels.push(channel)
+}
+
+function find_channel(label) {
+    console.log('find', label, 'in', channels)
+    for (var c in channels) {
+        if (channels[c].label === label) {
+            console.log('found', label, c)
+            return channels[c];
+        }
+    }
+    return null;
+}
+
+function closeDataChannel(label) {
+    channel = find_channel (label)
+    console.log('closing data channel with label', label)
+    channel.close()
+}
+
+function sendDataChannelMessage(msg) {
+    channel = find_channel (msg.id)
+    console.log('sending on data channel', msg.id, 'message', msg.msg)
+    channel.send(msg.msg)
+}
+
+function createCall() {
     // Reset connection attempts because we connected successfully
     connect_attempts = 0;
 
-    console.log('Creating RTCPeerConnection');
+    console.log('Creating RTCPeerConnection with configuration', rtc_configuration);
 
     peer_connection = new RTCPeerConnection(rtc_configuration);
-    send_channel = peer_connection.createDataChannel('label', null);
-    send_channel.onopen = handleDataChannelOpen;
-    send_channel.onmessage = handleDataChannelMessageReceived;
-    send_channel.onerror = handleDataChannelError;
-    send_channel.onclose = handleDataChannelClose;
     peer_connection.ondatachannel = onDataChannel;
     peer_connection.onaddstream = onRemoteStreamAdded;
     /* Send our video/audio to the other peer */
@@ -302,18 +372,15 @@ function createCall(msg) {
         return stream;
     }).catch(setError);
 
-    if (!msg.sdp) {
-        console.log("WARNING: First message wasn't an SDP message!?");
-    }
-
     peer_connection.onicecandidate = (event) => {
-       // We have a candidate, send it to the remote party with the
-       // same uuid
-       if (event.candidate == null) {
+        // We have a candidate, send it to the remote party with the
+        // same uuid
+        if (event.candidate == null) {
             console.log("ICE Candidate was null, done");
             return;
-       }
-       ws_conn.send(JSON.stringify({'ice': event.candidate}));
+        }
+        console.log("generated ICE Candidate", event.candidate);
+        ws_conn.send(JSON.stringify({'ice': event.candidate}));
     };
 
     setStatus("Created peer connection for call, waiting for SDP");
index 6215a0a..67f3aeb 100644 (file)
@@ -21,12 +21,13 @@ import os
 import sys
 import argparse
 import json
+import logging
 
-from signalling import WebRTCSignallingClient
-from actions import register_action_types, ActionObserver
+from signalling import WebRTCSignallingClient, RemoteWebRTCObserver
+from actions import ActionObserver
 from client import WebRTCClient
 from browser import Browser, create_driver
-from enums import SignallingState, NegotiationState, RemoteState
+from enums import SignallingState, NegotiationState, DataChannelState, Actions
 
 import gi
 gi.require_version("GLib", "2.0")
@@ -40,14 +41,20 @@ from gi.repository import GstSdp
 gi.require_version("GstValidate", "1.0")
 from gi.repository import GstValidate
 
+FORMAT = '%(asctime)-23s %(levelname)-7s  %(thread)d   %(name)-24s\t%(funcName)-24s %(message)s'
+LEVEL = os.environ.get("LOGLEVEL", "DEBUG")
+logging.basicConfig(level=LEVEL, format=FORMAT)
+l = logging.getLogger(__name__)
+
 class WebRTCApplication(object):
-    def __init__(self, server, id_, peerid, scenario_name, browser_name, html_source):
+    def __init__(self, server, id_, peerid, scenario_name, browser_name, html_source, test_name=None):
         self.server = server
         self.peerid = peerid
         self.html_source = html_source
         self.id = id_
         self.scenario_name = scenario_name
         self.browser_name = browser_name
+        self.test_name = test_name
 
     def _init_validate(self, scenario_file):
         self.runner = GstValidate.Runner.new()
@@ -61,24 +68,79 @@ class WebRTCApplication(object):
             self.client.pipeline.set_state(Gst.State.PLAYING)
 
     def _on_scenario_done(self, scenario):
-        self.quit()
-
-    def _connect_actions(self):
-        def create_offer():
-            self.client.create_offer(None)
-            return GstValidate.ActionReturn.OK
-        self.actions.create_offer.connect(create_offer)
+        l.error ('scenario done')
+        GLib.idle_add(self.quit)
 
-        def wait_for_negotiation_state(state):
-            states = [state, NegotiationState.ERROR]
-            state = self.client.wait_for_negotiation_states(states)
-            return GstValidate.ActionReturn.OK if state != RemoteState.ERROR else GstValidate.ActionReturn.ERROR
-        self.actions.wait_for_negotiation_state.connect(wait_for_negotiation_state)
+    def _connect_actions(self, actions):
+        def on_action(atype, action):
+            """
+            From a validate action, perform the action as required
+            """
+            if atype == Actions.CREATE_OFFER:
+                assert action.structure["which"] in ("local", "remote")
+                c = self.client if action.structure["which"] == "local" else self.remote_client
+                c.create_offer()
+                return GstValidate.ActionReturn.OK
+            elif atype == Actions.CREATE_ANSWER:
+                assert action.structure["which"] in ("local", "remote")
+                c = self.client if action.structure["which"] == "local" else self.remote_client
+                c.create_answer()
+                return GstValidate.ActionReturn.OK
+            elif atype == Actions.WAIT_FOR_NEGOTIATION_STATE:
+                states = [NegotiationState(action.structure["state"]), NegotiationState.ERROR]
+                assert action.structure["which"] in ("local", "remote")
+                c = self.client if action.structure["which"] == "local" else self.remote_client
+                state = c.wait_for_negotiation_states(states)
+                return GstValidate.ActionReturn.OK if state != NegotiationState.ERROR else GstValidate.ActionReturn.ERROR
+            elif atype == Actions.ADD_STREAM:
+                self.client.add_stream(action.structure["pipeline"])
+                return GstValidate.ActionReturn.OK
+            elif atype == Actions.ADD_DATA_CHANNEL:
+                assert action.structure["which"] in ("local", "remote")
+                c = self.client if action.structure["which"] == "local" else self.remote_client
+                c.add_data_channel(action.structure["id"])
+                return GstValidate.ActionReturn.OK
+            elif atype == Actions.SEND_DATA_CHANNEL_STRING:
+                assert action.structure["which"] in ("local", "remote")
+                c = self.client if action.structure["which"] == "local" else self.remote_client
+                channel = c.find_channel (action.structure["id"])
+                channel.send_string (action.structure["msg"])
+                return GstValidate.ActionReturn.OK
+            elif atype == Actions.WAIT_FOR_DATA_CHANNEL_STATE:
+                assert action.structure["which"] in ("local", "remote")
+                c = self.client if action.structure["which"] == "local" else self.remote_client
+                states = [DataChannelState(action.structure["state"]), DataChannelState.ERROR]
+                channel = c.find_channel (action.structure["id"])
+                state = channel.wait_for_states(states)
+                return GstValidate.ActionReturn.OK if state != DataChannelState.ERROR else GstValidate.ActionReturn.ERROR
+            elif atype == Actions.CLOSE_DATA_CHANNEL:
+                assert action.structure["which"] in ("local", "remote")
+                c = self.client if action.structure["which"] == "local" else self.remote_client
+                channel = c.find_channel (action.structure["id"])
+                channel.close()
+                return GstValidate.ActionReturn.OK
+            elif atype == Actions.WAIT_FOR_DATA_CHANNEL:
+                assert action.structure["which"] in ("local", "remote")
+                c = self.client if action.structure["which"] == "local" else self.remote_client
+                state = c.wait_for_data_channel(action.structure["id"])
+                return GstValidate.ActionReturn.OK
+            elif atype == Actions.WAIT_FOR_DATA_CHANNEL_STRING:
+                assert action.structure["which"] in ("local", "remote")
+                c = self.client if action.structure["which"] == "local" else self.remote_client
+                channel = c.find_channel (action.structure["id"])
+                channel.wait_for_message(action.structure["msg"])
+                return GstValidate.ActionReturn.OK
+            elif atype == Actions.WAIT_FOR_NEGOTIATION_NEEDED:
+                self.client.wait_for_negotiation_needed(action.structure["generation"])
+                return GstValidate.ActionReturn.OK
+            elif atype == Actions.SET_WEBRTC_OPTIONS:
+                self.client.set_options (action.structure)
+                self.remote_client.set_options (action.structure)
+                return GstValidate.ActionReturn.OK
+            else:
+                assert "Not reached" == ""
 
-        def add_stream(pipeline):
-            self.client.add_stream(pipeline)
-            return GstValidate.ActionReturn.OK
-        self.actions.add_stream.connect(add_stream)
+        actions.action.connect (on_action)
 
     def _connect_client_observer(self):
         def on_offer_created(offer):
@@ -87,6 +149,12 @@ class WebRTCApplication(object):
             self.signalling.send(msg)
         self.client.on_offer_created.connect(on_offer_created)
 
+        def on_answer_created(answer):
+            text = answer.sdp.as_text()
+            msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
+            self.signalling.send(msg)
+        self.client.on_answer_created.connect(on_answer_created)
+
         def on_ice_candidate(mline, candidate):
             msg = json.dumps({'ice': {'sdpMLineIndex': str(mline), 'candidate' : candidate}})
             self.signalling.send(msg)
@@ -116,6 +184,7 @@ class WebRTCApplication(object):
 
         def error(msg):
             # errors are unexpected
+            l.error ('Unexpected error: ' + msg)
             GLib.idle_add(self.quit)
             GLib.idle_add(sys.exit, -20)
         self.signalling.error.connect(error)
@@ -126,37 +195,50 @@ class WebRTCApplication(object):
         self.client = WebRTCClient()
         self._connect_client_observer()
 
-        self.actions = ActionObserver()
-        register_action_types(self.actions)
-        self._connect_actions()
-
         self.signalling = WebRTCSignallingClient(self.server, self.id)
+        self.remote_client = RemoteWebRTCObserver (self.signalling)
         self._connect_signalling_observer()
+
+        actions = ActionObserver()
+        actions.register_action_types()
+        self._connect_actions(actions)
+
+        # wait for the signalling server to start up before creating the browser
         self.signalling.wait_for_states([SignallingState.OPEN])
         self.signalling.hello()
 
-        self.browser = Browser(create_driver(self.browser_name), self.html_source)
+        self.browser = Browser(create_driver(self.browser_name))
+        self.browser.open(self.html_source)
 
         browser_id = self.browser.get_peer_id ()
         assert browser_id == self.peerid
 
         self.signalling.create_session(self.peerid)
+        test_name = self.test_name if self.test_name else self.scenario_name
+        self.remote_client.set_title (test_name)
 
         self._init_validate(self.scenario_name)
-        print("app initialized")
 
     def quit(self):
         # Stop signalling first so asyncio doesn't keep us alive on weird failures
+        l.info('quiting')
         self.signalling.stop()
-        self.browser.driver.quit()
-        self.client.stop()
+        l.info('signalling stopped')
         self.main_loop.quit()
+        l.info('main loop stopped')
+        self.client.stop()
+        l.info('client stopped')
+        self.browser.driver.quit()
+        l.info('browser exitted')
 
     def run(self):
         try:
             self._init()
+            l.info("app initialized")
             self.main_loop.run()
+            l.info("loop exited")
         except:
+            l.exception("Fatal error")
             self.quit()
             raise
 
@@ -168,6 +250,7 @@ def parse_options():
     parser.add_argument('--html-source', help='HTML page to open in the browser', default=None)
     parser.add_argument('--scenario', help='Scenario file to execute', default=None)
     parser.add_argument('--browser', help='Browser name to use', default=None)
+    parser.add_argument('--name', help='Name of the test', default=None)
     return parser.parse_args()
 
 def init():
@@ -196,7 +279,7 @@ def init():
 
 def run():
     args = init()
-    w = WebRTCApplication (args.server, args.id, args.peer_id, args.scenario, args.browser, args.html_source)
+    w = WebRTCApplication (args.server, args.id, args.peer_id, args.scenario, args.browser, args.html_source, test_name=args.name)
     return w.run()
 
 if __name__ == "__main__":