validate: add config file support
[platform/upstream/gstreamer.git] / validate / launcher / baseclasses.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19
20 """ Class representing tests and test managers. """
21
22 import importlib.util
23 import json
24 import os
25 import sys
26 import re
27 import copy
28 import shlex
29 import socketserver
30 import struct
31 import time
32 from . import utils
33 import signal
34 import urllib.parse
35 import subprocess
36 import threading
37 import queue
38 import configparser
39 import xml
40 import random
41 import shutil
42 import uuid
43 from itertools import cycle
44 from fractions import Fraction
45
46 from .utils import which
47 from . import reporters
48 from . import loggable
49 from .loggable import Loggable
50
51 from collections import defaultdict
52 try:
53     from lxml import etree as ET
54 except ImportError:
55     import xml.etree.cElementTree as ET
56
57
58 from .vfb_server import get_virual_frame_buffer_server
59 from .httpserver import HTTPServer
60 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
61     Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
62     check_bugs_resolution, is_tty
63
64 # The factor by which we increase the hard timeout when running inside
65 # Valgrind
66 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
67 RR_TIMEOUT_FACTOR = 2
68 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
69 # The error reported by valgrind when detecting errors
70 VALGRIND_ERROR_CODE = 20
71
72 VALIDATE_OVERRIDE_EXTENSION = ".override"
73 EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
74     'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
75     'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
76 EXITING_SIGNALS.update({139: "SIGSEGV"})
77 EXITING_SIGNALS.update({(v, k) for k, v in EXITING_SIGNALS.items()})
78
79
80 CI_ARTIFACTS_URL = os.environ.get('CI_ARTIFACTS_URL')
81
82
83 class Test(Loggable):
84
85     """ A class representing a particular test. """
86
87     def __init__(self, application_name, classname, options,
88                  reporter, duration=0, timeout=DEFAULT_TIMEOUT,
89                  hard_timeout=None, extra_env_variables=None,
90                  expected_issues=None, is_parallel=True,
91                  workdir=None):
92         """
93         @timeout: The timeout during which the value return by get_current_value
94                   keeps being exactly equal
95         @hard_timeout: Max time the test can take in absolute
96         """
97         Loggable.__init__(self)
98         self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
99         if hard_timeout:
100             self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
101             self.hard_timeout *= options.timeout_factor
102         else:
103             self.hard_timeout = hard_timeout
104         self.classname = classname
105         self.options = options
106         self.application = application_name
107         self.command = []
108         self.server_command = None
109         self.reporter = reporter
110         self.process = None
111         self.proc_env = None
112         self.thread = None
113         self.queue = None
114         self.duration = duration
115         self.stack_trace = None
116         self._uuid = None
117         if expected_issues is None:
118             self.expected_issues = []
119         elif not isinstance(expected_issues, list):
120             self.expected_issues = [expected_issues]
121         else:
122             self.expected_issues = expected_issues
123
124         extra_env_variables = extra_env_variables or {}
125         self.extra_env_variables = extra_env_variables
126         self.optional = False
127         self.is_parallel = is_parallel
128         self.generator = None
129         self.workdir = workdir
130         self.allow_flakiness = False
131         self.html_log = None
132         self.rr_logdir = None
133
134         self.clean()
135
136     def _generate_expected_issues(self):
137         return ''
138
139     def generate_expected_issues(self):
140         res = '%s"FIXME \'%s\' issues [REPORT A BUG ' % (" " * 4, self.classname) \
141             + 'in https://gitlab.freedesktop.org/gstreamer/ '\
142             + 'or use a proper bug description]": {'
143         res += """
144         "tests": [
145             "%s"
146         ],
147         "issues": [""" % (self.classname)
148
149         retcode = self.process.returncode if self.process else 0
150         if retcode != 0:
151             signame = EXITING_SIGNALS.get(retcode)
152             val = "'" + signame + "'" if signame else retcode
153             res += """\n            {
154                 '%s': %s,
155                 'sometimes': True,
156             },""" % ("signame" if signame else "returncode", val)
157
158         res += self._generate_expected_issues()
159         res += "\n%s],\n%s},\n" % (" " * 8, " " * 4)
160
161         return res
162
163     def copy(self, nth=None):
164         copied_test = copy.copy(self)
165         if nth:
166             copied_test.classname += '_it' + str(nth)
167             copied_test.options = copy.copy(self.options)
168             copied_test.options.logsdir = os.path.join(copied_test.options.logsdir, str(nth))
169             os.makedirs(copied_test.options.logsdir, exist_ok=True)
170
171         return copied_test
172
173     def clean(self):
174         self.kill_subprocess()
175         self.message = ""
176         self.error_str = ""
177         self.time_taken = 0.0
178         self._starting_time = None
179         self.result = Result.NOT_RUN
180         self.logfile = None
181         self.out = None
182         self.extra_logfiles = set()
183         self.__env_variable = []
184         self.kill_subprocess()
185         self.process = None
186
187     def __str__(self):
188         string = self.classname
189         if self.result != Result.NOT_RUN:
190             string += ": " + self.result
191             if self.result in [Result.FAILED, Result.TIMEOUT]:
192                 string += " '%s'" % self.message
193                 if not self.options.dump_on_failure:
194                     if not self.options.redirect_logs and self.result != Result.PASSED:
195                         string += self.get_logfile_repr()
196                 else:
197                     string = "\n==> %s" % string
198
199         return string
200
201     def add_env_variable(self, variable, value=None):
202         """
203         Only useful so that the gst-validate-launcher can print the exact
204         right command line to reproduce the tests
205         """
206         if value is None:
207             value = os.environ.get(variable, None)
208
209         if value is None:
210             return
211
212         self.__env_variable.append(variable)
213
214     @property
215     def _env_variable(self):
216         res = ""
217         if not self.options.verbose or self.options.verbose > 1:
218             for var in set(self.__env_variable):
219                 if res:
220                     res += " "
221                 value = self.proc_env.get(var, None)
222                 if value is not None:
223                     res += "%s='%s'" % (var, value)
224         else:
225             res += "[Not displaying environment variables, rerun with -vv for the full command]"
226
227         return res
228
229     def open_logfile(self):
230         if self.out:
231             return
232
233         path = os.path.join(self.options.logsdir,
234                             self.classname.replace(".", os.sep) + '.md')
235         mkdir(os.path.dirname(path))
236         self.logfile = path
237
238         if self.options.redirect_logs == 'stdout':
239             self.out = sys.stdout
240         elif self.options.redirect_logs == 'stderr':
241             self.out = sys.stderr
242         else:
243             self.out = open(path, 'w+')
244
245     def finalize_logfiles(self):
246         self.out.write("\n**Duration**: %s" % self.time_taken)
247         if not self.options.redirect_logs:
248             self.out.flush()
249             for logfile in self.extra_logfiles:
250                 # Only copy over extra logfile content if it's below a certain threshold
251                 # Avoid copying gigabytes of data if a lot of debugging is activated
252                 if os.path.getsize(logfile) < 500 * 1024:
253                     self.out.write('\n\n## %s:\n\n```\n%s\n```\n' % (
254                         os.path.basename(logfile), self.get_extra_log_content(logfile))
255                     )
256                 else:
257                     self.out.write('\n\n## %s:\n\n**Log file too big.**\n  %s\n\n Check file content directly\n\n' % (
258                         os.path.basename(logfile), logfile)
259                     )
260
261             if self.rr_logdir:
262                 self.out.write('\n\n## rr trace:\n\n```\nrr replay %s/latest-trace\n```\n' % (
263                     self.rr_logdir))
264
265             self.out.flush()
266             self.out.close()
267
268         if self.options.html:
269             self.html_log = os.path.splitext(self.logfile)[0] + '.html'
270             import commonmark
271             parser = commonmark.Parser()
272             with open(self.logfile) as f:
273                 ast = parser.parse(f.read())
274
275             renderer = commonmark.HtmlRenderer()
276             html = renderer.render(ast)
277             with open(self.html_log, 'w') as f:
278                 f.write(html)
279
280         self.out = None
281
282     def _get_file_content(self, file_name):
283         f = open(file_name, 'r+')
284         value = f.read()
285         f.close()
286
287         return value
288
289     def get_log_content(self):
290         return self._get_file_content(self.logfile)
291
292     def get_extra_log_content(self, extralog):
293         if extralog not in self.extra_logfiles:
294             return ""
295
296         return self._get_file_content(extralog)
297
298     def get_classname(self):
299         name = self.classname.split('.')[-1]
300         classname = self.classname.replace('.%s' % name, '')
301
302         return classname
303
304     def get_name(self):
305         return self.classname.split('.')[-1]
306
307     def get_uuid(self):
308         if self._uuid is None:
309             self._uuid = self.classname + str(uuid.uuid4())
310         return self._uuid
311
312     def add_arguments(self, *args):
313         self.command += args
314
315     def build_arguments(self):
316         self.add_env_variable("LD_PRELOAD")
317         self.add_env_variable("DISPLAY")
318
319     def add_stack_trace_to_logfile(self):
320         self.debug("Adding stack trace")
321         if self.options.rr:
322             return
323
324         trace_gatherer = BackTraceGenerator.get_default()
325         stack_trace = trace_gatherer.get_trace(self)
326
327         if not stack_trace:
328             return
329
330         info = "\n\n## Stack trace\n\n```\n%s\n```" % stack_trace
331         if self.options.redirect_logs:
332             print(info)
333             return
334
335         if self.options.xunit_file:
336             self.stack_trace = stack_trace
337
338         self.out.write(info)
339         self.out.flush()
340
341     def add_known_issue_information(self):
342         if self.expected_issues:
343             info = "\n\n## Already known issues\n\n``` python\n%s\n```\n\n" % (
344                 json.dumps(self.expected_issues, indent=4)
345             )
346         else:
347             info = ""
348
349         info += "\n\n**You can mark the issues as 'known' by adding the " \
350             + " following lines to the list of known issues**\n" \
351             + "\n\n``` python\n%s\n```" % (self.generate_expected_issues())
352
353         if self.options.redirect_logs:
354             print(info)
355             return
356
357         self.out.write(info)
358
359     def set_result(self, result, message="", error=""):
360
361         if not self.options.redirect_logs:
362             self.out.write("\n```\n")
363             self.out.flush()
364
365         self.debug("Setting result: %s (message: %s, error: %s)" % (result,
366                                                                     message, error))
367
368         if result is Result.TIMEOUT:
369             if self.options.debug is True:
370                 if self.options.gdb:
371                     printc("Timeout, you should process <ctrl>c to get into gdb",
372                            Colors.FAIL)
373                     # and wait here until gdb exits
374                     self.process.communicate()
375                 else:
376                     pname = self.command[0]
377                     input("%sTimeout happened  on %s you can attach gdb doing:\n $gdb %s %d%s\n"
378                           "Press enter to continue" % (Colors.FAIL, self.classname,
379                           pname, self.process.pid, Colors.ENDC))
380             else:
381                 self.add_stack_trace_to_logfile()
382
383         self.result = result
384         self.message = message
385         self.error_str = error
386
387         if result not in [Result.PASSED, Result.NOT_RUN, Result.SKIPPED]:
388             self.add_known_issue_information()
389
390     def check_results(self):
391         if self.result is Result.FAILED or self.result is Result.TIMEOUT:
392             return
393
394         self.debug("%s returncode: %s", self, self.process.returncode)
395         if self.options.rr and self.process.returncode == -signal.SIGPIPE:
396             self.set_result(Result.SKIPPED, "SIGPIPE received under `rr`, known issue.")
397         elif self.process.returncode == 0:
398             self.set_result(Result.PASSED)
399         elif self.process.returncode in EXITING_SIGNALS:
400             self.add_stack_trace_to_logfile()
401             self.set_result(Result.FAILED,
402                             "Application exited with signal %s" % (
403                                 EXITING_SIGNALS[self.process.returncode]))
404         elif self.process.returncode == VALGRIND_ERROR_CODE:
405             self.set_result(Result.FAILED, "Valgrind reported errors")
406         else:
407             self.set_result(Result.FAILED,
408                             "Application returned %d" % (self.process.returncode))
409
410     def get_current_value(self):
411         """
412         Lets subclasses implement a nicer timeout measurement method
413         They should return some value with which we will compare
414         the previous and timeout if they are egual during self.timeout
415         seconds
416         """
417         return Result.NOT_RUN
418
419     def process_update(self):
420         """
421         Returns True when process has finished running or has timed out.
422         """
423
424         if self.process is None:
425             # Process has not started running yet
426             return False
427
428         self.process.poll()
429         if self.process.returncode is not None:
430             return True
431
432         val = self.get_current_value()
433
434         self.debug("Got value: %s" % val)
435         if val is Result.NOT_RUN:
436             # The get_current_value logic is not implemented... dumb
437             # timeout
438             if time.time() - self.last_change_ts > self.timeout:
439                 self.set_result(Result.TIMEOUT,
440                                 "Application timed out: %s secs" %
441                                 self.timeout,
442                                 "timeout")
443                 return True
444             return False
445         elif val is Result.FAILED:
446             return True
447         elif val is Result.KNOWN_ERROR:
448             return True
449
450         self.log("New val %s" % val)
451
452         if val == self.last_val:
453             delta = time.time() - self.last_change_ts
454             self.debug("%s: Same value for %d/%d seconds" %
455                        (self, delta, self.timeout))
456             if delta > self.timeout:
457                 self.set_result(Result.TIMEOUT,
458                                 "Application timed out: %s secs" %
459                                 self.timeout,
460                                 "timeout")
461                 return True
462         elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
463             self.set_result(
464                 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
465             return True
466         else:
467             self.last_change_ts = time.time()
468             self.last_val = val
469
470         return False
471
472     def get_subproc_env(self):
473         return os.environ.copy()
474
475     def kill_subprocess(self):
476         subprocs_id = None
477         if self.options.rr and self.process and self.process.returncode is None:
478             cmd = ["ps", "-o", "pid", "--ppid", str(self.process.pid), "--noheaders"]
479             try:
480                 subprocs_id = [int(pid.strip('\n')) for
481                     pid in subprocess.check_output(cmd).decode().split(' ') if pid]
482             except FileNotFoundError:
483                 self.error("Ps not found, will probably not be able to get rr "
484                     "working properly after we kill the process")
485             except subprocess.CalledProcessError as e:
486                 self.error("Couldn't get rr subprocess pid: %s" % (e))
487
488         utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT, subprocs_id)
489
490     def run_external_checks(self):
491         pass
492
493     def thread_wrapper(self):
494         def enable_sigint():
495             # Restore the SIGINT handler for the child process (gdb) to ensure
496             # it can handle it.
497             signal.signal(signal.SIGINT, signal.SIG_DFL)
498
499         if self.options.gdb and os.name != "nt":
500             preexec_fn = enable_sigint
501         else:
502             preexec_fn = None
503
504         self.process = subprocess.Popen(self.command,
505                                         stderr=self.out,
506                                         stdout=self.out,
507                                         env=self.proc_env,
508                                         cwd=self.workdir,
509                                         preexec_fn=preexec_fn)
510         self.process.wait()
511         if self.result is not Result.TIMEOUT:
512             if self.process.returncode == 0:
513                 self.run_external_checks()
514             self.queue.put(None)
515
516     def get_valgrind_suppression_file(self, subdir, name):
517         p = get_data_file(subdir, name)
518         if p:
519             return p
520
521         self.error("Could not find any %s file" % name)
522
523     def get_valgrind_suppressions(self):
524         return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
525
526     def use_gdb(self, command):
527         if self.hard_timeout is not None:
528             self.hard_timeout *= GDB_TIMEOUT_FACTOR
529         self.timeout *= GDB_TIMEOUT_FACTOR
530
531         if not self.options.gdb_non_stop:
532             self.timeout = sys.maxsize
533             self.hard_timeout = sys.maxsize
534
535         args = ["gdb"]
536         if self.options.gdb_non_stop:
537             args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
538         args += ["--args"] + command
539         return args
540
541     def use_rr(self, command, subenv):
542         command = ["rr", 'record', '-h'] + command
543
544         self.timeout *= RR_TIMEOUT_FACTOR
545         self.rr_logdir = os.path.join(self.options.logsdir, self.classname.replace(".", os.sep), 'rr-logs')
546         subenv['_RR_TRACE_DIR'] = self.rr_logdir
547         try:
548             shutil.rmtree(self.rr_logdir, ignore_errors=False, onerror=None)
549         except FileNotFoundError:
550             pass
551         self.add_env_variable('_RR_TRACE_DIR', self.rr_logdir)
552
553         return command
554
555     def use_valgrind(self, command, subenv):
556         vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
557         self.extra_logfiles.add(vglogsfile)
558
559         vg_args = []
560
561         for o, v in [('trace-children', 'yes'),
562                      ('tool', 'memcheck'),
563                      ('leak-check', 'full'),
564                      ('leak-resolution', 'high'),
565                      # TODO: errors-for-leak-kinds should be set to all instead of definite
566                      # and all false positives should be added to suppression
567                      # files.
568                      ('errors-for-leak-kinds', 'definite,indirect'),
569                      ('show-leak-kinds', 'definite,indirect'),
570                      ('show-possibly-lost', 'no'),
571                      ('num-callers', '20'),
572                      ('error-exitcode', str(VALGRIND_ERROR_CODE)),
573                      ('gen-suppressions', 'all')]:
574             vg_args.append("--%s=%s" % (o, v))
575
576         if not self.options.redirect_logs:
577             vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
578             self.extra_logfiles.add(vglogsfile)
579             vg_args.append("--%s=%s" % ('log-file', vglogsfile))
580
581         for supp in self.get_valgrind_suppressions():
582             vg_args.append("--suppressions=%s" % supp)
583
584         command = ["valgrind"] + vg_args + command
585
586         # Tune GLib's memory allocator to be more valgrind friendly
587         subenv['G_DEBUG'] = 'gc-friendly'
588         subenv['G_SLICE'] = 'always-malloc'
589
590         if self.hard_timeout is not None:
591             self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
592         self.timeout *= VALGRIND_TIMEOUT_FACTOR
593
594         # Enable 'valgrind.config'
595         self.add_validate_config(get_data_file(
596             'data', 'valgrind.config'), subenv)
597         if subenv == self.proc_env:
598             self.add_env_variable('G_DEBUG', 'gc-friendly')
599             self.add_env_variable('G_SLICE', 'always-malloc')
600             self.add_env_variable('GST_VALIDATE_CONFIG',
601                                   self.proc_env['GST_VALIDATE_CONFIG'])
602
603         return command
604
605     def add_validate_config(self, config, subenv=None):
606         if not subenv:
607             subenv = self.extra_env_variables
608
609         cconf = subenv.get('GST_VALIDATE_CONFIG', "")
610         paths = [c for c in cconf.split(os.pathsep) if c] + [config]
611         subenv['GST_VALIDATE_CONFIG'] = os.pathsep.join(paths)
612
613     def launch_server(self):
614         return None
615
616     def get_logfile_repr(self):
617         if not self.options.redirect_logs:
618             if self.html_log:
619                 log = self.html_log
620             else:
621                 log = self.logfile
622
623             if CI_ARTIFACTS_URL:
624                 log = CI_ARTIFACTS_URL + os.path.relpath(log, self.options.logsdir)
625
626             return "\n    Log: %s" % (log)
627
628         return ""
629
630     def get_command_repr(self):
631         message = "%s %s" % (self._env_variable, ' '.join(
632             shlex.quote(arg) for arg in self.command))
633         if self.server_command:
634             message = "%s & %s" % (self.server_command, message)
635
636         return message
637
638     def test_start(self, queue):
639         self.open_logfile()
640
641         self.server_command = self.launch_server()
642         self.queue = queue
643         self.command = [self.application]
644         self._starting_time = time.time()
645         self.build_arguments()
646         self.proc_env = self.get_subproc_env()
647
648         for var, value in list(self.extra_env_variables.items()):
649             value = self.proc_env.get(var, '') + os.pathsep + value
650             self.proc_env[var] = value.strip(os.pathsep)
651             self.add_env_variable(var, self.proc_env[var])
652
653         if self.options.gdb:
654             self.command = self.use_gdb(self.command)
655
656             self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
657             # Make the gst-validate executable ignore SIGINT while gdb is
658             # running.
659             signal.signal(signal.SIGINT, signal.SIG_IGN)
660
661         if self.options.valgrind:
662             self.command = self.use_valgrind(self.command, self.proc_env)
663
664         if self.options.rr:
665             self.command = self.use_rr(self.command, self.proc_env)
666
667         if not self.options.redirect_logs:
668             self.out.write("# `%s`\n\n"
669                            "## Command\n\n``` bash\n%s\n```\n\n" % (
670                                self.classname, self.get_command_repr()))
671             self.out.write("## %s output\n\n``` \n\n" % os.path.basename(self.application))
672             self.out.flush()
673         else:
674             message = "Launching: %s%s\n" \
675                 "    Command: %s\n" % (Colors.ENDC, self.classname,
676                                        self.get_command_repr())
677             printc(message, Colors.OKBLUE)
678
679         self.thread = threading.Thread(target=self.thread_wrapper)
680         self.thread.start()
681
682         self.last_val = 0
683         self.last_change_ts = time.time()
684         self.start_ts = time.time()
685
686     def _dump_log_file(self, logfile):
687         if which('bat'):
688             try:
689                 subprocess.check_call(['bat', '-H', '1', '--paging=never', logfile])
690                 return
691             except (subprocess.CalledProcessError, FileNotFoundError):
692                 pass
693
694         with open(logfile, 'r') as fin:
695             for line in fin.readlines():
696                 print('> ' + line, end='')
697
698     def _dump_log_files(self):
699         self._dump_log_file(self.logfile)
700
701     def copy_logfiles(self, extra_folder="flaky_tests"):
702         path = os.path.dirname(os.path.join(self.options.logsdir, extra_folder,
703                             self.classname.replace(".", os.sep)))
704         mkdir(path)
705         self.logfile = shutil.copy(self.logfile, path)
706         extra_logs = []
707         for logfile in self.extra_logfiles:
708             extra_logs.append(shutil.copy(logfile, path))
709         self.extra_logfiles = extra_logs
710
711     def test_end(self, retry_on_failure=False):
712         self.kill_subprocess()
713         self.thread.join()
714         self.time_taken = time.time() - self._starting_time
715
716         if self.options.gdb:
717             signal.signal(signal.SIGINT, self.previous_sigint_handler)
718
719         self.finalize_logfiles()
720         message = None
721         end = "\n"
722
723         if self.options.dump_on_failure:
724             if self.result not in [Result.PASSED, Result.KNOWN_ERROR, Result.NOT_RUN]:
725                 self._dump_log_files()
726
727         # Only keep around env variables we need later
728         clean_env = {}
729         for n in self.__env_variable:
730             clean_env[n] = self.proc_env.get(n, None)
731         self.proc_env = clean_env
732
733         # Don't keep around JSON report objects, they were processed
734         # in check_results already
735         self.reports = []
736
737         return self.result
738
739
740 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
741     pass
742
743
744 class GstValidateListener(socketserver.BaseRequestHandler, Loggable):
745
746     def __init__(self, *args, **kwargs):
747         super().__init__(*args, **kwargs)
748         Loggable.__init__(self, "GstValidateListener")
749
750     def handle(self):
751         """Implements BaseRequestHandler handle method"""
752         test = None
753         self.logCategory = "GstValidateListener"
754         while True:
755             raw_len = self.request.recv(4)
756             if raw_len == b'':
757                 return
758             msglen = struct.unpack('>I', raw_len)[0]
759             e = None
760             raw_msg = bytes()
761             while msglen != len(raw_msg):
762                 raw_msg += self.request.recv(msglen - len(raw_msg))
763             if e is not None:
764                 continue
765             try:
766                 msg = raw_msg.decode('utf-8', 'ignore')
767             except UnicodeDecodeError as e:
768                 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
769                 continue
770
771             if msg == '':
772                 return
773
774             try:
775                 obj = json.loads(msg)
776             except json.decoder.JSONDecodeError as e:
777                 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
778                 continue
779
780             if test is None:
781                 # First message must contain the uuid
782                 uuid = obj.get("uuid", None)
783                 if uuid is None:
784                     return
785                 # Find test from launcher
786                 for t in self.server.launcher.tests:
787                     if uuid == t.get_uuid():
788                         test = t
789                         break
790                 if test is None:
791                     self.server.launcher.error(
792                         "Could not find test for UUID %s" % uuid)
793                     return
794
795             obj_type = obj.get("type", '')
796             if obj_type == 'position':
797                 test.set_position(obj['position'], obj['duration'],
798                                   obj['speed'])
799             elif obj_type == 'buffering':
800                 test.set_position(obj['position'], 100)
801             elif obj_type == 'action':
802                 test.add_action_execution(obj)
803                 # Make sure that action is taken into account when checking if process
804                 # is updating
805                 test.position += 1
806             elif obj_type == 'action-done':
807                 # Make sure that action end is taken into account when checking if process
808                 # is updating
809                 test.position += 1
810                 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
811             elif obj_type == 'report':
812                 test.add_report(obj)
813             elif obj_type == 'skip-test':
814                 test.set_result(Result.SKIPPED)
815
816
817 class GstValidateTest(Test):
818
819     """ A class representing a particular test. """
820     HARD_TIMEOUT_FACTOR = 5
821     fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
822     needs_gst_inspect = set()
823
824     def __init__(self, application_name, classname,
825                  options, reporter, duration=0,
826                  timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
827                  media_descriptor=None, extra_env_variables=None,
828                  expected_issues=None, workdir=None):
829
830         extra_env_variables = extra_env_variables or {}
831
832         if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
833             if timeout:
834                 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
835             elif duration:
836                 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
837             else:
838                 hard_timeout = None
839
840         # If we are running from source, use the -debug version of the
841         # application which is using rpath instead of libtool's wrappers. It's
842         # slightly faster to start and will not confuse valgrind.
843         debug = '%s-debug' % application_name
844         p = look_for_file_in_source_dir('tools', debug)
845         if p:
846             application_name = p
847
848         self.reports = []
849         self.position = -1
850         self.media_duration = -1
851         self.speed = 1.0
852         self.actions_infos = []
853         self.media_descriptor = media_descriptor
854         self.server = None
855         self.criticals = []
856
857         override_path = self.get_override_file(media_descriptor)
858         if override_path:
859             if extra_env_variables:
860                 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
861                     extra_env_variables[
862                         "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
863
864             extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
865
866         super(GstValidateTest, self).__init__(application_name, classname,
867                                               options, reporter,
868                                               duration=duration,
869                                               timeout=timeout,
870                                               hard_timeout=hard_timeout,
871                                               extra_env_variables=extra_env_variables,
872                                               expected_issues=expected_issues,
873                                               workdir=workdir)
874         if media_descriptor and media_descriptor.get_media_filepath():
875             config_file = os.path.join(media_descriptor.get_media_filepath() + '.config')
876             if os.path.isfile(config_file):
877                 self.add_validate_config(config_file, extra_env_variables)
878
879         if scenario is None or scenario.name.lower() == "none":
880             self.scenario = None
881         else:
882             self.scenario = scenario
883
884     def kill_subprocess(self):
885         Test.kill_subprocess(self)
886
887     def add_report(self, report):
888         self.reports.append(report)
889
890     def set_position(self, position, duration, speed=None):
891         self.position = position
892         self.media_duration = duration
893         if speed:
894             self.speed = speed
895
896     def add_action_execution(self, action_infos):
897         self.actions_infos.append(action_infos)
898
899     def get_override_file(self, media_descriptor):
900         if media_descriptor:
901             if media_descriptor.get_path():
902                 override_path = os.path.splitext(media_descriptor.get_path())[
903                     0] + VALIDATE_OVERRIDE_EXTENSION
904                 if os.path.exists(override_path):
905                     return override_path
906
907         return None
908
909     def get_current_position(self):
910         return self.position
911
912     def get_current_value(self):
913         return self.position
914
915     def get_subproc_env(self):
916         subproc_env = os.environ.copy()
917
918         if self.options.validate_default_config:
919             self.add_validate_config(self.options.validate_default_config,
920                 subproc_env, )
921
922         subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
923         subproc_env["GST_VALIDATE_LOGSDIR"] = self.options.logsdir
924
925         if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
926             gstlogsfile = os.path.splitext(self.logfile)[0] + '.gstdebug'
927             self.extra_logfiles.add(gstlogsfile)
928             subproc_env["GST_DEBUG_FILE"] = gstlogsfile
929
930         if self.options.no_color:
931             subproc_env["GST_DEBUG_NO_COLOR"] = '1'
932
933         # Ensure XInitThreads is called, see bgo#731525
934         subproc_env['GST_GL_XINITTHREADS'] = '1'
935         self.add_env_variable('GST_GL_XINITTHREADS', '1')
936
937         if self.scenario is not None:
938             scenario = self.scenario.get_execution_name()
939             subproc_env["GST_VALIDATE_SCENARIO"] = scenario
940             self.add_env_variable("GST_VALIDATE_SCENARIO",
941                                   subproc_env["GST_VALIDATE_SCENARIO"])
942         else:
943             try:
944                 del subproc_env["GST_VALIDATE_SCENARIO"]
945             except KeyError:
946                 pass
947
948         if not subproc_env.get('GST_DEBUG_DUMP_DOT_DIR'):
949             dotfilesdir = os.path.join(self.options.logsdir,
950                                 self.classname.replace(".", os.sep) + '.pipelines_dot_files')
951             mkdir(dotfilesdir)
952             subproc_env['GST_DEBUG_DUMP_DOT_DIR'] = dotfilesdir
953             if CI_ARTIFACTS_URL:
954                 dotfilesurl = CI_ARTIFACTS_URL + os.path.relpath(dotfilesdir,
955                                                                  self.options.logsdir)
956                 subproc_env['GST_VALIDATE_DEBUG_DUMP_DOT_URL'] = dotfilesurl
957
958         return subproc_env
959
960     def clean(self):
961         Test.clean(self)
962         self.reports = []
963         self.position = -1
964         self.media_duration = -1
965         self.speed = 1.0
966         self.actions_infos = []
967
968     def build_arguments(self):
969         super(GstValidateTest, self).build_arguments()
970         if "GST_VALIDATE" in os.environ:
971             self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
972
973         if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
974             self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
975                                   os.environ["GST_VALIDATE_SCENARIOS_PATH"])
976
977         self.add_env_variable("GST_VALIDATE_CONFIG")
978         self.add_env_variable("GST_VALIDATE_OVERRIDE")
979
980     def get_extra_log_content(self, extralog):
981         value = Test.get_extra_log_content(self, extralog)
982
983         return value
984
985     def report_matches_expected_issues(self, report, expected_issue):
986         for key in ['bug', 'bugs', 'sometimes']:
987             if key in expected_issue:
988                 del expected_issue[key]
989         for key, value in list(report.items()):
990             if key in expected_issue:
991                 if not re.findall(expected_issue[key], str(value)):
992                     return False
993                 expected_issue.pop(key)
994
995         if "can-happen-several-times" in expected_issue:
996             expected_issue.pop("can-happen-several-times")
997         return not bool(expected_issue)
998
999     def check_reported_issues(self, expected_issues):
1000         ret = []
1001         expected_retcode = [0]
1002         for report in self.reports:
1003             found = None
1004             for expected_issue in expected_issues:
1005                 if self.report_matches_expected_issues(report,
1006                                                        expected_issue.copy()):
1007                     found = expected_issue
1008                     break
1009
1010             if found is not None:
1011                 if not found.get('can-happen-several-times', False):
1012                     expected_issues.remove(found)
1013                 if report['level'] == 'critical':
1014                     if found.get('sometimes', True) and isinstance(expected_retcode, list):
1015                         expected_retcode.append(18)
1016                     else:
1017                         expected_retcode = [18]
1018             elif report['level'] == 'critical':
1019                 ret.append(report)
1020
1021         if not ret:
1022             return None, expected_issues, expected_retcode
1023
1024         return ret, expected_issues, expected_retcode
1025
1026     def check_expected_issue(self, expected_issue):
1027         res = True
1028         msg = ''
1029         expected_symbols = expected_issue.get('stacktrace_symbols')
1030         if expected_symbols:
1031             trace_gatherer = BackTraceGenerator.get_default()
1032             stack_trace = trace_gatherer.get_trace(self)
1033
1034             if stack_trace:
1035                 if not isinstance(expected_symbols, list):
1036                     expected_symbols = [expected_symbols]
1037
1038                 not_found_symbols = [s for s in expected_symbols
1039                                      if s not in stack_trace]
1040                 if not_found_symbols:
1041                     msg = " Expected symbols '%s' not found in stack trace " % (
1042                         not_found_symbols)
1043                     res = False
1044             else:
1045                 msg += " No stack trace available, could not verify symbols "
1046
1047         _, not_found_expected_issues, _ = self.check_reported_issues(expected_issue.get('issues', []))
1048         if not_found_expected_issues:
1049             mandatory_failures = [f for f in not_found_expected_issues
1050                                   if not f.get('sometimes', True)]
1051             if mandatory_failures:
1052                 msg = " (Expected issues not found: %s) " % mandatory_failures
1053                 res = False
1054
1055         return msg, res
1056
1057     def check_expected_timeout(self, expected_timeout):
1058         msg = "Expected timeout happened. "
1059         result = Result.PASSED
1060         message = expected_timeout.get('message')
1061         if message:
1062             if not re.findall(message, self.message):
1063                 result = Result.FAILED
1064                 msg = "Expected timeout message: %s got %s " % (
1065                     message, self.message)
1066
1067         stack_msg, stack_res = self.check_expected_issue(expected_timeout)
1068         if not stack_res:
1069             result = Result.TIMEOUT
1070             msg += stack_msg
1071
1072         return result, msg
1073
1074     def check_results(self):
1075         if self.result in [Result.FAILED, Result.PASSED, Result.SKIPPED]:
1076             return
1077
1078         self.debug("%s returncode: %s", self, self.process.returncode)
1079         expected_issues = copy.deepcopy(self.expected_issues)
1080         if self.options.rr:
1081             # signal.SIGPPIPE is 13 but it sometimes isn't present in python for some reason.
1082             expected_issues.append({"returncode": -13, "sometimes": True})
1083         self.criticals, not_found_expected_issues, expected_returncode = self.check_reported_issues(expected_issues)
1084         expected_timeout = None
1085         expected_signal = None
1086         for i, f in enumerate(not_found_expected_issues):
1087             returncode = f.get('returncode', [])
1088             if not isinstance(returncode, list):
1089                 returncode = [returncode]
1090
1091             if f.get('signame'):
1092                 signames = f['signame']
1093                 if not isinstance(signames, list):
1094                     signames = [signames]
1095
1096                 returncode = [EXITING_SIGNALS[signame] for signame in signames]
1097
1098             if returncode:
1099                 if 'sometimes' in f:
1100                     returncode.append(0)
1101                 expected_returncode = returncode
1102                 expected_signal = f
1103             elif f.get("timeout"):
1104                 expected_timeout = f
1105
1106         not_found_expected_issues = [f for f in not_found_expected_issues
1107                                      if not f.get('returncode') and not f.get('signame')]
1108
1109         msg = ""
1110         result = Result.PASSED
1111         if self.result == Result.TIMEOUT:
1112             with open(self.logfile) as f:
1113                 signal_fault_info = self.fault_sig_regex.findall(f.read())
1114                 if signal_fault_info:
1115                     result = Result.FAILED
1116                     msg = signal_fault_info[0]
1117                 elif expected_timeout:
1118                     not_found_expected_issues.remove(expected_timeout)
1119                     result, msg = self.check_expected_timeout(expected_timeout)
1120                 else:
1121                     return
1122         elif self.process.returncode in EXITING_SIGNALS:
1123             msg = "Application exited with signal %s" % (
1124                 EXITING_SIGNALS[self.process.returncode])
1125             if self.process.returncode not in expected_returncode:
1126                 result = Result.FAILED
1127             else:
1128                 if expected_signal:
1129                     stack_msg, stack_res = self.check_expected_issue(
1130                         expected_signal)
1131                     if not stack_res:
1132                         msg += stack_msg
1133                         result = Result.FAILED
1134             self.add_stack_trace_to_logfile()
1135         elif self.process.returncode == VALGRIND_ERROR_CODE:
1136             msg = "Valgrind reported errors "
1137             result = Result.FAILED
1138         elif self.process.returncode not in expected_returncode:
1139             msg = "Application returned %s " % self.process.returncode
1140             if expected_returncode != [0]:
1141                 msg += "(expected %s) " % expected_returncode
1142             result = Result.FAILED
1143
1144         if self.criticals:
1145             msg += "(critical errors: [%s]) " % ', '.join(set([c['summary']
1146                                                            for c in self.criticals]))
1147             result = Result.FAILED
1148
1149         if not_found_expected_issues:
1150             mandatory_failures = [f for f in not_found_expected_issues
1151                                   if not f.get('sometimes', True)]
1152
1153             if mandatory_failures:
1154                 msg += " (Expected errors not found: %s) " % mandatory_failures
1155                 result = Result.FAILED
1156         elif self.expected_issues:
1157             msg += ' %s(Expected errors occurred: %s)%s' % (Colors.OKBLUE,
1158                                                             self.expected_issues,
1159                                                             Colors.ENDC)
1160             result = Result.KNOWN_ERROR
1161
1162         if result == Result.PASSED:
1163             for report in self.reports:
1164                 if report["level"] == "expected":
1165                     result = Result.KNOWN_ERROR
1166                     break
1167
1168         self.set_result(result, msg.strip())
1169
1170     def _generate_expected_issues(self):
1171         res = ""
1172         self.criticals = self.criticals or []
1173         if self.result == Result.TIMEOUT:
1174             res += """            {
1175                 'timeout': True,
1176                 'sometimes': True,
1177             },"""
1178
1179         for report in self.criticals:
1180             res += "\n%s{" % (" " * 12)
1181
1182             for key, value in report.items():
1183                 if key == "type":
1184                     continue
1185                 if value is None:
1186                     continue
1187                 res += '\n%s%s"%s": "%s",' % (
1188                     " " * 16, "# " if key == "details" else "",
1189                     key, value.replace('\n', '\\n'))
1190
1191             res += "\n%s}," % (" " * 12)
1192
1193         return res
1194
1195     def get_valgrind_suppressions(self):
1196         result = super(GstValidateTest, self).get_valgrind_suppressions()
1197         result.extend(utils.get_gst_build_valgrind_suppressions())
1198         gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
1199         if gst_sup:
1200             result.append(gst_sup)
1201
1202         return result
1203
1204
1205 class GstValidateEncodingTestInterface(object):
1206     DURATION_TOLERANCE = GST_SECOND / 4
1207
1208     def __init__(self, combination, media_descriptor, duration_tolerance=None):
1209         super(GstValidateEncodingTestInterface, self).__init__()
1210
1211         self.media_descriptor = media_descriptor
1212         self.combination = combination
1213         self.dest_file = ""
1214
1215         self._duration_tolerance = duration_tolerance
1216         if duration_tolerance is None:
1217             self._duration_tolerance = self.DURATION_TOLERANCE
1218
1219     def get_current_size(self):
1220         try:
1221             size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
1222         except OSError:
1223             return None
1224
1225         self.debug("Size: %s" % size)
1226         return size
1227
1228     def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1229                           audio_restriction=None, audio_presence=0,
1230                           video_presence=0, variable_framerate=False):
1231         ret = ""
1232         if muxer:
1233             ret += muxer
1234         ret += ":"
1235         if venc:
1236             if video_restriction is not None:
1237                 ret = ret + video_restriction + '->'
1238             ret += venc
1239             props = ""
1240             if video_presence:
1241                 props += 'presence=%s,' % str(video_presence)
1242             if variable_framerate:
1243                 props += 'variable-framerate=true,'
1244             if props:
1245                 ret = ret + '|' + props[:-1]
1246         if aenc:
1247             ret += ":"
1248             if audio_restriction is not None:
1249                 ret = ret + audio_restriction + '->'
1250             ret += aenc
1251             if audio_presence:
1252                 ret = ret + '|' + str(audio_presence)
1253
1254         return ret.replace("::", ":")
1255
1256     def get_profile(self, video_restriction=None, audio_restriction=None,
1257             variable_framerate=False):
1258         vcaps = self.combination.get_video_caps()
1259         acaps = self.combination.get_audio_caps()
1260         if video_restriction is None:
1261             video_restriction = self.combination.video_restriction
1262         if audio_restriction is None:
1263             audio_restriction = self.combination.audio_restriction
1264         if self.media_descriptor is not None:
1265             if self.combination.video == "theora":
1266                 # Theoraenc doesn't support variable framerate, make sure to avoid them
1267                 framerate = self.media_descriptor.get_framerate()
1268                 if framerate == Fraction(0, 1):
1269                     framerate = Fraction(30, 1)
1270                     restriction = utils.GstCaps.new_from_str(video_restriction or "video/x-raw")
1271                     for struct, _ in restriction:
1272                         if struct.get("framerate") is None:
1273                             struct.set("framerate", struct.FRACTION_TYPE, framerate)
1274                     video_restriction = str(restriction)
1275
1276             video_presence = self.media_descriptor.get_num_tracks("video")
1277             if video_presence == 0:
1278                 vcaps = None
1279
1280             audio_presence = self.media_descriptor.get_num_tracks("audio")
1281             if audio_presence == 0:
1282                 acaps = None
1283
1284         return self._get_profile_full(self.combination.get_muxer_caps(),
1285                                       vcaps, acaps,
1286                                       audio_presence=audio_presence,
1287                                       video_presence=video_presence,
1288                                       video_restriction=video_restriction,
1289                                       audio_restriction=audio_restriction,
1290                                       variable_framerate=variable_framerate)
1291
1292     def _clean_caps(self, caps):
1293         """
1294         Returns a list of key=value or structure name, without "(types)" or ";" or ","
1295         """
1296         return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1297
1298     # pylint: disable=E1101
1299     def _has_caps_type_variant(self, c, ccaps):
1300         """
1301         Handle situations where we can have application/ogg or video/ogg or
1302         audio/ogg
1303         """
1304         has_variant = False
1305         media_type = re.findall("application/|video/|audio/", c)
1306         if media_type:
1307             media_type = media_type[0].replace('/', '')
1308             possible_mtypes = ["application", "video", "audio"]
1309             possible_mtypes.remove(media_type)
1310             for tmptype in possible_mtypes:
1311                 possible_c_variant = c.replace(media_type, tmptype)
1312                 if possible_c_variant in ccaps:
1313                     self.info(
1314                         "Found %s in %s, good enough!", possible_c_variant, ccaps)
1315                     has_variant = True
1316
1317         return has_variant
1318
1319     # pylint: disable=E1101
1320     def run_iqa_test(self, reference_file_uri):
1321         """
1322         Runs IQA test if @reference_file_path exists
1323         @test: The test to run tests on
1324         """
1325         if not GstValidateBaseTestManager.has_feature('iqa'):
1326             self.debug('Iqa element not present, not running extra test.')
1327             return
1328
1329         pipeline_desc = """
1330             uridecodebin uri=%s !
1331                 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1332             uridecodebin uri=%s ! iqa.
1333         """ % (reference_file_uri, self.dest_file)
1334         pipeline_desc = pipeline_desc.replace("\n", "")
1335
1336         command = [GstValidateBaseTestManager.COMMAND] + \
1337             shlex.split(pipeline_desc)
1338         msg = "## Running IQA tests on results of: " \
1339             + "%s\n### Command: \n```\n%s\n```\n" % (
1340                 self.classname, ' '.join(command))
1341         if not self.options.redirect_logs:
1342             self.out.write(msg)
1343             self.out.flush()
1344         else:
1345             printc(msg, Colors.OKBLUE)
1346
1347         self.process = subprocess.Popen(command,
1348                                         stderr=self.out,
1349                                         stdout=self.out,
1350                                         env=self.proc_env,
1351                                         cwd=self.workdir)
1352         self.process.wait()
1353
1354     def check_encoded_file(self):
1355         result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1356             self.dest_file)
1357         if result_descriptor is None:
1358             return (Result.FAILED, "Could not discover encoded file %s"
1359                     % self.dest_file)
1360
1361         duration = result_descriptor.get_duration()
1362         orig_duration = self.media_descriptor.get_duration()
1363         tolerance = self._duration_tolerance
1364
1365         if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1366             os.remove(result_descriptor.get_path())
1367             self.add_report(
1368                 {
1369                     'type': 'report',
1370                     'issue-id': 'transcoded-file-wrong-duration',
1371                     'summary': 'The duration of a transcoded file doesn\'t match the duration of the original file',
1372                     'level': 'critical',
1373                     'detected-on': 'pipeline',
1374                     'details': "Duration of encoded file is " " wrong (%s instead of %s)" % (
1375                         utils.TIME_ARGS(duration), utils.TIME_ARGS(orig_duration))
1376                 }
1377             )
1378         else:
1379             all_tracks_caps = result_descriptor.get_tracks_caps()
1380             container_caps = result_descriptor.get_caps()
1381             if container_caps:
1382                 all_tracks_caps.insert(0, ("container", container_caps))
1383
1384             for track_type, caps in all_tracks_caps:
1385                 ccaps = self._clean_caps(caps)
1386                 wanted_caps = self.combination.get_caps(track_type)
1387                 cwanted_caps = self._clean_caps(wanted_caps)
1388
1389                 if wanted_caps is None:
1390                     os.remove(result_descriptor.get_path())
1391                     self.add_report(
1392                         {
1393                             'type': 'report',
1394                             'issue-id': 'transcoded-file-wrong-stream-type',
1395                             'summary': 'Expected stream types during transcoding do not match expectations',
1396                             'level': 'critical',
1397                             'detected-on': 'pipeline',
1398                             'details': "Found a track of type %s in the encoded files"
1399                                     " but none where wanted in the encoded profile: %s" % (
1400                                         track_type, self.combination)
1401                         }
1402                     )
1403                     return
1404
1405                 for c in cwanted_caps:
1406                     if c not in ccaps:
1407                         if not self._has_caps_type_variant(c, ccaps):
1408                             os.remove(result_descriptor.get_path())
1409                             self.add_report(
1410                                 {
1411                                     'type': 'report',
1412                                     'issue-id': 'transcoded-file-wrong-caps',
1413                                     'summary': 'Expected stream caps during transcoding do not match expectations',
1414                                     'level': 'critical',
1415                                     'detected-on': 'pipeline',
1416                                     'details': "Field: %s  (from %s) not in caps of the outputted file %s" % (
1417                                         wanted_caps, c, ccaps)
1418                                 }
1419                             )
1420                             return
1421
1422             os.remove(result_descriptor.get_path())
1423
1424
1425 class TestsManager(Loggable):
1426
1427     """ A class responsible for managing tests. """
1428
1429     name = "base"
1430     loading_testsuite = None
1431
1432     def __init__(self):
1433
1434         Loggable.__init__(self)
1435
1436         self.tests = []
1437         self.unwanted_tests = []
1438         self.options = None
1439         self.args = None
1440         self.reporter = None
1441         self.wanted_tests_patterns = []
1442         self.blacklisted_tests_patterns = []
1443         self._generators = []
1444         self.check_testslist = True
1445         self.all_tests = None
1446         self.expected_issues = {}
1447         self.blacklisted_tests = []
1448
1449     def init(self):
1450         return True
1451
1452     def list_tests(self):
1453         return sorted(list(self.tests), key=lambda x: x.classname)
1454
1455     def find_tests(self, classname):
1456         regex = re.compile(classname)
1457         return [test for test in self.list_tests() if regex.findall(test.classname)]
1458
1459     def add_expected_issues(self, expected_issues):
1460         for bugid, failure_def in list(expected_issues.items()):
1461             tests_regexes = []
1462             for test_name_regex in failure_def['tests']:
1463                 regex = re.compile(test_name_regex)
1464                 tests_regexes.append(regex)
1465                 for test in self.tests:
1466                     if regex.findall(test.classname):
1467                         if failure_def.get('allow_flakiness'):
1468                             test.allow_flakiness = True
1469                             self.debug("%s allow flakiness" % (test.classname))
1470                         else:
1471                             for issue in failure_def['issues']:
1472                                 issue['bug'] = bugid
1473                             test.expected_issues.extend(failure_def['issues'])
1474                             self.debug("%s added expected issues from %s" % (
1475                                 test.classname, bugid))
1476             failure_def['tests'] = tests_regexes
1477
1478         self.expected_issues.update(expected_issues)
1479
1480     def add_test(self, test):
1481         if test.generator is None:
1482             test.classname = self.loading_testsuite + '.' + test.classname
1483
1484         for bugid, failure_def in list(self.expected_issues.items()):
1485             failure_def['bug'] = bugid
1486             for regex in failure_def['tests']:
1487                 if regex.findall(test.classname):
1488                     if failure_def.get('allow_flakiness'):
1489                         test.allow_flakiness = True
1490                         self.debug("%s allow flakiness" % (test.classname))
1491                     else:
1492                         for issue in failure_def['issues']:
1493                             issue['bug'] = bugid
1494                         test.expected_issues.extend(failure_def['issues'])
1495                         self.debug("%s added expected issues from %s" % (
1496                             test.classname, bugid))
1497
1498         if self._is_test_wanted(test):
1499             if test not in self.tests:
1500                 self.tests.append(test)
1501         else:
1502             if test not in self.tests:
1503                 self.unwanted_tests.append(test)
1504
1505     def get_tests(self):
1506         return self.tests
1507
1508     def populate_testsuite(self):
1509         pass
1510
1511     def add_generators(self, generators):
1512         """
1513         @generators: A list of, or one single #TestsGenerator to be used to generate tests
1514         """
1515         if not isinstance(generators, list):
1516             generators = [generators]
1517         self._generators.extend(generators)
1518         for generator in generators:
1519             generator.testsuite = self.loading_testsuite
1520
1521         self._generators = list(set(self._generators))
1522
1523     def get_generators(self):
1524         return self._generators
1525
1526     def _add_blacklist(self, blacklisted_tests):
1527         if not isinstance(blacklisted_tests, list):
1528             blacklisted_tests = [blacklisted_tests]
1529
1530         for patterns in blacklisted_tests:
1531             for pattern in patterns.split(","):
1532                 self.blacklisted_tests_patterns.append(re.compile(pattern))
1533
1534     def set_default_blacklist(self, default_blacklist):
1535         for test_regex, reason in default_blacklist:
1536             if not test_regex.startswith(self.loading_testsuite + '.'):
1537                 test_regex = self.loading_testsuite + '.' + test_regex
1538             self.blacklisted_tests.append((test_regex, reason))
1539             self._add_blacklist(test_regex)
1540
1541     def add_options(self, parser):
1542         """ Add more arguments. """
1543         pass
1544
1545     def set_settings(self, options, args, reporter):
1546         """ Set properties after options parsing. """
1547         self.options = options
1548         self.args = args
1549         self.reporter = reporter
1550
1551         self.populate_testsuite()
1552
1553         if self.options.valgrind:
1554             self.print_valgrind_bugs()
1555
1556         if options.wanted_tests:
1557             for patterns in options.wanted_tests:
1558                 for pattern in patterns.split(","):
1559                     self.wanted_tests_patterns.append(re.compile(pattern))
1560
1561         if options.blacklisted_tests:
1562             for patterns in options.blacklisted_tests:
1563                 self._add_blacklist(patterns)
1564
1565     def check_blacklists(self):
1566         if self.options.check_bugs_status:
1567             if not check_bugs_resolution(self.blacklisted_tests):
1568                 return False
1569
1570         return True
1571
1572     def log_blacklists(self):
1573         if self.blacklisted_tests:
1574             self.info("Currently 'hardcoded' %s blacklisted tests:" %
1575                       self.name)
1576
1577         for name, bug in self.blacklisted_tests:
1578             if not self.options.check_bugs_status:
1579                 self.info("  + %s --> bug: %s" % (name, bug))
1580
1581     def check_expected_issues(self):
1582         if not self.expected_issues or not self.options.check_bugs_status:
1583             return True
1584
1585         bugs_definitions = defaultdict(list)
1586         for bug, failure_def in list(self.expected_issues.items()):
1587             tests_names = '|'.join(
1588                 [regex.pattern for regex in failure_def['tests']])
1589             bugs_definitions[tests_names].extend([bug])
1590
1591         return check_bugs_resolution(bugs_definitions.items())
1592
1593     def _check_blacklisted(self, test):
1594         for pattern in self.blacklisted_tests_patterns:
1595             if pattern.findall(test.classname):
1596                 self.info("%s is blacklisted by %s", test.classname, pattern)
1597                 return True
1598
1599         return False
1600
1601     def _check_whitelisted(self, test):
1602         for pattern in self.wanted_tests_patterns:
1603             if pattern.findall(test.classname):
1604                 if self._check_blacklisted(test):
1605                     # If explicitly white listed that specific test
1606                     # bypass the blacklisting
1607                     if pattern.pattern != test.classname:
1608                         return False
1609                 return True
1610         return False
1611
1612     def _check_duration(self, test):
1613         if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1614             self.info("Not activating %s as its duration (%d) is superior"
1615                       " than the long limit (%d)" % (test, test.duration,
1616                                                      int(self.options.long_limit)))
1617             return False
1618
1619         return True
1620
1621     def _is_test_wanted(self, test):
1622         if self._check_whitelisted(test):
1623             if not self._check_duration(test):
1624                 return False
1625             return True
1626
1627         if self._check_blacklisted(test):
1628             return False
1629
1630         if not self._check_duration(test):
1631             return False
1632
1633         if not self.wanted_tests_patterns:
1634             return True
1635
1636         return False
1637
1638     def needs_http_server(self):
1639         return False
1640
1641     def print_valgrind_bugs(self):
1642         pass
1643
1644
1645 class TestsGenerator(Loggable):
1646
1647     def __init__(self, name, test_manager, tests=[]):
1648         Loggable.__init__(self)
1649         self.name = name
1650         self.test_manager = test_manager
1651         self.testsuite = None
1652         self._tests = {}
1653         for test in tests:
1654             self._tests[test.classname] = test
1655
1656     def generate_tests(self, *kwargs):
1657         """
1658         Method that generates tests
1659         """
1660         return list(self._tests.values())
1661
1662     def add_test(self, test):
1663         test.generator = self
1664         test.classname = self.testsuite + '.' + test.classname
1665         self._tests[test.classname] = test
1666
1667
1668 class GstValidateTestsGenerator(TestsGenerator):
1669
1670     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1671         pass
1672
1673     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1674         self.populate_tests(uri_minfo_special_scenarios, scenarios)
1675         return super(GstValidateTestsGenerator, self).generate_tests()
1676
1677
1678 class _TestsLauncher(Loggable):
1679
1680     def __init__(self):
1681
1682         Loggable.__init__(self)
1683
1684         self.options = None
1685         self.testers = []
1686         self.tests = []
1687         self.reporter = None
1688         self._list_testers()
1689         self.all_tests = None
1690         self.wanted_tests_patterns = []
1691
1692         self.queue = queue.Queue()
1693         self.jobs = []
1694         self.total_num_tests = 0
1695         self.current_progress = -1
1696         self.server = None
1697         self.httpsrv = None
1698         self.vfb_server = None
1699
1700     def _list_app_dirs(self):
1701         app_dirs = []
1702         env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1703         if env_dirs is not None:
1704             for dir_ in env_dirs.split(os.pathsep):
1705                 app_dirs.append(dir_)
1706
1707         return app_dirs
1708
1709     def _exec_app(self, app_dir, env):
1710         try:
1711             files = os.listdir(app_dir)
1712         except OSError as e:
1713             self.debug("Could not list %s: %s" % (app_dir, e))
1714             files = []
1715         for f in files:
1716             if f.endswith(".py"):
1717                 exec(compile(open(os.path.join(app_dir, f)).read(),
1718                              os.path.join(app_dir, f), 'exec'), env)
1719
1720     def _exec_apps(self, env):
1721         app_dirs = self._list_app_dirs()
1722         for app_dir in app_dirs:
1723             self._exec_app(app_dir, env)
1724
1725     def _list_testers(self):
1726         env = globals().copy()
1727         self._exec_apps(env)
1728
1729         testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1730         for tester in testers:
1731             if tester.init() is True:
1732                 self.testers.append(tester)
1733             else:
1734                 self.warning("Can not init tester: %s -- PATH is %s"
1735                              % (tester.name, os.environ["PATH"]))
1736
1737     def add_options(self, parser):
1738         for tester in self.testers:
1739             tester.add_options(parser)
1740
1741     def _load_testsuite(self, testsuites):
1742         exceptions = []
1743         for testsuite in testsuites:
1744             try:
1745                 sys.path.insert(0, os.path.dirname(testsuite))
1746                 spec = importlib.util.spec_from_file_location(os.path.basename(testsuite).replace(".py", ""), testsuite)
1747                 module = importlib.util.module_from_spec(spec)
1748                 spec.loader.exec_module(module)
1749                 return (module, None)
1750             except Exception as e:
1751                 exceptions.append("Could not load %s: %s" % (testsuite, e))
1752                 continue
1753             finally:
1754                 sys.path.remove(os.path.dirname(testsuite))
1755
1756         return (None, exceptions)
1757
1758     def _load_testsuites(self):
1759         testsuites = {}
1760         for testsuite in self.options.testsuites:
1761             if testsuite.endswith('.py') and os.path.exists(testsuite):
1762                 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1763                 loaded_module = self._load_testsuite([testsuite])
1764             else:
1765                 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1766                                              for d in self.options.testsuites_dirs]
1767                 loaded_module = self._load_testsuite(possible_testsuites_paths)
1768
1769             module = loaded_module[0]
1770             if not loaded_module[0]:
1771                 if "." in testsuite:
1772                     self.options.testsuites.append(testsuite.split('.')[0])
1773                     self.info("%s looks like a test name, trying that" %
1774                               testsuite)
1775                     self.options.wanted_tests.append(testsuite)
1776                 else:
1777                     if testsuite in testsuites:
1778                         self.info('Testuite %s was loaded previously', testsuite)
1779                         continue
1780                     printc("Could not load testsuite: %s, reasons: %s" % (
1781                         testsuite, loaded_module[1]), Colors.FAIL)
1782                 continue
1783
1784             if module.__name__ in testsuites:
1785                 self.info("Trying to load testsuite '%s' a second time?", module.__name__)
1786                 continue
1787
1788             testsuites[module.__name__] = module
1789             if not hasattr(module, "TEST_MANAGER"):
1790                 module.TEST_MANAGER = [tester.name for tester in self.testers]
1791             elif not isinstance(module.TEST_MANAGER, list):
1792                 module.TEST_MANAGER = [module.TEST_MANAGER]
1793
1794         self.options.testsuites = list(testsuites.values())
1795
1796     def _setup_testsuites(self):
1797         for testsuite in self.options.testsuites:
1798             loaded = False
1799             wanted_test_manager = None
1800             # TEST_MANAGER has been set in _load_testsuites()
1801             assert hasattr(testsuite, "TEST_MANAGER")
1802             wanted_test_manager = testsuite.TEST_MANAGER
1803             if not isinstance(wanted_test_manager, list):
1804                 wanted_test_manager = [wanted_test_manager]
1805
1806             for tester in self.testers:
1807                 if wanted_test_manager is not None and \
1808                         tester.name not in wanted_test_manager:
1809                     continue
1810
1811                 prev_testsuite_name = TestsManager.loading_testsuite
1812                 if self.options.user_paths:
1813                     TestsManager.loading_testsuite = tester.name
1814                     tester.register_defaults()
1815                     loaded = True
1816                 else:
1817                     TestsManager.loading_testsuite = testsuite.__name__
1818                     if testsuite.setup_tests(tester, self.options):
1819                         loaded = True
1820                 if prev_testsuite_name:
1821                     TestsManager.loading_testsuite = prev_testsuite_name
1822
1823             if not loaded:
1824                 printc("Could not load testsuite: %s"
1825                        " maybe because of missing TestManager"
1826                        % (testsuite), Colors.FAIL)
1827                 return False
1828
1829     def _load_config(self, options):
1830         printc("Loading config files is DEPRECATED"
1831                " you should use the new testsuite format now",)
1832
1833         for tester in self.testers:
1834             tester.options = options
1835             globals()[tester.name] = tester
1836         globals()["options"] = options
1837         c__file__ = __file__
1838         globals()["__file__"] = self.options.config
1839         exec(compile(open(self.options.config).read(),
1840                      self.options.config, 'exec'), globals())
1841         globals()["__file__"] = c__file__
1842
1843     def set_settings(self, options, args):
1844         if options.xunit_file:
1845             self.reporter = reporters.XunitReporter(options)
1846         else:
1847             self.reporter = reporters.Reporter(options)
1848
1849         self.options = options
1850         wanted_testers = None
1851         for tester in self.testers:
1852             if tester.name in args:
1853                 wanted_testers = tester.name
1854
1855         if wanted_testers:
1856             testers = self.testers
1857             self.testers = []
1858             for tester in testers:
1859                 if tester.name in args:
1860                     self.testers.append(tester)
1861                     args.remove(tester.name)
1862
1863         if options.config:
1864             self._load_config(options)
1865
1866         self._load_testsuites()
1867         if not self.options.testsuites:
1868             printc("Not testsuite loaded!", Colors.FAIL)
1869             return False
1870
1871         for tester in self.testers:
1872             tester.set_settings(options, args, self.reporter)
1873
1874         if not options.config and options.testsuites:
1875             if self._setup_testsuites() is False:
1876                 return False
1877
1878         if self.options.check_bugs_status:
1879             printc("-> Checking bugs resolution... ", end='')
1880
1881         for tester in self.testers:
1882             if not tester.check_blacklists():
1883                 return False
1884
1885             tester.log_blacklists()
1886
1887             if not tester.check_expected_issues():
1888                 return False
1889
1890         if self.options.check_bugs_status:
1891             printc("OK", Colors.OKGREEN)
1892
1893         if self.needs_http_server() or options.httponly is True:
1894             self.httpsrv = HTTPServer(options)
1895             self.httpsrv.start()
1896
1897         if options.no_display:
1898             self.vfb_server = get_virual_frame_buffer_server(options)
1899             res = self.vfb_server.start()
1900             if res[0] is False:
1901                 printc("Could not start virtual frame server: %s" % res[1],
1902                        Colors.FAIL)
1903                 return False
1904             os.environ["DISPLAY"] = self.vfb_server.display_id
1905
1906         return True
1907
1908     def _check_tester_has_other_testsuite(self, testsuite, tester):
1909         if tester.name != testsuite.TEST_MANAGER[0]:
1910             return True
1911
1912         for t in self.options.testsuites:
1913             if t != testsuite:
1914                 for other_testmanager in t.TEST_MANAGER:
1915                     if other_testmanager == tester.name:
1916                         return True
1917
1918         return False
1919
1920     def _check_defined_tests(self, tester, tests):
1921         if self.options.blacklisted_tests or self.options.wanted_tests:
1922             return
1923
1924         tests_names = [test.classname for test in tests]
1925         testlist_changed = False
1926         for testsuite in self.options.testsuites:
1927             if not self._check_tester_has_other_testsuite(testsuite, tester) \
1928                     and tester.check_testslist:
1929                 try:
1930                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1931                                          'r+')
1932
1933                     know_tests = testlist_file.read().split("\n")
1934                     testlist_file.close()
1935
1936                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1937                                          'w')
1938                 except IOError:
1939                     continue
1940
1941                 optional_out = []
1942                 for test in know_tests:
1943                     if test and test.strip('~') not in tests_names:
1944                         if not test.startswith('~'):
1945                             testlist_changed = True
1946                             printc("Test %s Not in testsuite %s anymore"
1947                                    % (test, testsuite.__file__), Colors.FAIL)
1948                         else:
1949                             optional_out.append((test, None))
1950
1951                 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1952                                      key=lambda x: x[0].strip('~'))
1953
1954                 for tname, test in tests_names:
1955                     if test and test.optional:
1956                         tname = '~' + tname
1957                     testlist_file.write("%s\n" % (tname))
1958                     if tname and tname not in know_tests:
1959                         printc("Test %s is NEW in testsuite %s"
1960                                % (tname, testsuite.__file__),
1961                                Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1962                         testlist_changed = True
1963
1964                 testlist_file.close()
1965                 break
1966
1967         return testlist_changed
1968
1969     def _split_tests(self, num_groups):
1970         groups = [[] for x in range(num_groups)]
1971         group = cycle(groups)
1972         for test in self.tests:
1973             next(group).append(test)
1974         return groups
1975
1976     def list_tests(self):
1977         for tester in self.testers:
1978             if not self._tester_needed(tester):
1979                 continue
1980
1981             tests = tester.list_tests()
1982             if self._check_defined_tests(tester, tests) and \
1983                     self.options.fail_on_testlist_change:
1984                 raise RuntimeError("Unexpected new test in testsuite.")
1985
1986             self.tests.extend(tests)
1987         self.tests.sort(key=lambda test: test.classname)
1988
1989         if self.options.num_parts < 1:
1990             raise RuntimeError("Tests must be split in positive number of parts.")
1991         if self.options.num_parts > len(self.tests):
1992             raise RuntimeError("Cannot have more parts then there exist tests.")
1993         if self.options.part_index < 1 or self.options.part_index > self.options.num_parts:
1994             raise RuntimeError("Part index is out of range")
1995
1996         self.tests = self._split_tests(self.options.num_parts)[self.options.part_index - 1]
1997         return self.tests
1998
1999     def _tester_needed(self, tester):
2000         for testsuite in self.options.testsuites:
2001             if tester.name in testsuite.TEST_MANAGER:
2002                 return True
2003         return False
2004
2005     def server_wrapper(self, ready):
2006         self.server = GstValidateTCPServer(
2007             ('localhost', 0), GstValidateListener)
2008         self.server.socket.settimeout(None)
2009         self.server.launcher = self
2010         self.serverport = self.server.socket.getsockname()[1]
2011         self.info("%s server port: %s" % (self, self.serverport))
2012         ready.set()
2013
2014         self.server.serve_forever(poll_interval=0.05)
2015
2016     def _start_server(self):
2017         self.info("Starting TCP Server")
2018         ready = threading.Event()
2019         self.server_thread = threading.Thread(target=self.server_wrapper,
2020                                               kwargs={'ready': ready})
2021         self.server_thread.start()
2022         ready.wait()
2023         os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
2024
2025     def _stop_server(self):
2026         if self.server:
2027             self.server.shutdown()
2028             self.server_thread.join()
2029             self.server.server_close()
2030             self.server = None
2031
2032     def test_wait(self):
2033         while True:
2034             # Check process every second for timeout
2035             try:
2036                 self.queue.get(timeout=1)
2037             except queue.Empty:
2038                 pass
2039
2040             for test in self.jobs:
2041                 if test.process_update():
2042                     self.jobs.remove(test)
2043                     return test
2044
2045     def tests_wait(self):
2046         try:
2047             test = self.test_wait()
2048             test.check_results()
2049         except KeyboardInterrupt:
2050             for test in self.jobs:
2051                 test.kill_subprocess()
2052             raise
2053
2054         return test
2055
2056     def start_new_job(self, tests_left):
2057         try:
2058             test = tests_left.pop(0)
2059         except IndexError:
2060             return False
2061
2062         test.test_start(self.queue)
2063
2064         self.jobs.append(test)
2065
2066         return True
2067
2068     def print_result(self, current_test_num, test, retry_on_failure=False):
2069         if test.result != Result.PASSED and not retry_on_failure:
2070             printc(str(test), color=utils.get_color_for_result(test.result))
2071
2072         length = 80
2073         progress = int(length * current_test_num // self.total_num_tests)
2074         bar = 'â–ˆ' * progress + '-' * (length - progress)
2075         if is_tty():
2076             printc('\r|%s| [%s/%s]' % (bar, current_test_num, self.total_num_tests), end='\r')
2077         else:
2078             if progress > self.current_progress:
2079                 self.current_progress = progress
2080                 printc('|%s| [%s/%s]' % (bar, current_test_num, self.total_num_tests))
2081
2082     def _run_tests(self, running_tests=None, all_alone=False, retry_on_failures=False):
2083         if not self.all_tests:
2084             self.all_tests = self.list_tests()
2085
2086         if not running_tests:
2087             running_tests = self.tests
2088
2089         self.reporter.init_timer()
2090         alone_tests = []
2091         tests = []
2092         for test in running_tests:
2093             if test.is_parallel and not all_alone:
2094                 tests.append(test)
2095             else:
2096                 alone_tests.append(test)
2097
2098         # use max to defend against the case where all tests are alone_tests
2099         max_num_jobs = max(min(self.options.num_jobs, len(tests)), 1)
2100         jobs_running = 0
2101
2102         if self.options.forever and len(tests) < self.options.num_jobs and len(tests):
2103             max_num_jobs = self.options.num_jobs
2104             copied = []
2105             i = 0
2106             while (len(tests) + len(copied)) < max_num_jobs:
2107                 copied.append(tests[i].copy(len(copied) + 1))
2108
2109                 i += 1
2110                 if i >= len(tests):
2111                     i = 0
2112             tests += copied
2113             self.tests += copied
2114
2115         self.total_num_tests = len(self.all_tests)
2116         printc("\nRunning %d tests..." % self.total_num_tests, color=Colors.HEADER)
2117         # if order of test execution doesn't matter, shuffle
2118         # the order to optimize cpu usage
2119         if self.options.shuffle:
2120             random.shuffle(tests)
2121             random.shuffle(alone_tests)
2122
2123         current_test_num = 1
2124         to_retry = []
2125         for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
2126             tests_left = list(tests)
2127             for i in range(num_jobs):
2128                 if not self.start_new_job(tests_left):
2129                     break
2130                 jobs_running += 1
2131
2132             while jobs_running != 0:
2133                 test = self.tests_wait()
2134                 jobs_running -= 1
2135                 current_test_num += 1
2136                 res = test.test_end(retry_on_failure=retry_on_failures)
2137                 to_report = True
2138                 if res not in [Result.PASSED, Result.SKIPPED, Result.KNOWN_ERROR]:
2139                     if self.options.forever or self.options.fatal_error:
2140                         self.print_result(current_test_num - 1, test, retry_on_failure=retry_on_failures)
2141                         self.reporter.after_test(test)
2142                         return False
2143
2144                     if retry_on_failures:
2145                         if not self.options.redirect_logs and test.allow_flakiness:
2146                             test.copy_logfiles()
2147                         printc(test)
2148                         to_retry.append(test)
2149
2150                         # Not adding to final report if flakiness is tolerated
2151                         to_report = not test.allow_flakiness
2152                 self.print_result(current_test_num - 1, test, retry_on_failure=retry_on_failures)
2153                 if to_report:
2154                     self.reporter.after_test(test)
2155                 if retry_on_failures:
2156                     test.clean()
2157                 if self.start_new_job(tests_left):
2158                     jobs_running += 1
2159
2160         if to_retry:
2161             printc("--> Rerunning the following tests to see if they are flaky:", Colors.WARNING)
2162             for test in to_retry:
2163                 printc('  * %s' % test.classname)
2164             printc('')
2165             return self._run_tests(to_retry, all_alone=True, retry_on_failures=False)
2166
2167         return True
2168
2169     def clean_tests(self, stop_server=False):
2170         for test in self.tests:
2171             test.clean()
2172         if stop_server:
2173             self._stop_server()
2174
2175     def run_tests(self):
2176         r = 0
2177         try:
2178             self._start_server()
2179             if self.options.forever:
2180                 r = 1
2181                 while True:
2182                     printc("-> Iteration %d" % r, end='\r')
2183
2184                     if not self._run_tests():
2185                         break
2186                     r += 1
2187                     self.clean_tests()
2188                     msg = "-> Iteration %d... %sOK%s" % (r, Colors.OKGREEN, Colors.ENDC)
2189                     printc(msg, end="\r")
2190
2191                 return False
2192             elif self.options.n_runs:
2193                 res = True
2194                 for r in range(self.options.n_runs):
2195                     printc("-> Iteration %d" % r, end='\r')
2196                     if not self._run_tests():
2197                         res = False
2198                         printc("ERROR", Colors.FAIL, end="\r")
2199                     else:
2200                         printc("OK", Colors.OKGREEN, end="\r")
2201                     self.clean_tests()
2202
2203                 return res
2204             else:
2205                 return self._run_tests(retry_on_failures=self.options.retry_on_failures)
2206         finally:
2207             if self.options.forever:
2208                 printc("\n-> Ran %d times" % r)
2209             if self.httpsrv:
2210                 self.httpsrv.stop()
2211             if self.vfb_server:
2212                 self.vfb_server.stop()
2213             self.clean_tests(True)
2214
2215     def final_report(self):
2216         return self.reporter.final_report()
2217
2218     def needs_http_server(self):
2219         for tester in self.testers:
2220             if tester.needs_http_server():
2221                 return True
2222
2223
2224 class NamedDic(object):
2225
2226     def __init__(self, props):
2227         if props:
2228             for name, value in props.items():
2229                 setattr(self, name, value)
2230
2231
2232 class Scenario(object):
2233
2234     def __init__(self, name, props, path=None):
2235         self.name = name
2236         self.path = path
2237
2238         for prop, value in props:
2239             setattr(self, prop.replace("-", "_"), value)
2240
2241     def get_execution_name(self):
2242         if self.path is not None:
2243             return self.path
2244         else:
2245             return self.name
2246
2247     def seeks(self):
2248         if hasattr(self, "seek"):
2249             return bool(self.seek)
2250
2251         return False
2252
2253     def needs_clock_sync(self):
2254         if hasattr(self, "need_clock_sync"):
2255             return bool(self.need_clock_sync)
2256
2257         return False
2258
2259     def needs_live_content(self):
2260         # Scenarios that can only be used on live content
2261         if hasattr(self, "live_content_required"):
2262             return bool(self.live_content_required)
2263         return False
2264
2265     def compatible_with_live_content(self):
2266         # if a live content is required it's implicitly compatible with
2267         # live content
2268         if self.needs_live_content():
2269             return True
2270         if hasattr(self, "live_content_compatible"):
2271             return bool(self.live_content_compatible)
2272         return False
2273
2274     def get_min_media_duration(self):
2275         if hasattr(self, "min_media_duration"):
2276             return float(self.min_media_duration)
2277
2278         return 0
2279
2280     def does_reverse_playback(self):
2281         if hasattr(self, "reverse_playback"):
2282             return bool(self.reverse_playback)
2283
2284         return False
2285
2286     def get_duration(self):
2287         try:
2288             return float(getattr(self, "duration"))
2289         except AttributeError:
2290             return 0
2291
2292     def get_min_tracks(self, track_type):
2293         try:
2294             return int(getattr(self, "min_%s_track" % track_type))
2295         except AttributeError:
2296             return 0
2297
2298     def __repr__(self):
2299         return "<Scenario %s>" % self.name
2300
2301
2302 class ScenarioManager(Loggable):
2303     _instance = None
2304     system_scenarios = []
2305     special_scenarios = {}
2306
2307     FILE_EXTENSION = "scenario"
2308
2309     def __new__(cls, *args, **kwargs):
2310         if not cls._instance:
2311             cls._instance = super(ScenarioManager, cls).__new__(
2312                 cls, *args, **kwargs)
2313             cls._instance.config = None
2314             cls._instance.discovered = False
2315             Loggable.__init__(cls._instance)
2316
2317         return cls._instance
2318
2319     def find_special_scenarios(self, mfile):
2320         scenarios = []
2321         mfile_bname = os.path.basename(mfile)
2322
2323         for f in os.listdir(os.path.dirname(mfile)):
2324             if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
2325                 scenarios.append(os.path.join(os.path.dirname(mfile), f))
2326
2327         if scenarios:
2328             scenarios = self.discover_scenarios(scenarios, mfile)
2329
2330         return scenarios
2331
2332     def discover_scenarios(self, scenario_paths=[], mfile=None):
2333         """
2334         Discover scenarios specified in scenario_paths or the default ones
2335         if nothing specified there
2336         """
2337         scenarios = []
2338         scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
2339         log_path = os.path.join(self.config.logsdir, "scenarios_discovery.log")
2340         logs = open(log_path, 'w')
2341
2342         try:
2343             command = [GstValidateBaseTestManager.COMMAND,
2344                        "--scenarios-defs-output-file", scenario_defs]
2345             command.extend(scenario_paths)
2346             subprocess.check_call(command, stdout=logs, stderr=logs)
2347         except subprocess.CalledProcessError as e:
2348             self.error(e)
2349             self.error('See %s' % log_path)
2350             pass
2351
2352         config = configparser.RawConfigParser()
2353         f = open(scenario_defs)
2354         config.readfp(f)
2355
2356         for section in config.sections():
2357             name = None
2358             if scenario_paths:
2359                 for scenario_path in scenario_paths:
2360                     if section == scenario_path:
2361                         if mfile is None:
2362                             name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2363                             path = scenario_path
2364                         else:
2365                             # The real name of the scenario is:
2366                             # filename.REALNAME.scenario
2367                             name = scenario_path.replace(mfile + ".", "").replace(
2368                                 "." + self.FILE_EXTENSION, "")
2369                             path = scenario_path
2370                         break
2371             else:
2372                 name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2373                 path = None
2374
2375             assert name
2376
2377             props = config.items(section)
2378             scenario = Scenario(name, props, path)
2379             if scenario_paths:
2380                 self.special_scenarios[path] = scenario
2381             scenarios.append(scenario)
2382
2383         if not scenario_paths:
2384             self.discovered = True
2385             self.system_scenarios.extend(scenarios)
2386
2387         return scenarios
2388
2389     def get_scenario(self, name):
2390         if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2391             scenario = self.special_scenarios.get(name)
2392             if scenario:
2393                 return scenario
2394
2395             scenarios = self.discover_scenarios([name])
2396             self.special_scenarios[name] = scenarios
2397
2398             if scenarios:
2399                 return scenarios[0]
2400
2401         if self.discovered is False:
2402             self.discover_scenarios()
2403
2404         if name is None:
2405             return self.system_scenarios
2406
2407         try:
2408             return [scenario for scenario in self.system_scenarios if scenario.name == name][0]
2409         except IndexError:
2410             self.warning("Scenario: %s not found" % name)
2411             return None
2412
2413
2414 class GstValidateBaseTestManager(TestsManager):
2415     scenarios_manager = ScenarioManager()
2416     features_cache = {}
2417
2418     def __init__(self):
2419         super(GstValidateBaseTestManager, self).__init__()
2420         self._scenarios = []
2421         self._encoding_formats = []
2422
2423     @classmethod
2424     def update_commands(cls, extra_paths=None):
2425         for varname, cmd in {'': 'gst-validate',
2426                              'TRANSCODING_': 'gst-validate-transcoding',
2427                              'MEDIA_CHECK_': 'gst-validate-media-check',
2428                              'RTSP_SERVER_': 'gst-validate-rtsp-server',
2429                              'INSPECT_': 'gst-inspect'}.items():
2430             setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2431
2432     @classmethod
2433     def has_feature(cls, featurename):
2434         try:
2435             return cls.features_cache[featurename]
2436         except KeyError:
2437             pass
2438
2439         try:
2440             subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2441             res = True
2442         except subprocess.CalledProcessError:
2443             res = False
2444
2445         cls.features_cache[featurename] = res
2446         return res
2447
2448     def add_scenarios(self, scenarios):
2449         """
2450         @scenarios A list or a unic scenario name(s) to be run on the tests.
2451                     They are just the default scenarios, and then depending on
2452                     the TestsGenerator to be used you can have more fine grained
2453                     control on what to be run on each series of tests.
2454         """
2455         if isinstance(scenarios, list):
2456             self._scenarios.extend(scenarios)
2457         else:
2458             self._scenarios.append(scenarios)
2459
2460         self._scenarios = list(set(self._scenarios))
2461
2462     def set_scenarios(self, scenarios):
2463         """
2464         Override the scenarios
2465         """
2466         self._scenarios = []
2467         self.add_scenarios(scenarios)
2468
2469     def get_scenarios(self):
2470         return self._scenarios
2471
2472     def add_encoding_formats(self, encoding_formats):
2473         """
2474         :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2475                            formats for transcoding test.
2476                            They are just the default encoding formats, and then depending on
2477                            the TestsGenerator to be used you can have more fine grained
2478                            control on what to be run on each series of tests.
2479         """
2480         if isinstance(encoding_formats, list):
2481             self._encoding_formats.extend(encoding_formats)
2482         else:
2483             self._encoding_formats.append(encoding_formats)
2484
2485         self._encoding_formats = list(set(self._encoding_formats))
2486
2487     def get_encoding_formats(self):
2488         return self._encoding_formats
2489
2490
2491 GstValidateBaseTestManager.update_commands()
2492
2493
2494 class MediaDescriptor(Loggable):
2495
2496     def __init__(self):
2497         Loggable.__init__(self)
2498
2499     def get_path(self):
2500         raise NotImplemented
2501
2502     def has_frames(self):
2503         return False
2504
2505     def get_framerate(self):
2506         for ttype, caps_str in self.get_tracks_caps():
2507             if ttype != "video":
2508                 continue
2509
2510             caps = utils.GstCaps.new_from_str(caps_str)
2511             if not caps:
2512                 self.warning("Could not create caps for %s" % caps_str)
2513                 continue
2514
2515             framerate = caps[0].get("framerate")
2516             if framerate:
2517                 return framerate
2518
2519         return Fraction(0, 1)
2520
2521     def get_media_filepath(self):
2522         raise NotImplemented
2523
2524     def skip_parsers(self):
2525         return False
2526
2527     def get_caps(self):
2528         raise NotImplemented
2529
2530     def get_uri(self):
2531         raise NotImplemented
2532
2533     def get_duration(self):
2534         raise NotImplemented
2535
2536     def get_protocol(self):
2537         raise NotImplemented
2538
2539     def is_seekable(self):
2540         raise NotImplemented
2541
2542     def is_live(self):
2543         raise NotImplemented
2544
2545     def is_image(self):
2546         raise NotImplemented
2547
2548     def get_num_tracks(self, track_type):
2549         raise NotImplemented
2550
2551     def get_tracks_caps(self):
2552         return []
2553
2554     def can_play_reverse(self):
2555         raise NotImplemented
2556
2557     def prerrols(self):
2558         return True
2559
2560     def is_compatible(self, scenario):
2561         if scenario is None:
2562             return True
2563
2564         if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2565             self.debug("Do not run %s as %s does not support seeking",
2566                        scenario, self.get_uri())
2567             return False
2568
2569         if self.is_image() and scenario.needs_clock_sync():
2570             self.debug("Do not run %s as %s is an image",
2571                        scenario, self.get_uri())
2572             return False
2573
2574         if not self.can_play_reverse() and scenario.does_reverse_playback():
2575             return False
2576
2577         if not self.is_live() and scenario.needs_live_content():
2578             self.debug("Do not run %s as %s is not a live content",
2579                        scenario, self.get_uri())
2580             return False
2581
2582         if self.is_live() and not scenario.compatible_with_live_content():
2583             self.debug("Do not run %s as %s is a live content",
2584                        scenario, self.get_uri())
2585             return False
2586
2587         if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2588             return False
2589
2590         if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2591             self.debug(
2592                 "Do not run %s as %s is too short (%i < min media duation : %i",
2593                 scenario, self.get_uri(),
2594                 self.get_duration() / GST_SECOND,
2595                 scenario.get_min_media_duration())
2596             return False
2597
2598         for track_type in ['audio', 'subtitle', 'video']:
2599             if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2600                 self.debug("%s -- %s | At least %s %s track needed  < %s"
2601                            % (scenario, self.get_uri(), track_type,
2602                               scenario.get_min_tracks(track_type),
2603                               self.get_num_tracks(track_type)))
2604                 return False
2605
2606         return True
2607
2608
2609 class GstValidateMediaDescriptor(MediaDescriptor):
2610     # Some extension file for discovering results
2611     SKIPPED_MEDIA_INFO_EXT = "media_info.skipped"
2612     MEDIA_INFO_EXT = "media_info"
2613     PUSH_MEDIA_INFO_EXT = "media_info.push"
2614     STREAM_INFO_EXT = "stream_info"
2615
2616     __all_descriptors = {}
2617
2618     @classmethod
2619     def get(cls, xml_path):
2620         if xml_path in cls.__all_descriptors:
2621             return cls.__all_descriptors[xml_path]
2622         return GstValidateMediaDescriptor(xml_path)
2623
2624     def __init__(self, xml_path):
2625         super(GstValidateMediaDescriptor, self).__init__()
2626
2627         self._media_file_path = None
2628         main_descriptor = self.__all_descriptors.get(xml_path)
2629         if main_descriptor:
2630             self._copy_data_from_main(main_descriptor)
2631         else:
2632             self.__all_descriptors[xml_path] = self
2633
2634             self._xml_path = xml_path
2635             try:
2636                 media_xml = ET.parse(xml_path).getroot()
2637             except xml.etree.ElementTree.ParseError:
2638                 printc("Could not parse %s" % xml_path,
2639                     Colors.FAIL)
2640                 raise
2641             self._extract_data(media_xml)
2642
2643         self.set_protocol(urllib.parse.urlparse(self.get_uri()).scheme)
2644
2645     def skip_parsers(self):
2646         return self._skip_parsers
2647
2648     def has_frames(self):
2649         return self._has_frames
2650
2651     def _copy_data_from_main(self, main_descriptor):
2652         for attr in main_descriptor.__dict__.keys():
2653             setattr(self, attr, getattr(main_descriptor, attr))
2654
2655     def _extract_data(self, media_xml):
2656         # Extract the information we need from the xml
2657         self._caps = media_xml.findall("streams")[0].attrib["caps"]
2658         self._track_caps = []
2659         try:
2660             streams = media_xml.findall("streams")[0].findall("stream")
2661         except IndexError:
2662             pass
2663         else:
2664             for stream in streams:
2665                 self._track_caps.append(
2666                     (stream.attrib["type"], stream.attrib["caps"]))
2667
2668         self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2669         self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2670         self._duration = int(media_xml.attrib["duration"])
2671         self._uri = media_xml.attrib["uri"]
2672         parsed_uri = urllib.parse.urlparse(self.get_uri())
2673         self._protocol = media_xml.get("protocol", parsed_uri.scheme)
2674         if parsed_uri.scheme == "file":
2675             if not os.path.exists(parsed_uri.path) and os.path.exists(self.get_media_filepath()):
2676                 self._uri = "file://" + self.get_media_filepath()
2677         elif parsed_uri.scheme == Protocols.IMAGESEQUENCE:
2678             self._media_file_path = os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(parsed_uri.path))
2679             self._uri = parsed_uri._replace(path=os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(self._media_file_path))).geturl()
2680         self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2681         self._is_live = media_xml.get("live", "false").lower() == "true"
2682         self._is_image = False
2683         for stream in media_xml.findall("streams")[0].findall("stream"):
2684             if stream.attrib["type"] == "image":
2685                 self._is_image = True
2686         self._track_types = []
2687         for stream in media_xml.findall("streams")[0].findall("stream"):
2688             self._track_types.append(stream.attrib["type"])
2689
2690     def __cleanup_media_info_ext(self):
2691         for ext in [self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT,
2692                 self.SKIPPED_MEDIA_INFO_EXT, ]:
2693             if self._xml_path.endswith(ext):
2694                 return self._xml_path[:len(self._xml_path) - (len(ext) + 1)]
2695
2696         assert "Not reached" == None  # noqa
2697
2698     @staticmethod
2699     def new_from_uri(uri, verbose=False, include_frames=False, is_push=False, is_skipped=False):
2700         """
2701             include_frames = 0 # Never
2702             include_frames = 1 # always
2703             include_frames = 2 # if previous file included them
2704
2705         """
2706         media_path = utils.url2path(uri)
2707
2708         ext = GstValidateMediaDescriptor.MEDIA_INFO_EXT
2709         if is_push:
2710             ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT
2711         elif is_skipped:
2712             ext = GstValidateMediaDescriptor.SKIPPED_MEDIA_INFO_EXT
2713         descriptor_path = "%s.%s" % (media_path, ext)
2714         args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2715         if include_frames == 2:
2716             try:
2717                 media_xml = ET.parse(descriptor_path).getroot()
2718                 prev_uri = urllib.parse.urlparse(media_xml.attrib['uri'])
2719                 if prev_uri.scheme == Protocols.IMAGESEQUENCE:
2720                     parsed_uri = urllib.parse.urlparse(uri)
2721                     uri = prev_uri._replace(path=os.path.join(os.path.dirname(parsed_uri.path), os.path.basename(prev_uri.path))).geturl()
2722                 include_frames = bool(int(media_xml.attrib["frame-detection"]))
2723                 if bool(int(media_xml.attrib.get("skip-parsers", 0))):
2724                     args.append("--skip-parsers")
2725             except FileNotFoundError:
2726                 pass
2727         else:
2728             include_frames = bool(include_frames)
2729         args.append(uri)
2730
2731         args.extend(["--output-file", descriptor_path])
2732         if include_frames:
2733             args.extend(["--full"])
2734
2735         if verbose:
2736             printc("Generating media info for %s\n"
2737                    "    Command: '%s'" % (media_path, ' '.join(args)),
2738                    Colors.OKBLUE)
2739
2740         try:
2741             subprocess.check_output(args, stderr=open(os.devnull))
2742         except subprocess.CalledProcessError as e:
2743             if verbose:
2744                 printc("Result: Failed", Colors.FAIL)
2745             else:
2746                 loggable.warning("GstValidateMediaDescriptor",
2747                                  "Exception: %s" % e)
2748             return None
2749
2750         if verbose:
2751             printc("Result: Passed", Colors.OKGREEN)
2752
2753         try:
2754             return GstValidateMediaDescriptor(descriptor_path)
2755         except (IOError, xml.etree.ElementTree.ParseError):
2756             return None
2757
2758     def get_path(self):
2759         return self._xml_path
2760
2761     def need_clock_sync(self):
2762         return Protocols.needs_clock_sync(self.get_protocol())
2763
2764     def get_media_filepath(self):
2765         if self._media_file_path is None:
2766             self._media_file_path = self.__cleanup_media_info_ext()
2767         return self._media_file_path
2768
2769     def get_caps(self):
2770         return self._caps
2771
2772     def get_tracks_caps(self):
2773         return self._track_caps
2774
2775     def get_uri(self):
2776         return self._uri
2777
2778     def get_duration(self):
2779         return self._duration
2780
2781     def set_protocol(self, protocol):
2782         if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2783             self._protocol = Protocols.PUSHFILE
2784         else:
2785             self._protocol = protocol
2786
2787     def get_protocol(self):
2788         return self._protocol
2789
2790     def is_seekable(self):
2791         return self._is_seekable
2792
2793     def is_live(self):
2794         return self._is_live
2795
2796     def can_play_reverse(self):
2797         return True
2798
2799     def is_image(self):
2800         return self._is_image
2801
2802     def get_num_tracks(self, track_type):
2803         n = 0
2804         for t in self._track_types:
2805             if t == track_type:
2806                 n += 1
2807
2808         return n
2809
2810     def get_clean_name(self):
2811         name = os.path.basename(self.get_path())
2812         regex = '|'.join(['\\.%s$' % ext for ext in [self.SKIPPED_MEDIA_INFO_EXT, self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT]])
2813         name = re.sub(regex, "", name)
2814
2815         return name.replace('.', "_")
2816
2817
2818 class MediaFormatCombination(object):
2819     FORMATS = {"aac": "audio/mpeg,mpegversion=4",  # Audio
2820                "ac3": "audio/x-ac3",
2821                "vorbis": "audio/x-vorbis",
2822                "mp3": "audio/mpeg,mpegversion=1,layer=3",
2823                "opus": "audio/x-opus",
2824                "rawaudio": "audio/x-raw",
2825
2826                # Video
2827                "h264": "video/x-h264",
2828                "h265": "video/x-h265",
2829                "vp8": "video/x-vp8",
2830                "vp9": "video/x-vp9",
2831                "theora": "video/x-theora",
2832                "prores": "video/x-prores",
2833                "jpeg": "image/jpeg",
2834
2835                # Containers
2836                "webm": "video/webm",
2837                "ogg": "application/ogg",
2838                "mkv": "video/x-matroska",
2839                "mp4": "video/quicktime,variant=iso;",
2840                "quicktime": "video/quicktime;"}
2841
2842     def __str__(self):
2843         return "%s and %s in %s" % (self.audio, self.video, self.container)
2844
2845     def __init__(self, container, audio, video, duration_factor=1,
2846             video_restriction=None, audio_restriction=None):
2847         """
2848         Describes a media format to be used for transcoding tests.
2849
2850         :param container: A string defining the container format to be used, must bin in self.FORMATS
2851         :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2852         :param video: A string defining the video format to be used, must bin in self.FORMATS
2853         """
2854         self.container = container
2855         self.audio = audio
2856         self.video = video
2857         self.video_restriction = video_restriction
2858         self.audio_restriction = audio_restriction
2859
2860     def get_caps(self, track_type):
2861         try:
2862             return self.FORMATS[self.__dict__[track_type]]
2863         except KeyError:
2864             return None
2865
2866     def get_audio_caps(self):
2867         return self.get_caps("audio")
2868
2869     def get_video_caps(self):
2870         return self.get_caps("video")
2871
2872     def get_muxer_caps(self):
2873         return self.get_caps("container")