Merging gst-examples
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / check / basic.py
1 #!/usr/bin/env python3
2
3 import os
4 import unittest
5 from selenium import webdriver
6 from selenium.webdriver.support.wait import WebDriverWait
7 from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
8 from selenium.webdriver.chrome.options import Options as COptions
9 import webrtc_sendrecv as webrtc
10 import simple_server as sserver
11 import asyncio
12 import threading
13 import signal
14
15 import gi
16 gi.require_version('Gst', '1.0')
17 from gi.repository import Gst
18
19 thread = None
20 stop = None
21 server = None
22
23 class AsyncIOThread(threading.Thread):
24     def __init__ (self, loop):
25         threading.Thread.__init__(self)
26         self.loop = loop
27
28     def run(self):
29         asyncio.set_event_loop(self.loop)
30         self.loop.run_forever()
31         self.loop.close()
32         print ("closed loop")
33
34     def stop_thread(self):
35         self.loop.call_soon_threadsafe(self.loop.stop)
36
37 async def run_until(server, stop_token):
38     async with server:
39         await stop_token
40     print ("run_until done")
41
42 def setUpModule():
43     global thread, server
44     Gst.init(None)
45     cacerts_path = os.environ.get('TEST_CA_CERT_PATH')
46     loop = asyncio.new_event_loop()
47
48     thread = AsyncIOThread(loop)
49     thread.start()
50     server = sserver.WebRTCSimpleServer('127.0.0.1', 8443, 20, False, cacerts_path)
51     def f():
52         global stop
53         stop = asyncio.ensure_future(server.run())
54     loop.call_soon_threadsafe(f)
55
56 def tearDownModule():
57     global thread, stop
58     stop.cancel()
59     thread.stop_thread()
60     thread.join()
61     print("thread joined")
62
63 def valid_int(n):
64     if isinstance(n, int):
65         return True
66     if isinstance(n, str):
67         try:
68             num = int(n)
69             return True
70         except:
71             return False
72     return False
73
74 def create_firefox_driver():
75     capabilities = webdriver.DesiredCapabilities().FIREFOX.copy()
76     capabilities['acceptSslCerts'] = True
77     capabilities['acceptInsecureCerts'] = True
78     profile = FirefoxProfile()
79     profile.set_preference ('media.navigator.streams.fake', True)
80     profile.set_preference ('media.navigator.permission.disabled', True)
81
82     return webdriver.Firefox(firefox_profile=profile, capabilities=capabilities)
83
84 def create_chrome_driver():
85     capabilities = webdriver.DesiredCapabilities().CHROME.copy()
86     capabilities['acceptSslCerts'] = True
87     capabilities['acceptInsecureCerts'] = True
88     copts = COptions()
89     copts.add_argument('--allow-file-access-from-files')
90     copts.add_argument('--use-fake-ui-for-media-stream')
91     copts.add_argument('--use-fake-device-for-media-stream')
92     copts.add_argument('--enable-blink-features=RTCUnifiedPlanByDefault')
93
94     return webdriver.Chrome(options=copts, desired_capabilities=capabilities)
95
96 class ServerConnectionTestCase(unittest.TestCase):
97     def setUp(self):
98         self.browser = create_firefox_driver()
99 #        self.browser = create_chrome_driver()
100         self.addCleanup(self.browser.quit)
101         self.html_source = os.environ.get('TEST_HTML_SOURCE')
102         self.assertIsNot(self.html_source, None)
103         self.assertNotEqual(self.html_source, '')
104         self.html_source = 'file://' + self.html_source + '/index.html'
105
106     def get_peer_id(self):
107         self.browser.get(self.html_source)
108         peer_id = WebDriverWait(self.browser, 5).until(
109             lambda x: x.find_element_by_id('peer-id'),
110             message='Peer-id element was never seen'
111         )
112         WebDriverWait (self.browser, 5).until(
113             lambda x: valid_int(peer_id.text),
114             message='Peer-id never became a number'
115         )
116         return int(peer_id.text)
117
118     def testPeerID(self):
119         self.get_peer_id()
120
121     def testPerformCall(self):
122         loop = asyncio.new_event_loop()
123         thread = AsyncIOThread(loop)
124         thread.start()
125         peer_id = self.get_peer_id()
126         client = webrtc.WebRTCClient(peer_id + 1, peer_id, 'wss://127.0.0.1:8443')
127
128         async def do_things():
129             await client.connect()
130             async def stop_after(client, delay):
131                 await asyncio.sleep(delay)
132                 await client.stop()
133             future = asyncio.ensure_future (stop_after (client, 5))
134             res = await client.loop()
135             thread.stop_thread()
136             return res
137
138         res = asyncio.run_coroutine_threadsafe(do_things(), loop).result()
139         thread.join()
140         print ("client thread joined")
141         self.assertEqual(res, 0)
142
143 if __name__ == '__main__':
144     unittest.main(verbosity=2)