gst-validate-launcher: let gdb handle SIGINT itself
[platform/upstream/gstreamer.git] / validate / launcher / baseclasses.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19
20 """ Class representing tests and test managers. """
21
22 import json
23 import os
24 import sys
25 import re
26 import copy
27 import shlex
28 import socketserver
29 import struct
30 import time
31 from . import utils
32 import signal
33 import urllib.parse
34 import subprocess
35 import threading
36 import queue
37 import configparser
38 import xml
39 import random
40 import uuid
41
42 from .utils import which
43 from . import reporters
44 from . import loggable
45 from .loggable import Loggable
46
47 try:
48     from lxml import etree as ET
49 except ImportError:
50     import xml.etree.cElementTree as ET
51
52 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
53     Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
54     check_bugs_resolution
55
56 # The factor by which we increase the hard timeout when running inside
57 # Valgrind
58 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
59 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
60 # The error reported by valgrind when detecting errors
61 VALGRIND_ERROR_CODE = 20
62
63 VALIDATE_OVERRIDE_EXTENSION = ".override"
64 COREDUMP_SIGNALS = [-getattr(signal, s) for s in [
65     'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
66     'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)] + [139]
67
68
69 class Test(Loggable):
70
71     """ A class representing a particular test. """
72
73     def __init__(self, application_name, classname, options,
74                  reporter, duration=0, timeout=DEFAULT_TIMEOUT,
75                  hard_timeout=None, extra_env_variables=None,
76                  expected_failures=None, is_parallel=True,
77                  workdir=None):
78         """
79         @timeout: The timeout during which the value return by get_current_value
80                   keeps being exactly equal
81         @hard_timeout: Max time the test can take in absolute
82         """
83         Loggable.__init__(self)
84         self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
85         if hard_timeout:
86             self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
87             self.hard_timeout *= options.timeout_factor
88         else:
89             self.hard_timeout = hard_timeout
90         self.classname = classname
91         self.options = options
92         self.application = application_name
93         self.command = []
94         self.server_command = None
95         self.reporter = reporter
96         self.process = None
97         self.proc_env = None
98         self.thread = None
99         self.queue = None
100         self.duration = duration
101         self.stack_trace = None
102         self._uuid = None
103         if expected_failures is None:
104             self.expected_failures = []
105         elif not isinstance(expected_failures, list):
106             self.expected_failures = [expected_failures]
107         else:
108             self.expected_failures = expected_failures
109
110         extra_env_variables = extra_env_variables or {}
111         self.extra_env_variables = extra_env_variables
112         self.optional = False
113         self.is_parallel = is_parallel
114         self.generator = None
115         # String representation of the test number in the testsuite
116         self.number = ""
117         self.workdir = workdir
118
119         self.clean()
120
121     def clean(self):
122         self.kill_subprocess()
123         self.message = ""
124         self.error_str = ""
125         self.time_taken = 0.0
126         self._starting_time = None
127         self.result = Result.NOT_RUN
128         self.logfile = None
129         self.out = None
130         self.extra_logfiles = []
131         self.__env_variable = []
132         self.kill_subprocess()
133
134     def __str__(self):
135         string = self.classname
136         if self.result != Result.NOT_RUN:
137             string += ": " + self.result
138             if self.result in [Result.FAILED, Result.TIMEOUT]:
139                 string += " '%s'\n" \
140                           "       You can reproduce with: %s\n" \
141                     % (self.message, self.get_command_repr())
142
143                 if not self.options.redirect_logs and \
144                         self.result == Result.PASSED or \
145                         not self.options.dump_on_failure:
146                     string += self.get_logfile_repr()
147
148         return string
149
150     def add_env_variable(self, variable, value=None):
151         """
152         Only usefull so that the gst-validate-launcher can print the exact
153         right command line to reproduce the tests
154         """
155         if value is None:
156             value = os.environ.get(variable, None)
157
158         if value is None:
159             return
160
161         self.__env_variable.append(variable)
162
163     @property
164     def _env_variable(self):
165         res = ""
166         for var in set(self.__env_variable):
167             if res:
168                 res += " "
169             value = self.proc_env.get(var, None)
170             if value is not None:
171                 res += "%s='%s'" % (var, value)
172
173         return res
174
175     def open_logfile(self):
176         if self.out:
177             return
178
179         path = os.path.join(self.options.logsdir,
180                             self.classname.replace(".", os.sep))
181         mkdir(os.path.dirname(path))
182         self.logfile = path
183
184         if self.options.redirect_logs == 'stdout':
185             self.out = sys.stdout
186         elif self.options.redirect_logs == 'stderr':
187             self.out = sys.stderr
188         else:
189             self.out = open(path, 'w+')
190
191     def close_logfile(self):
192         if not self.options.redirect_logs:
193             self.out.close()
194
195         self.out = None
196
197     def _get_file_content(self, file_name):
198         f = open(file_name, 'r+')
199         value = f.read()
200         f.close()
201
202         return value
203
204     def get_log_content(self):
205         return self._get_file_content(self.logfile)
206
207     def get_extra_log_content(self, extralog):
208         if extralog not in self.extra_logfiles:
209             return ""
210
211         return self._get_file_content(extralog)
212
213     def get_classname(self):
214         name = self.classname.split('.')[-1]
215         classname = self.classname.replace('.%s' % name, '')
216
217         return classname
218
219     def get_name(self):
220         return self.classname.split('.')[-1]
221
222     def get_uuid(self):
223         if self._uuid is None:
224             self._uuid = self.classname + str(uuid.uuid4())
225         return self._uuid
226
227     def add_arguments(self, *args):
228         self.command += args
229
230     def build_arguments(self):
231         self.add_env_variable("LD_PRELOAD")
232         self.add_env_variable("DISPLAY")
233
234     def add_stack_trace_to_logfile(self):
235         trace_gatherer = BackTraceGenerator.get_default()
236         stack_trace = trace_gatherer.get_trace(self)
237
238         if not stack_trace:
239             return
240
241         info = "\n\n== Stack trace: == \n%s" % stack_trace
242         if self.options.redirect_logs:
243             print(info)
244         elif self.options.xunit_file:
245             self.stack_trace = stack_trace
246         else:
247             with open(self.logfile, 'a') as f:
248                 f.write(info)
249
250     def set_result(self, result, message="", error=""):
251         self.debug("Setting result: %s (message: %s, error: %s)" % (result,
252                                                                     message, error))
253
254         if result is Result.TIMEOUT:
255             if self.options.debug is True:
256                 if self.options.gdb:
257                     printc("Timeout, you should process <ctrl>c to get into gdb",
258                            Colors.FAIL)
259                     # and wait here until gdb exits
260                     self.process.communicate()
261                 else:
262                     pname = self.command[0]
263                     input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
264                           "Press enter to continue" % (Colors.FAIL, pname, self.process.pid,
265                                                        Colors.ENDC))
266             else:
267                 self.add_stack_trace_to_logfile()
268
269         self.result = result
270         self.message = message
271         self.error_str = error
272
273     def check_results(self):
274         if self.result is Result.FAILED or self.result is Result.TIMEOUT:
275             return
276
277         self.debug("%s returncode: %s", self, self.process.returncode)
278         if self.process.returncode == 0:
279             self.set_result(Result.PASSED)
280         elif self.process.returncode in [-signal.SIGSEGV, -signal.SIGABRT, 139]:
281             self.add_stack_trace_to_logfile()
282             self.set_result(Result.FAILED,
283                             "Application segfaulted, returne code: %d" % (
284                                 self.process.returncode))
285         elif self.process.returncode == VALGRIND_ERROR_CODE:
286             self.set_result(Result.FAILED, "Valgrind reported errors")
287         else:
288             self.set_result(Result.FAILED,
289                             "Application returned %d" % (self.process.returncode))
290
291     def get_current_value(self):
292         """
293         Lets subclasses implement a nicer timeout measurement method
294         They should return some value with which we will compare
295         the previous and timeout if they are egual during self.timeout
296         seconds
297         """
298         return Result.NOT_RUN
299
300     def process_update(self):
301         """
302         Returns True when process has finished running or has timed out.
303         """
304
305         if self.process is None:
306             # Process has not started running yet
307             return False
308
309         self.process.poll()
310         if self.process.returncode is not None:
311             return True
312
313         val = self.get_current_value()
314
315         self.debug("Got value: %s" % val)
316         if val is Result.NOT_RUN:
317             # The get_current_value logic is not implemented... dumb
318             # timeout
319             if time.time() - self.last_change_ts > self.timeout:
320                 self.set_result(Result.TIMEOUT,
321                                 "Application timed out: %s secs" %
322                                 self.timeout,
323                                 "timeout")
324                 return True
325             return False
326         elif val is Result.FAILED:
327             return True
328         elif val is Result.KNOWN_ERROR:
329             return True
330
331         self.log("New val %s" % val)
332
333         if val == self.last_val:
334             delta = time.time() - self.last_change_ts
335             self.debug("%s: Same value for %d/%d seconds" %
336                        (self, delta, self.timeout))
337             if delta > self.timeout:
338                 self.set_result(Result.TIMEOUT,
339                                 "Application timed out: %s secs" %
340                                 self.timeout,
341                                 "timeout")
342                 return True
343         elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
344             self.set_result(
345                 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
346             return True
347         else:
348             self.last_change_ts = time.time()
349             self.last_val = val
350
351         return False
352
353     def get_subproc_env(self):
354         return os.environ.copy()
355
356     def kill_subprocess(self):
357         utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT)
358
359     def run_external_checks(self):
360         pass
361
362     def thread_wrapper(self):
363         def enable_sigint():
364             # Restore the SIGINT handler for the child process (gdb) to ensure
365             # it can handle it.
366             signal.signal(signal.SIGINT, signal.SIG_DFL)
367
368         if self.options.gdb and os.name != "nt":
369             preexec_fn = enable_sigint
370         else:
371             preexec_fn = None
372
373         self.process = subprocess.Popen(self.command,
374                                         stderr=self.out,
375                                         stdout=self.out,
376                                         env=self.proc_env,
377                                         cwd=self.workdir,
378                                         preexec_fn=preexec_fn)
379         self.process.wait()
380         if self.result is not Result.TIMEOUT:
381             if self.process.returncode == 0:
382                 self.run_external_checks()
383             self.queue.put(None)
384
385     def get_valgrind_suppression_file(self, subdir, name):
386         p = get_data_file(subdir, name)
387         if p:
388             return p
389
390         self.error("Could not find any %s file" % name)
391
392     def get_valgrind_suppressions(self):
393         return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
394
395     def use_gdb(self, command):
396         if self.hard_timeout is not None:
397             self.hard_timeout *= GDB_TIMEOUT_FACTOR
398         self.timeout *= GDB_TIMEOUT_FACTOR
399
400         if not self.options.gdb_non_stop:
401             self.timeout = sys.maxsize
402             self.hard_timeout = sys.maxsize
403
404         args = ["gdb"]
405         if self.options.gdb_non_stop:
406             args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
407         args += ["--args"] + command
408         return args
409
410     def use_valgrind(self, command, subenv):
411         vglogsfile = self.logfile + '.valgrind'
412         self.extra_logfiles.append(vglogsfile)
413
414         vg_args = []
415
416         for o, v in [('trace-children', 'yes'),
417                      ('tool', 'memcheck'),
418                      ('leak-check', 'full'),
419                      ('leak-resolution', 'high'),
420                      # TODO: errors-for-leak-kinds should be set to all instead of definite
421                      #       and all false positives should be added to suppression files.
422                      ('errors-for-leak-kinds', 'definite'),
423                      ('num-callers', '20'),
424                      ('error-exitcode', str(VALGRIND_ERROR_CODE)),
425                      ('gen-suppressions', 'all')]:
426             vg_args.append("--%s=%s" % (o, v))
427
428         if not self.options.redirect_logs:
429             vglogsfile = self.logfile + '.valgrind'
430             self.extra_logfiles.append(vglogsfile)
431             vg_args.append("--%s=%s" % ('log-file', vglogsfile))
432
433         for supp in self.get_valgrind_suppressions():
434             vg_args.append("--suppressions=%s" % supp)
435
436         command = ["valgrind"] + vg_args + command
437
438         # Tune GLib's memory allocator to be more valgrind friendly
439         subenv['G_DEBUG'] = 'gc-friendly'
440         subenv['G_SLICE'] = 'always-malloc'
441
442         if self.hard_timeout is not None:
443             self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
444         self.timeout *= VALGRIND_TIMEOUT_FACTOR
445
446         # Enable 'valgrind.config'
447         self.add_validate_config(get_data_file(
448             'data', 'valgrind.config'), subenv)
449         if subenv == self.proc_env:
450             self.add_env_variable('G_DEBUG', 'gc-friendly')
451             self.add_env_variable('G_SLICE', 'always-malloc')
452             self.add_env_variable('GST_VALIDATE_CONFIG',
453                                   self.proc_env['GST_VALIDATE_CONFIG'])
454
455         return command
456
457     def add_validate_config(self, config, subenv=None):
458         if not subenv:
459             subenv = self.extra_env_variables
460
461         if subenv.get('GST_VALIDATE_CONFIG'):
462             subenv['GST_VALIDATE_CONFIG'] = '%s%s%s' % (
463                 self.proc_env['GST_VALIDATE_CONFIG'], os.pathsep, config)
464         else:
465             subenv['GST_VALIDATE_CONFIG'] = config
466
467     def launch_server(self):
468         return None
469
470     def get_logfile_repr(self):
471         message = "    Logs:\n"
472         logfiles = self.extra_logfiles.copy()
473
474         if not self.options.redirect_logs:
475             logfiles.insert(0, self.logfile)
476
477         for log in logfiles:
478             message += "         - %s\n" % log
479
480         return message
481
482     def get_command_repr(self):
483         message = "%s %s" % (self._env_variable, ' '.join(self.command))
484         if self.server_command:
485             message = "%s & %s" % (self.server_command, message)
486
487         return "'%s'" % message
488
489     def test_start(self, queue):
490         self.open_logfile()
491
492         self.server_command = self.launch_server()
493         self.queue = queue
494         self.command = [self.application]
495         self._starting_time = time.time()
496         self.build_arguments()
497         self.proc_env = self.get_subproc_env()
498
499         for var, value in list(self.extra_env_variables.items()):
500             value = self.proc_env.get(var, '') + os.pathsep + value
501             self.proc_env[var] = value.strip(os.pathsep)
502             self.add_env_variable(var, self.proc_env[var])
503
504         if self.options.gdb:
505             self.command = self.use_gdb(self.command)
506
507             self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
508             # Make the gst-validate executable ignore SIGINT while gdb is running.
509             signal.signal(signal.SIGINT, signal.SIG_IGN)
510
511         if self.options.valgrind:
512             self.command = self.use_valgrind(self.command, self.proc_env)
513
514         if not self.options.redirect_logs:
515             self.out.write("=================\n"
516                            "Test name: %s\n"
517                            "Command: '%s'\n"
518                            "=================\n\n"
519                            % (self.classname, ' '.join(self.command)))
520             self.out.flush()
521         else:
522             message = "Launching: %s%s\n" \
523                 "    Command: %s\n" % (Colors.ENDC, self.classname,
524                                        self.get_command_repr())
525             printc(message, Colors.OKBLUE)
526
527         self.thread = threading.Thread(target=self.thread_wrapper)
528         self.thread.start()
529
530         self.last_val = 0
531         self.last_change_ts = time.time()
532         self.start_ts = time.time()
533
534     def _dump_log_file(self, logfile):
535         message = "Dumping contents of %s\n" % logfile
536         printc(message, Colors.FAIL)
537
538         with open(logfile, 'r') as fin:
539             print(fin.read())
540
541     def _dump_log_files(self):
542         printc("Dumping log files on failure\n", Colors.FAIL)
543         self._dump_log_file(self.logfile)
544         for logfile in self.extra_logfiles:
545             self._dump_log_file(logfile)
546
547     def test_end(self):
548         self.kill_subprocess()
549         self.thread.join()
550         self.time_taken = time.time() - self._starting_time
551
552         if self.options.gdb:
553             signal.signal(signal.SIGINT, self.previous_sigint_handler)
554
555         if self.result != Result.PASSED:
556             message = str(self)
557             end = "\n"
558         else:
559             message = "%s %s: %s%s" % (self.number, self.classname, self.result,
560                                        " (" + self.message + ")" if self.message else "")
561             end = "\r"
562
563         printc(message, color=utils.get_color_for_result(self.result), end=end)
564         self.close_logfile()
565
566         if self.options.dump_on_failure:
567             if self.result is not Result.PASSED:
568                 self._dump_log_files()
569
570         # Only keep around env variables we need later
571         clean_env = {}
572         for n in self.__env_variable:
573             clean_env[n] = self.proc_env.get(n, None)
574         self.proc_env = clean_env
575
576         # Don't keep around JSON report objects, they were processed
577         # in check_results already
578         self.reports = []
579
580         return self.result
581
582
583 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
584     pass
585
586
587 class GstValidateListener(socketserver.BaseRequestHandler):
588     def handle(self):
589         """Implements BaseRequestHandler handle method"""
590         test = None
591         while True:
592             raw_len = self.request.recv(4)
593             if raw_len == b'':
594                 return
595             msglen = struct.unpack('>I', raw_len)[0]
596             msg = self.request.recv(msglen).decode()
597             if msg == '':
598                 return
599
600             obj = json.loads(msg)
601
602             if test is None:
603                 # First message must contain the uuid
604                 uuid = obj.get("uuid", None)
605                 if uuid is None:
606                     return
607                 # Find test from launcher
608                 for t in self.server.launcher.tests:
609                     if uuid == t.get_uuid():
610                         test = t
611                         break
612                 if test is None:
613                     self.server.launcher.error(
614                         "Could not find test for UUID %s" % uuid)
615                     return
616
617             obj_type = obj.get("type", '')
618             if obj_type == 'position':
619                 test.set_position(obj['position'], obj['duration'],
620                                   obj['speed'])
621             elif obj_type == 'buffering':
622                 test.set_position(obj['position'], 100)
623             elif obj_type == 'action':
624                 test.add_action_execution(obj)
625                 # Make sure that action is taken into account when checking if process
626                 # is updating
627                 test.position += 1
628             elif obj_type == 'action-done':
629                 # Make sure that action end is taken into account when checking if process
630                 # is updating
631                 test.position += 1
632                 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
633             elif obj_type == 'report':
634                 test.add_report(obj)
635
636
637 class GstValidateTest(Test):
638
639     """ A class representing a particular test. """
640     findpos_regex = re.compile(
641         '.*position.*(\d+):(\d+):(\d+).(\d+).*duration.*(\d+):(\d+):(\d+).(\d+)')
642     findlastseek_regex = re.compile(
643         'seeking to.*(\d+):(\d+):(\d+).(\d+).*stop.*(\d+):(\d+):(\d+).(\d+).*rate.*(\d+)\.(\d+)')
644
645     HARD_TIMEOUT_FACTOR = 5
646
647     def __init__(self, application_name, classname,
648                  options, reporter, duration=0,
649                  timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
650                  media_descriptor=None, extra_env_variables=None,
651                  expected_failures=None, workdir=None):
652
653         extra_env_variables = extra_env_variables or {}
654
655         if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
656             if timeout:
657                 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
658             elif duration:
659                 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
660             else:
661                 hard_timeout = None
662
663         # If we are running from source, use the -debug version of the
664         # application which is using rpath instead of libtool's wrappers. It's
665         # slightly faster to start and will not confuse valgrind.
666         debug = '%s-debug' % application_name
667         p = look_for_file_in_source_dir('tools', debug)
668         if p:
669             application_name = p
670
671         self.reports = []
672         self.position = -1
673         self.media_duration = -1
674         self.speed = 1.0
675         self.actions_infos = []
676         self.media_descriptor = media_descriptor
677         self.server = None
678
679         override_path = self.get_override_file(media_descriptor)
680         if override_path:
681             if extra_env_variables:
682                 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
683                     extra_env_variables["GST_VALIDATE_OVERRIDE"] += os.path.pathsep
684
685             extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
686
687         super(GstValidateTest, self).__init__(application_name, classname,
688                                               options, reporter,
689                                               duration=duration,
690                                               timeout=timeout,
691                                               hard_timeout=hard_timeout,
692                                               extra_env_variables=extra_env_variables,
693                                               expected_failures=expected_failures,
694                                               workdir=workdir)
695
696         # defines how much the process can be outside of the configured
697         # segment / seek
698         self._sent_eos_time = None
699
700         if scenario is None or scenario.name.lower() == "none":
701             self.scenario = None
702         else:
703             self.scenario = scenario
704
705     def kill_subprocess(self):
706         Test.kill_subprocess(self)
707
708     def add_report(self, report):
709         self.reports.append(report)
710
711     def set_position(self, position, duration, speed=None):
712         self.position = position
713         self.media_duration = duration
714         if speed:
715             self.speed = speed
716
717     def add_action_execution(self, action_infos):
718         if action_infos['action-type'] == 'eos':
719             self._sent_eos_time = time.time()
720         self.actions_infos.append(action_infos)
721
722     def get_override_file(self, media_descriptor):
723         if media_descriptor:
724             if media_descriptor.get_path():
725                 override_path = os.path.splitext(media_descriptor.get_path())[
726                     0] + VALIDATE_OVERRIDE_EXTENSION
727                 if os.path.exists(override_path):
728                     return override_path
729
730         return None
731
732     def get_current_position(self):
733         return self.position
734
735     def get_current_value(self):
736         if self.scenario:
737             if self._sent_eos_time is not None:
738                 t = time.time()
739                 if ((t - self._sent_eos_time)) > 30:
740                     if self.media_descriptor.get_protocol() == Protocols.HLS:
741                         self.set_result(Result.PASSED,
742                                         """Got no EOS 30 seconds after sending EOS,
743                                         in HLS known and tolerated issue:
744                                         https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
745                         return Result.KNOWN_ERROR
746
747                     self.set_result(
748                         Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
749
750                     return Result.FAILED
751
752         return self.position
753
754     def get_subproc_env(self):
755         subproc_env = os.environ.copy()
756
757         subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
758
759         if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
760             gstlogsfile = self.logfile + '.gstdebug'
761             self.extra_logfiles.append(gstlogsfile)
762             subproc_env["GST_DEBUG_FILE"] = gstlogsfile
763
764         if self.options.no_color:
765             subproc_env["GST_DEBUG_NO_COLOR"] = '1'
766
767         # Ensure XInitThreads is called, see bgo#731525
768         subproc_env['GST_GL_XINITTHREADS'] = '1'
769         self.add_env_variable('GST_GL_XINITTHREADS', '1')
770
771         if self.scenario is not None:
772             scenario = self.scenario.get_execution_name()
773             subproc_env["GST_VALIDATE_SCENARIO"] = scenario
774             self.add_env_variable("GST_VALIDATE_SCENARIO",
775                                   subproc_env["GST_VALIDATE_SCENARIO"])
776         else:
777             try:
778                 del subproc_env["GST_VALIDATE_SCENARIO"]
779             except KeyError:
780                 pass
781
782         return subproc_env
783
784     def clean(self):
785         Test.clean(self)
786         self._sent_eos_time = None
787         self.reports = []
788         self.position = -1
789         self.media_duration = -1
790         self.speed = 1.0
791         self.actions_infos = []
792
793     def build_arguments(self):
794         super(GstValidateTest, self).build_arguments()
795         if "GST_VALIDATE" in os.environ:
796             self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
797
798         if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
799             self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
800                                   os.environ["GST_VALIDATE_SCENARIOS_PATH"])
801
802         self.add_env_variable("GST_VALIDATE_CONFIG")
803         self.add_env_variable("GST_VALIDATE_OVERRIDE")
804
805     def get_extra_log_content(self, extralog):
806         value = Test.get_extra_log_content(self, extralog)
807
808         return value
809
810     def report_matches_expected_failure(self, report, expected_failure):
811         for key in ['bug', 'bugs', 'sometimes']:
812             if key in expected_failure:
813                 del expected_failure[key]
814         for key, value in list(report.items()):
815             if key in expected_failure:
816                 if not re.findall(expected_failure[key], str(value)):
817                     return False
818                 expected_failure.pop(key)
819
820         return not bool(expected_failure)
821
822     def check_reported_issues(self):
823         ret = []
824         expected_failures = copy.deepcopy(self.expected_failures)
825         expected_retcode = [0]
826         for report in self.reports:
827             found = None
828             for expected_failure in expected_failures:
829                 if self.report_matches_expected_failure(report,
830                                                         expected_failure.copy()):
831                     found = expected_failure
832                     break
833
834             if found is not None:
835                 expected_failures.remove(found)
836                 if report['level'] == 'critical':
837                     if found.get('sometimes') and isinstance(expected_retcode, list):
838                         expected_retcode.append(18)
839                     else:
840                         expected_retcode = [18]
841             elif report['level'] == 'critical':
842                 ret.append(report['summary'])
843
844         if not ret:
845             return None, expected_failures, expected_retcode
846
847         return ret, expected_failures, expected_retcode
848
849     def check_expected_timeout(self, expected_timeout):
850         msg = "Expected timeout happened. "
851         result = Result.PASSED
852         message = expected_timeout.get('message')
853         if message:
854             if not re.findall(message, self.message):
855                 result = Result.FAILED
856                 msg = "Expected timeout message: %s got %s " % (
857                     message, self.message)
858
859         expected_symbols = expected_timeout.get('stacktrace_symbols')
860         if expected_symbols:
861             trace_gatherer = BackTraceGenerator.get_default()
862             stack_trace = trace_gatherer.get_trace(self)
863
864             if stack_trace:
865                 if not isinstance(expected_symbols, list):
866                     expected_symbols = [expected_symbols]
867
868                 not_found_symbols = [s for s in expected_symbols
869                                      if s not in stack_trace]
870                 if not_found_symbols:
871                     result = Result.TIMEOUT
872                     msg = "Expected symbols '%s' not found in stack trace " % (
873                         not_found_symbols)
874             else:
875                 msg += "No stack trace available, could not verify symbols "
876
877         return result, msg
878
879     def check_results(self):
880         if self.result in [Result.FAILED, self.result is Result.PASSED]:
881             return
882
883         for report in self.reports:
884             if report.get('issue-id') == 'runtime::missing-plugin':
885                 self.set_result(Result.SKIPPED, "%s\n%s" % (report['summary'],
886                                                             report['details']))
887                 return
888
889         self.debug("%s returncode: %s", self, self.process.returncode)
890
891         criticals, not_found_expected_failures, expected_returncode = self.check_reported_issues()
892
893         expected_timeout = None
894         for i, f in enumerate(not_found_expected_failures):
895             if len(f) == 1 and f.get("returncode"):
896                 returncode = f['returncode']
897                 if not isinstance(expected_returncode, list):
898                     returncode = [expected_returncode]
899                 if 'sometimes' in f:
900                     returncode.append(0)
901             elif f.get("timeout"):
902                 expected_timeout = f
903
904         not_found_expected_failures = [f for f in not_found_expected_failures
905                                        if not f.get('returncode')]
906
907         msg = ""
908         result = Result.PASSED
909         if self.result == Result.TIMEOUT:
910             if expected_timeout:
911                 not_found_expected_failures.remove(expected_timeout)
912                 result, msg = self.check_expected_timeout(expected_timeout)
913             else:
914                 return
915         elif self.process.returncode in COREDUMP_SIGNALS:
916             result = Result.FAILED
917             msg = "Application segfaulted "
918             self.add_stack_trace_to_logfile()
919         elif self.process.returncode == VALGRIND_ERROR_CODE:
920             msg = "Valgrind reported errors "
921             result = Result.FAILED
922         elif self.process.returncode not in expected_returncode:
923             msg = "Application returned %s " % self.process.returncode
924             if expected_returncode != [0]:
925                 msg += "(expected %s) " % expected_returncode
926             result = Result.FAILED
927
928         if criticals:
929             msg += "(critical errors: [%s]) " % ', '.join(criticals)
930             result = Result.FAILED
931
932         if not_found_expected_failures:
933             mandatory_failures = [f for f in not_found_expected_failures
934                                   if not f.get('sometimes')]
935
936             if mandatory_failures:
937                 msg += "(Expected errors not found: %s) " % mandatory_failures
938                 result = Result.FAILED
939         elif self.expected_failures:
940             msg += '%s(Expected errors occured: %s)%s' % (Colors.OKBLUE,
941                                                           self.expected_failures,
942                                                           Colors.ENDC)
943
944         self.set_result(result, msg.strip())
945
946     def get_valgrind_suppressions(self):
947         result = super(GstValidateTest, self).get_valgrind_suppressions()
948         gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
949         if gst_sup:
950             result.append(gst_sup)
951         return result
952
953
954 class GstValidateEncodingTestInterface(object):
955     DURATION_TOLERANCE = GST_SECOND / 4
956
957     def __init__(self, combination, media_descriptor, duration_tolerance=None):
958         super(GstValidateEncodingTestInterface, self).__init__()
959
960         self.media_descriptor = media_descriptor
961         self.combination = combination
962         self.dest_file = ""
963
964         self._duration_tolerance = duration_tolerance
965         if duration_tolerance is None:
966             self._duration_tolerance = self.DURATION_TOLERANCE
967
968     def get_current_size(self):
969         try:
970             size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
971         except OSError:
972             return None
973
974         self.debug("Size: %s" % size)
975         return size
976
977     def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
978                           audio_restriction=None, audio_presence=0,
979                           video_presence=0):
980         ret = ""
981         if muxer:
982             ret += muxer
983         ret += ":"
984         if venc:
985             if video_restriction is not None:
986                 ret = ret + video_restriction + '->'
987             ret += venc
988             if video_presence:
989                 ret = ret + '|' + str(video_presence)
990         if aenc:
991             ret += ":"
992             if audio_restriction is not None:
993                 ret = ret + audio_restriction + '->'
994             ret += aenc
995             if audio_presence:
996                 ret = ret + '|' + str(audio_presence)
997
998         return ret.replace("::", ":")
999
1000     def get_profile(self, video_restriction=None, audio_restriction=None):
1001         vcaps = self.combination.get_video_caps()
1002         acaps = self.combination.get_audio_caps()
1003         if self.media_descriptor is not None:
1004             if self.media_descriptor.get_num_tracks("video") == 0:
1005                 vcaps = None
1006
1007             if self.media_descriptor.get_num_tracks("audio") == 0:
1008                 acaps = None
1009
1010         return self._get_profile_full(self.combination.get_muxer_caps(),
1011                                       vcaps, acaps,
1012                                       video_restriction=video_restriction,
1013                                       audio_restriction=audio_restriction)
1014
1015     def _clean_caps(self, caps):
1016         """
1017         Returns a list of key=value or structure name, without "(types)" or ";" or ","
1018         """
1019         return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1020
1021     # pylint: disable=E1101
1022     def _has_caps_type_variant(self, c, ccaps):
1023         """
1024         Handle situations where we can have application/ogg or video/ogg or
1025         audio/ogg
1026         """
1027         has_variant = False
1028         media_type = re.findall("application/|video/|audio/", c)
1029         if media_type:
1030             media_type = media_type[0].replace('/', '')
1031             possible_mtypes = ["application", "video", "audio"]
1032             possible_mtypes.remove(media_type)
1033             for tmptype in possible_mtypes:
1034                 possible_c_variant = c.replace(media_type, tmptype)
1035                 if possible_c_variant in ccaps:
1036                     self.info(
1037                         "Found %s in %s, good enough!", possible_c_variant, ccaps)
1038                     has_variant = True
1039
1040         return has_variant
1041
1042     # pylint: disable=E1101
1043     def run_iqa_test(self, reference_file_uri):
1044         """
1045         Runs IQA test if @reference_file_path exists
1046         @test: The test to run tests on
1047         """
1048         if not GstValidateBaseTestManager.has_feature('iqa'):
1049             self.debug('Iqa element not present, not running extra test.')
1050             return
1051
1052         pipeline_desc = """
1053             uridecodebin uri=%s !
1054                 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1055             uridecodebin uri=%s ! iqa.
1056         """ % (reference_file_uri, self.dest_file)
1057         pipeline_desc = pipeline_desc.replace("\n", "")
1058
1059         command = [GstValidateBaseTestManager.COMMAND] + \
1060             shlex.split(pipeline_desc)
1061         if not self.options.redirect_logs:
1062             self.out.write(
1063                 "=================\n"
1064                 "Running IQA tests on results of: %s\n"
1065                 "Command: '%s'\n"
1066                 "=================\n\n" % (
1067                     self.classname, ' '.join(command)))
1068             self.out.flush()
1069         else:
1070             message = "Running IQA tests on results of:%s %s\n" \
1071                 "    Command: %s\n" % (
1072                     Colors.ENDC, self.classname, ' '.join(command))
1073             printc(message, Colors.OKBLUE)
1074
1075         self.process = subprocess.Popen(command,
1076                                         stderr=self.out,
1077                                         stdout=self.out,
1078                                         env=self.proc_env,
1079                                         cwd=self.workdir)
1080         self.process.wait()
1081
1082     def check_encoded_file(self):
1083         result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1084             self.dest_file)
1085         if result_descriptor is None:
1086             return (Result.FAILED, "Could not discover encoded file %s"
1087                     % self.dest_file)
1088
1089         duration = result_descriptor.get_duration()
1090         orig_duration = self.media_descriptor.get_duration()
1091         tolerance = self._duration_tolerance
1092
1093         if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1094             os.remove(result_descriptor.get_path())
1095             return (Result.FAILED, "Duration of encoded file is "
1096                     " wrong (%s instead of %s)" %
1097                     (utils.TIME_ARGS(duration),
1098                      utils.TIME_ARGS(orig_duration)))
1099         else:
1100             all_tracks_caps = result_descriptor.get_tracks_caps()
1101             container_caps = result_descriptor.get_caps()
1102             if container_caps:
1103                 all_tracks_caps.insert(0, ("container", container_caps))
1104
1105             for track_type, caps in all_tracks_caps:
1106                 ccaps = self._clean_caps(caps)
1107                 wanted_caps = self.combination.get_caps(track_type)
1108                 cwanted_caps = self._clean_caps(wanted_caps)
1109
1110                 if wanted_caps is None:
1111                     os.remove(result_descriptor.get_path())
1112                     return (Result.FAILED,
1113                             "Found a track of type %s in the encoded files"
1114                             " but none where wanted in the encoded profile: %s"
1115                             % (track_type, self.combination))
1116
1117                 for c in cwanted_caps:
1118                     if c not in ccaps:
1119                         if not self._has_caps_type_variant(c, ccaps):
1120                             os.remove(result_descriptor.get_path())
1121                             return (Result.FAILED,
1122                                     "Field: %s  (from %s) not in caps of the outputed file %s"
1123                                     % (wanted_caps, c, ccaps))
1124
1125             os.remove(result_descriptor.get_path())
1126             return (Result.PASSED, "")
1127
1128
1129 class TestsManager(Loggable):
1130
1131     """ A class responsible for managing tests. """
1132
1133     name = "base"
1134     loading_testsuite = None
1135
1136     def __init__(self):
1137
1138         Loggable.__init__(self)
1139
1140         self.tests = []
1141         self.unwanted_tests = []
1142         self.options = None
1143         self.args = None
1144         self.reporter = None
1145         self.wanted_tests_patterns = []
1146         self.blacklisted_tests_patterns = []
1147         self._generators = []
1148         self.check_testslist = True
1149         self.all_tests = None
1150         self.expected_failures = {}
1151         self.blacklisted_tests = []
1152
1153     def init(self):
1154         return True
1155
1156     def list_tests(self):
1157         return sorted(list(self.tests), key=lambda x: x.classname)
1158
1159     def find_tests(self, classname):
1160         regex = re.compile(classname)
1161         return [test for test in self.list_tests() if regex.findall(test.classname)]
1162
1163     def add_expected_issues(self, expected_failures):
1164         expected_failures_re = {}
1165         for test_name_regex, failures in list(expected_failures.items()):
1166             regex = re.compile(test_name_regex)
1167             expected_failures_re[regex] = failures
1168             for test in self.tests:
1169                 if regex.findall(test.classname):
1170                     test.expected_failures.extend(failures)
1171
1172         self.expected_failures.update(expected_failures_re)
1173
1174     def add_test(self, test):
1175         if test.generator is None:
1176             test.classname = self.loading_testsuite + '.' + test.classname
1177         for regex, failures in list(self.expected_failures.items()):
1178             if regex.findall(test.classname):
1179                 test.expected_failures.extend(failures)
1180
1181         if self._is_test_wanted(test):
1182             if test not in self.tests:
1183                 self.tests.append(test)
1184                 self.tests.sort(key=lambda test: test.classname)
1185         else:
1186             if test not in self.tests:
1187                 self.unwanted_tests.append(test)
1188                 self.unwanted_tests.sort(key=lambda test: test.classname)
1189
1190     def get_tests(self):
1191         return self.tests
1192
1193     def populate_testsuite(self):
1194         pass
1195
1196     def add_generators(self, generators):
1197         """
1198         @generators: A list of, or one single #TestsGenerator to be used to generate tests
1199         """
1200         if not isinstance(generators, list):
1201             generators = [generators]
1202         self._generators.extend(generators)
1203         for generator in generators:
1204             generator.testsuite = self.loading_testsuite
1205
1206         self._generators = list(set(self._generators))
1207
1208     def get_generators(self):
1209         return self._generators
1210
1211     def _add_blacklist(self, blacklisted_tests):
1212         if not isinstance(blacklisted_tests, list):
1213             blacklisted_tests = [blacklisted_tests]
1214
1215         for patterns in blacklisted_tests:
1216             for pattern in patterns.split(","):
1217                 self.blacklisted_tests_patterns.append(re.compile(pattern))
1218
1219     def set_default_blacklist(self, default_blacklist):
1220         for test_regex, reason in default_blacklist:
1221             if not test_regex.startswith(self.loading_testsuite + '.'):
1222                 test_regex = self.loading_testsuite + '.' + test_regex
1223             self.blacklisted_tests.append((test_regex, reason))
1224
1225     def add_options(self, parser):
1226         """ Add more arguments. """
1227         pass
1228
1229     def set_settings(self, options, args, reporter):
1230         """ Set properties after options parsing. """
1231         self.options = options
1232         self.args = args
1233         self.reporter = reporter
1234
1235         self.populate_testsuite()
1236
1237         if self.options.valgrind:
1238             self.print_valgrind_bugs()
1239
1240         if options.wanted_tests:
1241             for patterns in options.wanted_tests:
1242                 for pattern in patterns.split(","):
1243                     self.wanted_tests_patterns.append(re.compile(pattern))
1244
1245         if options.blacklisted_tests:
1246             for patterns in options.blacklisted_tests:
1247                 self._add_blacklist(patterns)
1248
1249     def set_blacklists(self):
1250         if self.blacklisted_tests:
1251             printc("\nCurrently 'hardcoded' %s blacklisted tests:" %
1252                    self.name, Colors.WARNING, title_char='-')
1253
1254         if self.options.check_bugs_status:
1255             if not check_bugs_resolution(self.blacklisted_tests):
1256                 return False
1257
1258         for name, bug in self.blacklisted_tests:
1259             self._add_blacklist(name)
1260             if not self.options.check_bugs_status:
1261                 print("  + %s \n   --> bug: %s\n" % (name, bug))
1262
1263         return True
1264
1265     def check_expected_failures(self):
1266         if not self.expected_failures or not self.options.check_bugs_status:
1267             return True
1268
1269         if self.expected_failures:
1270             printc("\nCurrently known failures in the %s testsuite:"
1271                    % self.name, Colors.WARNING, title_char='-')
1272
1273         bugs_definitions = {}
1274         for regex, failures in list(self.expected_failures.items()):
1275             for failure in failures:
1276                 bugs = failure.get('bug')
1277                 if not bugs:
1278                     bugs = failure.get('bugs')
1279                 if not bugs:
1280                     printc('+ %s:\n  --> no bug reported associated with %s\n' % (
1281                         regex.pattern, failure), Colors.WARNING)
1282                     continue
1283
1284                 if not isinstance(bugs, list):
1285                     bugs = [bugs]
1286                 cbugs = bugs_definitions.get(regex.pattern, [])
1287                 bugs.extend([b for b in bugs if b not in cbugs])
1288                 bugs_definitions[regex.pattern] = bugs
1289
1290         return check_bugs_resolution(bugs_definitions.items())
1291
1292     def _check_blacklisted(self, test):
1293         for pattern in self.blacklisted_tests_patterns:
1294             if pattern.findall(test.classname):
1295                 self.info("%s is blacklisted by %s", test.classname, pattern)
1296                 return True
1297
1298         return False
1299
1300     def _check_whitelisted(self, test):
1301         for pattern in self.wanted_tests_patterns:
1302             if pattern.findall(test.classname):
1303                 if self._check_blacklisted(test):
1304                     # If explicitly white listed that specific test
1305                     # bypass the blacklisting
1306                     if pattern.pattern != test.classname:
1307                         return False
1308                 return True
1309         return False
1310
1311     def _check_duration(self, test):
1312         if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1313             self.info("Not activating %s as its duration (%d) is superior"
1314                       " than the long limit (%d)" % (test, test.duration,
1315                                                      int(self.options.long_limit)))
1316             return False
1317
1318         return True
1319
1320     def _is_test_wanted(self, test):
1321         if self._check_whitelisted(test):
1322             if not self._check_duration(test):
1323                 return False
1324             return True
1325
1326         if self._check_blacklisted(test):
1327             return False
1328
1329         if not self._check_duration(test):
1330             return False
1331
1332         if not self.wanted_tests_patterns:
1333             return True
1334
1335         return False
1336
1337     def needs_http_server(self):
1338         return False
1339
1340     def print_valgrind_bugs(self):
1341         pass
1342
1343
1344 class TestsGenerator(Loggable):
1345
1346     def __init__(self, name, test_manager, tests=[]):
1347         Loggable.__init__(self)
1348         self.name = name
1349         self.test_manager = test_manager
1350         self.testsuite = None
1351         self._tests = {}
1352         for test in tests:
1353             self._tests[test.classname] = test
1354
1355     def generate_tests(self, *kwargs):
1356         """
1357         Method that generates tests
1358         """
1359         return list(self._tests.values())
1360
1361     def add_test(self, test):
1362         test.generator = self
1363         test.classname = self.testsuite + '.' + test.classname
1364         self._tests[test.classname] = test
1365
1366
1367 class GstValidateTestsGenerator(TestsGenerator):
1368
1369     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1370         pass
1371
1372     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1373         self.populate_tests(uri_minfo_special_scenarios, scenarios)
1374         return super(GstValidateTestsGenerator, self).generate_tests()
1375
1376
1377 class _TestsLauncher(Loggable):
1378
1379     def __init__(self, libsdir):
1380
1381         Loggable.__init__(self)
1382
1383         self.libsdir = libsdir
1384         self.options = None
1385         self.testers = []
1386         self.tests = []
1387         self.reporter = None
1388         self._list_testers()
1389         self.all_tests = None
1390         self.wanted_tests_patterns = []
1391
1392         self.queue = queue.Queue()
1393         self.jobs = []
1394         self.total_num_tests = 0
1395         self.server = None
1396
1397     def _list_app_dirs(self):
1398         app_dirs = []
1399         app_dirs.append(os.path.join(self.libsdir, "apps"))
1400         env_dirs = os.environ.get("GST_VALIDATE_APPS_DIR")
1401         if env_dirs is not None:
1402             for dir_ in env_dirs.split(":"):
1403                 app_dirs.append(dir_)
1404                 sys.path.append(dir_)
1405
1406         return app_dirs
1407
1408     def _exec_app(self, app_dir, env):
1409         try:
1410             files = os.listdir(app_dir)
1411         except OSError as e:
1412             self.debug("Could not list %s: %s" % (app_dir, e))
1413             files = []
1414         for f in files:
1415             if f.endswith(".py"):
1416                 exec(compile(open(os.path.join(app_dir, f)).read(),
1417                              os.path.join(app_dir, f), 'exec'), env)
1418
1419     def _exec_apps(self, env):
1420         app_dirs = self._list_app_dirs()
1421         for app_dir in app_dirs:
1422             self._exec_app(app_dir, env)
1423
1424     def _list_testers(self):
1425         env = globals().copy()
1426         self._exec_apps(env)
1427
1428         testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1429         for tester in testers:
1430             if tester.init() is True:
1431                 self.testers.append(tester)
1432             else:
1433                 self.warning("Can not init tester: %s -- PATH is %s"
1434                              % (tester.name, os.environ["PATH"]))
1435
1436     def add_options(self, parser):
1437         for tester in self.testers:
1438             tester.add_options(parser)
1439
1440     def _load_testsuite(self, testsuites):
1441         exceptions = []
1442         for testsuite in testsuites:
1443             try:
1444                 sys.path.insert(0, os.path.dirname(testsuite))
1445                 return (__import__(os.path.basename(testsuite).replace(".py", "")), None)
1446             except Exception as e:
1447                 exceptions.append("Could not load %s: %s" % (testsuite, e))
1448                 continue
1449             finally:
1450                 sys.path.remove(os.path.dirname(testsuite))
1451
1452         return (None, exceptions)
1453
1454     def _load_testsuites(self):
1455         testsuites = set()
1456         for testsuite in self.options.testsuites:
1457             if os.path.exists(testsuite):
1458                 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1459                 loaded_module = self._load_testsuite([testsuite])
1460             else:
1461                 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1462                                              for d in self.options.testsuites_dirs]
1463                 loaded_module = self._load_testsuite(possible_testsuites_paths)
1464
1465             module = loaded_module[0]
1466             if not loaded_module[0]:
1467                 if "." in testsuite:
1468                     self.options.testsuites.append(testsuite.split('.')[0])
1469                     self.info("%s looks like a test name, trying that" %
1470                               testsuite)
1471                     self.options.wanted_tests.append(testsuite)
1472                 else:
1473                     printc("Could not load testsuite: %s, reasons: %s" % (
1474                         testsuite, loaded_module[1]), Colors.FAIL)
1475                 continue
1476
1477             testsuites.add(module)
1478             if not hasattr(module, "TEST_MANAGER"):
1479                 module.TEST_MANAGER = [tester.name for tester in self.testers]
1480             elif not isinstance(module.TEST_MANAGER, list):
1481                 module.TEST_MANAGER = [module.TEST_MANAGER]
1482
1483         self.options.testsuites = list(testsuites)
1484
1485     def _setup_testsuites(self):
1486         for testsuite in self.options.testsuites:
1487             loaded = False
1488             wanted_test_manager = None
1489             if hasattr(testsuite, "TEST_MANAGER"):
1490                 wanted_test_manager = testsuite.TEST_MANAGER
1491                 if not isinstance(wanted_test_manager, list):
1492                     wanted_test_manager = [wanted_test_manager]
1493
1494             for tester in self.testers:
1495                 if wanted_test_manager is not None and \
1496                         tester.name not in wanted_test_manager:
1497                     continue
1498
1499                 if self.options.user_paths:
1500                     TestsManager.loading_testsuite = tester.name
1501                     tester.register_defaults()
1502                     loaded = True
1503                 else:
1504                     TestsManager.loading_testsuite = testsuite.__name__
1505                     if testsuite.setup_tests(tester, self.options):
1506                         loaded = True
1507                     TestsManager.loading_testsuite = None
1508
1509             if not loaded:
1510                 printc("Could not load testsuite: %s"
1511                        " maybe because of missing TestManager"
1512                        % (testsuite), Colors.FAIL)
1513                 return False
1514
1515     def _load_config(self, options):
1516         printc("Loading config files is DEPRECATED"
1517                " you should use the new testsuite format now",)
1518
1519         for tester in self.testers:
1520             tester.options = options
1521             globals()[tester.name] = tester
1522         globals()["options"] = options
1523         c__file__ = __file__
1524         globals()["__file__"] = self.options.config
1525         exec(compile(open(self.options.config).read(),
1526                      self.options.config, 'exec'), globals())
1527         globals()["__file__"] = c__file__
1528
1529     def set_settings(self, options, args):
1530         if options.xunit_file:
1531             self.reporter = reporters.XunitReporter(options)
1532         else:
1533             self.reporter = reporters.Reporter(options)
1534
1535         self.options = options
1536         wanted_testers = None
1537         for tester in self.testers:
1538             if tester.name in args:
1539                 wanted_testers = tester.name
1540
1541         if wanted_testers:
1542             testers = self.testers
1543             self.testers = []
1544             for tester in testers:
1545                 if tester.name in args:
1546                     self.testers.append(tester)
1547                     args.remove(tester.name)
1548
1549         if options.config:
1550             self._load_config(options)
1551
1552         self._load_testsuites()
1553         if not self.options.testsuites:
1554             printc("Not testsuite loaded!", Colors.FAIL)
1555             return False
1556
1557         for tester in self.testers:
1558             tester.set_settings(options, args, self.reporter)
1559
1560         if not options.config and options.testsuites:
1561             if self._setup_testsuites() is False:
1562                 return False
1563
1564         for tester in self.testers:
1565             if not tester.set_blacklists():
1566                 return False
1567
1568             if not tester.check_expected_failures():
1569                 return False
1570
1571         return True
1572
1573     def _check_tester_has_other_testsuite(self, testsuite, tester):
1574         if tester.name != testsuite.TEST_MANAGER[0]:
1575             return True
1576
1577         for t in self.options.testsuites:
1578             if t != testsuite:
1579                 for other_testmanager in t.TEST_MANAGER:
1580                     if other_testmanager == tester.name:
1581                         return True
1582
1583         return False
1584
1585     def _check_defined_tests(self, tester, tests):
1586         if self.options.blacklisted_tests or self.options.wanted_tests:
1587             return
1588
1589         tests_names = [test.classname for test in tests]
1590         testlist_changed = False
1591         for testsuite in self.options.testsuites:
1592             if not self._check_tester_has_other_testsuite(testsuite, tester) \
1593                     and tester.check_testslist:
1594                 try:
1595                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1596                                          'r+')
1597
1598                     know_tests = testlist_file.read().split("\n")
1599                     testlist_file.close()
1600
1601                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1602                                          'w')
1603                 except IOError:
1604                     continue
1605
1606                 optional_out = []
1607                 for test in know_tests:
1608                     if test and test.strip('~') not in tests_names:
1609                         if not test.startswith('~'):
1610                             testlist_changed = True
1611                             printc("Test %s Not in testsuite %s anymore"
1612                                    % (test, testsuite.__file__), Colors.FAIL)
1613                         else:
1614                             optional_out.append((test, None))
1615
1616                 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1617                                      key=lambda x: x[0].strip('~'))
1618
1619                 for tname, test in tests_names:
1620                     if test and test.optional:
1621                         tname = '~' + tname
1622                     testlist_file.write("%s\n" % (tname))
1623                     if tname and tname not in know_tests:
1624                         printc("Test %s is NEW in testsuite %s"
1625                                % (tname, testsuite.__file__),
1626                                Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1627                         testlist_changed = True
1628
1629                 testlist_file.close()
1630                 break
1631
1632         return testlist_changed
1633
1634     def list_tests(self):
1635         for tester in self.testers:
1636             if not self._tester_needed(tester):
1637                 continue
1638
1639             tests = tester.list_tests()
1640             if self._check_defined_tests(tester, tests) and \
1641                     self.options.fail_on_testlist_change:
1642                 raise RuntimeError("Unexpected new test in testsuite.")
1643
1644             self.tests.extend(tests)
1645         return sorted(list(self.tests), key=lambda t: t.classname)
1646
1647     def _tester_needed(self, tester):
1648         for testsuite in self.options.testsuites:
1649             if tester.name in testsuite.TEST_MANAGER:
1650                 return True
1651         return False
1652
1653     def server_wrapper(self, ready):
1654         self.server = GstValidateTCPServer(
1655             ('localhost', 0), GstValidateListener)
1656         self.server.socket.settimeout(None)
1657         self.server.launcher = self
1658         self.serverport = self.server.socket.getsockname()[1]
1659         self.info("%s server port: %s" % (self, self.serverport))
1660         ready.set()
1661
1662         self.server.serve_forever(poll_interval=0.05)
1663
1664     def _start_server(self):
1665         self.info("Starting TCP Server")
1666         ready = threading.Event()
1667         self.server_thread = threading.Thread(target=self.server_wrapper,
1668                                               kwargs={'ready': ready})
1669         self.server_thread.start()
1670         ready.wait()
1671         os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
1672
1673     def _stop_server(self):
1674         if self.server:
1675             self.server.shutdown()
1676             self.server_thread.join()
1677             self.server.server_close()
1678             self.server = None
1679
1680     def test_wait(self):
1681         while True:
1682             # Check process every second for timeout
1683             try:
1684                 self.queue.get(timeout=1)
1685             except queue.Empty:
1686                 pass
1687
1688             for test in self.jobs:
1689                 if test.process_update():
1690                     self.jobs.remove(test)
1691                     return test
1692
1693     def tests_wait(self):
1694         try:
1695             test = self.test_wait()
1696             test.check_results()
1697         except KeyboardInterrupt:
1698             for test in self.jobs:
1699                 test.kill_subprocess()
1700             raise
1701
1702         return test
1703
1704     def start_new_job(self, tests_left):
1705         try:
1706             test = tests_left.pop(0)
1707         except IndexError:
1708             return False
1709
1710         test.test_start(self.queue)
1711
1712         self.jobs.append(test)
1713
1714         return True
1715
1716     def _run_tests(self):
1717         cur_test_num = 0
1718
1719         if not self.all_tests:
1720             all_tests = self.list_tests()
1721             self.all_tests = all_tests
1722         self.total_num_tests = len(self.all_tests)
1723
1724         self.reporter.init_timer()
1725         alone_tests = []
1726         tests = []
1727         for test in self.tests:
1728             if test.is_parallel:
1729                 tests.append(test)
1730             else:
1731                 alone_tests.append(test)
1732
1733         max_num_jobs = min(self.options.num_jobs, len(tests))
1734         jobs_running = 0
1735
1736         # if order of test execution doesn't matter, shuffle
1737         # the order to optimize cpu usage
1738         if self.options.shuffle:
1739             random.shuffle(tests)
1740             random.shuffle(alone_tests)
1741
1742         current_test_num = 1
1743         for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
1744             tests_left = list(tests)
1745             for i in range(num_jobs):
1746                 if not self.start_new_job(tests_left):
1747                     break
1748                 jobs_running += 1
1749
1750             while jobs_running != 0:
1751                 test = self.tests_wait()
1752                 jobs_running -= 1
1753                 test.number = "[%d / %d] " % (current_test_num,
1754                                               self.total_num_tests)
1755                 current_test_num += 1
1756                 res = test.test_end()
1757                 self.reporter.after_test(test)
1758                 if res != Result.PASSED and (self.options.forever or
1759                                              self.options.fatal_error):
1760                     return test.result
1761                 if self.start_new_job(tests_left):
1762                     jobs_running += 1
1763
1764         return True
1765
1766     def clean_tests(self):
1767         for test in self.tests:
1768             test.clean()
1769         self._stop_server()
1770
1771     def run_tests(self):
1772         self._start_server()
1773         if self.options.forever:
1774             r = 1
1775             while True:
1776                 printc("Running iteration %d" % r, title=True)
1777
1778                 if not self._run_tests():
1779                     break
1780                 r += 1
1781                 self.clean_tests()
1782
1783             return False
1784         elif self.options.n_runs:
1785             res = True
1786             for r in range(self.options.n_runs):
1787                 t = "Running iteration %d" % r
1788                 print("%s\n%s\n%s\n" % ("=" * len(t), t, "=" * len(t)))
1789                 if not self._run_tests():
1790                     res = False
1791                 self.clean_tests()
1792
1793             return res
1794         else:
1795             return self._run_tests()
1796
1797     def final_report(self):
1798         return self.reporter.final_report()
1799
1800     def needs_http_server(self):
1801         for tester in self.testers:
1802             if tester.needs_http_server():
1803                 return True
1804
1805
1806 class NamedDic(object):
1807
1808     def __init__(self, props):
1809         if props:
1810             for name, value in props.items():
1811                 setattr(self, name, value)
1812
1813
1814 class Scenario(object):
1815
1816     def __init__(self, name, props, path=None):
1817         self.name = name
1818         self.path = path
1819
1820         for prop, value in props:
1821             setattr(self, prop.replace("-", "_"), value)
1822
1823     def get_execution_name(self):
1824         if self.path is not None:
1825             return self.path
1826         else:
1827             return self.name
1828
1829     def seeks(self):
1830         if hasattr(self, "seek"):
1831             return bool(self.seek)
1832
1833         return False
1834
1835     def needs_clock_sync(self):
1836         if hasattr(self, "need_clock_sync"):
1837             return bool(self.need_clock_sync)
1838
1839         return False
1840
1841     def needs_live_content(self):
1842         # Scenarios that can only be used on live content
1843         if hasattr(self, "live_content_required"):
1844             return bool(self.live_content_required)
1845         return False
1846
1847     def compatible_with_live_content(self):
1848         # if a live content is required it's implicitely compatible with
1849         # live content
1850         if self.needs_live_content():
1851             return True
1852         if hasattr(self, "live_content_compatible"):
1853             return bool(self.live_content_compatible)
1854         return False
1855
1856     def get_min_media_duration(self):
1857         if hasattr(self, "min_media_duration"):
1858             return float(self.min_media_duration)
1859
1860         return 0
1861
1862     def does_reverse_playback(self):
1863         if hasattr(self, "reverse_playback"):
1864             return bool(self.reverse_playback)
1865
1866         return False
1867
1868     def get_duration(self):
1869         try:
1870             return float(getattr(self, "duration"))
1871         except AttributeError:
1872             return 0
1873
1874     def get_min_tracks(self, track_type):
1875         try:
1876             return int(getattr(self, "min_%s_track" % track_type))
1877         except AttributeError:
1878             return 0
1879
1880     def __repr__(self):
1881         return "<Scenario %s>" % self.name
1882
1883
1884 class ScenarioManager(Loggable):
1885     _instance = None
1886     all_scenarios = []
1887
1888     FILE_EXTENSION = "scenario"
1889
1890     def __new__(cls, *args, **kwargs):
1891         if not cls._instance:
1892             cls._instance = super(ScenarioManager, cls).__new__(
1893                 cls, *args, **kwargs)
1894             cls._instance.config = None
1895             cls._instance.discovered = False
1896             Loggable.__init__(cls._instance)
1897
1898         return cls._instance
1899
1900     def find_special_scenarios(self, mfile):
1901         scenarios = []
1902         mfile_bname = os.path.basename(mfile)
1903
1904         for f in os.listdir(os.path.dirname(mfile)):
1905             if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
1906                 scenarios.append(os.path.join(os.path.dirname(mfile), f))
1907
1908         if scenarios:
1909             scenarios = self.discover_scenarios(scenarios, mfile)
1910
1911         return scenarios
1912
1913     def discover_scenarios(self, scenario_paths=[], mfile=None):
1914         """
1915         Discover scenarios specified in scenario_paths or the default ones
1916         if nothing specified there
1917         """
1918         scenarios = []
1919         scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
1920         logs = open(os.path.join(self.config.logsdir,
1921                                  "scenarios_discovery.log"), 'w')
1922
1923         try:
1924             command = [GstValidateBaseTestManager.COMMAND,
1925                        "--scenarios-defs-output-file", scenario_defs]
1926             command.extend(scenario_paths)
1927             subprocess.check_call(command, stdout=logs, stderr=logs)
1928         except subprocess.CalledProcessError:
1929             pass
1930
1931         config = configparser.RawConfigParser()
1932         f = open(scenario_defs)
1933         config.readfp(f)
1934
1935         for section in config.sections():
1936             if scenario_paths:
1937                 for scenario_path in scenario_paths:
1938                     if mfile is None:
1939                         name = section
1940                         path = scenario_path
1941                     elif section in scenario_path:
1942                         # The real name of the scenario is:
1943                         # filename.REALNAME.scenario
1944                         name = scenario_path.replace(mfile + ".", "").replace(
1945                             "." + self.FILE_EXTENSION, "")
1946                         path = scenario_path
1947             else:
1948                 name = section
1949                 path = None
1950
1951             props = config.items(section)
1952             scenarios.append(Scenario(name, props, path))
1953
1954         if not scenario_paths:
1955             self.discovered = True
1956             self.all_scenarios.extend(scenarios)
1957
1958         return scenarios
1959
1960     def get_scenario(self, name):
1961         if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
1962             scenarios = self.discover_scenarios([name])
1963
1964             if scenarios:
1965                 return scenarios[0]
1966
1967         if self.discovered is False:
1968             self.discover_scenarios()
1969
1970         if name is None:
1971             return self.all_scenarios
1972
1973         try:
1974             return [scenario for scenario in self.all_scenarios if scenario.name == name][0]
1975         except IndexError:
1976             self.warning("Scenario: %s not found" % name)
1977             return None
1978
1979
1980 class GstValidateBaseTestManager(TestsManager):
1981     scenarios_manager = ScenarioManager()
1982     features_cache = {}
1983
1984     def __init__(self):
1985         super(GstValidateBaseTestManager, self).__init__()
1986         self._scenarios = []
1987         self._encoding_formats = []
1988
1989     @classmethod
1990     def update_commands(cls, extra_paths=None):
1991         for varname, cmd in {'': 'gst-validate',
1992                              'TRANSCODING_': 'gst-validate-transcoding',
1993                              'MEDIA_CHECK_': 'gst-validate-media-check',
1994                              'RTSP_SERVER_': 'gst-validate-rtsp-server',
1995                              'INSPECT_': 'gst-inspect'}.items():
1996             setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
1997
1998     @classmethod
1999     def has_feature(cls, featurename):
2000         try:
2001             return cls.features_cache[featurename]
2002         except KeyError:
2003             pass
2004
2005         try:
2006             subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2007             res = True
2008         except subprocess.CalledProcessError:
2009             res = False
2010
2011         cls.features_cache[featurename] = res
2012         return res
2013
2014     def add_scenarios(self, scenarios):
2015         """
2016         @scenarios A list or a unic scenario name(s) to be run on the tests.
2017                     They are just the default scenarios, and then depending on
2018                     the TestsGenerator to be used you can have more fine grained
2019                     control on what to be run on each serie of tests.
2020         """
2021         if isinstance(scenarios, list):
2022             self._scenarios.extend(scenarios)
2023         else:
2024             self._scenarios.append(scenarios)
2025
2026         self._scenarios = list(set(self._scenarios))
2027
2028     def set_scenarios(self, scenarios):
2029         """
2030         Override the scenarios
2031         """
2032         self._scenarios = []
2033         self.add_scenarios(scenarios)
2034
2035     def get_scenarios(self):
2036         return self._scenarios
2037
2038     def add_encoding_formats(self, encoding_formats):
2039         """
2040         :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2041                            formats for transcoding test.
2042                            They are just the default encoding formats, and then depending on
2043                            the TestsGenerator to be used you can have more fine grained
2044                            control on what to be run on each serie of tests.
2045         """
2046         if isinstance(encoding_formats, list):
2047             self._encoding_formats.extend(encoding_formats)
2048         else:
2049             self._encoding_formats.append(encoding_formats)
2050
2051         self._encoding_formats = list(set(self._encoding_formats))
2052
2053     def get_encoding_formats(self):
2054         return self._encoding_formats
2055
2056
2057 GstValidateBaseTestManager.update_commands()
2058
2059
2060 class MediaDescriptor(Loggable):
2061
2062     def __init__(self):
2063         Loggable.__init__(self)
2064
2065     def get_path(self):
2066         raise NotImplemented
2067
2068     def get_media_filepath(self):
2069         raise NotImplemented
2070
2071     def get_caps(self):
2072         raise NotImplemented
2073
2074     def get_uri(self):
2075         raise NotImplemented
2076
2077     def get_duration(self):
2078         raise NotImplemented
2079
2080     def get_protocol(self):
2081         raise NotImplemented
2082
2083     def is_seekable(self):
2084         raise NotImplemented
2085
2086     def is_live(self):
2087         raise NotImplemented
2088
2089     def is_image(self):
2090         raise NotImplemented
2091
2092     def get_num_tracks(self, track_type):
2093         raise NotImplemented
2094
2095     def can_play_reverse(self):
2096         raise NotImplemented
2097
2098     def prerrols(self):
2099         return True
2100
2101     def is_compatible(self, scenario):
2102         if scenario is None:
2103             return True
2104
2105         if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2106             self.debug("Do not run %s as %s does not support seeking",
2107                        scenario, self.get_uri())
2108             return False
2109
2110         if self.is_image() and scenario.needs_clock_sync():
2111             self.debug("Do not run %s as %s is an image",
2112                        scenario, self.get_uri())
2113             return False
2114
2115         if not self.can_play_reverse() and scenario.does_reverse_playback():
2116             return False
2117
2118         if not self.is_live() and scenario.needs_live_content():
2119             self.debug("Do not run %s as %s is not a live content",
2120                        scenario, self.get_uri())
2121             return False
2122
2123         if self.is_live() and not scenario.compatible_with_live_content():
2124             self.debug("Do not run %s as %s is a live content",
2125                        scenario, self.get_uri())
2126             return False
2127
2128         if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2129             return False
2130
2131         if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2132             self.debug(
2133                 "Do not run %s as %s is too short (%i < min media duation : %i",
2134                 scenario, self.get_uri(),
2135                 self.get_duration() / GST_SECOND,
2136                 scenario.get_min_media_duration())
2137             return False
2138
2139         for track_type in ['audio', 'subtitle', 'video']:
2140             if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2141                 self.debug("%s -- %s | At least %s %s track needed  < %s"
2142                            % (scenario, self.get_uri(), track_type,
2143                               scenario.get_min_tracks(track_type),
2144                               self.get_num_tracks(track_type)))
2145                 return False
2146
2147         return True
2148
2149
2150 class GstValidateMediaDescriptor(MediaDescriptor):
2151     # Some extension file for discovering results
2152     MEDIA_INFO_EXT = "media_info"
2153     STREAM_INFO_EXT = "stream_info"
2154
2155     def __init__(self, xml_path):
2156         super(GstValidateMediaDescriptor, self).__init__()
2157
2158         self._xml_path = xml_path
2159         try:
2160             media_xml = ET.parse(xml_path).getroot()
2161         except xml.etree.ElementTree.ParseError:
2162             printc("Could not parse %s" % xml_path,
2163                    Colors.FAIL)
2164             raise
2165
2166         self._extract_data(media_xml)
2167
2168         self.set_protocol(urllib.parse.urlparse(
2169             urllib.parse.urlparse(self.get_uri()).scheme).scheme)
2170
2171     def _extract_data(self, media_xml):
2172         # Extract the information we need from the xml
2173         self._caps = media_xml.findall("streams")[0].attrib["caps"]
2174         self._track_caps = []
2175         try:
2176             streams = media_xml.findall("streams")[0].findall("stream")
2177         except IndexError:
2178             pass
2179         else:
2180             for stream in streams:
2181                 self._track_caps.append(
2182                     (stream.attrib["type"], stream.attrib["caps"]))
2183         self._uri = media_xml.attrib["uri"]
2184         self._duration = int(media_xml.attrib["duration"])
2185         self._protocol = media_xml.get("protocol", None)
2186         self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2187         self._is_live = media_xml.get("live", "false").lower() == "true"
2188         self._is_image = False
2189         for stream in media_xml.findall("streams")[0].findall("stream"):
2190             if stream.attrib["type"] == "image":
2191                 self._is_image = True
2192         self._track_types = []
2193         for stream in media_xml.findall("streams")[0].findall("stream"):
2194             self._track_types.append(stream.attrib["type"])
2195
2196     @staticmethod
2197     def new_from_uri(uri, verbose=False, include_frames=False):
2198         """
2199             include_frames = 0 # Never
2200             include_frames = 1 # always
2201             include_frames = 2 # if previous file included them
2202
2203         """
2204         media_path = utils.url2path(uri)
2205
2206         descriptor_path = "%s.%s" % (
2207             media_path, GstValidateMediaDescriptor.MEDIA_INFO_EXT)
2208         if include_frames == 2:
2209             try:
2210                 media_xml = ET.parse(descriptor_path).getroot()
2211                 frames = media_xml.findall('streams/stream/frame')
2212                 include_frames = bool(frames)
2213             except FileNotFoundError:
2214                 pass
2215         else:
2216             include_frames = bool(include_frames)
2217
2218         args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2219         args.append(uri)
2220
2221         args.extend(["--output-file", descriptor_path])
2222         if include_frames:
2223             args.extend(["--full"])
2224
2225         if verbose:
2226             printc("Generating media info for %s\n"
2227                    "    Command: '%s'" % (media_path, ' '.join(args)),
2228                    Colors.OKBLUE)
2229
2230         try:
2231             subprocess.check_output(args, stderr=open(os.devnull))
2232         except subprocess.CalledProcessError as e:
2233             if verbose:
2234                 printc("Result: Failed", Colors.FAIL)
2235             else:
2236                 loggable.warning("GstValidateMediaDescriptor",
2237                                  "Exception: %s" % e)
2238             return None
2239
2240         if verbose:
2241             printc("Result: Passed", Colors.OKGREEN)
2242
2243         try:
2244             return GstValidateMediaDescriptor(descriptor_path)
2245         except (IOError, xml.etree.ElementTree.ParseError):
2246             return None
2247
2248     def get_path(self):
2249         return self._xml_path
2250
2251     def need_clock_sync(self):
2252         return Protocols.needs_clock_sync(self.get_protocol())
2253
2254     def get_media_filepath(self):
2255         if self.get_protocol() == Protocols.FILE:
2256             return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "")
2257         else:
2258             return self._xml_path.replace("." + self.STREAM_INFO_EXT, "")
2259
2260     def get_caps(self):
2261         return self._caps
2262
2263     def get_tracks_caps(self):
2264         return self._track_caps
2265
2266     def get_uri(self):
2267         return self._uri
2268
2269     def get_duration(self):
2270         return self._duration
2271
2272     def set_protocol(self, protocol):
2273         self._protocol = protocol
2274
2275     def get_protocol(self):
2276         return self._protocol
2277
2278     def is_seekable(self):
2279         return self._is_seekable
2280
2281     def is_live(self):
2282         return self._is_live
2283
2284     def can_play_reverse(self):
2285         return True
2286
2287     def is_image(self):
2288         return self._is_image
2289
2290     def get_num_tracks(self, track_type):
2291         n = 0
2292         for t in self._track_types:
2293             if t == track_type:
2294                 n += 1
2295
2296         return n
2297
2298     def get_clean_name(self):
2299         name = os.path.basename(self.get_path())
2300         name = re.sub("\.stream_info|\.media_info", "", name)
2301
2302         return name.replace('.', "_")
2303
2304
2305 class MediaFormatCombination(object):
2306     FORMATS = {"aac": "audio/mpeg,mpegversion=4",  # Audio
2307                "ac3": "audio/x-ac3",
2308                "vorbis": "audio/x-vorbis",
2309                "mp3": "audio/mpeg,mpegversion=1,layer=3",
2310                "opus": "audio/x-opus",
2311                "rawaudio": "audio/x-raw",
2312
2313                # Video
2314                "h264": "video/x-h264",
2315                "h265": "video/x-h265",
2316                "vp8": "video/x-vp8",
2317                "vp9": "video/x-vp9",
2318                "theora": "video/x-theora",
2319                "prores": "video/x-prores",
2320                "jpeg": "image/jpeg",
2321
2322                # Containers
2323                "webm": "video/webm",
2324                "ogg": "application/ogg",
2325                "mkv": "video/x-matroska",
2326                "mp4": "video/quicktime,variant=iso;",
2327                "quicktime": "video/quicktime;"}
2328
2329     def __str__(self):
2330         return "%s and %s in %s" % (self.audio, self.video, self.container)
2331
2332     def __init__(self, container, audio, video):
2333         """
2334         Describes a media format to be used for transcoding tests.
2335
2336         :param container: A string defining the container format to be used, must bin in self.FORMATS
2337         :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2338         :param video: A string defining the video format to be used, must bin in self.FORMATS
2339         """
2340         self.container = container
2341         self.audio = audio
2342         self.video = video
2343
2344     def get_caps(self, track_type):
2345         try:
2346             return self.FORMATS[self.__dict__[track_type]]
2347         except KeyError:
2348             return None
2349
2350     def get_audio_caps(self):
2351         return self.get_caps("audio")
2352
2353     def get_video_caps(self):
2354         return self.get_caps("video")
2355
2356     def get_muxer_caps(self):
2357         return self.get_caps("container")