From: Matthew Waters Date: Wed, 12 Feb 2020 10:56:34 +0000 (+1100) Subject: check/validate: a few more tests and improvements X-Git-Tag: 1.19.3~487^2~52^2~15 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=615813ef93f6a7b977280bd55d6ffd17c25fcbd2;p=platform%2Fupstream%2Fgstreamer.git check/validate: a few more tests and improvements Tests a matrix of options: - local/remote negotiation initiator - 'most' bundle-policy combinations (some combinations will never work) - firefox or chrome browser Across 4 test scenarios: - simple negotiation with default browser streams (or none if gstreamer initiates) - sending a vp8 stream - opening a data channel - sending a message over the data channel for a total of 112 tests! --- diff --git a/webrtc/check/validate/actions.py b/webrtc/check/validate/actions.py index 6ced2cf..98a3cb8 100644 --- a/webrtc/check/validate/actions.py +++ b/webrtc/check/validate/actions.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2018, Matthew Waters +# Copyright (c) 2020, Matthew Waters # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -22,6 +20,11 @@ gi.require_version("GstValidate", "1.0") from gi.repository import GstValidate from observer import Signal +from enums import Actions + +import logging + +l = logging.getLogger(__name__) class ActionObserver(object): def __init__(self): @@ -42,36 +45,62 @@ class ActionObserver(object): return val - self.create_offer = Signal(_action_continue, _action_accum) - self.wait_for_negotiation_state = Signal(_action_continue, _action_accum) - self.add_stream = Signal(_action_continue, _action_accum) - self.wait_for_remote_state = Signal(_action_continue, _action_accum) + self.action = Signal(_action_continue, _action_accum) + + def _action(self, scenario, action): + l.debug('executing action: ' + str(action.structure)) + return self.action.fire (Actions(action.structure.get_name()), action) - def _create_offer(self, scenario, action): - print("action create-offer") - return self.create_offer.fire() - def _wait_for_negotiation_state(self, scenario, action): - state = action.structure["state"] - print("action wait-for-negotiation-state", state) - return self.wait_for_negotiation_state.fire(state) - def _add_stream(self, scenario, action): - pipeline = action.structure["pipeline"] - print("action add-stream", pipeline) - return self.add_stream.fire(pipeline) -def register_action_types(observer): - if not isinstance(observer, ActionObserver): - raise TypeError + def register_action_types(observer): + if not isinstance(observer, ActionObserver): + raise TypeError - GstValidate.register_action_type("create-offer", "webrtc", - observer._create_offer, None, - "Instruct a create-offer to commence", - GstValidate.ActionTypeFlags.NONE) - GstValidate.register_action_type("wait-for-negotiation-state", "webrtc", - observer._wait_for_negotiation_state, None, - "Wait for a specific negotiation state to be reached", - GstValidate.ActionTypeFlags.NONE) - GstValidate.register_action_type("add-stream", "webrtc", - observer._add_stream, None, - "Add a stream to the webrtcbin", - GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.CREATE_OFFER.value, + "webrtc", observer._action, None, + "Instruct a create-offer to commence", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.CREATE_ANSWER.value, + "webrtc", observer._action, None, + "Create answer", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.WAIT_FOR_NEGOTIATION_STATE.value, + "webrtc", observer._action, None, + "Wait for a specific negotiation state to be reached", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.ADD_STREAM.value, + "webrtc", observer._action, None, + "Add a stream to the webrtcbin", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.ADD_DATA_CHANNEL.value, + "webrtc", observer._action, None, + "Add a data channel to the webrtcbin", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.SEND_DATA_CHANNEL_STRING.value, + "webrtc", observer._action, None, + "Send a message using a data channel", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.WAIT_FOR_DATA_CHANNEL_STATE.value, + "webrtc", observer._action, None, + "Wait for data channel to reach state", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.CLOSE_DATA_CHANNEL.value, + "webrtc", observer._action, None, + "Close a data channel", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.WAIT_FOR_DATA_CHANNEL.value, + "webrtc", observer._action, None, + "Wait for a data channel to appear", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.WAIT_FOR_DATA_CHANNEL_STRING.value, + "webrtc", observer._action, None, + "Wait for a data channel to receive a message", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.WAIT_FOR_NEGOTIATION_NEEDED.value, + "webrtc", observer._action, None, + "Wait for a the on-negotiation-needed signal to fire", + GstValidate.ActionTypeFlags.NONE) + GstValidate.register_action_type(Actions.SET_WEBRTC_OPTIONS.value, + "webrtc", observer._action, None, + "Set some webrtc options", + GstValidate.ActionTypeFlags.NONE) diff --git a/webrtc/check/validate/apps/gstwebrtc.py b/webrtc/check/validate/apps/gstwebrtc.py index 693ac4a..27d4088 100644 --- a/webrtc/check/validate/apps/gstwebrtc.py +++ b/webrtc/check/validate/apps/gstwebrtc.py @@ -1,6 +1,4 @@ -#*- vi:si:et:sw=4:sts=4:ts=4:syntax=python -# -# Copyright (c) 2018 Matthew Waters +# Copyright (c) 2020, Matthew Waters # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -21,26 +19,82 @@ import inspect import os import sys import shutil +import itertools import tempfile -from launcher.baseclasses import TestsManager, TestsGenerator, GstValidateTest, ScenarioManager +from launcher.baseclasses import TestsManager, GstValidateTest, ScenarioManager from launcher.utils import DEFAULT_TIMEOUT DEFAULT_BROWSERS = ['firefox', 'chrome'] + +# list of scenarios. These are the names of the actual scenario files stored +# on disk. DEFAULT_SCENARIOS = [ - "offer_answer", - "vp8_send_stream" - ] - -BROWSER_SCENARIO_BLACKLISTS = { - 'firefox' : [ - 'offer_answer', # fails to accept an SDP without any media sections - ], - 'chrome' : [ - ], + "offer_answer", + "vp8_send_stream", + "open_data_channel", + "send_data_channel_string", + ] + +# various configuration changes that are included from other scenarios. +# key is the name of the override used in the name of the test +# value is the subdirectory where the override is placed +# changes some things about the test like: +# - who initiates the negotiation +# - bundle settings +SCENARIO_OVERRIDES = { + # name : directory + + # who starts the negotiation + 'local' : 'local_initiates_negotiation', + 'remote' : 'remote_initiates_negotiation', + + # bundle-policy configuration + # XXX: webrtcbin's bundle-policy=none is not part of the spec + 'none_compat' : 'bundle_local_none_remote_max_compat', + 'none_balanced' : 'bundle_local_none_remote_balanced', + 'none_bundle' : 'bundle_local_none_remote_max_bundle', + 'compat_compat' : 'bundle_local_max_compat_remote_max_compat', + 'compat_balanced' : 'bundle_local_max_compat_remote_balanced', + 'compat_bundle' : 'bundle_local_max_compat_remote_max_bundle', + 'balanced_compat' : 'bundle_local_balanced_remote_max_compat', + 'balanced_balanced' : 'bundle_local_balanced_remote_balanced', + 'balanced_bundle' : 'bundle_local_balanced_remote_bundle', + 'bundle_compat' : 'bundle_local_max_bundle_remote_max_compat', + 'bundle_balanced' : 'bundle_local_max_bundle_remote_balanced', + 'bundle_bundle' : 'bundle_local_max_bundle_remote_max_bundle', } +bundle_options = ['compat', 'balanced', 'bundle'] + +# Given an override, these are the choices to choose from. Each choice is a +# separate test +OVERRIDE_CHOICES = { + 'initiator' : ['local', 'remote'], + 'bundle' : ['_'.join(opt) for opt in itertools.product(['none'] + bundle_options, bundle_options)], +} + +# Which scenarios support which override. All the overrides will be chosen +SCENARIO_OVERRIDES_SUPPORTED = { + "offer_answer" : ['initiator', 'bundle'], + "vp8_send_stream" : ['initiator', 'bundle'], + "open_data_channel" : ['initiator', 'bundle'], + "send_data_channel_string" : ['initiator', 'bundle'], +} + +# Things that don't work for some reason or another. +DEFAULT_BLACKLIST = [ + (r"webrtc\.firefox\.local\..*offer_answer", + "Firefox doesn't like a SDP without any media"), + (r"webrtc.*remote.*vp8_send_stream", + "We can't match payload types with a remote offer and a sending stream"), + (r"webrtc.*\.balanced_.*", + "webrtcbin doesn't implement bundle-policy=balanced"), + (r"webrtc.*\.none_bundle.*", + "Browsers want a BUNDLE group if in max-bundle mode"), +] + class MutableInt(object): def __init__(self, value): self.value = value @@ -66,11 +120,12 @@ class GstWebRTCTest(GstValidateTest): @classmethod def __get_available_peer_id(cls): + # each connection uses two peer ids peerid = cls.__last_id.value cls.__last_id.value += 2 return peerid - def __init__(self, classname, tests_manager, scenario, browser, timeout=DEFAULT_TIMEOUT): + def __init__(self, classname, tests_manager, scenario, browser, scenario_override_includes=None, timeout=DEFAULT_TIMEOUT): super().__init__("python3", classname, tests_manager.options, @@ -82,6 +137,7 @@ class GstWebRTCTest(GstValidateTest): self.current_file_path = os.path.dirname (os.path.abspath (filename)) self.certdir = None self.browser = browser + self.scenario_override_includes = scenario_override_includes def launch_server(self): if self.options.redirect_logs == 'stdout': @@ -138,6 +194,8 @@ class GstWebRTCTest(GstValidateTest): html_page = os.path.join(self.current_file_path, '..', 'web', 'single_stream.html') html_params = '?server=127.0.0.1&port=' + str(self.server_port) + '&id=' + str(web_id) self.add_arguments("file://" + html_page + html_params) + self.add_arguments("--name") + self.add_arguments(self.classname) self.add_arguments('--peer-id') self.add_arguments(str(web_id)) self.add_arguments(str(gst_id)) @@ -154,10 +212,27 @@ class GstWebRTCTest(GstValidateTest): self.__used_ports.remove(self.server_port) if self.certdir: shutil.rmtree(self.certdir, ignore_errors=True) - self.certdir return res + def get_subproc_env(self): + env = super().get_subproc_env() + if not self.scenario_override_includes: + return env + + # this feels gross... + paths = env.get('GST_VALIDATE_SCENARIOS_PATH', '').split(os.pathsep) + new_paths = [] + for p in paths: + new_paths.append(p) + for override_path in self.scenario_override_includes: + new_p = os.path.join(p, override_path) + if os.path.exists (new_p): + new_paths.append(new_p) + env['GST_VALIDATE_SCENARIOS_PATH'] = os.pathsep.join(new_paths) + + return env + class GstWebRTCTestsManager(TestsManager): scenarios_manager = ScenarioManager() name = "webrtc" @@ -165,23 +240,50 @@ class GstWebRTCTestsManager(TestsManager): def __init__(self): super(GstWebRTCTestsManager, self).__init__() self.loading_testsuite = self.name + self._scenarios = [] + + def add_scenarios(self, scenarios): + if isinstance(scenarios, list): + self._scenarios.extend(scenarios) + else: + self._scenarios.append(scenarios) - def webrtc_server_address(self): - return "wss://127.0.0.1:8443" + self._scenarios = list(set(self._scenarios)) + + def set_scenarios(self, scenarios): + self._scenarios = [] + self.add_scenarios(scenarios) + + def get_scenarios(self): + return self._scenarios def populate_testsuite(self): self.add_scenarios (DEFAULT_SCENARIOS) + self.set_default_blacklist(DEFAULT_BLACKLIST) + + def list_tests(self): + if self.tests: + return self.tests scenarios = [(scenario_name, self.scenarios_manager.get_scenario(scenario_name)) for scenario_name in self.get_scenarios()] - for name, scenario in scenarios: - if not scenario: - self.warning("Could not find scenario %s" % name) - continue - for browser in DEFAULT_BROWSERS: - if name in BROWSER_SCENARIO_BLACKLISTS[browser]: - self.warning('Skipping broken test', name, 'for browser', browser) + for browser in DEFAULT_BROWSERS: + for name, scenario in scenarios: + if not scenario: + self.warning("Could not find scenario %s" % name) continue - classname = browser + '_' + name - self.add_test(GstWebRTCTest(classname, self, scenario, browser)) + if not SCENARIO_OVERRIDES_SUPPORTED[name]: + # no override choices supported + classname = browser + '.' + name + print ("adding", classname) + self.add_test(GstWebRTCTest(classname, self, scenario, browser)) + else: + for overrides in itertools.product(*[OVERRIDE_CHOICES[c] for c in SCENARIO_OVERRIDES_SUPPORTED[name]]): + oname = '.'.join (overrides) + opaths = [SCENARIO_OVERRIDES[p] for p in overrides] + classname = browser + '.' + oname + '.' + name + print ("adding", classname) + self.add_test(GstWebRTCTest(classname, self, scenario, browser, opaths)) + + return self.tests diff --git a/webrtc/check/validate/browser.py b/webrtc/check/validate/browser.py index 536a031..7db9b0b 100644 --- a/webrtc/check/validate/browser.py +++ b/webrtc/check/validate/browser.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2018, Matthew Waters +# Copyright (c) 2020, Matthew Waters # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -17,11 +15,14 @@ # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, # Boston, MA 02110-1301, USA. +import logging from selenium import webdriver from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.firefox.firefox_profile import FirefoxProfile from selenium.webdriver.chrome.options import Options as COptions +l = logging.getLogger(__name__) + def create_firefox_driver(): capabilities = webdriver.DesiredCapabilities().FIREFOX.copy() capabilities['acceptSslCerts'] = True @@ -39,6 +40,9 @@ def create_chrome_driver(): copts.add_argument('--use-fake-ui-for-media-stream') copts.add_argument('--use-fake-device-for-media-stream') copts.add_argument('--enable-blink-features=RTCUnifiedPlanByDefault') + # XXX: until libnice can deal with mdns candidates + local_state = {"enabled_labs_experiments" : ["enable-webrtc-hide-local-ips-with-mdns@2"] } + copts.add_experimental_option("localState", {"browser" : local_state}) return webdriver.Chrome(options=copts, desired_capabilities=capabilities) @@ -48,7 +52,7 @@ def create_driver(name): elif name == 'chrome': return create_chrome_driver() else: - raise ValueError("Unknown browser name " + name) + raise ValueError("Unknown browser name \'" + name + "\'") def valid_int(n): if isinstance(n, int): @@ -62,18 +66,23 @@ def valid_int(n): return False class Browser(object): - def __init__(self, driver, html_source): + """ + A browser as connected through selenium. + """ + def __init__(self, driver): + l.info('Using driver \'' + driver.name + '\' with capabilities ' + str(driver.capabilities)) self.driver = driver - self.html_source = html_source + + def open(self, html_source): + self.driver.get(html_source) def get_peer_id(self): - self.driver.get(self.html_source) - peer_id = WebDriverWait(self.driver, 10).until( + peer_id = WebDriverWait(self.driver, 5).until( lambda x: x.find_element_by_id('peer-id'), message='Peer-id element was never seen' ) - WebDriverWait (self.driver, 10).until( + WebDriverWait (self.driver, 5).until( lambda x: valid_int(peer_id.text), message='Peer-id never became a number' ) - return int(peer_id.text) + return int(peer_id.text) diff --git a/webrtc/check/validate/client.py b/webrtc/check/validate/client.py index 24c8bcd..4a19982 100644 --- a/webrtc/check/validate/client.py +++ b/webrtc/check/validate/client.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2018, Matthew Waters +# Copyright (c) 2020, Matthew Waters # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -20,8 +18,8 @@ import threading import copy -from observer import Signal -from enums import NegotiationState +from observer import Signal, WebRTCObserver, DataChannelObserver, StateObserver +from enums import NegotiationState, DataChannelState import gi gi.require_version("Gst", "1.0") @@ -33,10 +31,13 @@ from gi.repository import GstSdp gi.require_version("GstValidate", "1.0") from gi.repository import GstValidate -class WebRTCBinObserver(object): + +class WebRTCBinObserver(WebRTCObserver): + """ + Observe a webrtcbin element. + """ def __init__(self, element): - self.state = NegotiationState.NEW - self.state_cond = threading.Condition() + WebRTCObserver.__init__(self) self.element = element self.signal_handlers = [] self.signal_handlers.append(element.connect("on-negotiation-needed", self._on_negotiation_needed)) @@ -44,17 +45,16 @@ class WebRTCBinObserver(object): self.signal_handlers.append(element.connect("pad-added", self._on_pad_added)) self.signal_handlers.append(element.connect("on-new-transceiver", self._on_new_transceiver)) self.signal_handlers.append(element.connect("on-data-channel", self._on_data_channel)) - self.on_offer_created = Signal() - self.on_answer_created = Signal() + self.negotiation_needed = 0 + self._negotiation_needed_observer = StateObserver(self, "negotiation_needed", threading.Condition()) self.on_negotiation_needed = Signal() self.on_ice_candidate = Signal() self.on_pad_added = Signal() self.on_new_transceiver = Signal() - self.on_data_channel = Signal() - self.on_local_description_set = Signal() - self.on_remote_description_set = Signal() def _on_negotiation_needed(self, element): + self.negotiation_needed += 1 + self._negotiation_needed_observer.update(self.negotiation_needed) self.on_negotiation_needed.fire() def _on_ice_candidate(self, element, mline, candidate): @@ -63,35 +63,19 @@ class WebRTCBinObserver(object): def _on_pad_added(self, element, pad): self.on_pad_added.fire(pad) - def _on_local_description_set(self, promise, desc): - self._update_negotiation_from_description_state(desc) - self.on_local_description_set.fire(desc) - - def _on_remote_description_set(self, promise, desc): - self._update_negotiation_from_description_state(desc) - self.on_remote_description_set.fire(desc) + def _on_description_set(self, promise, desc): + new_state = self._update_negotiation_from_description_state(desc) + if new_state == NegotiationState.OFFER_SET: + self.on_offer_set.fire (desc) + elif new_state == NegotiationState.ANSWER_SET: + self.on_answer_set.fire (desc) def _on_new_transceiver(self, element, transceiver): self.on_new_transceiver.fire(transceiver) - def _on_data_channel(self, element): - self.on_data_channel.fire(desc) - - def _update_negotiation_state(self, new_state): - with self.state_cond: - old_state = self.state - self.state = new_state - self.state_cond.notify_all() - print ("observer updated state to", new_state) - - def wait_for_negotiation_states(self, states): - ret = None - with self.state_cond: - while self.state not in states: - self.state_cond.wait() - print ("observer waited for", states) - ret = self.state - return ret + def _on_data_channel(self, element, channel): + observer = WebRTCBinDataChannelObserver(channel, channel.props.label, 'remote') + self.add_channel(observer) def _update_negotiation_from_description_state(self, desc): new_state = None @@ -101,6 +85,7 @@ class WebRTCBinObserver(object): new_state = NegotiationState.ANSWER_SET assert new_state is not None self._update_negotiation_state(new_state) + return new_state def _deepcopy_session_description(self, desc): # XXX: passing 'offer' to both a promise and an action signal without @@ -115,11 +100,10 @@ class WebRTCBinObserver(object): offer = reply['offer'] new_offer = self._deepcopy_session_description(offer) - promise = Gst.Promise.new_with_change_func(self._on_local_description_set, new_offer) + promise = Gst.Promise.new_with_change_func(self._on_description_set, new_offer) new_offer = self._deepcopy_session_description(offer) self.element.emit('set-local-description', new_offer, promise) - self.on_offer_created.fire(offer) def _on_answer_created(self, promise, element): @@ -128,11 +112,10 @@ class WebRTCBinObserver(object): offer = reply['answer'] new_offer = self._deepcopy_session_description(offer) - promise = Gst.Promise.new_with_change_func(self._on_local_description_set, new_offer) + promise = Gst.Promise.new_with_change_func(self._on_description_set, new_offer) new_offer = self._deepcopy_session_description(offer) self.element.emit('set-local-description', new_offer, promise) - self.on_answer_created.fire(offer) def create_offer(self, options=None): @@ -144,13 +127,24 @@ class WebRTCBinObserver(object): self.element.emit('create-answer', options, promise) def set_remote_description(self, desc): - promise = Gst.Promise.new_with_change_func(self._on_remote_description_set, desc) + promise = Gst.Promise.new_with_change_func(self._on_description_set, desc) self.element.emit('set-remote-description', desc, promise) def add_ice_candidate(self, mline, candidate): self.element.emit('add-ice-candidate', mline, candidate) + def add_data_channel(self, ident): + channel = self.element.emit('create-data-channel', ident, None) + observer = WebRTCBinDataChannelObserver(channel, ident, 'local') + self.add_channel(observer) + + def wait_for_negotiation_needed(self, generation): + self._negotiation_needed_observer.wait_for ((generation,)) + class WebRTCStream(object): + """ + An stream attached to a webrtcbin element + """ def __init__(self): self.bin = None @@ -189,6 +183,10 @@ class WebRTCStream(object): self.bin.sync_state_with_parent() class WebRTCClient(WebRTCBinObserver): + """ + Client for performing webrtc operations. Controls the pipeline that + contains a webrtcbin element. + """ def __init__(self): self.pipeline = Gst.Pipeline(None) self.webrtcbin = Gst.ElementFactory.make("webrtcbin") @@ -210,3 +208,42 @@ class WebRTCClient(WebRTCBinObserver): stream.set_description(desc) stream.add_and_link_to (self.pipeline, self.webrtcbin, pad) self._streams.append(stream) + + def set_options (self, opts): + if opts.has_field("local-bundle-policy"): + self.webrtcbin.props.bundle_policy = opts["local-bundle-policy"] + + +class WebRTCBinDataChannelObserver(DataChannelObserver): + """ + Data channel observer for a webrtcbin data channel. + """ + def __init__(self, target, ident, location): + super().__init__(ident, location) + self.target = target + self.signal_handlers = [] + self.signal_handlers.append(target.connect("on-open", self._on_open)) + self.signal_handlers.append(target.connect("on-close", self._on_close)) + self.signal_handlers.append(target.connect("on-error", self._on_error)) + self.signal_handlers.append(target.connect("on-message-data", self._on_message_data)) + self.signal_handlers.append(target.connect("on-message-string", self._on_message_string)) + self.signal_handlers.append(target.connect("on-buffered-amount-low", self._on_buffered_amount_low)) + + def _on_open(self, channel): + self._update_state (DataChannelState.OPEN) + def _on_close(self, channel): + self._update_state (DataChannelState.CLOSED) + def _on_error(self, channel): + self._update_state (DataChannelState.ERROR) + def _on_message_data(self, channel, data): + self.data.append(msg) + def _on_message_string(self, channel, msg): + self.got_message (msg) + def _on_buffered_amount_low(self, channel): + pass + + def close(self): + self.target.emit('close') + + def send_string (self, msg): + self.target.emit('send-string', msg) diff --git a/webrtc/check/validate/enums.py b/webrtc/check/validate/enums.py index 14bc31a..a23d2c9 100644 --- a/webrtc/check/validate/enums.py +++ b/webrtc/check/validate/enums.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018, Matthew Waters +# Copyright (c) 2020, Matthew Waters # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -15,22 +15,57 @@ # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, # Boston, MA 02110-1301, USA. -class SignallingState(object): +from enum import Enum, unique + +@unique +class SignallingState(Enum): + """ + State of the signalling protocol. + """ NEW = "new" # no connection has been made + ERROR = "error" # an error was thrown. overrides all others OPEN = "open" # websocket connection is open - ERROR = "error" # and error was thrown. overrides all others HELLO = "hello" # hello was sent and received SESSION = "session" # session setup was sent and received -class NegotiationState(object): - NEW = "new" - ERROR = "error" - NEGOTIATION_NEEDED = "negotiation-needed" - OFFER_CREATED = "offer-created" - ANSWER_CREATED = "answer-created" - OFFER_SET = "offer-set" - ANSWER_SET = "answer-set" +@unique +class NegotiationState(Enum): + """ + State of the webrtc negotiation. Both peers have separate states and are + tracked separately. + """ + NEW = "new" # No negotiation has been performed + ERROR = "error" # an error occured + OFFER_CREATED = "offer-created" # offer was created + ANSWER_CREATED = "answer-created" # answer was created + OFFER_SET = "offer-set" # offer has been set + ANSWER_SET = "answer-set" # answer has been set + +@unique +class DataChannelState(Enum): + """ + State of a data channel. Each data channel is tracked individually + """ + NEW = "new" # data channel created but not connected + OPEN = "open" # data channel is open, data can flow + CLOSED = "closed" # data channel is closed, sending data will fail + ERROR = "error" # data channel encountered an error -class RemoteState(object): - ERROR = "error" - REMOTE_STREAM_RECEIVED = "remote-stream-received" +@unique +class Actions(Enum): + """ + Action names that we implement. Each name is the structure name for each + action as stored in the scenario file. + """ + CREATE_OFFER = "create-offer" # create an offer and send it to the peer + CREATE_ANSWER = "create-answer" # create an answer and send it to the peer + WAIT_FOR_NEGOTIATION_STATE = "wait-for-negotiation-state" # wait for the @NegotiationState to reach a certain value + ADD_STREAM = "add-stream" # add a stream to send to the peer. local only + ADD_DATA_CHANNEL = "add-data-channel" # add a stream to send to the peer. local only + WAIT_FOR_DATA_CHANNEL = "wait-for-data-channel" # wait for a data channel to appear + WAIT_FOR_DATA_CHANNEL_STATE = "wait-for-data-channel-state" # wait for a data channel to have a certain state + SEND_DATA_CHANNEL_STRING = "send-data-channel-string" # send a string over the data channel + WAIT_FOR_DATA_CHANNEL_STRING = "wait-for-data-channel-string" # wait for a string on the data channel + CLOSE_DATA_CHANNEL = "close-data-channel" # close a data channel + WAIT_FOR_NEGOTIATION_NEEDED = "wait-for-negotiation-needed" # wait for negotiation needed to fire + SET_WEBRTC_OPTIONS = "set-webrtc-options" # set some options diff --git a/webrtc/check/validate/observer.py b/webrtc/check/validate/observer.py index b4f6be5..d607858 100644 --- a/webrtc/check/validate/observer.py +++ b/webrtc/check/validate/observer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018, Matthew Waters +# Copyright (c) 2020, Matthew Waters # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -15,7 +15,17 @@ # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, # Boston, MA 02110-1301, USA. +import logging +import threading + +from enums import NegotiationState, DataChannelState + +l = logging.getLogger(__name__) + class Signal(object): + """ + A class for callback-based signal handlers. + """ def __init__(self, cont_func=None, accum_func=None): self._handlers = [] if not cont_func: @@ -41,3 +51,119 @@ class Signal(object): if not self.cont_func(ret): break return ret + + +class StateObserver(object): + """ + Observe some state. Allows waiting for specific states to occur and + notifying waiters of updated values. Will hold previous states to ensure + @update cannot change the state before @wait_for can look at the state. + """ + def __init__(self, target, attr_name, cond): + self.target = target + self.attr_name = attr_name + self.cond = cond + # track previous states of the value so that the notification still + # occurs even if the field has moved on to another state + self.previous_states = [] + + def wait_for(self, states): + ret = None + with self.cond: + state = getattr (self.target, self.attr_name) + l.debug (str(self.target) + " \'" + self.attr_name + + "\' waiting for " + str(states)) + while True: + l.debug(str(self.target) + " \'" + self.attr_name + + "\' previous states: " + str(self.previous_states)) + for i, s in enumerate (self.previous_states): + if s in states: + l.debug(str(self.target) + " \'" + self.attr_name + + "\' " + str(s) + " found at position " + + str(i) + " of " + str(self.previous_states)) + self.previous_states = self.previous_states[i:] + return s + self.cond.wait() + + def update (self, new_state): + with self.cond: + self.previous_states += [new_state] + setattr(self.target, self.attr_name, new_state) + self.cond.notify_all() + l.debug (str(self.target) + " updated \'" + self.attr_name + "\' to " + str(new_state)) + + +class WebRTCObserver(object): + """ + Base webrtc observer class. Stores a lot of duplicated functionality + between the local and remove peer implementations. + """ + def __init__(self): + self.state = NegotiationState.NEW + self._state_observer = StateObserver(self, "state", threading.Condition()) + self.on_offer_created = Signal() + self.on_answer_created = Signal() + self.on_offer_set = Signal() + self.on_answer_set = Signal() + self.on_data_channel = Signal() + self.data_channels = [] + self._xxxxxxxdata_channel_ids = None + self._data_channels_observer = StateObserver(self, "_xxxxxxxdata_channel_ids", threading.Condition()) + + def _update_negotiation_state(self, new_state): + self._state_observer.update (new_state) + + def wait_for_negotiation_states(self, states): + return self._state_observer.wait_for (states) + + def find_channel (self, ident): + for c in self.data_channels: + if c.ident == ident: + return c + + def add_channel (self, channel): + l.debug('adding channel ' + str (channel) + ' with name ' + str(channel.ident)) + self.data_channels.append (channel) + self._data_channels_observer.update (channel.ident) + self.on_data_channel.fire(channel) + + def wait_for_data_channel(self, ident): + return self._data_channels_observer.wait_for (ident) + + def create_offer(self, options): + raise NotImplementedError() + + def add_data_channel(self, ident): + raise NotImplementedError() + + +class DataChannelObserver(object): + """ + Base webrtc data channelobserver class. Stores a lot of duplicated + functionality between the local and remove peer implementations. + """ + def __init__(self, ident, location): + self.state = DataChannelState.NEW + self._state_observer = StateObserver(self, "state", threading.Condition()) + self.ident = ident + self.location = location + self.message = None + self._message_observer = StateObserver(self, "message", threading.Condition()) + + def _update_state(self, new_state): + self._state_observer.update (new_state) + + def wait_for_states(self, states): + return self._state_observer.wait_for (states) + + def wait_for_message (self, msg): + return self._message_observer.wait_for (msg) + + def got_message(self, msg): + self._message_observer.update (msg) + + def close (self): + raise NotImplementedError() + + def send_string (self, msg): + raise NotImplementedError() diff --git a/webrtc/check/validate/scenarios/bundle_local_balanced_remote_balanced/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_balanced_remote_balanced/bundle_policy.scenario new file mode 100644 index 0000000..b3220c4 --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_balanced_remote_balanced/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=balanced, remote_bundle_policy=balanced diff --git a/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_bundle/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_bundle/bundle_policy.scenario new file mode 100644 index 0000000..fd29616 --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_bundle/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=balanced, remote_bundle_policy=max-bundle diff --git a/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_compat/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_compat/bundle_policy.scenario new file mode 100644 index 0000000..d770994 --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_balanced_remote_max_compat/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=balanced, remote_bundle_policy=max-compat diff --git a/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_balanced/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_balanced/bundle_policy.scenario new file mode 100644 index 0000000..34c2946 --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_balanced/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=max-bundle, remote_bundle_policy=balanced diff --git a/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_bundle/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_bundle/bundle_policy.scenario new file mode 100644 index 0000000..6504adf --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_bundle/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=max-bundle, remote_bundle_policy=max-bundle diff --git a/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_compat/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_compat/bundle_policy.scenario new file mode 100644 index 0000000..cbe82f4 --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_max_bundle_remote_max_compat/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=max-bundle, remote_bundle_policy=max-compat diff --git a/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_balanced/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_balanced/bundle_policy.scenario new file mode 100644 index 0000000..30bd7d1 --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_balanced/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=max-compat, remote_bundle_policy=balanced diff --git a/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_bundle/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_bundle/bundle_policy.scenario new file mode 100644 index 0000000..2b564b9 --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_bundle/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=max-compat, remote_bundle_policy=max-bundle diff --git a/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_compat/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_compat/bundle_policy.scenario new file mode 100644 index 0000000..34508fa --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_max_compat_remote_max_compat/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=max-compat, remote_bundle_policy=max-compat diff --git a/webrtc/check/validate/scenarios/bundle_local_none_remote_balanced/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_none_remote_balanced/bundle_policy.scenario new file mode 100644 index 0000000..57d208a --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_none_remote_balanced/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=none, remote_bundle_policy=balanced diff --git a/webrtc/check/validate/scenarios/bundle_local_none_remote_max_bundle/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_none_remote_max_bundle/bundle_policy.scenario new file mode 100644 index 0000000..adee196 --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_none_remote_max_bundle/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=none, remote_bundle_policy=max-bundle diff --git a/webrtc/check/validate/scenarios/bundle_local_none_remote_max_compat/bundle_policy.scenario b/webrtc/check/validate/scenarios/bundle_local_none_remote_max_compat/bundle_policy.scenario new file mode 100644 index 0000000..3708815 --- /dev/null +++ b/webrtc/check/validate/scenarios/bundle_local_none_remote_max_compat/bundle_policy.scenario @@ -0,0 +1 @@ +set-vars, local_bundle_policy=none, remote_bundle_policy=max-compat diff --git a/webrtc/check/validate/scenarios/local_initiates_negotiation/negotiation_initiator.scenario b/webrtc/check/validate/scenarios/local_initiates_negotiation/negotiation_initiator.scenario new file mode 100644 index 0000000..022d235 --- /dev/null +++ b/webrtc/check/validate/scenarios/local_initiates_negotiation/negotiation_initiator.scenario @@ -0,0 +1 @@ +set-vars, negotiation_initiator=local, negotiation_responder=remote diff --git a/webrtc/check/validate/scenarios/offer_answer.scenario b/webrtc/check/validate/scenarios/offer_answer.scenario index 559f65f..4e797bd 100644 --- a/webrtc/check/validate/scenarios/offer_answer.scenario +++ b/webrtc/check/validate/scenarios/offer_answer.scenario @@ -1,3 +1,15 @@ -description, summary="Produce an offer" -create-offer; -wait-for-negotiation-state, state="answer-set" +description, summary="Produce an offer and negotiate it with the peer" +include,location=negotiation_initiator.scenario +include,location=bundle_policy.scenario + +set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)" + +create-offer, which="$(negotiation_initiator)"; +# all of these waits are technically unnecessary and only the last is needed +wait-for-negotiation-state, which="$(negotiation_initiator)", state="offer-created" +wait-for-negotiation-state, which="$(negotiation_initiator)", state="offer-set" +wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set" +create-answer, which="$(negotiation_responder)"; +wait-for-negotiation-state, which="$(negotiation_responder)", state="answer-created" +wait-for-negotiation-state, which="$(negotiation_responder)", state="answer-set" +wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set" diff --git a/webrtc/check/validate/scenarios/open_data_channel.scenario b/webrtc/check/validate/scenarios/open_data_channel.scenario new file mode 100644 index 0000000..8c1bc39 --- /dev/null +++ b/webrtc/check/validate/scenarios/open_data_channel.scenario @@ -0,0 +1,23 @@ +description, summary="Open a data channel" +include,location=negotiation_initiator.scenario +include,location=bundle_policy.scenario + +set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)" + +# add the channel on the initiator so that datachannel is added to the sdp +add-data-channel, which="$(negotiation_initiator)", id="gstreamer"; + +# negotiate +create-offer, which="$(negotiation_initiator)"; +wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set" +create-answer, which="$(negotiation_responder)"; +wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set" + +# ensure data channel is created +wait-for-data-channel, which="$(negotiation_responder)", id="gstreamer"; +wait-for-data-channel, which="$(negotiation_initiator)", id="gstreamer"; +wait-for-data-channel-state, which="$(negotiation_initiator)", id="gstreamer", state="open"; + +# only the browser closing works at the moment +close-data-channel, which="remote", id="gstreamer" +wait-for-data-channel-state, which="local", id="gstreamer", state="closed"; diff --git a/webrtc/check/validate/scenarios/remote_initiates_negotiation/negotiation_initiator.scenario b/webrtc/check/validate/scenarios/remote_initiates_negotiation/negotiation_initiator.scenario new file mode 100644 index 0000000..07ba3bb --- /dev/null +++ b/webrtc/check/validate/scenarios/remote_initiates_negotiation/negotiation_initiator.scenario @@ -0,0 +1 @@ +set-vars, negotiation_initiator=remote, negotiation_responder=local diff --git a/webrtc/check/validate/scenarios/send_data_channel_string.scenario b/webrtc/check/validate/scenarios/send_data_channel_string.scenario new file mode 100644 index 0000000..5d247ba --- /dev/null +++ b/webrtc/check/validate/scenarios/send_data_channel_string.scenario @@ -0,0 +1,21 @@ +description, summary="Send data over a data channel" +include,location=negotiation_initiator.scenario +include,location=bundle_policy.scenario + +set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)" + +add-data-channel, which="$(negotiation_initiator)", id="gstreamer"; + +create-offer, which="$(negotiation_initiator)"; +wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set" +create-answer, which="$(negotiation_responder)"; +wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set" + +# wait for the data channel to appear +wait-for-data-channel, which="$(negotiation_initiator)", id="gstreamer"; +wait-for-data-channel, which="$(negotiation_responder)", id="gstreamer"; +wait-for-data-channel-state, which="$(negotiation_initiator)", id="gstreamer", state="open"; + +# send something +send-data-channel-string, which="local", id="gstreamer", msg="some data"; +wait-for-data-channel-string, which="remote", id="gstreamer", msg="some data"; diff --git a/webrtc/check/validate/scenarios/vp8_send_stream.scenario b/webrtc/check/validate/scenarios/vp8_send_stream.scenario index 2ec15a5..3081488 100644 --- a/webrtc/check/validate/scenarios/vp8_send_stream.scenario +++ b/webrtc/check/validate/scenarios/vp8_send_stream.scenario @@ -1,5 +1,15 @@ description, summary="Send a VP8 stream", handles-state=true +include,location=negotiation_initiator.scenario +include,location=bundle_policy.scenario + +set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)" + add-stream, pipeline="videotestsrc is-live=1 ! vp8enc ! rtpvp8pay ! queue" set-state, state="playing"; -create-offer; -wait-for-negotiation-state, state="answer-set" +wait-for-negotiation-needed, generation=1; + +# negotiate +create-offer, which="$(negotiation_initiator)"; +wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set" +create-answer, which="$(negotiation_responder)"; +wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set" diff --git a/webrtc/check/validate/signalling.py b/webrtc/check/validate/signalling.py index 2a7d938..2e6d674 100644 --- a/webrtc/check/validate/signalling.py +++ b/webrtc/check/validate/signalling.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2018, Matthew Waters +# Copyright (c) 2020, Matthew Waters # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -24,11 +22,17 @@ import os import sys import threading import json +import logging + +from observer import Signal, StateObserver, WebRTCObserver, DataChannelObserver +from enums import SignallingState, NegotiationState, DataChannelState -from observer import Signal -from enums import SignallingState, RemoteState +l = logging.getLogger(__name__) class AsyncIOThread(threading.Thread): + """ + Run an asyncio loop in another thread. + """ def __init__ (self, loop): threading.Thread.__init__(self) self.loop = loop @@ -40,16 +44,24 @@ class AsyncIOThread(threading.Thread): def stop_thread(self): self.loop.call_soon_threadsafe(self.loop.stop) + class SignallingClientThread(object): + """ + Connect to a signalling server + """ def __init__(self, server): + # server string to connect to. Passed directly to websockets.connect() self.server = server + # fired after we have connected to the signalling server self.wss_connected = Signal() + # fired every time we receive a message from the signalling server self.message = Signal() self._init_async() def _init_async(self): + self._running = False self.conn = None self._loop = asyncio.new_event_loop() @@ -59,23 +71,31 @@ class SignallingClientThread(object): self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(self._a_loop())) async def _a_connect(self): + # connect to the signalling server assert not self.conn sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) self.conn = await websockets.connect(self.server, ssl=sslctx) async def _a_loop(self): + self._running = True + l.info('loop started') await self._a_connect() self.wss_connected.fire() assert self.conn async for message in self.conn: self.message.fire(message) + l.info('loop exited') def send(self, data): + # send some information to the peer async def _a_send(): await self.conn.send(data) self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_send())) def stop(self): + if self._running == False: + return + cond = threading.Condition() # asyncio, why you so complicated to stop ? @@ -87,64 +107,70 @@ class SignallingClientThread(object): to_wait = [t for t in tasks if not t.done()] if to_wait: + l.info('waiting for ' + str(to_wait)) done, pending = await asyncio.wait(to_wait) with cond: + l.error('notifying cond') cond.notify() + self._running = False - self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_stop())) with cond: - cond.wait() + self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_stop())) + l.error('cond waiting') + cond.wait() + l.error('cond waited') self._thread.stop_thread() self._thread.join() + l.error('thread joined') + class WebRTCSignallingClient(SignallingClientThread): + """ + Signalling client implementation. Deals wit session management over the + signalling protocol. Sends and receives from a peer. + """ def __init__(self, server, id_): super().__init__(server) self.wss_connected.connect(self._on_connection) self.message.connect(self._on_message) self.state = SignallingState.NEW - self._state_cond = threading.Condition() + self._state_observer = StateObserver(self, "state", threading.Condition()) self.id = id_ self._peerid = None - # override that base class + # fired when the hello has been received self.connected = Signal() + # fired when the signalling server responds that the session creation is ok self.session_created = Signal() + # fired on an error self.error = Signal() + # fired when the peer receives some json data self.have_json = Signal() + def _update_state(self, new_state): + self._state_observer.update (new_state) + def wait_for_states(self, states): - ret = None - with self._state_cond: - while self.state not in states: - self._state_cond.wait() - ret = self.state - return ret - - def _update_state(self, state): - with self._state_cond: - if self.state is not SignallingState.ERROR: - self.state = state - self._state_cond.notify_all() + return self._state_observer.wait_for (states) def hello(self): self.send('HELLO ' + str(self.id)) + l.info("sent HELLO") self.wait_for_states([SignallingState.HELLO]) - print("signalling-client sent HELLO") def create_session(self, peerid): self._peerid = peerid self.send('SESSION {}'.format(self._peerid)) - self.wait_for_states([SignallingState.SESSION, SignallingState.ERROR]) - print("signalling-client sent SESSION") + l.info("sent SESSION") + self.wait_for_states([SignallingState.SESSION]) def _on_connection(self): self._update_state (SignallingState.OPEN) def _on_message(self, message): - print("signalling-client received", message) + l.debug("received: " + message) if message == 'HELLO': self._update_state (SignallingState.HELLO) self.connected.fire() @@ -159,3 +185,82 @@ class WebRTCSignallingClient(SignallingClientThread): self.have_json.fire(msg) return False + +class RemoteWebRTCObserver(WebRTCObserver): + """ + Use information sent over the signalling channel to construct the current + state of a remote peer. Allow performing actions by sending requests over + the signalling channel. + """ + def __init__(self, signalling): + super().__init__() + self.signalling = signalling + + def on_json(msg): + if 'STATE' in msg: + state = NegotiationState (msg['STATE']) + self._update_negotiation_state(state) + if state == NegotiationState.OFFER_CREATED: + self.on_offer_created.fire(msg['description']) + elif state == NegotiationState.ANSWER_CREATED: + self.on_answer_created.fire(msg['description']) + elif state == NegotiationState.OFFER_SET: + self.on_offer_set.fire (msg['description']) + elif state == NegotiationState.ANSWER_SET: + self.on_answer_set.fire (msg['description']) + elif 'DATA-NEW' in msg: + new = msg['DATA-NEW'] + observer = RemoteDataChannelObserver(new['id'], new['location'], self) + self.add_channel (observer) + elif 'DATA-STATE' in msg: + ident = msg['id'] + channel = self.find_channel(ident) + channel._update_state (DataChannelState(msg['DATA-STATE'])) + elif 'DATA-MSG' in msg: + ident = msg['id'] + channel = self.find_channel(ident) + channel.got_message(msg['DATA-MSG']) + self.signalling.have_json.connect (on_json) + + def add_data_channel (self, ident): + msg = json.dumps({'DATA_CREATE': {'id': ident}}) + self.signalling.send (msg) + + def create_offer (self): + msg = json.dumps({'CREATE_OFFER': ""}) + self.signalling.send (msg) + + def create_answer (self): + msg = json.dumps({'CREATE_ANSWER': ""}) + self.signalling.send (msg) + + def set_title (self, title): + # entirely for debugging purposes + msg = json.dumps({'SET_TITLE': title}) + self.signalling.send (msg) + + def set_options (self, opts): + options = {} + if opts.has_field("remote-bundle-policy"): + options["bundlePolicy"] = opts["remote-bundle-policy"] + msg = json.dumps({'OPTIONS' : options}) + self.signalling.send (msg) + + +class RemoteDataChannelObserver(DataChannelObserver): + """ + Use information sent over the signalling channel to construct the current + state of a remote peer's data channel. Allow performing actions by sending + requests over the signalling channel. + """ + def __init__(self, ident, location, webrtc): + super().__init__(ident, location) + self.webrtc = webrtc + + def send_string(self, msg): + msg = json.dumps({'DATA_SEND_MSG': {'msg' : msg, 'id': self.ident}}) + self.webrtc.signalling.send (msg) + + def close (self): + msg = json.dumps({'DATA_CLOSE': {'id': self.ident}}) + self.webrtc.signalling.send (msg) diff --git a/webrtc/check/validate/testsuites/webrtc.py b/webrtc/check/validate/testsuites/webrtc.py index c078e5d..3101098 100644 --- a/webrtc/check/validate/testsuites/webrtc.py +++ b/webrtc/check/validate/testsuites/webrtc.py @@ -1,6 +1,4 @@ -#*- vi:si:et:sw=4:sts=4:ts=4:syntax=python -# -# Copyright (c) 2018 Matthew Waters +# Copyright (c) 2020 Matthew Waters # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -25,12 +23,9 @@ import os TEST_MANAGER = "webrtc" -BLACKLIST = [ - ] def setup_tests(test_manager, options): print("Setting up webrtc tests") -# test_manager.set_default_blacklist(BLACKLIST) return True diff --git a/webrtc/check/validate/web/single_stream.html b/webrtc/check/validate/web/single_stream.html index 9a95fb7..218b200 100644 --- a/webrtc/check/validate/web/single_stream.html +++ b/webrtc/check/validate/web/single_stream.html @@ -27,9 +27,5 @@
Our id is unknown

-
-
getUserMedia constraints being used:
-
-
diff --git a/webrtc/check/validate/web/webrtc.js b/webrtc/check/validate/web/webrtc.js index eb7e00f..2a0d535 100644 --- a/webrtc/check/validate/web/webrtc.js +++ b/webrtc/check/validate/web/webrtc.js @@ -14,13 +14,12 @@ var ws_port; var default_peer_id; // Override with your own STUN servers if you want var rtc_configuration = {iceServers: [{urls: "stun:stun.services.mozilla.com"}, - {urls: "stun:stun.l.google.com:19302"}]}; -// The default constraints that will be attempted. Can be overriden by the user. -var default_constraints = {video: true, audio: true}; + {urls: "stun:stun.l.google.com:19302"},]}; +var default_constraints = {video: true, audio: false}; var connect_attempts = 0; var peer_connection; -var send_channel; +var channels = [] var ws_conn; // Promise for local stream after constraints are approved by the user var local_stream_promise; @@ -56,7 +55,7 @@ function setError(text) { var span = document.getElementById("status") span.textContent = text; span.classList.add('error'); - ws_conn.send(JSON.stringify({'STATE': 'error'})) + ws_conn.send(JSON.stringify({'STATE': 'error', 'msg' : text})) } function resetVideo() { @@ -75,27 +74,40 @@ function resetVideo() { videoElement.load(); } +function updateRemoteStateFromSetSDPJson(sdp) { + if (sdp.type == "offer") + ws_conn.send(JSON.stringify({'STATE': 'offer-set', 'description' : sdp})) + else if (sdp.type == "answer") + ws_conn.send(JSON.stringify({'STATE': 'answer-set', 'description' : sdp})) + else + throw new Error ("Unknown SDP type!"); +} + +function updateRemoteStateFromGeneratedSDPJson(sdp) { + if (sdp.type == "offer") + ws_conn.send(JSON.stringify({'STATE': 'offer-created', 'description' : sdp})) + else if (sdp.type == "answer") + ws_conn.send(JSON.stringify({'STATE': 'answer-created', 'description' : sdp})) + else + throw new Error ("Unknown SDP type!"); +} + // SDP offer received from peer, set remote description and create an answer function onIncomingSDP(sdp) { peer_connection.setRemoteDescription(sdp).then(() => { - setStatus("Remote SDP set"); - if (sdp.type != "offer") - return; - setStatus("Got SDP offer"); - local_stream_promise.then((stream) => { - setStatus("Got local stream, creating answer"); - peer_connection.createAnswer() - .then(onLocalDescription).catch(setError); - }).catch(setError); + updateRemoteStateFromSetSDPJson(sdp) + setStatus("Set remote SDP", sdp.type); }).catch(setError); } // Local description was set, send it to peer function onLocalDescription(desc) { + updateRemoteStateFromGeneratedSDPJson(desc) console.log("Got local description: " + JSON.stringify(desc)); peer_connection.setLocalDescription(desc).then(function() { - setStatus("Sending SDP answer"); - sdp = {'sdp': peer_connection.localDescription} + updateRemoteStateFromSetSDPJson(desc) + sdp = {'sdp': desc} + setStatus("Sending SDP", sdp.type); ws_conn.send(JSON.stringify(sdp)); }); } @@ -103,9 +115,33 @@ function onLocalDescription(desc) { // ICE candidate received from peer, add it to the peer connection function onIncomingICE(ice) { var candidate = new RTCIceCandidate(ice); + console.log("adding candidate", candidate) peer_connection.addIceCandidate(candidate).catch(setError); } +function createOffer(offer) { + local_stream_promise.then((stream) => { + setStatus("Got local stream, creating offer"); + peer_connection.createOffer() + .then(onLocalDescription).catch(setError); + }).catch(setError) +} + +function createAnswer(offer) { + local_stream_promise.then((stream) => { + setStatus("Got local stream, creating answer"); + peer_connection.createAnswer() + .then(onLocalDescription).catch(setError); + }).catch(setError) +} + +function handleOptions(options) { + console.log ('received options', options); + if (options.bundlePolicy != null) { + rtc_configuration['bundlePolicy'] = options.bundlePolicy; + } +} + function onServerMessage(event) { console.log("Received " + event.data); switch (event.data) { @@ -129,14 +165,33 @@ function onServerMessage(event) { return; } + if (msg.SET_TITLE != null) { + // some debugging for tests that hang around + document.title = msg['SET_TITLE'] + return; + } else if (msg.OPTIONS != null) { + handleOptions(msg.OPTIONS); + return; + } + // Incoming JSON signals the beginning of a call if (!peer_connection) - createCall(msg); + createCall(); if (msg.sdp != null) { onIncomingSDP(msg.sdp); } else if (msg.ice != null) { onIncomingICE(msg.ice); + } else if (msg.CREATE_OFFER != null) { + createOffer(msg.CREATE_OFFER) + } else if (msg.CREATE_ANSWER != null) { + createAnswer(msg.CREATE_ANSWER) + } else if (msg.DATA_CREATE != null) { + addDataChannel(msg.DATA_CREATE.id) + } else if (msg.DATA_CLOSE != null) { + closeDataChannel(msg.DATA_CLOSE.id) + } else if (msg.DATA_SEND_MSG != null) { + sendDataChannelMessage(msg.DATA_SEND_MSG) } else { handleIncomingError("Unknown incoming JSON: " + msg); } @@ -151,6 +206,7 @@ function onServerClose(event) { peer_connection.close(); peer_connection = null; } + channels = [] // Reset after a second window.setTimeout(websocketServerConnect, 1000); @@ -164,14 +220,7 @@ function onServerError(event) { function getLocalStream() { var constraints; - var textarea = document.getElementById('constraints'); - try { - constraints = JSON.parse(textarea.value); - } catch (e) { - console.error(e); - setError('ERROR parsing constraints: ' + e.message + ', using default constraints'); - constraints = default_constraints; - } + constraints = default_constraints; console.log(JSON.stringify(constraints)); // Add local stream @@ -192,12 +241,7 @@ function websocketServerConnect() { var span = document.getElementById("status"); span.classList.remove('error'); span.textContent = ''; - // Populate constraints - var textarea = document.getElementById('constraints'); - if (textarea.value == '') - textarea.value = JSON.stringify(default_constraints); // Fetch the peer id to use - var url = new URL(window.location.href); peer_id = url.searchParams.get("id"); @@ -236,7 +280,6 @@ function onRemoteStreamAdded(event) { if (videoTracks.length > 0) { console.log('Incoming stream: ' + videoTracks.length + ' video tracks and ' + audioTracks.length + ' audio tracks'); getVideoElement().srcObject = event.stream; - ws_conn.send(JSON.stringify({'STATE': 'remote-stream-received'})) } else { handleIncomingError('Stream with unknown tracks added, resetting'); } @@ -246,53 +289,80 @@ function errorUserMediaHandler() { setError("Browser doesn't support getUserMedia!"); } -const handleDataChannelOpen = (event) =>{ - console.log("dataChannel.OnOpen", event); -}; - const handleDataChannelMessageReceived = (event) =>{ console.log("dataChannel.OnMessage:", event, event.data.type); - setStatus("Received data channel message"); - if (typeof event.data === 'string' || event.data instanceof String) { - console.log('Incoming string message: ' + event.data); - textarea = document.getElementById("text") - textarea.value = textarea.value + '\n' + event.data - } else { - console.log('Incoming data message'); - } - send_channel.send("Hi! (from browser)"); + ws_conn.send(JSON.stringify({'DATA-MSG' : event.data, 'id' : event.target.label})); +}; + +const handleDataChannelOpen = (event) =>{ + console.log("dataChannel.OnOpen", event); + ws_conn.send(JSON.stringify({'DATA-STATE' : 'open', 'id' : event.target.label})); }; const handleDataChannelError = (error) =>{ console.log("dataChannel.OnError:", error); + ws_conn.send(JSON.stringify({'DATA-STATE' : error, 'id' : event.target.label})); }; const handleDataChannelClose = (event) =>{ console.log("dataChannel.OnClose", event); + ws_conn.send(JSON.stringify({'DATA-STATE' : 'closed', 'id' : event.target.label})); }; function onDataChannel(event) { setStatus("Data channel created"); - let receiveChannel = event.channel; - receiveChannel.onopen = handleDataChannelOpen; - receiveChannel.onmessage = handleDataChannelMessageReceived; - receiveChannel.onerror = handleDataChannelError; - receiveChannel.onclose = handleDataChannelClose; + let channel = event.channel; + console.log('adding remote data channel with label', channel.label) + ws_conn.send(JSON.stringify({'DATA-NEW' : {'id' : channel.label, 'location' : 'remote'}})); + channel.onopen = handleDataChannelOpen; + channel.onmessage = handleDataChannelMessageReceived; + channel.onerror = handleDataChannelError; + channel.onclose = handleDataChannelClose; + channels.push(channel) } -function createCall(msg) { +function addDataChannel(label) { + channel = peer_connection.createDataChannel(label, null); + console.log('adding local data channel with label', label) + ws_conn.send(JSON.stringify({'DATA-NEW' : {'id' : label, 'location' : 'local'}})); + channel.onopen = handleDataChannelOpen; + channel.onmessage = handleDataChannelMessageReceived; + channel.onerror = handleDataChannelError; + channel.onclose = handleDataChannelClose; + channels.push(channel) +} + +function find_channel(label) { + console.log('find', label, 'in', channels) + for (var c in channels) { + if (channels[c].label === label) { + console.log('found', label, c) + return channels[c]; + } + } + return null; +} + +function closeDataChannel(label) { + channel = find_channel (label) + console.log('closing data channel with label', label) + channel.close() +} + +function sendDataChannelMessage(msg) { + channel = find_channel (msg.id) + console.log('sending on data channel', msg.id, 'message', msg.msg) + channel.send(msg.msg) +} + +function createCall() { // Reset connection attempts because we connected successfully connect_attempts = 0; - console.log('Creating RTCPeerConnection'); + console.log('Creating RTCPeerConnection with configuration', rtc_configuration); peer_connection = new RTCPeerConnection(rtc_configuration); - send_channel = peer_connection.createDataChannel('label', null); - send_channel.onopen = handleDataChannelOpen; - send_channel.onmessage = handleDataChannelMessageReceived; - send_channel.onerror = handleDataChannelError; - send_channel.onclose = handleDataChannelClose; peer_connection.ondatachannel = onDataChannel; peer_connection.onaddstream = onRemoteStreamAdded; /* Send our video/audio to the other peer */ @@ -302,18 +372,15 @@ function createCall(msg) { return stream; }).catch(setError); - if (!msg.sdp) { - console.log("WARNING: First message wasn't an SDP message!?"); - } - peer_connection.onicecandidate = (event) => { - // We have a candidate, send it to the remote party with the - // same uuid - if (event.candidate == null) { + // We have a candidate, send it to the remote party with the + // same uuid + if (event.candidate == null) { console.log("ICE Candidate was null, done"); return; - } - ws_conn.send(JSON.stringify({'ice': event.candidate})); + } + console.log("generated ICE Candidate", event.candidate); + ws_conn.send(JSON.stringify({'ice': event.candidate})); }; setStatus("Created peer connection for call, waiting for SDP"); diff --git a/webrtc/check/validate/webrtc_validate.py b/webrtc/check/validate/webrtc_validate.py index 6215a0a..67f3aeb 100644 --- a/webrtc/check/validate/webrtc_validate.py +++ b/webrtc/check/validate/webrtc_validate.py @@ -21,12 +21,13 @@ import os import sys import argparse import json +import logging -from signalling import WebRTCSignallingClient -from actions import register_action_types, ActionObserver +from signalling import WebRTCSignallingClient, RemoteWebRTCObserver +from actions import ActionObserver from client import WebRTCClient from browser import Browser, create_driver -from enums import SignallingState, NegotiationState, RemoteState +from enums import SignallingState, NegotiationState, DataChannelState, Actions import gi gi.require_version("GLib", "2.0") @@ -40,14 +41,20 @@ from gi.repository import GstSdp gi.require_version("GstValidate", "1.0") from gi.repository import GstValidate +FORMAT = '%(asctime)-23s %(levelname)-7s %(thread)d %(name)-24s\t%(funcName)-24s %(message)s' +LEVEL = os.environ.get("LOGLEVEL", "DEBUG") +logging.basicConfig(level=LEVEL, format=FORMAT) +l = logging.getLogger(__name__) + class WebRTCApplication(object): - def __init__(self, server, id_, peerid, scenario_name, browser_name, html_source): + def __init__(self, server, id_, peerid, scenario_name, browser_name, html_source, test_name=None): self.server = server self.peerid = peerid self.html_source = html_source self.id = id_ self.scenario_name = scenario_name self.browser_name = browser_name + self.test_name = test_name def _init_validate(self, scenario_file): self.runner = GstValidate.Runner.new() @@ -61,24 +68,79 @@ class WebRTCApplication(object): self.client.pipeline.set_state(Gst.State.PLAYING) def _on_scenario_done(self, scenario): - self.quit() - - def _connect_actions(self): - def create_offer(): - self.client.create_offer(None) - return GstValidate.ActionReturn.OK - self.actions.create_offer.connect(create_offer) + l.error ('scenario done') + GLib.idle_add(self.quit) - def wait_for_negotiation_state(state): - states = [state, NegotiationState.ERROR] - state = self.client.wait_for_negotiation_states(states) - return GstValidate.ActionReturn.OK if state != RemoteState.ERROR else GstValidate.ActionReturn.ERROR - self.actions.wait_for_negotiation_state.connect(wait_for_negotiation_state) + def _connect_actions(self, actions): + def on_action(atype, action): + """ + From a validate action, perform the action as required + """ + if atype == Actions.CREATE_OFFER: + assert action.structure["which"] in ("local", "remote") + c = self.client if action.structure["which"] == "local" else self.remote_client + c.create_offer() + return GstValidate.ActionReturn.OK + elif atype == Actions.CREATE_ANSWER: + assert action.structure["which"] in ("local", "remote") + c = self.client if action.structure["which"] == "local" else self.remote_client + c.create_answer() + return GstValidate.ActionReturn.OK + elif atype == Actions.WAIT_FOR_NEGOTIATION_STATE: + states = [NegotiationState(action.structure["state"]), NegotiationState.ERROR] + assert action.structure["which"] in ("local", "remote") + c = self.client if action.structure["which"] == "local" else self.remote_client + state = c.wait_for_negotiation_states(states) + return GstValidate.ActionReturn.OK if state != NegotiationState.ERROR else GstValidate.ActionReturn.ERROR + elif atype == Actions.ADD_STREAM: + self.client.add_stream(action.structure["pipeline"]) + return GstValidate.ActionReturn.OK + elif atype == Actions.ADD_DATA_CHANNEL: + assert action.structure["which"] in ("local", "remote") + c = self.client if action.structure["which"] == "local" else self.remote_client + c.add_data_channel(action.structure["id"]) + return GstValidate.ActionReturn.OK + elif atype == Actions.SEND_DATA_CHANNEL_STRING: + assert action.structure["which"] in ("local", "remote") + c = self.client if action.structure["which"] == "local" else self.remote_client + channel = c.find_channel (action.structure["id"]) + channel.send_string (action.structure["msg"]) + return GstValidate.ActionReturn.OK + elif atype == Actions.WAIT_FOR_DATA_CHANNEL_STATE: + assert action.structure["which"] in ("local", "remote") + c = self.client if action.structure["which"] == "local" else self.remote_client + states = [DataChannelState(action.structure["state"]), DataChannelState.ERROR] + channel = c.find_channel (action.structure["id"]) + state = channel.wait_for_states(states) + return GstValidate.ActionReturn.OK if state != DataChannelState.ERROR else GstValidate.ActionReturn.ERROR + elif atype == Actions.CLOSE_DATA_CHANNEL: + assert action.structure["which"] in ("local", "remote") + c = self.client if action.structure["which"] == "local" else self.remote_client + channel = c.find_channel (action.structure["id"]) + channel.close() + return GstValidate.ActionReturn.OK + elif atype == Actions.WAIT_FOR_DATA_CHANNEL: + assert action.structure["which"] in ("local", "remote") + c = self.client if action.structure["which"] == "local" else self.remote_client + state = c.wait_for_data_channel(action.structure["id"]) + return GstValidate.ActionReturn.OK + elif atype == Actions.WAIT_FOR_DATA_CHANNEL_STRING: + assert action.structure["which"] in ("local", "remote") + c = self.client if action.structure["which"] == "local" else self.remote_client + channel = c.find_channel (action.structure["id"]) + channel.wait_for_message(action.structure["msg"]) + return GstValidate.ActionReturn.OK + elif atype == Actions.WAIT_FOR_NEGOTIATION_NEEDED: + self.client.wait_for_negotiation_needed(action.structure["generation"]) + return GstValidate.ActionReturn.OK + elif atype == Actions.SET_WEBRTC_OPTIONS: + self.client.set_options (action.structure) + self.remote_client.set_options (action.structure) + return GstValidate.ActionReturn.OK + else: + assert "Not reached" == "" - def add_stream(pipeline): - self.client.add_stream(pipeline) - return GstValidate.ActionReturn.OK - self.actions.add_stream.connect(add_stream) + actions.action.connect (on_action) def _connect_client_observer(self): def on_offer_created(offer): @@ -87,6 +149,12 @@ class WebRTCApplication(object): self.signalling.send(msg) self.client.on_offer_created.connect(on_offer_created) + def on_answer_created(answer): + text = answer.sdp.as_text() + msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}}) + self.signalling.send(msg) + self.client.on_answer_created.connect(on_answer_created) + def on_ice_candidate(mline, candidate): msg = json.dumps({'ice': {'sdpMLineIndex': str(mline), 'candidate' : candidate}}) self.signalling.send(msg) @@ -116,6 +184,7 @@ class WebRTCApplication(object): def error(msg): # errors are unexpected + l.error ('Unexpected error: ' + msg) GLib.idle_add(self.quit) GLib.idle_add(sys.exit, -20) self.signalling.error.connect(error) @@ -126,37 +195,50 @@ class WebRTCApplication(object): self.client = WebRTCClient() self._connect_client_observer() - self.actions = ActionObserver() - register_action_types(self.actions) - self._connect_actions() - self.signalling = WebRTCSignallingClient(self.server, self.id) + self.remote_client = RemoteWebRTCObserver (self.signalling) self._connect_signalling_observer() + + actions = ActionObserver() + actions.register_action_types() + self._connect_actions(actions) + + # wait for the signalling server to start up before creating the browser self.signalling.wait_for_states([SignallingState.OPEN]) self.signalling.hello() - self.browser = Browser(create_driver(self.browser_name), self.html_source) + self.browser = Browser(create_driver(self.browser_name)) + self.browser.open(self.html_source) browser_id = self.browser.get_peer_id () assert browser_id == self.peerid self.signalling.create_session(self.peerid) + test_name = self.test_name if self.test_name else self.scenario_name + self.remote_client.set_title (test_name) self._init_validate(self.scenario_name) - print("app initialized") def quit(self): # Stop signalling first so asyncio doesn't keep us alive on weird failures + l.info('quiting') self.signalling.stop() - self.browser.driver.quit() - self.client.stop() + l.info('signalling stopped') self.main_loop.quit() + l.info('main loop stopped') + self.client.stop() + l.info('client stopped') + self.browser.driver.quit() + l.info('browser exitted') def run(self): try: self._init() + l.info("app initialized") self.main_loop.run() + l.info("loop exited") except: + l.exception("Fatal error") self.quit() raise @@ -168,6 +250,7 @@ def parse_options(): parser.add_argument('--html-source', help='HTML page to open in the browser', default=None) parser.add_argument('--scenario', help='Scenario file to execute', default=None) parser.add_argument('--browser', help='Browser name to use', default=None) + parser.add_argument('--name', help='Name of the test', default=None) return parser.parse_args() def init(): @@ -196,7 +279,7 @@ def init(): def run(): args = init() - w = WebRTCApplication (args.server, args.id, args.peer_id, args.scenario, args.browser, args.html_source) + w = WebRTCApplication (args.server, args.id, args.peer_id, args.scenario, args.browser, args.html_source, test_name=args.name) return w.run() if __name__ == "__main__":