validate: launcher: Add a argument to avoid rereuning flaky tests
[platform/upstream/gstreamer.git] / subprojects / gst-devtools / validate / launcher / baseclasses.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19
20 """ Class representing tests and test managers. """
21
22 from enum import Enum
23 import importlib.util
24 import json
25 import os
26 import sys
27 import re
28 import copy
29 import shlex
30 import socketserver
31 import struct
32 import time
33 from . import utils
34 import signal
35 import urllib.parse
36 import subprocess
37 import threading
38 import queue
39 import configparser
40 import xml
41 import random
42 import shutil
43 import uuid
44 from itertools import cycle
45 from fractions import Fraction
46
47 from .utils import GstCaps, which
48 from . import reporters
49 from . import loggable
50 from .loggable import Loggable
51
52 from collections import defaultdict
53 try:
54     from lxml import etree as ET
55 except ImportError:
56     import xml.etree.cElementTree as ET
57
58
59 from .vfb_server import get_virual_frame_buffer_server
60 from .httpserver import HTTPServer
61 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
62     Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
63     check_bugs_resolution, is_tty
64
65 # The factor by which we increase the hard timeout when running inside
66 # Valgrind
67 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
68 RR_TIMEOUT_FACTOR = 2
69 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
70 # The error reported by valgrind when detecting errors
71 VALGRIND_ERROR_CODE = 20
72
73 VALIDATE_OVERRIDE_EXTENSION = ".override"
74 EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
75     'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
76     'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
77 EXITING_SIGNALS.update({139: "SIGSEGV"})
78 EXITING_SIGNALS.update({(v, k) for k, v in EXITING_SIGNALS.items()})
79
80
81 CI_ARTIFACTS_URL = os.environ.get('CI_ARTIFACTS_URL')
82
83
84 class Test(Loggable):
85
86     """ A class representing a particular test. """
87
88     def __init__(self, application_name, classname, options,
89                  reporter, duration=0, timeout=DEFAULT_TIMEOUT,
90                  hard_timeout=None, extra_env_variables=None,
91                  expected_issues=None, is_parallel=True,
92                  workdir=None):
93         """
94         @timeout: The timeout during which the value return by get_current_value
95                   keeps being exactly equal
96         @hard_timeout: Max time the test can take in absolute
97         """
98         Loggable.__init__(self)
99         self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
100         if hard_timeout:
101             self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
102             self.hard_timeout *= options.timeout_factor
103         else:
104             self.hard_timeout = hard_timeout
105         self.classname = classname
106         self.options = options
107         self.application = application_name
108         self.command = []
109         self.server_command = None
110         self.reporter = reporter
111         self.process = None
112         self.proc_env = None
113         self.thread = None
114         self.queue = None
115         self.duration = duration
116         self.stack_trace = None
117         self._uuid = None
118         if expected_issues is None:
119             self.expected_issues = []
120         elif not isinstance(expected_issues, list):
121             self.expected_issues = [expected_issues]
122         else:
123             self.expected_issues = expected_issues
124
125         extra_env_variables = extra_env_variables or {}
126         self.extra_env_variables = extra_env_variables
127         self.optional = False
128         self.is_parallel = is_parallel
129         self.generator = None
130         self.workdir = workdir
131         self.max_retries = 0
132         self.html_log = None
133         self.rr_logdir = None
134
135         self.clean()
136
137     def _generate_expected_issues(self):
138         return ''
139
140     def generate_expected_issues(self):
141         res = '%s"FIXME \'%s\' issues [REPORT A BUG ' % (" " * 4, self.classname) \
142             + 'in https://gitlab.freedesktop.org/gstreamer/ '\
143             + 'or use a proper bug description]": {'
144         res += """
145         "tests": [
146             "%s"
147         ],
148         "issues": [""" % (self.classname)
149
150         retcode = self.process.returncode if self.process else 0
151         if retcode != 0:
152             signame = EXITING_SIGNALS.get(retcode)
153             val = "'" + signame + "'" if signame else retcode
154             res += """\n            {
155                 '%s': %s,
156                 'sometimes': True,
157             },""" % ("signame" if signame else "returncode", val)
158
159         res += self._generate_expected_issues()
160         res += "\n%s],\n%s},\n" % (" " * 8, " " * 4)
161
162         return res
163
164     def copy(self, nth=None):
165         copied_test = copy.copy(self)
166         if nth:
167             copied_test.classname += '_it' + str(nth)
168             copied_test.options = copy.copy(self.options)
169             copied_test.options.logsdir = os.path.join(copied_test.options.logsdir, str(nth))
170             os.makedirs(copied_test.options.logsdir, exist_ok=True)
171
172         return copied_test
173
174     def clean(self):
175         self.kill_subprocess()
176         self.message = ""
177         self.error_str = ""
178         self.time_taken = 0.0
179         self._starting_time = None
180         self.result = Result.NOT_RUN
181         self.logfile = None
182         self.out = None
183         self.extra_logfiles = set()
184         self.__env_variable = []
185         self.kill_subprocess()
186         self.process = None
187
188     def __str__(self):
189         string = self.classname
190         if self.result != Result.NOT_RUN:
191             string += ": " + self.result
192             if self.result in [Result.FAILED, Result.TIMEOUT]:
193                 string += " '%s'" % self.message
194                 if not self.options.dump_on_failure:
195                     if not self.options.redirect_logs and self.result != Result.PASSED:
196                         string += self.get_logfile_repr()
197                 else:
198                     string = "\n==> %s" % string
199
200         return string
201
202     def add_env_variable(self, variable, value=None):
203         """
204         Only useful so that the gst-validate-launcher can print the exact
205         right command line to reproduce the tests
206         """
207         if value is None:
208             value = os.environ.get(variable, None)
209
210         if value is None:
211             return
212
213         self.__env_variable.append(variable)
214
215     @property
216     def _env_variable(self):
217         res = ""
218         if not self.options.verbose or self.options.verbose > 1:
219             for var in set(self.__env_variable):
220                 if res:
221                     res += " "
222                 value = self.proc_env.get(var, None)
223                 if value is not None:
224                     res += "%s='%s'" % (var, value)
225         else:
226             res += "[Not displaying environment variables, rerun with -vv for the full command]"
227
228         return res
229
230     def open_logfile(self):
231         if self.out:
232             return
233
234         path = os.path.join(self.options.logsdir,
235                             self.classname.replace(".", os.sep) + '.md')
236         mkdir(os.path.dirname(path))
237         self.logfile = path
238
239         if self.options.redirect_logs == 'stdout':
240             self.out = sys.stdout
241         elif self.options.redirect_logs == 'stderr':
242             self.out = sys.stderr
243         else:
244             self.out = open(path, 'w+')
245
246     def finalize_logfiles(self):
247         self.out.write("\n**Duration**: %s" % self.time_taken)
248         if not self.options.redirect_logs:
249             self.out.flush()
250             for logfile in self.extra_logfiles:
251                 # Only copy over extra logfile content if it's below a certain threshold
252                 # Avoid copying gigabytes of data if a lot of debugging is activated
253                 if os.path.getsize(logfile) < 500 * 1024:
254                     self.out.write('\n\n## %s:\n\n```\n%s\n```\n' % (
255                         os.path.basename(logfile), self.get_extra_log_content(logfile))
256                     )
257                 else:
258                     self.out.write('\n\n## %s:\n\n**Log file too big.**\n  %s\n\n Check file content directly\n\n' % (
259                         os.path.basename(logfile), logfile)
260                     )
261
262             if self.rr_logdir:
263                 self.out.write('\n\n## rr trace:\n\n```\nrr replay %s/latest-trace\n```\n' % (
264                     self.rr_logdir))
265
266             self.out.flush()
267             self.out.close()
268
269         if self.options.html:
270             self.html_log = os.path.splitext(self.logfile)[0] + '.html'
271             import commonmark
272             parser = commonmark.Parser()
273             with open(self.logfile) as f:
274                 ast = parser.parse(f.read())
275
276             renderer = commonmark.HtmlRenderer()
277             html = renderer.render(ast)
278             with open(self.html_log, 'w') as f:
279                 f.write(html)
280
281         self.out = None
282
283     def _get_file_content(self, file_name):
284         f = open(file_name, 'r+')
285         value = f.read()
286         f.close()
287
288         return value
289
290     def get_log_content(self):
291         return self._get_file_content(self.logfile)
292
293     def get_extra_log_content(self, extralog):
294         if extralog not in self.extra_logfiles:
295             return ""
296
297         return self._get_file_content(extralog)
298
299     def get_classname(self):
300         name = self.classname.split('.')[-1]
301         classname = self.classname.replace('.%s' % name, '')
302
303         return classname
304
305     def get_name(self):
306         return self.classname.split('.')[-1]
307
308     def get_uuid(self):
309         if self._uuid is None:
310             self._uuid = self.classname + str(uuid.uuid4())
311         return self._uuid
312
313     def add_arguments(self, *args):
314         self.command += args
315
316     def build_arguments(self):
317         self.add_env_variable("LD_PRELOAD")
318         self.add_env_variable("DISPLAY")
319
320     def add_stack_trace_to_logfile(self):
321         self.debug("Adding stack trace")
322         if self.options.rr:
323             return
324
325         trace_gatherer = BackTraceGenerator.get_default()
326         stack_trace = trace_gatherer.get_trace(self)
327
328         if not stack_trace:
329             return
330
331         info = "\n\n## Stack trace\n\n```\n%s\n```" % stack_trace
332         if self.options.redirect_logs:
333             print(info)
334             return
335
336         if self.options.xunit_file:
337             self.stack_trace = stack_trace
338
339         self.out.write(info)
340         self.out.flush()
341
342     def add_known_issue_information(self):
343         if self.expected_issues:
344             info = "\n\n## Already known issues\n\n``` python\n%s\n```\n\n" % (
345                 json.dumps(self.expected_issues, indent=4)
346             )
347         else:
348             info = ""
349
350         info += "\n\n**You can mark the issues as 'known' by adding the " \
351             + " following lines to the list of known issues**\n" \
352             + "\n\n``` python\n%s\n```" % (self.generate_expected_issues())
353
354         if self.options.redirect_logs:
355             print(info)
356             return
357
358         self.out.write(info)
359
360     def set_result(self, result, message="", error=""):
361
362         if not self.options.redirect_logs:
363             self.out.write("\n```\n")
364             self.out.flush()
365
366         self.debug("Setting result: %s (message: %s, error: %s)" % (result,
367                                                                     message, error))
368
369         if result is Result.TIMEOUT:
370             if self.options.debug is True:
371                 if self.options.gdb:
372                     printc("Timeout, you should process <ctrl>c to get into gdb",
373                            Colors.FAIL)
374                     # and wait here until gdb exits
375                     self.process.communicate()
376                 else:
377                     pname = self.command[0]
378                     input("%sTimeout happened  on %s you can attach gdb doing:\n $gdb %s %d%s\n"
379                           "Press enter to continue" % (Colors.FAIL, self.classname,
380                           pname, self.process.pid, Colors.ENDC))
381             else:
382                 self.add_stack_trace_to_logfile()
383
384         self.result = result
385         self.message = message
386         self.error_str = error
387
388         if result not in [Result.PASSED, Result.NOT_RUN, Result.SKIPPED]:
389             self.add_known_issue_information()
390
391     def check_results(self):
392         if self.result is Result.FAILED or self.result is Result.TIMEOUT:
393             return
394
395         self.debug("%s returncode: %s", self, self.process.returncode)
396         if self.options.rr and self.process.returncode == -signal.SIGPIPE:
397             self.set_result(Result.SKIPPED, "SIGPIPE received under `rr`, known issue.")
398         elif self.process.returncode == 0:
399             self.set_result(Result.PASSED)
400         elif self.process.returncode in EXITING_SIGNALS:
401             self.add_stack_trace_to_logfile()
402             self.set_result(Result.FAILED,
403                             "Application exited with signal %s" % (
404                                 EXITING_SIGNALS[self.process.returncode]))
405         elif self.process.returncode == VALGRIND_ERROR_CODE:
406             self.set_result(Result.FAILED, "Valgrind reported errors")
407         else:
408             self.set_result(Result.FAILED,
409                             "Application returned %d" % (self.process.returncode))
410
411     def get_current_value(self):
412         """
413         Lets subclasses implement a nicer timeout measurement method
414         They should return some value with which we will compare
415         the previous and timeout if they are egual during self.timeout
416         seconds
417         """
418         return Result.NOT_RUN
419
420     def process_update(self):
421         """
422         Returns True when process has finished running or has timed out.
423         """
424
425         if self.process is None:
426             # Process has not started running yet
427             return False
428
429         self.process.poll()
430         if self.process.returncode is not None:
431             return True
432
433         val = self.get_current_value()
434
435         self.debug("Got value: %s" % val)
436         if val is Result.NOT_RUN:
437             # The get_current_value logic is not implemented... dumb
438             # timeout
439             if time.time() - self.last_change_ts > self.timeout:
440                 self.set_result(Result.TIMEOUT,
441                                 "Application timed out: %s secs" %
442                                 self.timeout,
443                                 "timeout")
444                 return True
445             return False
446         elif val is Result.FAILED:
447             return True
448         elif val is Result.KNOWN_ERROR:
449             return True
450
451         self.log("New val %s" % val)
452
453         if val == self.last_val:
454             delta = time.time() - self.last_change_ts
455             self.debug("%s: Same value for %d/%d seconds" %
456                        (self, delta, self.timeout))
457             if delta > self.timeout:
458                 self.set_result(Result.TIMEOUT,
459                                 "Application timed out: %s secs" %
460                                 self.timeout,
461                                 "timeout")
462                 return True
463         elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
464             self.set_result(
465                 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
466             return True
467         else:
468             self.last_change_ts = time.time()
469             self.last_val = val
470
471         return False
472
473     def get_subproc_env(self):
474         return os.environ.copy()
475
476     def kill_subprocess(self):
477         subprocs_id = None
478         if self.options.rr and self.process and self.process.returncode is None:
479             cmd = ["ps", "-o", "pid", "--ppid", str(self.process.pid), "--noheaders"]
480             try:
481                 subprocs_id = [int(pid.strip('\n')) for
482                     pid in subprocess.check_output(cmd).decode().split(' ') if pid]
483             except FileNotFoundError:
484                 self.error("Ps not found, will probably not be able to get rr "
485                     "working properly after we kill the process")
486             except subprocess.CalledProcessError as e:
487                 self.error("Couldn't get rr subprocess pid: %s" % (e))
488
489         utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT, subprocs_id)
490
491     def run_external_checks(self):
492         pass
493
494     def thread_wrapper(self):
495         def enable_sigint():
496             # Restore the SIGINT handler for the child process (gdb) to ensure
497             # it can handle it.
498             signal.signal(signal.SIGINT, signal.SIG_DFL)
499
500         if self.options.gdb and os.name != "nt":
501             preexec_fn = enable_sigint
502         else:
503             preexec_fn = None
504
505         self.process = subprocess.Popen(self.command,
506                                         stderr=self.out,
507                                         stdout=self.out,
508                                         env=self.proc_env,
509                                         cwd=self.workdir,
510                                         preexec_fn=preexec_fn)
511         self.process.wait()
512         if self.result is not Result.TIMEOUT:
513             if self.process.returncode == 0:
514                 self.run_external_checks()
515             self.queue.put(None)
516
517     def get_valgrind_suppression_file(self, subdir, name):
518         p = get_data_file(subdir, name)
519         if p:
520             return p
521
522         self.error("Could not find any %s file" % name)
523
524     def get_valgrind_suppressions(self):
525         return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
526
527     def use_gdb(self, command):
528         if self.hard_timeout is not None:
529             self.hard_timeout *= GDB_TIMEOUT_FACTOR
530         self.timeout *= GDB_TIMEOUT_FACTOR
531
532         if not self.options.gdb_non_stop:
533             self.timeout = sys.maxsize
534             self.hard_timeout = sys.maxsize
535
536         args = ["gdb"]
537         if self.options.gdb_non_stop:
538             args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
539         args += ["--args"] + command
540         return args
541
542     def use_rr(self, command, subenv):
543         command = ["rr", 'record', '-h'] + command
544
545         self.timeout *= RR_TIMEOUT_FACTOR
546         self.rr_logdir = os.path.join(self.options.logsdir, self.classname.replace(".", os.sep), 'rr-logs')
547         subenv['_RR_TRACE_DIR'] = self.rr_logdir
548         try:
549             shutil.rmtree(self.rr_logdir, ignore_errors=False, onerror=None)
550         except FileNotFoundError:
551             pass
552         self.add_env_variable('_RR_TRACE_DIR', self.rr_logdir)
553
554         return command
555
556     def use_valgrind(self, command, subenv):
557         vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
558         self.extra_logfiles.add(vglogsfile)
559
560         vg_args = []
561
562         for o, v in [('trace-children', 'yes'),
563                      ('tool', 'memcheck'),
564                      ('leak-check', 'full'),
565                      ('leak-resolution', 'high'),
566                      # TODO: errors-for-leak-kinds should be set to all instead of definite
567                      # and all false positives should be added to suppression
568                      # files.
569                      ('errors-for-leak-kinds', 'definite,indirect'),
570                      ('show-leak-kinds', 'definite,indirect'),
571                      ('show-possibly-lost', 'no'),
572                      ('num-callers', '20'),
573                      ('error-exitcode', str(VALGRIND_ERROR_CODE)),
574                      ('gen-suppressions', 'all')]:
575             vg_args.append("--%s=%s" % (o, v))
576
577         if not self.options.redirect_logs:
578             vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
579             self.extra_logfiles.add(vglogsfile)
580             vg_args.append("--%s=%s" % ('log-file', vglogsfile))
581
582         for supp in self.get_valgrind_suppressions():
583             vg_args.append("--suppressions=%s" % supp)
584
585         command = ["valgrind"] + vg_args + command
586
587         # Tune GLib's memory allocator to be more valgrind friendly
588         subenv['G_DEBUG'] = 'gc-friendly'
589         subenv['G_SLICE'] = 'always-malloc'
590
591         if self.hard_timeout is not None:
592             self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
593         self.timeout *= VALGRIND_TIMEOUT_FACTOR
594
595         # Enable 'valgrind.config'
596         self.add_validate_config(get_data_file(
597             'data', 'valgrind.config'), subenv)
598         if subenv == self.proc_env:
599             self.add_env_variable('G_DEBUG', 'gc-friendly')
600             self.add_env_variable('G_SLICE', 'always-malloc')
601             self.add_env_variable('GST_VALIDATE_CONFIG',
602                                   self.proc_env['GST_VALIDATE_CONFIG'])
603
604         return command
605
606     def add_validate_config(self, config, subenv=None):
607         if not subenv:
608             subenv = self.extra_env_variables
609
610         cconf = subenv.get('GST_VALIDATE_CONFIG', "")
611         paths = [c for c in cconf.split(os.pathsep) if c] + [config]
612         subenv['GST_VALIDATE_CONFIG'] = os.pathsep.join(paths)
613
614     def launch_server(self):
615         return None
616
617     def get_logfile_repr(self):
618         if not self.options.redirect_logs:
619             if self.html_log:
620                 log = self.html_log
621             else:
622                 log = self.logfile
623
624             if CI_ARTIFACTS_URL:
625                 log = CI_ARTIFACTS_URL + os.path.relpath(log, self.options.logsdir)
626
627             return "\n    Log: %s" % (log)
628
629         return ""
630
631     def get_command_repr(self):
632         message = "%s %s" % (self._env_variable, ' '.join(
633             shlex.quote(arg) for arg in self.command))
634         if self.server_command:
635             message = "%s & %s" % (self.server_command, message)
636
637         return message
638
639     def test_start(self, queue):
640         self.open_logfile()
641
642         self.server_command = self.launch_server()
643         self.queue = queue
644         self.command = [self.application]
645         self._starting_time = time.time()
646         self.build_arguments()
647         self.proc_env = self.get_subproc_env()
648
649         for var, value in list(self.extra_env_variables.items()):
650             value = self.proc_env.get(var, '') + os.pathsep + value
651             self.proc_env[var] = value.strip(os.pathsep)
652             self.add_env_variable(var, self.proc_env[var])
653
654         if self.options.gdb:
655             self.command = self.use_gdb(self.command)
656
657             self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
658             # Make the gst-validate executable ignore SIGINT while gdb is
659             # running.
660             signal.signal(signal.SIGINT, signal.SIG_IGN)
661
662         if self.options.valgrind:
663             self.command = self.use_valgrind(self.command, self.proc_env)
664
665         if self.options.rr:
666             self.command = self.use_rr(self.command, self.proc_env)
667
668         if not self.options.redirect_logs:
669             self.out.write("# `%s`\n\n"
670                            "## Command\n\n``` bash\n%s\n```\n\n" % (
671                                self.classname, self.get_command_repr()))
672             self.out.write("## %s output\n\n``` \n\n" % os.path.basename(self.application))
673             self.out.flush()
674         else:
675             message = "Launching: %s%s\n" \
676                 "    Command: %s\n" % (Colors.ENDC, self.classname,
677                                        self.get_command_repr())
678             printc(message, Colors.OKBLUE)
679
680         self.thread = threading.Thread(target=self.thread_wrapper)
681         self.thread.start()
682
683         self.last_val = 0
684         self.last_change_ts = time.time()
685         self.start_ts = time.time()
686
687     def _dump_log_file(self, logfile):
688         if which('bat'):
689             try:
690                 subprocess.check_call(['bat', '-H', '1', '--paging=never', logfile])
691                 return
692             except (subprocess.CalledProcessError, FileNotFoundError):
693                 pass
694
695         with open(logfile, 'r') as fin:
696             for line in fin.readlines():
697                 print('> ' + line, end='')
698
699     def _dump_log_files(self):
700         self._dump_log_file(self.logfile)
701
702     def copy_logfiles(self, extra_folder="flaky_tests"):
703         path = os.path.dirname(os.path.join(self.options.logsdir, extra_folder,
704                             self.classname.replace(".", os.sep)))
705         mkdir(path)
706         self.logfile = shutil.copy(self.logfile, path)
707         extra_logs = []
708         for logfile in self.extra_logfiles:
709             extra_logs.append(shutil.copy(logfile, path))
710         self.extra_logfiles = extra_logs
711
712     def test_end(self, retry_on_failures=False):
713         self.kill_subprocess()
714         self.thread.join()
715         self.time_taken = time.time() - self._starting_time
716
717         if self.options.gdb:
718             signal.signal(signal.SIGINT, self.previous_sigint_handler)
719
720         self.finalize_logfiles()
721         if self.options.dump_on_failure and not retry_on_failures and not self.max_retries:
722             if self.result not in [Result.PASSED, Result.KNOWN_ERROR, Result.NOT_RUN]:
723                 self._dump_log_files()
724
725         # Only keep around env variables we need later
726         clean_env = {}
727         for n in self.__env_variable:
728             clean_env[n] = self.proc_env.get(n, None)
729         self.proc_env = clean_env
730
731         # Don't keep around JSON report objects, they were processed
732         # in check_results already
733         self.reports = []
734
735         return self.result
736
737
738 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
739     pass
740
741
742 class GstValidateListener(socketserver.BaseRequestHandler, Loggable):
743
744     def __init__(self, *args, **kwargs):
745         super().__init__(*args, **kwargs)
746         Loggable.__init__(self, "GstValidateListener")
747
748     def handle(self):
749         """Implements BaseRequestHandler handle method"""
750         test = None
751         self.logCategory = "GstValidateListener"
752         while True:
753             raw_len = self.request.recv(4)
754             if raw_len == b'':
755                 return
756             msglen = struct.unpack('>I', raw_len)[0]
757             e = None
758             raw_msg = bytes()
759             while msglen != len(raw_msg):
760                 raw_msg += self.request.recv(msglen - len(raw_msg))
761             if e is not None:
762                 continue
763             try:
764                 msg = raw_msg.decode('utf-8', 'ignore')
765             except UnicodeDecodeError as e:
766                 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
767                 continue
768
769             if msg == '':
770                 return
771
772             try:
773                 obj = json.loads(msg)
774             except json.decoder.JSONDecodeError as e:
775                 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
776                 continue
777
778             if test is None:
779                 # First message must contain the uuid
780                 uuid = obj.get("uuid", None)
781                 if uuid is None:
782                     return
783                 # Find test from launcher
784                 for t in self.server.launcher.tests:
785                     if uuid == t.get_uuid():
786                         test = t
787                         break
788                 if test is None:
789                     self.server.launcher.error(
790                         "Could not find test for UUID %s" % uuid)
791                     return
792
793             obj_type = obj.get("type", '')
794             if obj_type == 'position':
795                 test.set_position(obj['position'], obj['duration'],
796                                   obj['speed'])
797             elif obj_type == 'buffering':
798                 test.set_position(obj['position'], 100)
799             elif obj_type == 'action':
800                 test.add_action_execution(obj)
801                 # Make sure that action is taken into account when checking if process
802                 # is updating
803                 test.position += 1
804             elif obj_type == 'action-done':
805                 # Make sure that action end is taken into account when checking if process
806                 # is updating
807                 test.position += 1
808                 if test.actions_infos:
809                     test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
810             elif obj_type == 'report':
811                 test.add_report(obj)
812             elif obj_type == 'skip-test':
813                 test.set_result(Result.SKIPPED)
814
815
816 class GstValidateTest(Test):
817
818     """ A class representing a particular test. """
819     HARD_TIMEOUT_FACTOR = 5
820     fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
821     needs_gst_inspect = set()
822
823     def __init__(self, application_name, classname,
824                  options, reporter, duration=0,
825                  timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
826                  media_descriptor=None, extra_env_variables=None,
827                  expected_issues=None, workdir=None, **kwargs):
828
829         extra_env_variables = extra_env_variables or {}
830
831         if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
832             if timeout:
833                 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
834             elif duration:
835                 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
836             else:
837                 hard_timeout = None
838
839         # If we are running from source, use the -debug version of the
840         # application which is using rpath instead of libtool's wrappers. It's
841         # slightly faster to start and will not confuse valgrind.
842         debug = '%s-debug' % application_name
843         p = look_for_file_in_source_dir('tools', debug)
844         if p:
845             application_name = p
846
847         self.reports = []
848         self.position = -1
849         self.media_duration = -1
850         self.speed = 1.0
851         self.actions_infos = []
852         self.media_descriptor = media_descriptor
853         self.server = None
854         self.criticals = []
855
856         override_path = self.get_override_file(media_descriptor)
857         if override_path:
858             if extra_env_variables:
859                 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
860                     extra_env_variables[
861                         "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
862
863             extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
864
865         super().__init__(application_name,
866                          classname,
867                          options, reporter,
868                          duration=duration,
869                          timeout=timeout,
870                          hard_timeout=hard_timeout,
871                          extra_env_variables=extra_env_variables,
872                          expected_issues=expected_issues,
873                          workdir=workdir,
874                          **kwargs)
875         if media_descriptor and media_descriptor.get_media_filepath():
876             config_file = os.path.join(media_descriptor.get_media_filepath() + '.config')
877             if os.path.isfile(config_file):
878                 self.add_validate_config(config_file, extra_env_variables)
879
880         if scenario is None or scenario.name.lower() == "none":
881             self.scenario = None
882         else:
883             self.scenario = scenario
884
885     def needs_http_server(self):
886         if self.media_descriptor is None:
887             return False
888
889         protocol = self.media_descriptor.get_protocol()
890         uri = self.media_descriptor.get_uri()
891         uri_requires_http_server = False
892         if uri:
893             if 'http-server-port' in uri:
894                 expanded_uri = uri % {
895                     'http-server-port': self.options.http_server_port}
896                 uri_requires_http_server = expanded_uri.find(
897                     "127.0.0.1:%s" % self.options.http_server_port) != -1
898         if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] or uri_requires_http_server:
899             return True
900
901         return False
902
903     def kill_subprocess(self):
904         Test.kill_subprocess(self)
905
906     def add_report(self, report):
907         self.reports.append(report)
908
909     def set_position(self, position, duration, speed=None):
910         self.position = position
911         self.media_duration = duration
912         if speed:
913             self.speed = speed
914
915     def add_action_execution(self, action_infos):
916         self.actions_infos.append(action_infos)
917
918     def get_override_file(self, media_descriptor):
919         if media_descriptor:
920             if media_descriptor.get_path():
921                 override_path = os.path.splitext(media_descriptor.get_path())[
922                     0] + VALIDATE_OVERRIDE_EXTENSION
923                 if os.path.exists(override_path):
924                     return override_path
925
926         return None
927
928     def get_current_position(self):
929         return self.position
930
931     def get_current_value(self):
932         return self.position
933
934     def get_subproc_env(self):
935         subproc_env = os.environ.copy()
936
937         if self.options.validate_default_config:
938             self.add_validate_config(self.options.validate_default_config,
939                 subproc_env, )
940
941         subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
942         subproc_env["GST_VALIDATE_LOGSDIR"] = self.options.logsdir
943
944         if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
945             gstlogsfile = os.path.splitext(self.logfile)[0] + '.gstdebug'
946             self.extra_logfiles.add(gstlogsfile)
947             subproc_env["GST_DEBUG_FILE"] = gstlogsfile
948
949         if self.options.no_color:
950             subproc_env["GST_DEBUG_NO_COLOR"] = '1'
951
952         # Ensure XInitThreads is called, see bgo#731525
953         subproc_env['GST_GL_XINITTHREADS'] = '1'
954         self.add_env_variable('GST_GL_XINITTHREADS', '1')
955         subproc_env['GST_XINITTHREADS'] = '1'
956         self.add_env_variable('GST_XINITTHREADS', '1')
957
958         if self.scenario is not None:
959             scenario = self.scenario.get_execution_name()
960             subproc_env["GST_VALIDATE_SCENARIO"] = scenario
961             self.add_env_variable("GST_VALIDATE_SCENARIO",
962                                   subproc_env["GST_VALIDATE_SCENARIO"])
963         else:
964             try:
965                 del subproc_env["GST_VALIDATE_SCENARIO"]
966             except KeyError:
967                 pass
968
969         if not subproc_env.get('GST_DEBUG_DUMP_DOT_DIR'):
970             dotfilesdir = os.path.join(self.options.logsdir,
971                                 self.classname.replace(".", os.sep) + '.pipelines_dot_files')
972             mkdir(dotfilesdir)
973             subproc_env['GST_DEBUG_DUMP_DOT_DIR'] = dotfilesdir
974             if CI_ARTIFACTS_URL:
975                 dotfilesurl = CI_ARTIFACTS_URL + os.path.relpath(dotfilesdir,
976                                                                  self.options.logsdir)
977                 subproc_env['GST_VALIDATE_DEBUG_DUMP_DOT_URL'] = dotfilesurl
978
979         return subproc_env
980
981     def clean(self):
982         Test.clean(self)
983         self.reports = []
984         self.position = -1
985         self.media_duration = -1
986         self.speed = 1.0
987         self.actions_infos = []
988
989     def build_arguments(self):
990         super(GstValidateTest, self).build_arguments()
991         if "GST_VALIDATE" in os.environ:
992             self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
993
994         if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
995             self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
996                                   os.environ["GST_VALIDATE_SCENARIOS_PATH"])
997
998         self.add_env_variable("GST_VALIDATE_CONFIG")
999         self.add_env_variable("GST_VALIDATE_OVERRIDE")
1000
1001     def get_extra_log_content(self, extralog):
1002         value = Test.get_extra_log_content(self, extralog)
1003
1004         return value
1005
1006     def report_matches_expected_issues(self, report, expected_issue):
1007         for key in ['bug', 'bugs', 'sometimes']:
1008             if key in expected_issue:
1009                 del expected_issue[key]
1010         for key, value in list(report.items()):
1011             if key in expected_issue:
1012                 if not re.findall(expected_issue[key], str(value)):
1013                     return False
1014                 expected_issue.pop(key)
1015
1016         if "can-happen-several-times" in expected_issue:
1017             expected_issue.pop("can-happen-several-times")
1018         return not bool(expected_issue)
1019
1020     def check_reported_issues(self, expected_issues):
1021         ret = []
1022         expected_retcode = [0]
1023         for report in self.reports:
1024             found = None
1025             for expected_issue in expected_issues:
1026                 if self.report_matches_expected_issues(report,
1027                                                        expected_issue.copy()):
1028                     found = expected_issue
1029                     break
1030
1031             if found is not None:
1032                 if not found.get('can-happen-several-times', False):
1033                     expected_issues.remove(found)
1034                 if report['level'] == 'critical':
1035                     if found.get('sometimes', True) and isinstance(expected_retcode, list):
1036                         expected_retcode.append(18)
1037                     else:
1038                         expected_retcode = [18]
1039             elif report['level'] == 'critical':
1040                 ret.append(report)
1041
1042         if not ret:
1043             return None, expected_issues, expected_retcode
1044
1045         return ret, expected_issues, expected_retcode
1046
1047     def check_expected_issue(self, expected_issue):
1048         res = True
1049         msg = ''
1050         expected_symbols = expected_issue.get('stacktrace_symbols')
1051         if expected_symbols:
1052             trace_gatherer = BackTraceGenerator.get_default()
1053             stack_trace = trace_gatherer.get_trace(self)
1054
1055             if stack_trace:
1056                 if not isinstance(expected_symbols, list):
1057                     expected_symbols = [expected_symbols]
1058
1059                 not_found_symbols = [s for s in expected_symbols
1060                                      if s not in stack_trace]
1061                 if not_found_symbols:
1062                     msg = " Expected symbols '%s' not found in stack trace " % (
1063                         not_found_symbols)
1064                     res = False
1065             else:
1066                 msg += " No stack trace available, could not verify symbols "
1067
1068         _, not_found_expected_issues, _ = self.check_reported_issues(expected_issue.get('issues', []))
1069         if not_found_expected_issues:
1070             mandatory_failures = [f for f in not_found_expected_issues
1071                                   if not f.get('sometimes', True)]
1072             if mandatory_failures:
1073                 msg = " (Expected issues not found: %s) " % mandatory_failures
1074                 res = False
1075
1076         return msg, res
1077
1078     def check_expected_timeout(self, expected_timeout):
1079         msg = "Expected timeout happened. "
1080         result = Result.PASSED
1081         message = expected_timeout.get('message')
1082         if message:
1083             if not re.findall(message, self.message):
1084                 result = Result.FAILED
1085                 msg = "Expected timeout message: %s got %s " % (
1086                     message, self.message)
1087
1088         stack_msg, stack_res = self.check_expected_issue(expected_timeout)
1089         if not stack_res:
1090             result = Result.TIMEOUT
1091             msg += stack_msg
1092
1093         return result, msg
1094
1095     def check_results(self):
1096         if self.result in [Result.FAILED, Result.PASSED, Result.SKIPPED]:
1097             return
1098
1099         self.debug("%s returncode: %s", self, self.process.returncode)
1100         expected_issues = copy.deepcopy(self.expected_issues)
1101         if self.options.rr:
1102             # signal.SIGPPIPE is 13 but it sometimes isn't present in python for some reason.
1103             expected_issues.append({"returncode": -13, "sometimes": True})
1104         self.criticals, not_found_expected_issues, expected_returncode = self.check_reported_issues(expected_issues)
1105         expected_timeout = None
1106         expected_signal = None
1107         for i, f in enumerate(not_found_expected_issues):
1108             returncode = f.get('returncode', [])
1109             if not isinstance(returncode, list):
1110                 returncode = [returncode]
1111
1112             if f.get('signame'):
1113                 signames = f['signame']
1114                 if not isinstance(signames, list):
1115                     signames = [signames]
1116
1117                 returncode = [EXITING_SIGNALS[signame] for signame in signames]
1118
1119             if returncode:
1120                 if 'sometimes' in f:
1121                     returncode.append(0)
1122                 expected_returncode = returncode
1123                 expected_signal = f
1124             elif f.get("timeout"):
1125                 expected_timeout = f
1126
1127         not_found_expected_issues = [f for f in not_found_expected_issues
1128                                      if not f.get('returncode') and not f.get('signame')]
1129
1130         msg = ""
1131         result = Result.PASSED
1132         if self.result == Result.TIMEOUT:
1133             with open(self.logfile) as f:
1134                 signal_fault_info = self.fault_sig_regex.findall(f.read())
1135                 if signal_fault_info:
1136                     result = Result.FAILED
1137                     msg = signal_fault_info[0]
1138                 elif expected_timeout:
1139                     not_found_expected_issues.remove(expected_timeout)
1140                     result, msg = self.check_expected_timeout(expected_timeout)
1141                 else:
1142                     return
1143         elif self.process.returncode in EXITING_SIGNALS:
1144             msg = "Application exited with signal %s" % (
1145                 EXITING_SIGNALS[self.process.returncode])
1146             if self.process.returncode not in expected_returncode:
1147                 result = Result.FAILED
1148             else:
1149                 if expected_signal:
1150                     stack_msg, stack_res = self.check_expected_issue(
1151                         expected_signal)
1152                     if not stack_res:
1153                         msg += stack_msg
1154                         result = Result.FAILED
1155             self.add_stack_trace_to_logfile()
1156         elif self.process.returncode == VALGRIND_ERROR_CODE:
1157             msg = "Valgrind reported errors "
1158             result = Result.FAILED
1159         elif self.process.returncode not in expected_returncode:
1160             msg = "Application returned %s " % self.process.returncode
1161             if expected_returncode != [0]:
1162                 msg += "(expected %s) " % expected_returncode
1163             result = Result.FAILED
1164
1165         if self.criticals:
1166             msg += "(critical errors: [%s]) " % ', '.join(set([c['summary']
1167                                                            for c in self.criticals]))
1168             result = Result.FAILED
1169
1170         if not_found_expected_issues:
1171             mandatory_failures = [f for f in not_found_expected_issues
1172                                   if not f.get('sometimes', True)]
1173
1174             if mandatory_failures:
1175                 msg += " (Expected errors not found: %s) " % mandatory_failures
1176                 result = Result.FAILED
1177         elif self.expected_issues:
1178             msg += ' %s(Expected errors occurred: %s)%s' % (Colors.OKBLUE,
1179                                                             self.expected_issues,
1180                                                             Colors.ENDC)
1181             result = Result.KNOWN_ERROR
1182
1183         if result == Result.PASSED:
1184             for report in self.reports:
1185                 if report["level"] == "expected":
1186                     result = Result.KNOWN_ERROR
1187                     break
1188
1189         self.set_result(result, msg.strip())
1190
1191     def _generate_expected_issues(self):
1192         res = ""
1193         self.criticals = self.criticals or []
1194         if self.result == Result.TIMEOUT:
1195             res += """            {
1196                 'timeout': True,
1197                 'sometimes': True,
1198             },"""
1199
1200         for report in self.criticals:
1201             res += "\n%s{" % (" " * 12)
1202
1203             for key, value in report.items():
1204                 if key == "type":
1205                     continue
1206                 if value is None:
1207                     continue
1208                 res += '\n%s%s"%s": "%s",' % (
1209                     " " * 16, "# " if key == "details" else "",
1210                     key, value.replace('\n', '\\n'))
1211
1212             res += "\n%s}," % (" " * 12)
1213
1214         return res
1215
1216     def get_valgrind_suppressions(self):
1217         result = super(GstValidateTest, self).get_valgrind_suppressions()
1218         result.extend(utils.get_gst_build_valgrind_suppressions())
1219         return result
1220
1221
1222 class VariableFramerateMode(Enum):
1223     DISABLED = 1
1224     ENABLED = 2
1225     AUTO = 3
1226
1227
1228 class GstValidateEncodingTestInterface(object):
1229     DURATION_TOLERANCE = GST_SECOND / 4
1230
1231     def __init__(self, combination, media_descriptor, duration_tolerance=None):
1232         super(GstValidateEncodingTestInterface, self).__init__()
1233
1234         self.media_descriptor = media_descriptor
1235         self.combination = combination
1236         self.dest_file = ""
1237
1238         self._duration_tolerance = duration_tolerance
1239         if duration_tolerance is None:
1240             self._duration_tolerance = self.DURATION_TOLERANCE
1241
1242     def get_current_size(self):
1243         try:
1244             size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
1245         except OSError:
1246             return None
1247
1248         self.debug("Size: %s" % size)
1249         return size
1250
1251     def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1252                           audio_restriction=None, audio_presence=0,
1253                           video_presence=0,
1254                           variable_framerate=VariableFramerateMode.DISABLED):
1255
1256         ret = ""
1257         if muxer:
1258             ret += muxer
1259         ret += ":"
1260         if venc:
1261             if video_restriction is not None:
1262                 ret = ret + video_restriction + '->'
1263             ret += venc
1264             props = ""
1265             if video_presence:
1266                 props += 'presence=%s|' % str(video_presence)
1267             if variable_framerate == VariableFramerateMode.AUTO:
1268                 if video_restriction and "framerate" in video_restriction:
1269                     variable_framerate = VariableFramerateMode.DISABLED
1270                 else:
1271                     variable_framerate = VariableFramerateMode.ENABLED
1272             if variable_framerate == VariableFramerateMode.ENABLED:
1273                 props += 'variable-framerate=true|'
1274             if props:
1275                 ret = ret + '|' + props[:-1]
1276         if aenc:
1277             ret += ":"
1278             if audio_restriction is not None:
1279                 ret = ret + audio_restriction + '->'
1280             ret += aenc
1281             if audio_presence:
1282                 ret = ret + '|' + str(audio_presence)
1283
1284         return ret.replace("::", ":")
1285
1286     def get_profile(self, video_restriction=None, audio_restriction=None,
1287             variable_framerate=VariableFramerateMode.DISABLED):
1288         vcaps = self.combination.get_video_caps()
1289         acaps = self.combination.get_audio_caps()
1290         if video_restriction is None:
1291             video_restriction = self.combination.video_restriction
1292         if audio_restriction is None:
1293             audio_restriction = self.combination.audio_restriction
1294         if self.media_descriptor is not None:
1295             if self.combination.video == "theora":
1296                 # Theoraenc doesn't support variable framerate, make sure to avoid them
1297                 framerate = self.media_descriptor.get_framerate()
1298                 if framerate == Fraction(0, 1):
1299                     framerate = Fraction(30, 1)
1300                     restriction = utils.GstCaps.new_from_str(video_restriction or "video/x-raw")
1301                     for struct, _ in restriction:
1302                         if struct.get("framerate") is None:
1303                             struct.set("framerate", struct.FRACTION_TYPE, framerate)
1304                     video_restriction = str(restriction)
1305
1306             video_presence = self.media_descriptor.get_num_tracks("video")
1307             if video_presence == 0:
1308                 vcaps = None
1309
1310             audio_presence = self.media_descriptor.get_num_tracks("audio")
1311             if audio_presence == 0:
1312                 acaps = None
1313
1314         return self._get_profile_full(self.combination.get_muxer_caps(),
1315                                       vcaps, acaps,
1316                                       audio_presence=audio_presence,
1317                                       video_presence=video_presence,
1318                                       video_restriction=video_restriction,
1319                                       audio_restriction=audio_restriction,
1320                                       variable_framerate=variable_framerate)
1321
1322     def _clean_caps(self, caps):
1323         """
1324         Returns a list of key=value or structure name, without "(types)" or ";" or ","
1325         """
1326         return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1327
1328     # pylint: disable=E1101
1329     def _has_caps_type_variant(self, c, ccaps):
1330         """
1331         Handle situations where we can have application/ogg or video/ogg or
1332         audio/ogg
1333         """
1334         has_variant = False
1335         media_type = re.findall("application/|video/|audio/", c)
1336         if media_type:
1337             media_type = media_type[0].replace('/', '')
1338             possible_mtypes = ["application", "video", "audio"]
1339             possible_mtypes.remove(media_type)
1340             for tmptype in possible_mtypes:
1341                 possible_c_variant = c.replace(media_type, tmptype)
1342                 if possible_c_variant in ccaps:
1343                     self.info(
1344                         "Found %s in %s, good enough!", possible_c_variant, ccaps)
1345                     has_variant = True
1346
1347         return has_variant
1348
1349     # pylint: disable=E1101
1350     def run_iqa_test(self, reference_file_uri):
1351         """
1352         Runs IQA test if @reference_file_path exists
1353         @test: The test to run tests on
1354         """
1355         if not GstValidateBaseTestManager.has_feature('iqa'):
1356             self.debug('Iqa element not present, not running extra test.')
1357             return
1358
1359         pipeline_desc = """
1360             uridecodebin uri=%s !
1361                 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1362             uridecodebin uri=%s ! iqa.
1363         """ % (reference_file_uri, self.dest_file)
1364         pipeline_desc = pipeline_desc.replace("\n", "")
1365
1366         command = [GstValidateBaseTestManager.COMMAND] + \
1367             shlex.split(pipeline_desc)
1368         msg = "## Running IQA tests on results of: " \
1369             + "%s\n### Command: \n```\n%s\n```\n" % (
1370                 self.classname, ' '.join(command))
1371         if not self.options.redirect_logs:
1372             self.out.write(msg)
1373             self.out.flush()
1374         else:
1375             printc(msg, Colors.OKBLUE)
1376
1377         self.process = subprocess.Popen(command,
1378                                         stderr=self.out,
1379                                         stdout=self.out,
1380                                         env=self.proc_env,
1381                                         cwd=self.workdir)
1382         self.process.wait()
1383
1384     def check_encoded_file(self):
1385         result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1386             self.dest_file)
1387         if result_descriptor is None:
1388             return (Result.FAILED, "Could not discover encoded file %s"
1389                     % self.dest_file)
1390
1391         duration = result_descriptor.get_duration()
1392         orig_duration = self.media_descriptor.get_duration()
1393         tolerance = self._duration_tolerance
1394
1395         if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1396             os.remove(result_descriptor.get_path())
1397             self.add_report(
1398                 {
1399                     'type': 'report',
1400                     'issue-id': 'transcoded-file-wrong-duration',
1401                     'summary': 'The duration of a transcoded file doesn\'t match the duration of the original file',
1402                     'level': 'critical',
1403                     'detected-on': 'pipeline',
1404                     'details': "Duration of encoded file is " " wrong (%s instead of %s)" % (
1405                         utils.TIME_ARGS(duration), utils.TIME_ARGS(orig_duration))
1406                 }
1407             )
1408         else:
1409             all_tracks_caps = result_descriptor.get_tracks_caps()
1410             container_caps = result_descriptor.get_caps()
1411             if container_caps:
1412                 all_tracks_caps.insert(0, ("container", container_caps))
1413
1414             for track_type, caps in all_tracks_caps:
1415                 ccaps = self._clean_caps(caps)
1416                 wanted_caps = self.combination.get_caps(track_type)
1417                 cwanted_caps = self._clean_caps(wanted_caps)
1418
1419                 if wanted_caps is None:
1420                     os.remove(result_descriptor.get_path())
1421                     self.add_report(
1422                         {
1423                             'type': 'report',
1424                             'issue-id': 'transcoded-file-wrong-stream-type',
1425                             'summary': 'Expected stream types during transcoding do not match expectations',
1426                             'level': 'critical',
1427                             'detected-on': 'pipeline',
1428                             'details': "Found a track of type %s in the encoded files"
1429                                     " but none where wanted in the encoded profile: %s" % (
1430                                         track_type, self.combination)
1431                         }
1432                     )
1433                     return
1434
1435                 for c in cwanted_caps:
1436                     if c not in ccaps:
1437                         if not self._has_caps_type_variant(c, ccaps):
1438                             os.remove(result_descriptor.get_path())
1439                             self.add_report(
1440                                 {
1441                                     'type': 'report',
1442                                     'issue-id': 'transcoded-file-wrong-caps',
1443                                     'summary': 'Expected stream caps during transcoding do not match expectations',
1444                                     'level': 'critical',
1445                                     'detected-on': 'pipeline',
1446                                     'details': "Field: %s  (from %s) not in caps of the outputted file %s" % (
1447                                         wanted_caps, c, ccaps)
1448                                 }
1449                             )
1450                             return
1451
1452             os.remove(result_descriptor.get_path())
1453
1454
1455 class TestsManager(Loggable):
1456
1457     """ A class responsible for managing tests. """
1458
1459     name = "base"
1460     loading_testsuite = None
1461
1462     def __init__(self):
1463
1464         Loggable.__init__(self)
1465
1466         self.tests = []
1467         self.unwanted_tests = []
1468         self.options = None
1469         self.args = None
1470         self.reporter = None
1471         self.wanted_tests_patterns = []
1472         self.blacklisted_tests_patterns = []
1473         self._generators = []
1474         self.check_testslist = True
1475         self.all_tests = None
1476         self.expected_issues = {}
1477         self.blacklisted_tests = []
1478
1479     def init(self):
1480         return True
1481
1482     def list_tests(self):
1483         return sorted(list(self.tests), key=lambda x: x.classname)
1484
1485     def find_tests(self, classname):
1486         regex = re.compile(classname)
1487         return [test for test in self.list_tests() if regex.findall(test.classname)]
1488
1489     def add_expected_issues(self, expected_issues):
1490         for bugid, failure_def in list(expected_issues.items()):
1491             tests_regexes = []
1492             for test_name_regex in failure_def['tests']:
1493                 regex = re.compile(test_name_regex)
1494                 tests_regexes.append(regex)
1495                 for test in self.tests:
1496                     if regex.findall(test.classname):
1497                         max_retries = failure_def.get('allow_flakiness', failure_def.get('max_retries'))
1498                         if max_retries:
1499                             test.max_retries = int(max_retries)
1500                             self.debug(f"{test.classname} allow {test.max_retries}")
1501                         else:
1502                             for issue in failure_def['issues']:
1503                                 issue['bug'] = bugid
1504                             test.expected_issues.extend(failure_def['issues'])
1505                             self.debug("%s added expected issues from %s" % (
1506                                 test.classname, bugid))
1507             failure_def['tests'] = tests_regexes
1508
1509         self.expected_issues.update(expected_issues)
1510
1511     def add_test(self, test):
1512         if test.generator is None:
1513             test.classname = self.loading_testsuite + '.' + test.classname
1514
1515         for bugid, failure_def in list(self.expected_issues.items()):
1516             failure_def['bug'] = bugid
1517             for regex in failure_def['tests']:
1518                 if regex.findall(test.classname):
1519                     max_retries = failure_def.get('allow_flakiness', failure_def.get('max_retries'))
1520                     if max_retries:
1521                         test.max_retries = int(max_retries)
1522                         self.debug(f"{test.classname} allow {test.max_retries} retries.")
1523                     else:
1524                         for issue in failure_def['issues']:
1525                             issue['bug'] = bugid
1526                         test.expected_issues.extend(failure_def['issues'])
1527                         self.debug("%s added expected issues from %s" % (
1528                             test.classname, bugid))
1529
1530         if self._is_test_wanted(test):
1531             if test not in self.tests:
1532                 self.tests.append(test)
1533         else:
1534             if test not in self.tests:
1535                 self.unwanted_tests.append(test)
1536
1537     def get_tests(self):
1538         return self.tests
1539
1540     def populate_testsuite(self):
1541         pass
1542
1543     def add_generators(self, generators):
1544         """
1545         @generators: A list of, or one single #TestsGenerator to be used to generate tests
1546         """
1547         if not isinstance(generators, list):
1548             generators = [generators]
1549         self._generators.extend(generators)
1550         for generator in generators:
1551             generator.testsuite = self.loading_testsuite
1552
1553         self._generators = list(set(self._generators))
1554
1555     def get_generators(self):
1556         return self._generators
1557
1558     def _add_blacklist(self, blacklisted_tests):
1559         if not isinstance(blacklisted_tests, list):
1560             blacklisted_tests = [blacklisted_tests]
1561
1562         for patterns in blacklisted_tests:
1563             for pattern in patterns.split(","):
1564                 self.blacklisted_tests_patterns.append(re.compile(pattern))
1565
1566     def set_default_blacklist(self, default_blacklist):
1567         for test_regex, reason in default_blacklist:
1568             if not test_regex.startswith(self.loading_testsuite + '.'):
1569                 test_regex = self.loading_testsuite + '.' + test_regex
1570             self.blacklisted_tests.append((test_regex, reason))
1571             self._add_blacklist(test_regex)
1572
1573     def add_options(self, parser):
1574         """ Add more arguments. """
1575         pass
1576
1577     def set_settings(self, options, args, reporter):
1578         """ Set properties after options parsing. """
1579         self.options = options
1580         self.args = args
1581         self.reporter = reporter
1582
1583         self.populate_testsuite()
1584
1585         if self.options.valgrind:
1586             self.print_valgrind_bugs()
1587
1588         if options.wanted_tests:
1589             for patterns in options.wanted_tests:
1590                 for pattern in patterns.split(","):
1591                     self.wanted_tests_patterns.append(re.compile(pattern))
1592
1593         if options.blacklisted_tests:
1594             for patterns in options.blacklisted_tests:
1595                 self._add_blacklist(patterns)
1596
1597     def check_blacklists(self):
1598         if self.options.check_bugs_status:
1599             if not check_bugs_resolution(self.blacklisted_tests):
1600                 return False
1601
1602         return True
1603
1604     def log_blacklists(self):
1605         if self.blacklisted_tests:
1606             self.info("Currently 'hardcoded' %s blacklisted tests:" %
1607                       self.name)
1608
1609         for name, bug in self.blacklisted_tests:
1610             if not self.options.check_bugs_status:
1611                 self.info("  + %s --> bug: %s" % (name, bug))
1612
1613     def check_expected_issues(self):
1614         if not self.expected_issues or not self.options.check_bugs_status:
1615             return True
1616
1617         bugs_definitions = defaultdict(list)
1618         for bug, failure_def in list(self.expected_issues.items()):
1619             tests_names = '|'.join(
1620                 [regex.pattern for regex in failure_def['tests']])
1621             bugs_definitions[tests_names].extend([bug])
1622
1623         return check_bugs_resolution(bugs_definitions.items())
1624
1625     def _check_blacklisted(self, test):
1626         for pattern in self.blacklisted_tests_patterns:
1627             if pattern.findall(test.classname):
1628                 self.info("%s is blacklisted by %s", test.classname, pattern)
1629                 return True
1630
1631         return False
1632
1633     def _check_whitelisted(self, test):
1634         for pattern in self.wanted_tests_patterns:
1635             if pattern.findall(test.classname):
1636                 if self._check_blacklisted(test):
1637                     # If explicitly white listed that specific test
1638                     # bypass the blacklisting
1639                     if pattern.pattern != test.classname:
1640                         return False
1641                 return True
1642         return False
1643
1644     def _check_duration(self, test):
1645         if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1646             self.info("Not activating %s as its duration (%d) is superior"
1647                       " than the long limit (%d)" % (test, test.duration,
1648                                                      int(self.options.long_limit)))
1649             return False
1650
1651         return True
1652
1653     def _is_test_wanted(self, test):
1654         if self._check_whitelisted(test):
1655             if not self._check_duration(test):
1656                 return False
1657             return True
1658
1659         if self._check_blacklisted(test):
1660             return False
1661
1662         if not self._check_duration(test):
1663             return False
1664
1665         if not self.wanted_tests_patterns:
1666             return True
1667
1668         return False
1669
1670     def needs_http_server(self):
1671         return False
1672
1673     def print_valgrind_bugs(self):
1674         pass
1675
1676
1677 class TestsGenerator(Loggable):
1678
1679     def __init__(self, name, test_manager, tests=[]):
1680         Loggable.__init__(self)
1681         self.name = name
1682         self.test_manager = test_manager
1683         self.testsuite = None
1684         self._tests = {}
1685         for test in tests:
1686             self._tests[test.classname] = test
1687
1688     def generate_tests(self, *kwargs):
1689         """
1690         Method that generates tests
1691         """
1692         return list(self._tests.values())
1693
1694     def add_test(self, test):
1695         test.generator = self
1696         test.classname = self.testsuite + '.' + test.classname
1697         self._tests[test.classname] = test
1698
1699
1700 class GstValidateTestsGenerator(TestsGenerator):
1701
1702     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1703         pass
1704
1705     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1706         self.populate_tests(uri_minfo_special_scenarios, scenarios)
1707         return super(GstValidateTestsGenerator, self).generate_tests()
1708
1709
1710 class _TestsLauncher(Loggable):
1711
1712     def __init__(self):
1713
1714         Loggable.__init__(self)
1715
1716         self.options = None
1717         self.testers = []
1718         self.tests = []
1719         self.reporter = None
1720         self._list_testers()
1721         self.all_tests = None
1722         self.wanted_tests_patterns = []
1723
1724         self.queue = queue.Queue()
1725         self.jobs = []
1726         self.total_num_tests = 0
1727         self.current_progress = -1
1728         self.server = None
1729         self.httpsrv = None
1730         self.vfb_server = None
1731
1732     def _list_app_dirs(self):
1733         app_dirs = []
1734         env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1735         if env_dirs is not None:
1736             for dir_ in env_dirs.split(os.pathsep):
1737                 app_dirs.append(dir_)
1738
1739         return app_dirs
1740
1741     def _exec_app(self, app_dir, env):
1742         try:
1743             files = os.listdir(app_dir)
1744         except OSError as e:
1745             self.debug("Could not list %s: %s" % (app_dir, e))
1746             files = []
1747         for f in files:
1748             if f.endswith(".py"):
1749                 exec(compile(open(os.path.join(app_dir, f)).read(),
1750                              os.path.join(app_dir, f), 'exec'), env)
1751
1752     def _exec_apps(self, env):
1753         app_dirs = self._list_app_dirs()
1754         for app_dir in app_dirs:
1755             self._exec_app(app_dir, env)
1756
1757     def _list_testers(self):
1758         env = globals().copy()
1759         self._exec_apps(env)
1760
1761         testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1762         for tester in testers:
1763             if tester.init() is True:
1764                 self.testers.append(tester)
1765             else:
1766                 self.warning("Can not init tester: %s -- PATH is %s"
1767                              % (tester.name, os.environ["PATH"]))
1768
1769     def add_options(self, parser):
1770         for tester in self.testers:
1771             tester.add_options(parser)
1772
1773     def _load_testsuite(self, testsuites):
1774         exceptions = []
1775         for testsuite in testsuites:
1776             try:
1777                 sys.path.insert(0, os.path.dirname(testsuite))
1778                 spec = importlib.util.spec_from_file_location(os.path.basename(testsuite).replace(".py", ""), testsuite)
1779                 module = importlib.util.module_from_spec(spec)
1780                 spec.loader.exec_module(module)
1781                 return (module, None)
1782             except Exception as e:
1783                 exceptions.append("Could not load %s: %s" % (testsuite, e))
1784                 continue
1785             finally:
1786                 sys.path.remove(os.path.dirname(testsuite))
1787
1788         return (None, exceptions)
1789
1790     def _load_testsuites(self):
1791         testsuites = {}
1792         for testsuite in self.options.testsuites:
1793             if testsuite.endswith('.py') and os.path.exists(testsuite):
1794                 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1795                 loaded_module = self._load_testsuite([testsuite])
1796             else:
1797                 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1798                                              for d in self.options.testsuites_dirs]
1799                 loaded_module = self._load_testsuite(possible_testsuites_paths)
1800
1801             module = loaded_module[0]
1802             if not loaded_module[0]:
1803                 if "." in testsuite:
1804                     self.options.testsuites.append(testsuite.split('.')[0])
1805                     self.info("%s looks like a test name, trying that" %
1806                               testsuite)
1807                     self.options.wanted_tests.append(testsuite)
1808                 else:
1809                     if testsuite in testsuites:
1810                         self.info('Testuite %s was loaded previously', testsuite)
1811                         continue
1812                     printc("Could not load testsuite: %s, reasons: %s" % (
1813                         testsuite, loaded_module[1]), Colors.FAIL)
1814                 continue
1815
1816             if module.__name__ in testsuites:
1817                 self.info("Trying to load testsuite '%s' a second time?", module.__name__)
1818                 continue
1819
1820             testsuites[module.__name__] = module
1821             if not hasattr(module, "TEST_MANAGER"):
1822                 module.TEST_MANAGER = [tester.name for tester in self.testers]
1823             elif not isinstance(module.TEST_MANAGER, list):
1824                 module.TEST_MANAGER = [module.TEST_MANAGER]
1825
1826         self.options.testsuites = list(testsuites.values())
1827
1828     def _setup_testsuites(self):
1829         for testsuite in self.options.testsuites:
1830             loaded = False
1831             wanted_test_manager = None
1832             # TEST_MANAGER has been set in _load_testsuites()
1833             assert hasattr(testsuite, "TEST_MANAGER")
1834             wanted_test_manager = testsuite.TEST_MANAGER
1835             if not isinstance(wanted_test_manager, list):
1836                 wanted_test_manager = [wanted_test_manager]
1837
1838             for tester in self.testers:
1839                 if wanted_test_manager is not None and \
1840                         tester.name not in wanted_test_manager:
1841                     continue
1842
1843                 prev_testsuite_name = TestsManager.loading_testsuite
1844                 if self.options.user_paths:
1845                     TestsManager.loading_testsuite = tester.name
1846                     tester.register_defaults()
1847                     loaded = True
1848                 else:
1849                     TestsManager.loading_testsuite = testsuite.__name__
1850                     if testsuite.setup_tests(tester, self.options):
1851                         loaded = True
1852                 if prev_testsuite_name:
1853                     TestsManager.loading_testsuite = prev_testsuite_name
1854
1855             if not loaded:
1856                 printc("Could not load testsuite: %s"
1857                        " maybe because of missing TestManager"
1858                        % (testsuite), Colors.FAIL)
1859                 return False
1860
1861     def _load_config(self, options):
1862         printc("Loading config files is DEPRECATED"
1863                " you should use the new testsuite format now",)
1864
1865         for tester in self.testers:
1866             tester.options = options
1867             globals()[tester.name] = tester
1868         globals()["options"] = options
1869         c__file__ = __file__
1870         globals()["__file__"] = self.options.config
1871         exec(compile(open(self.options.config).read(),
1872                      self.options.config, 'exec'), globals())
1873         globals()["__file__"] = c__file__
1874
1875     def set_settings(self, options, args):
1876         if options.xunit_file:
1877             self.reporter = reporters.XunitReporter(options)
1878         else:
1879             self.reporter = reporters.Reporter(options)
1880
1881         self.options = options
1882         wanted_testers = None
1883         for tester in self.testers:
1884             if tester.name in args:
1885                 wanted_testers = tester.name
1886
1887         if wanted_testers:
1888             testers = self.testers
1889             self.testers = []
1890             for tester in testers:
1891                 if tester.name in args:
1892                     self.testers.append(tester)
1893                     args.remove(tester.name)
1894
1895         if options.config:
1896             self._load_config(options)
1897
1898         self._load_testsuites()
1899         if not self.options.testsuites:
1900             printc("Not testsuite loaded!", Colors.FAIL)
1901             return False
1902
1903         for tester in self.testers:
1904             tester.set_settings(options, args, self.reporter)
1905
1906         if not options.config and options.testsuites:
1907             if self._setup_testsuites() is False:
1908                 return False
1909
1910         if self.options.check_bugs_status:
1911             printc("-> Checking bugs resolution... ", end='')
1912
1913         for tester in self.testers:
1914             if not tester.check_blacklists():
1915                 return False
1916
1917             tester.log_blacklists()
1918
1919             if not tester.check_expected_issues():
1920                 return False
1921
1922         if self.options.check_bugs_status:
1923             printc("OK", Colors.OKGREEN)
1924
1925         if self.needs_http_server() or options.httponly is True:
1926             self.httpsrv = HTTPServer(options)
1927             self.httpsrv.start()
1928
1929         if options.no_display:
1930             self.vfb_server = get_virual_frame_buffer_server(options)
1931             res = self.vfb_server.start()
1932             if res[0] is False:
1933                 printc("Could not start virtual frame server: %s" % res[1],
1934                        Colors.FAIL)
1935                 return False
1936             os.environ["DISPLAY"] = self.vfb_server.display_id
1937
1938         return True
1939
1940     def _check_tester_has_other_testsuite(self, testsuite, tester):
1941         if tester.name != testsuite.TEST_MANAGER[0]:
1942             return True
1943
1944         for t in self.options.testsuites:
1945             if t != testsuite:
1946                 for other_testmanager in t.TEST_MANAGER:
1947                     if other_testmanager == tester.name:
1948                         return True
1949
1950         return False
1951
1952     def _check_defined_tests(self, tester, tests):
1953         if self.options.blacklisted_tests or self.options.wanted_tests:
1954             return
1955
1956         tests_names = [test.classname for test in tests]
1957         testlist_changed = False
1958         for testsuite in self.options.testsuites:
1959             if not self._check_tester_has_other_testsuite(testsuite, tester) \
1960                     and tester.check_testslist:
1961                 try:
1962                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1963                                          'r+')
1964
1965                     know_tests = testlist_file.read().split("\n")
1966                     testlist_file.close()
1967
1968                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1969                                          'w')
1970                 except IOError:
1971                     continue
1972
1973                 optional_out = []
1974                 for test in know_tests:
1975                     if test and test.strip('~') not in tests_names:
1976                         if not test.startswith('~'):
1977                             testlist_changed = True
1978                             printc("Test %s Not in testsuite %s anymore"
1979                                    % (test, testsuite.__file__), Colors.FAIL)
1980                         else:
1981                             optional_out.append((test, None))
1982
1983                 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1984                                      key=lambda x: x[0].strip('~'))
1985
1986                 for tname, test in tests_names:
1987                     if test and test.optional:
1988                         tname = '~' + tname
1989                     testlist_file.write("%s\n" % (tname))
1990                     if tname and tname not in know_tests:
1991                         printc("Test %s is NEW in testsuite %s"
1992                                % (tname, testsuite.__file__),
1993                                Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1994                         testlist_changed = True
1995
1996                 testlist_file.close()
1997                 break
1998
1999         return testlist_changed
2000
2001     def _split_tests(self, num_groups):
2002         groups = [[] for x in range(num_groups)]
2003         group = cycle(groups)
2004         for test in self.tests:
2005             next(group).append(test)
2006         return groups
2007
2008     def list_tests(self):
2009         for tester in self.testers:
2010             if not self._tester_needed(tester):
2011                 continue
2012
2013             tests = tester.list_tests()
2014             if self._check_defined_tests(tester, tests) and \
2015                     self.options.fail_on_testlist_change:
2016                 raise RuntimeError("Unexpected new test in testsuite.")
2017
2018             self.tests.extend(tests)
2019         self.tests.sort(key=lambda test: test.classname)
2020
2021         if self.options.num_parts < 1:
2022             raise RuntimeError("Tests must be split in positive number of parts.")
2023         if self.options.num_parts > len(self.tests):
2024             raise RuntimeError("Cannot have more parts then there exist tests.")
2025         if self.options.part_index < 1 or self.options.part_index > self.options.num_parts:
2026             raise RuntimeError("Part index is out of range")
2027
2028         self.tests = self._split_tests(self.options.num_parts)[self.options.part_index - 1]
2029         return self.tests
2030
2031     def _tester_needed(self, tester):
2032         for testsuite in self.options.testsuites:
2033             if tester.name in testsuite.TEST_MANAGER:
2034                 return True
2035         return False
2036
2037     def server_wrapper(self, ready):
2038         self.server = GstValidateTCPServer(
2039             ('localhost', 0), GstValidateListener)
2040         self.server.socket.settimeout(None)
2041         self.server.launcher = self
2042         self.serverport = self.server.socket.getsockname()[1]
2043         self.info("%s server port: %s" % (self, self.serverport))
2044         ready.set()
2045
2046         self.server.serve_forever(poll_interval=0.05)
2047
2048     def _start_server(self):
2049         self.info("Starting TCP Server")
2050         ready = threading.Event()
2051         self.server_thread = threading.Thread(target=self.server_wrapper,
2052                                               kwargs={'ready': ready})
2053         self.server_thread.start()
2054         ready.wait()
2055         os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
2056
2057     def _stop_server(self):
2058         if self.server:
2059             self.server.shutdown()
2060             self.server_thread.join()
2061             self.server.server_close()
2062             self.server = None
2063
2064     def test_wait(self):
2065         while True:
2066             # Check process every second for timeout
2067             try:
2068                 self.queue.get(timeout=1)
2069             except queue.Empty:
2070                 pass
2071
2072             for test in self.jobs:
2073                 if test.process_update():
2074                     self.jobs.remove(test)
2075                     return test
2076
2077     def tests_wait(self):
2078         try:
2079             test = self.test_wait()
2080             test.check_results()
2081         except KeyboardInterrupt:
2082             for test in self.jobs:
2083                 test.kill_subprocess()
2084             raise
2085
2086         return test
2087
2088     def start_new_job(self, tests_left):
2089         try:
2090             test = tests_left.pop(0)
2091         except IndexError:
2092             return False
2093
2094         test.test_start(self.queue)
2095
2096         self.jobs.append(test)
2097
2098         return True
2099
2100     def print_result(self, current_test_num, test, total_num_tests, retry_on_failures=False):
2101         if test.result not in [Result.PASSED, Result.KNOWN_ERROR] and (not retry_on_failures or test.max_retries):
2102             printc(str(test), color=utils.get_color_for_result(test.result))
2103
2104         length = 80
2105         progress = int(length * current_test_num // total_num_tests)
2106         bar = 'â–ˆ' * progress + '-' * (length - progress)
2107         if is_tty():
2108             printc('\r|%s| [%s/%s]' % (bar, current_test_num, total_num_tests), end='\r')
2109         else:
2110             if progress > self.current_progress:
2111                 self.current_progress = progress
2112                 printc('|%s| [%s/%s]' % (bar, current_test_num, total_num_tests))
2113
2114     def _run_tests(self, running_tests=None, all_alone=False, retry_on_failures=False, total_num_tests=None):
2115         if not self.all_tests:
2116             self.all_tests = self.list_tests()
2117
2118         if not running_tests:
2119             running_tests = self.tests
2120
2121         self.reporter.init_timer()
2122         alone_tests = []
2123         tests = []
2124         for test in running_tests:
2125             if test.is_parallel and not all_alone:
2126                 tests.append(test)
2127             else:
2128                 alone_tests.append(test)
2129
2130         # use max to defend against the case where all tests are alone_tests
2131         max_num_jobs = max(min(self.options.num_jobs, len(tests)), 1)
2132         jobs_running = 0
2133
2134         if self.options.forever and len(tests) < self.options.num_jobs and len(tests):
2135             max_num_jobs = self.options.num_jobs
2136             copied = []
2137             i = 0
2138             while (len(tests) + len(copied)) < max_num_jobs:
2139                 copied.append(tests[i].copy(len(copied) + 1))
2140
2141                 i += 1
2142                 if i >= len(tests):
2143                     i = 0
2144             tests += copied
2145             self.tests += copied
2146
2147         self.total_num_tests = len(self.all_tests)
2148         prefix = "=> Re-r" if total_num_tests else "R"
2149         total_num_tests = total_num_tests if total_num_tests else self.total_num_tests
2150         printc(f"\n{prefix}unning {total_num_tests} tests...", color=Colors.HEADER)
2151         # if order of test execution doesn't matter, shuffle
2152         # the order to optimize cpu usage
2153         if self.options.shuffle:
2154             random.shuffle(tests)
2155             random.shuffle(alone_tests)
2156
2157         current_test_num = 1
2158         to_retry = []
2159         for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
2160             tests_left = list(tests)
2161             for i in range(num_jobs):
2162                 if not self.start_new_job(tests_left):
2163                     break
2164                 jobs_running += 1
2165
2166             while jobs_running != 0:
2167                 test = self.tests_wait()
2168                 jobs_running -= 1
2169                 current_test_num += 1
2170                 res = test.test_end(retry_on_failures=retry_on_failures)
2171                 to_report = True
2172                 if res not in [Result.PASSED, Result.SKIPPED, Result.KNOWN_ERROR]:
2173                     if self.options.forever or self.options.fatal_error:
2174                         self.print_result(current_test_num - 1, test, retry_on_failures=retry_on_failures,
2175                             total_num_tests=total_num_tests)
2176                         self.reporter.after_test(test)
2177                         return False
2178
2179                     if retry_on_failures or test.max_retries and not self.options.no_retry_on_failures:
2180                         if not self.options.redirect_logs:
2181                             test.copy_logfiles()
2182                         to_retry.append(test)
2183
2184                         # Not adding to final report if flakiness is tolerated
2185                         if test.max_retries:
2186                             test.max_retries -= 1
2187                             to_report = False
2188                 self.print_result(current_test_num - 1, test,
2189                     retry_on_failures=retry_on_failures,
2190                     total_num_tests=total_num_tests)
2191                 if to_report:
2192                     self.reporter.after_test(test)
2193                 if self.start_new_job(tests_left):
2194                     jobs_running += 1
2195
2196         if to_retry:
2197             printc("--> Rerunning the following tests to see if they are flaky:", Colors.WARNING)
2198             for test in to_retry:
2199                 test.clean()
2200                 printc(f'  * {test.classname}')
2201             printc('')
2202             self.current_progress = -1
2203             res = self._run_tests(
2204                 to_retry,
2205                 all_alone=True,
2206                 retry_on_failures=False,
2207                 total_num_tests=len(to_retry),
2208             )
2209
2210             return res
2211
2212         return True
2213
2214     def clean_tests(self, stop_server=False):
2215         for test in self.tests:
2216             test.clean()
2217         if stop_server:
2218             self._stop_server()
2219
2220     def run_tests(self):
2221         r = 0
2222         try:
2223             self._start_server()
2224             if self.options.forever:
2225                 r = 1
2226                 while True:
2227                     self.current_progress = -1
2228                     printc("-> Iteration %d" % r, end='\r')
2229
2230                     if not self._run_tests():
2231                         break
2232                     r += 1
2233                     self.clean_tests()
2234                     msg = "-> Iteration %d... %sOK%s" % (r, Colors.OKGREEN, Colors.ENDC)
2235                     printc(msg, end="\r")
2236
2237                 return False
2238             elif self.options.n_runs:
2239                 res = True
2240                 for r in range(self.options.n_runs):
2241                     self.current_progress = -1
2242                     printc("-> Iteration %d" % r, end='\r')
2243                     if not self._run_tests(retry_on_failures=self.options.retry_on_failures):
2244                         res = False
2245                         printc("ERROR", Colors.FAIL, end="\r")
2246                     else:
2247                         printc("OK", Colors.OKGREEN, end="\r")
2248                     self.clean_tests()
2249
2250                 return res
2251             else:
2252                 return self._run_tests(retry_on_failures=self.options.retry_on_failures)
2253         finally:
2254             if self.options.forever:
2255                 printc("\n-> Ran %d times" % r)
2256             if self.httpsrv:
2257                 self.httpsrv.stop()
2258             if self.vfb_server:
2259                 self.vfb_server.stop()
2260             self.clean_tests(True)
2261
2262     def final_report(self):
2263         return self.reporter.final_report()
2264
2265     def needs_http_server(self):
2266         for tester in self.testers:
2267             if tester.needs_http_server():
2268                 return True
2269
2270
2271 class NamedDic(object):
2272
2273     def __init__(self, props):
2274         if props:
2275             for name, value in props.items():
2276                 setattr(self, name, value)
2277
2278
2279 class Scenario(object):
2280
2281     def __init__(self, name, props, path=None):
2282         self.name = name
2283         self.path = path
2284
2285         for prop, value in props:
2286             setattr(self, prop.replace("-", "_"), value)
2287
2288     def get_execution_name(self):
2289         if self.path is not None:
2290             return self.path
2291         else:
2292             return self.name
2293
2294     def seeks(self):
2295         if hasattr(self, "seek"):
2296             return bool(self.seek)
2297
2298         return False
2299
2300     def needs_clock_sync(self):
2301         if hasattr(self, "need_clock_sync"):
2302             return bool(self.need_clock_sync)
2303
2304         return False
2305
2306     def needs_live_content(self):
2307         # Scenarios that can only be used on live content
2308         if hasattr(self, "live_content_required"):
2309             return bool(self.live_content_required)
2310         return False
2311
2312     def compatible_with_live_content(self):
2313         # if a live content is required it's implicitly compatible with
2314         # live content
2315         if self.needs_live_content():
2316             return True
2317         if hasattr(self, "live_content_compatible"):
2318             return bool(self.live_content_compatible)
2319         return False
2320
2321     def get_min_media_duration(self):
2322         if hasattr(self, "min_media_duration"):
2323             return float(self.min_media_duration)
2324
2325         return 0
2326
2327     def does_reverse_playback(self):
2328         if hasattr(self, "reverse_playback"):
2329             return bool(self.reverse_playback)
2330
2331         return False
2332
2333     def get_duration(self):
2334         try:
2335             return float(getattr(self, "duration"))
2336         except AttributeError:
2337             return 0
2338
2339     def get_min_tracks(self, track_type):
2340         try:
2341             return int(getattr(self, "min_%s_track" % track_type))
2342         except AttributeError:
2343             return 0
2344
2345     def __repr__(self):
2346         return "<Scenario %s>" % self.name
2347
2348
2349 class ScenarioManager(Loggable):
2350     _instance = None
2351     system_scenarios = []
2352     special_scenarios = {}
2353
2354     FILE_EXTENSION = "scenario"
2355
2356     def __new__(cls, *args, **kwargs):
2357         if not cls._instance:
2358             cls._instance = super(ScenarioManager, cls).__new__(
2359                 cls, *args, **kwargs)
2360             cls._instance.config = None
2361             cls._instance.discovered = False
2362             Loggable.__init__(cls._instance)
2363
2364         return cls._instance
2365
2366     def find_special_scenarios(self, mfile):
2367         scenarios = []
2368         mfile_bname = os.path.basename(mfile)
2369
2370         for f in os.listdir(os.path.dirname(mfile)):
2371             if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
2372                 scenarios.append(os.path.join(os.path.dirname(mfile), f))
2373
2374         if scenarios:
2375             scenarios = self.discover_scenarios(scenarios, mfile)
2376
2377         return scenarios
2378
2379     def discover_scenarios(self, scenario_paths=[], mfile=None):
2380         """
2381         Discover scenarios specified in scenario_paths or the default ones
2382         if nothing specified there
2383         """
2384         scenarios = []
2385         scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
2386         log_path = os.path.join(self.config.logsdir, "scenarios_discovery.log")
2387         logs = open(log_path, 'w')
2388
2389         try:
2390             command = [GstValidateBaseTestManager.COMMAND,
2391                        "--scenarios-defs-output-file", scenario_defs]
2392             command.extend(scenario_paths)
2393             subprocess.check_call(command, stdout=logs, stderr=logs)
2394         except subprocess.CalledProcessError as e:
2395             self.error(e)
2396             self.error('See %s' % log_path)
2397             pass
2398
2399         config = configparser.RawConfigParser()
2400         f = open(scenario_defs)
2401         config.readfp(f)
2402
2403         for section in config.sections():
2404             name = None
2405             if scenario_paths:
2406                 for scenario_path in scenario_paths:
2407                     if section == scenario_path:
2408                         if mfile is None:
2409                             name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2410                             path = scenario_path
2411                         else:
2412                             # The real name of the scenario is:
2413                             # filename.REALNAME.scenario
2414                             name = scenario_path.replace(mfile + ".", "").replace(
2415                                 "." + self.FILE_EXTENSION, "")
2416                             path = scenario_path
2417                         break
2418             else:
2419                 name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2420                 path = None
2421
2422             assert name
2423
2424             props = config.items(section)
2425             scenario = Scenario(name, props, path)
2426             if scenario_paths:
2427                 self.special_scenarios[path] = scenario
2428             scenarios.append(scenario)
2429
2430         if not scenario_paths:
2431             self.discovered = True
2432             self.system_scenarios.extend(scenarios)
2433
2434         return scenarios
2435
2436     def get_scenario(self, name):
2437         if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2438             scenario = self.special_scenarios.get(name)
2439             if scenario:
2440                 return scenario
2441
2442             scenarios = self.discover_scenarios([name])
2443             self.special_scenarios[name] = scenarios
2444
2445             if scenarios:
2446                 return scenarios[0]
2447
2448         if self.discovered is False:
2449             self.discover_scenarios()
2450
2451         if name is None:
2452             return self.system_scenarios
2453
2454         try:
2455             return [scenario for scenario in self.system_scenarios if scenario.name == name][0]
2456         except IndexError:
2457             self.warning("Scenario: %s not found" % name)
2458             return None
2459
2460
2461 class GstValidateBaseTestManager(TestsManager):
2462     scenarios_manager = ScenarioManager()
2463     features_cache = {}
2464
2465     def __init__(self):
2466         super(GstValidateBaseTestManager, self).__init__()
2467         self._scenarios = []
2468         self._encoding_formats = []
2469
2470     @classmethod
2471     def update_commands(cls, extra_paths=None):
2472         for varname, cmd in {'': 'gst-validate',
2473                              'TRANSCODING_': 'gst-validate-transcoding',
2474                              'MEDIA_CHECK_': 'gst-validate-media-check',
2475                              'RTSP_SERVER_': 'gst-validate-rtsp-server',
2476                              'INSPECT_': 'gst-inspect'}.items():
2477             setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2478
2479     @classmethod
2480     def has_feature(cls, featurename):
2481         try:
2482             return cls.features_cache[featurename]
2483         except KeyError:
2484             pass
2485
2486         try:
2487             subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2488             res = True
2489         except subprocess.CalledProcessError:
2490             res = False
2491
2492         cls.features_cache[featurename] = res
2493         return res
2494
2495     def add_scenarios(self, scenarios):
2496         """
2497         @scenarios A list or a unic scenario name(s) to be run on the tests.
2498                     They are just the default scenarios, and then depending on
2499                     the TestsGenerator to be used you can have more fine grained
2500                     control on what to be run on each series of tests.
2501         """
2502         if isinstance(scenarios, list):
2503             self._scenarios.extend(scenarios)
2504         else:
2505             self._scenarios.append(scenarios)
2506
2507         self._scenarios = list(set(self._scenarios))
2508
2509     def set_scenarios(self, scenarios):
2510         """
2511         Override the scenarios
2512         """
2513         self._scenarios = []
2514         self.add_scenarios(scenarios)
2515
2516     def get_scenarios(self):
2517         return self._scenarios
2518
2519     def add_encoding_formats(self, encoding_formats):
2520         """
2521         :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2522                            formats for transcoding test.
2523                            They are just the default encoding formats, and then depending on
2524                            the TestsGenerator to be used you can have more fine grained
2525                            control on what to be run on each series of tests.
2526         """
2527         if isinstance(encoding_formats, list):
2528             self._encoding_formats.extend(encoding_formats)
2529         else:
2530             self._encoding_formats.append(encoding_formats)
2531
2532         self._encoding_formats = list(set(self._encoding_formats))
2533
2534     def get_encoding_formats(self):
2535         return self._encoding_formats
2536
2537
2538 GstValidateBaseTestManager.update_commands()
2539
2540
2541 class MediaDescriptor(Loggable):
2542
2543     def __init__(self):
2544         Loggable.__init__(self)
2545
2546     def get_path(self):
2547         raise NotImplemented
2548
2549     def has_frames(self):
2550         return False
2551
2552     def get_framerate(self):
2553         for ttype, caps_str in self.get_tracks_caps():
2554             if ttype != "video":
2555                 continue
2556
2557             caps = utils.GstCaps.new_from_str(caps_str)
2558             if not caps:
2559                 self.warning("Could not create caps for %s" % caps_str)
2560                 continue
2561
2562             framerate = caps[0].get("framerate")
2563             if framerate:
2564                 return framerate
2565
2566         return Fraction(0, 1)
2567
2568     def get_media_filepath(self):
2569         raise NotImplemented
2570
2571     def skip_parsers(self):
2572         return False
2573
2574     def get_caps(self):
2575         raise NotImplemented
2576
2577     def get_uri(self):
2578         raise NotImplemented
2579
2580     def get_duration(self):
2581         raise NotImplemented
2582
2583     def get_protocol(self):
2584         raise NotImplemented
2585
2586     def is_seekable(self):
2587         raise NotImplemented
2588
2589     def is_live(self):
2590         raise NotImplemented
2591
2592     def is_image(self):
2593         raise NotImplemented
2594
2595     def get_num_tracks(self, track_type):
2596         raise NotImplemented
2597
2598     def get_tracks_caps(self):
2599         return []
2600
2601     def can_play_reverse(self):
2602         raise NotImplemented
2603
2604     def prerrols(self):
2605         return True
2606
2607     def is_compatible(self, scenario):
2608         if scenario is None:
2609             return True
2610
2611         if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2612             self.debug("Do not run %s as %s does not support seeking",
2613                        scenario, self.get_uri())
2614             return False
2615
2616         if self.is_image() and scenario.needs_clock_sync():
2617             self.debug("Do not run %s as %s is an image",
2618                        scenario, self.get_uri())
2619             return False
2620
2621         if not self.can_play_reverse() and scenario.does_reverse_playback():
2622             return False
2623
2624         if not self.is_live() and scenario.needs_live_content():
2625             self.debug("Do not run %s as %s is not a live content",
2626                        scenario, self.get_uri())
2627             return False
2628
2629         if self.is_live() and not scenario.compatible_with_live_content():
2630             self.debug("Do not run %s as %s is a live content",
2631                        scenario, self.get_uri())
2632             return False
2633
2634         if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2635             return False
2636
2637         if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2638             self.debug(
2639                 "Do not run %s as %s is too short (%i < min media duation : %i",
2640                 scenario, self.get_uri(),
2641                 self.get_duration() / GST_SECOND,
2642                 scenario.get_min_media_duration())
2643             return False
2644
2645         for track_type in ['audio', 'subtitle', 'video']:
2646             if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2647                 self.debug("%s -- %s | At least %s %s track needed  < %s"
2648                            % (scenario, self.get_uri(), track_type,
2649                               scenario.get_min_tracks(track_type),
2650                               self.get_num_tracks(track_type)))
2651                 return False
2652
2653         return True
2654
2655
2656 class GstValidateMediaDescriptor(MediaDescriptor):
2657     # Some extension file for discovering results
2658     SKIPPED_MEDIA_INFO_EXT = "media_info.skipped"
2659     MEDIA_INFO_EXT = "media_info"
2660     PUSH_MEDIA_INFO_EXT = "media_info.push"
2661     STREAM_INFO_EXT = "stream_info"
2662
2663     __all_descriptors = {}
2664
2665     @classmethod
2666     def get(cls, xml_path):
2667         if xml_path in cls.__all_descriptors:
2668             return cls.__all_descriptors[xml_path]
2669         return GstValidateMediaDescriptor(xml_path)
2670
2671     def __init__(self, xml_path):
2672         super(GstValidateMediaDescriptor, self).__init__()
2673
2674         self._media_file_path = None
2675         main_descriptor = self.__all_descriptors.get(xml_path)
2676         if main_descriptor:
2677             self._copy_data_from_main(main_descriptor)
2678         else:
2679             self.__all_descriptors[xml_path] = self
2680
2681             self._xml_path = xml_path
2682             try:
2683                 media_xml = ET.parse(xml_path).getroot()
2684             except xml.etree.ElementTree.ParseError:
2685                 printc("Could not parse %s" % xml_path,
2686                     Colors.FAIL)
2687                 raise
2688             self._extract_data(media_xml)
2689
2690         self.set_protocol(urllib.parse.urlparse(self.get_uri()).scheme)
2691
2692     def skip_parsers(self):
2693         return self._skip_parsers
2694
2695     def has_frames(self):
2696         return self._has_frames
2697
2698     def _copy_data_from_main(self, main_descriptor):
2699         for attr in main_descriptor.__dict__.keys():
2700             setattr(self, attr, getattr(main_descriptor, attr))
2701
2702     def _extract_data(self, media_xml):
2703         # Extract the information we need from the xml
2704         self._caps = media_xml.findall("streams")[0].attrib["caps"]
2705         self._track_caps = []
2706         try:
2707             streams = media_xml.findall("streams")[0].findall("stream")
2708         except IndexError:
2709             pass
2710         else:
2711             for stream in streams:
2712                 self._track_caps.append(
2713                     (stream.attrib["type"], stream.attrib["caps"]))
2714
2715         self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2716         self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2717         self._duration = int(media_xml.attrib["duration"])
2718         self._uri = media_xml.attrib["uri"]
2719         parsed_uri = urllib.parse.urlparse(self.get_uri())
2720         self._protocol = media_xml.get("protocol", parsed_uri.scheme)
2721         if parsed_uri.scheme == "file":
2722             if not os.path.exists(parsed_uri.path) and os.path.exists(self.get_media_filepath()):
2723                 self._uri = "file://" + self.get_media_filepath()
2724         elif parsed_uri.scheme == Protocols.IMAGESEQUENCE:
2725             self._media_file_path = os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(parsed_uri.path))
2726             self._uri = parsed_uri._replace(path=os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(self._media_file_path))).geturl()
2727         self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2728         self._is_live = media_xml.get("live", "false").lower() == "true"
2729         self._is_image = False
2730         for stream in media_xml.findall("streams")[0].findall("stream"):
2731             if stream.attrib["type"] == "image":
2732                 self._is_image = True
2733         self._track_types = []
2734         for stream in media_xml.findall("streams")[0].findall("stream"):
2735             self._track_types.append(stream.attrib["type"])
2736
2737     def __cleanup_media_info_ext(self):
2738         for ext in [self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT,
2739                 self.SKIPPED_MEDIA_INFO_EXT, ]:
2740             if self._xml_path.endswith(ext):
2741                 return self._xml_path[:len(self._xml_path) - (len(ext) + 1)]
2742
2743         assert "Not reached" == None  # noqa
2744
2745     @staticmethod
2746     def new_from_uri(uri, verbose=False, include_frames=False, is_push=False, is_skipped=False):
2747         """
2748             include_frames = 0 # Never
2749             include_frames = 1 # always
2750             include_frames = 2 # if previous file included them
2751
2752         """
2753         media_path = utils.url2path(uri)
2754
2755         ext = GstValidateMediaDescriptor.MEDIA_INFO_EXT
2756         if is_push:
2757             ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT
2758         elif is_skipped:
2759             ext = GstValidateMediaDescriptor.SKIPPED_MEDIA_INFO_EXT
2760         descriptor_path = "%s.%s" % (media_path, ext)
2761         args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2762         if include_frames == 2:
2763             try:
2764                 media_xml = ET.parse(descriptor_path).getroot()
2765                 prev_uri = urllib.parse.urlparse(media_xml.attrib['uri'])
2766                 if prev_uri.scheme == Protocols.IMAGESEQUENCE:
2767                     parsed_uri = urllib.parse.urlparse(uri)
2768                     uri = prev_uri._replace(path=os.path.join(os.path.dirname(parsed_uri.path), os.path.basename(prev_uri.path))).geturl()
2769                 include_frames = bool(int(media_xml.attrib["frame-detection"]))
2770                 if bool(int(media_xml.attrib.get("skip-parsers", 0))):
2771                     args.append("--skip-parsers")
2772             except FileNotFoundError:
2773                 pass
2774         else:
2775             include_frames = bool(include_frames)
2776         args.append(uri)
2777
2778         args.extend(["--output-file", descriptor_path])
2779         if include_frames:
2780             args.extend(["--full"])
2781
2782         if verbose:
2783             printc("Generating media info for %s\n"
2784                    "    Command: '%s'" % (media_path, ' '.join(args)),
2785                    Colors.OKBLUE)
2786
2787         try:
2788             subprocess.check_output(args, stderr=open(os.devnull))
2789         except subprocess.CalledProcessError as e:
2790             if verbose:
2791                 printc("Result: Failed", Colors.FAIL)
2792             else:
2793                 loggable.warning("GstValidateMediaDescriptor",
2794                                  "Exception: %s" % e)
2795             return None
2796
2797         if verbose:
2798             printc("Result: Passed", Colors.OKGREEN)
2799
2800         try:
2801             return GstValidateMediaDescriptor(descriptor_path)
2802         except (IOError, xml.etree.ElementTree.ParseError):
2803             return None
2804
2805     def get_path(self):
2806         return self._xml_path
2807
2808     def need_clock_sync(self):
2809         return Protocols.needs_clock_sync(self.get_protocol())
2810
2811     def get_media_filepath(self):
2812         if self._media_file_path is None:
2813             self._media_file_path = self.__cleanup_media_info_ext()
2814         return self._media_file_path
2815
2816     def get_caps(self):
2817         return self._caps
2818
2819     def get_tracks_caps(self):
2820         return self._track_caps
2821
2822     def get_uri(self):
2823         return self._uri
2824
2825     def get_duration(self):
2826         return self._duration
2827
2828     def set_protocol(self, protocol):
2829         if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2830             self._protocol = Protocols.PUSHFILE
2831         else:
2832             self._protocol = protocol
2833
2834     def get_protocol(self):
2835         return self._protocol
2836
2837     def is_seekable(self):
2838         return self._is_seekable
2839
2840     def is_live(self):
2841         return self._is_live
2842
2843     def can_play_reverse(self):
2844         return True
2845
2846     def is_image(self):
2847         return self._is_image
2848
2849     def get_num_tracks(self, track_type):
2850         n = 0
2851         for t in self._track_types:
2852             if t == track_type:
2853                 n += 1
2854
2855         return n
2856
2857     def get_clean_name(self):
2858         name = os.path.basename(self.get_path())
2859         regex = '|'.join(['\\.%s$' % ext for ext in [self.SKIPPED_MEDIA_INFO_EXT, self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT]])
2860         name = re.sub(regex, "", name)
2861
2862         return name.replace('.', "_")
2863
2864
2865 class MediaFormatCombination(object):
2866     FORMATS = {"aac": "audio/mpeg,mpegversion=4",  # Audio
2867                "ac3": "audio/x-ac3",
2868                "vorbis": "audio/x-vorbis",
2869                "mp3": "audio/mpeg,mpegversion=1,layer=3",
2870                "opus": "audio/x-opus",
2871                "rawaudio": "audio/x-raw",
2872
2873                # Video
2874                "h264": "video/x-h264",
2875                "h265": "video/x-h265",
2876                "vp8": "video/x-vp8",
2877                "vp9": "video/x-vp9",
2878                "theora": "video/x-theora",
2879                "prores": "video/x-prores",
2880                "jpeg": "image/jpeg",
2881
2882                # Containers
2883                "webm": "video/webm",
2884                "ogg": "application/ogg",
2885                "mkv": "video/x-matroska",
2886                "mp4": "video/quicktime,variant=iso;",
2887                "quicktime": "video/quicktime;"}
2888
2889     def __str__(self):
2890         return "%s and %s in %s" % (self.audio, self.video, self.container)
2891
2892     def __init__(self, container, audio, video, duration_factor=1,
2893             video_restriction=None, audio_restriction=None):
2894         """
2895         Describes a media format to be used for transcoding tests.
2896
2897         :param container: A string defining the container format to be used, must bin in self.FORMATS
2898         :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2899         :param video: A string defining the video format to be used, must bin in self.FORMATS
2900         """
2901         self.container = container
2902         self.audio = audio
2903         self.video = video
2904         self.video_restriction = video_restriction
2905         self.audio_restriction = audio_restriction
2906
2907     def get_caps(self, track_type):
2908         try:
2909             return self.FORMATS[self.__dict__[track_type]]
2910         except KeyError:
2911             return None
2912
2913     def get_audio_caps(self):
2914         return self.get_caps("audio")
2915
2916     def get_video_caps(self):
2917         return self.get_caps("video")
2918
2919     def get_muxer_caps(self):
2920         return self.get_caps("container")