Use gst_element_request_pad_simple...
[platform/upstream/gstreamer.git] / validate / launcher / baseclasses.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19
20 """ Class representing tests and test managers. """
21
22 import importlib.util
23 import json
24 import os
25 import sys
26 import re
27 import copy
28 import shlex
29 import socketserver
30 import struct
31 import time
32 from . import utils
33 import signal
34 import urllib.parse
35 import subprocess
36 import threading
37 import queue
38 import configparser
39 import xml
40 import random
41 import shutil
42 import uuid
43 from itertools import cycle
44 from fractions import Fraction
45
46 from .utils import which
47 from . import reporters
48 from . import loggable
49 from .loggable import Loggable
50
51 from collections import defaultdict
52 try:
53     from lxml import etree as ET
54 except ImportError:
55     import xml.etree.cElementTree as ET
56
57
58 from .vfb_server import get_virual_frame_buffer_server
59 from .httpserver import HTTPServer
60 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
61     Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
62     check_bugs_resolution, is_tty
63
64 # The factor by which we increase the hard timeout when running inside
65 # Valgrind
66 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
67 RR_TIMEOUT_FACTOR = 2
68 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
69 # The error reported by valgrind when detecting errors
70 VALGRIND_ERROR_CODE = 20
71
72 VALIDATE_OVERRIDE_EXTENSION = ".override"
73 EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
74     'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
75     'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
76 EXITING_SIGNALS.update({139: "SIGSEGV"})
77 EXITING_SIGNALS.update({(v, k) for k, v in EXITING_SIGNALS.items()})
78
79
80 CI_ARTIFACTS_URL = os.environ.get('CI_ARTIFACTS_URL')
81
82
83 class Test(Loggable):
84
85     """ A class representing a particular test. """
86
87     def __init__(self, application_name, classname, options,
88                  reporter, duration=0, timeout=DEFAULT_TIMEOUT,
89                  hard_timeout=None, extra_env_variables=None,
90                  expected_issues=None, is_parallel=True,
91                  workdir=None):
92         """
93         @timeout: The timeout during which the value return by get_current_value
94                   keeps being exactly equal
95         @hard_timeout: Max time the test can take in absolute
96         """
97         Loggable.__init__(self)
98         self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
99         if hard_timeout:
100             self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
101             self.hard_timeout *= options.timeout_factor
102         else:
103             self.hard_timeout = hard_timeout
104         self.classname = classname
105         self.options = options
106         self.application = application_name
107         self.command = []
108         self.server_command = None
109         self.reporter = reporter
110         self.process = None
111         self.proc_env = None
112         self.thread = None
113         self.queue = None
114         self.duration = duration
115         self.stack_trace = None
116         self._uuid = None
117         if expected_issues is None:
118             self.expected_issues = []
119         elif not isinstance(expected_issues, list):
120             self.expected_issues = [expected_issues]
121         else:
122             self.expected_issues = expected_issues
123
124         extra_env_variables = extra_env_variables or {}
125         self.extra_env_variables = extra_env_variables
126         self.optional = False
127         self.is_parallel = is_parallel
128         self.generator = None
129         self.workdir = workdir
130         self.allow_flakiness = False
131         self.html_log = None
132         self.rr_logdir = None
133
134         self.clean()
135
136     def _generate_expected_issues(self):
137         return ''
138
139     def generate_expected_issues(self):
140         res = '%s"FIXME \'%s\' issues [REPORT A BUG ' % (" " * 4, self.classname) \
141             + 'in https://gitlab.freedesktop.org/gstreamer/ '\
142             + 'or use a proper bug description]": {'
143         res += """
144         "tests": [
145             "%s"
146         ],
147         "issues": [""" % (self.classname)
148
149         retcode = self.process.returncode if self.process else 0
150         if retcode != 0:
151             signame = EXITING_SIGNALS.get(retcode)
152             val = "'" + signame + "'" if signame else retcode
153             res += """\n            {
154                 '%s': %s,
155                 'sometimes': True,
156             },""" % ("signame" if signame else "returncode", val)
157
158         res += self._generate_expected_issues()
159         res += "\n%s],\n%s},\n" % (" " * 8, " " * 4)
160
161         return res
162
163     def copy(self, nth=None):
164         copied_test = copy.copy(self)
165         if nth:
166             copied_test.classname += '_it' + str(nth)
167             copied_test.options = copy.copy(self.options)
168             copied_test.options.logsdir = os.path.join(copied_test.options.logsdir, str(nth))
169             os.makedirs(copied_test.options.logsdir, exist_ok=True)
170
171         return copied_test
172
173     def clean(self):
174         self.kill_subprocess()
175         self.message = ""
176         self.error_str = ""
177         self.time_taken = 0.0
178         self._starting_time = None
179         self.result = Result.NOT_RUN
180         self.logfile = None
181         self.out = None
182         self.extra_logfiles = set()
183         self.__env_variable = []
184         self.kill_subprocess()
185         self.process = None
186
187     def __str__(self):
188         string = self.classname
189         if self.result != Result.NOT_RUN:
190             string += ": " + self.result
191             if self.result in [Result.FAILED, Result.TIMEOUT]:
192                 string += " '%s'" % self.message
193                 if not self.options.dump_on_failure:
194                     if not self.options.redirect_logs and self.result != Result.PASSED:
195                         string += self.get_logfile_repr()
196                 else:
197                     string = "\n==> %s" % string
198
199         return string
200
201     def add_env_variable(self, variable, value=None):
202         """
203         Only useful so that the gst-validate-launcher can print the exact
204         right command line to reproduce the tests
205         """
206         if value is None:
207             value = os.environ.get(variable, None)
208
209         if value is None:
210             return
211
212         self.__env_variable.append(variable)
213
214     @property
215     def _env_variable(self):
216         res = ""
217         if not self.options.verbose or self.options.verbose > 1:
218             for var in set(self.__env_variable):
219                 if res:
220                     res += " "
221                 value = self.proc_env.get(var, None)
222                 if value is not None:
223                     res += "%s='%s'" % (var, value)
224         else:
225             res += "[Not displaying environment variables, rerun with -vv for the full command]"
226
227         return res
228
229     def open_logfile(self):
230         if self.out:
231             return
232
233         path = os.path.join(self.options.logsdir,
234                             self.classname.replace(".", os.sep) + '.md')
235         mkdir(os.path.dirname(path))
236         self.logfile = path
237
238         if self.options.redirect_logs == 'stdout':
239             self.out = sys.stdout
240         elif self.options.redirect_logs == 'stderr':
241             self.out = sys.stderr
242         else:
243             self.out = open(path, 'w+')
244
245     def finalize_logfiles(self):
246         self.out.write("\n**Duration**: %s" % self.time_taken)
247         if not self.options.redirect_logs:
248             self.out.flush()
249             for logfile in self.extra_logfiles:
250                 # Only copy over extra logfile content if it's below a certain threshold
251                 # Avoid copying gigabytes of data if a lot of debugging is activated
252                 if os.path.getsize(logfile) < 500 * 1024:
253                     self.out.write('\n\n## %s:\n\n```\n%s\n```\n' % (
254                         os.path.basename(logfile), self.get_extra_log_content(logfile))
255                     )
256                 else:
257                     self.out.write('\n\n## %s:\n\n**Log file too big.**\n  %s\n\n Check file content directly\n\n' % (
258                         os.path.basename(logfile), logfile)
259                     )
260
261             if self.rr_logdir:
262                 self.out.write('\n\n## rr trace:\n\n```\nrr replay %s/latest-trace\n```\n' % (
263                     self.rr_logdir))
264
265             self.out.flush()
266             self.out.close()
267
268         if self.options.html:
269             self.html_log = os.path.splitext(self.logfile)[0] + '.html'
270             import commonmark
271             parser = commonmark.Parser()
272             with open(self.logfile) as f:
273                 ast = parser.parse(f.read())
274
275             renderer = commonmark.HtmlRenderer()
276             html = renderer.render(ast)
277             with open(self.html_log, 'w') as f:
278                 f.write(html)
279
280         self.out = None
281
282     def _get_file_content(self, file_name):
283         f = open(file_name, 'r+')
284         value = f.read()
285         f.close()
286
287         return value
288
289     def get_log_content(self):
290         return self._get_file_content(self.logfile)
291
292     def get_extra_log_content(self, extralog):
293         if extralog not in self.extra_logfiles:
294             return ""
295
296         return self._get_file_content(extralog)
297
298     def get_classname(self):
299         name = self.classname.split('.')[-1]
300         classname = self.classname.replace('.%s' % name, '')
301
302         return classname
303
304     def get_name(self):
305         return self.classname.split('.')[-1]
306
307     def get_uuid(self):
308         if self._uuid is None:
309             self._uuid = self.classname + str(uuid.uuid4())
310         return self._uuid
311
312     def add_arguments(self, *args):
313         self.command += args
314
315     def build_arguments(self):
316         self.add_env_variable("LD_PRELOAD")
317         self.add_env_variable("DISPLAY")
318
319     def add_stack_trace_to_logfile(self):
320         self.debug("Adding stack trace")
321         if self.options.rr:
322             return
323
324         trace_gatherer = BackTraceGenerator.get_default()
325         stack_trace = trace_gatherer.get_trace(self)
326
327         if not stack_trace:
328             return
329
330         info = "\n\n## Stack trace\n\n```\n%s\n```" % stack_trace
331         if self.options.redirect_logs:
332             print(info)
333             return
334
335         if self.options.xunit_file:
336             self.stack_trace = stack_trace
337
338         self.out.write(info)
339         self.out.flush()
340
341     def add_known_issue_information(self):
342         if self.expected_issues:
343             info = "\n\n## Already known issues\n\n``` python\n%s\n```\n\n" % (
344                 json.dumps(self.expected_issues, indent=4)
345             )
346         else:
347             info = ""
348
349         info += "\n\n**You can mark the issues as 'known' by adding the " \
350             + " following lines to the list of known issues**\n" \
351             + "\n\n``` python\n%s\n```" % (self.generate_expected_issues())
352
353         if self.options.redirect_logs:
354             print(info)
355             return
356
357         self.out.write(info)
358
359     def set_result(self, result, message="", error=""):
360
361         if not self.options.redirect_logs:
362             self.out.write("\n```\n")
363             self.out.flush()
364
365         self.debug("Setting result: %s (message: %s, error: %s)" % (result,
366                                                                     message, error))
367
368         if result is Result.TIMEOUT:
369             if self.options.debug is True:
370                 if self.options.gdb:
371                     printc("Timeout, you should process <ctrl>c to get into gdb",
372                            Colors.FAIL)
373                     # and wait here until gdb exits
374                     self.process.communicate()
375                 else:
376                     pname = self.command[0]
377                     input("%sTimeout happened  on %s you can attach gdb doing:\n $gdb %s %d%s\n"
378                           "Press enter to continue" % (Colors.FAIL, self.classname,
379                           pname, self.process.pid, Colors.ENDC))
380             else:
381                 self.add_stack_trace_to_logfile()
382
383         self.result = result
384         self.message = message
385         self.error_str = error
386
387         if result not in [Result.PASSED, Result.NOT_RUN, Result.SKIPPED]:
388             self.add_known_issue_information()
389
390     def check_results(self):
391         if self.result is Result.FAILED or self.result is Result.TIMEOUT:
392             return
393
394         self.debug("%s returncode: %s", self, self.process.returncode)
395         if self.options.rr and self.process.returncode == -signal.SIGPIPE:
396             self.set_result(Result.SKIPPED, "SIGPIPE received under `rr`, known issue.")
397         elif self.process.returncode == 0:
398             self.set_result(Result.PASSED)
399         elif self.process.returncode in EXITING_SIGNALS:
400             self.add_stack_trace_to_logfile()
401             self.set_result(Result.FAILED,
402                             "Application exited with signal %s" % (
403                                 EXITING_SIGNALS[self.process.returncode]))
404         elif self.process.returncode == VALGRIND_ERROR_CODE:
405             self.set_result(Result.FAILED, "Valgrind reported errors")
406         else:
407             self.set_result(Result.FAILED,
408                             "Application returned %d" % (self.process.returncode))
409
410     def get_current_value(self):
411         """
412         Lets subclasses implement a nicer timeout measurement method
413         They should return some value with which we will compare
414         the previous and timeout if they are egual during self.timeout
415         seconds
416         """
417         return Result.NOT_RUN
418
419     def process_update(self):
420         """
421         Returns True when process has finished running or has timed out.
422         """
423
424         if self.process is None:
425             # Process has not started running yet
426             return False
427
428         self.process.poll()
429         if self.process.returncode is not None:
430             return True
431
432         val = self.get_current_value()
433
434         self.debug("Got value: %s" % val)
435         if val is Result.NOT_RUN:
436             # The get_current_value logic is not implemented... dumb
437             # timeout
438             if time.time() - self.last_change_ts > self.timeout:
439                 self.set_result(Result.TIMEOUT,
440                                 "Application timed out: %s secs" %
441                                 self.timeout,
442                                 "timeout")
443                 return True
444             return False
445         elif val is Result.FAILED:
446             return True
447         elif val is Result.KNOWN_ERROR:
448             return True
449
450         self.log("New val %s" % val)
451
452         if val == self.last_val:
453             delta = time.time() - self.last_change_ts
454             self.debug("%s: Same value for %d/%d seconds" %
455                        (self, delta, self.timeout))
456             if delta > self.timeout:
457                 self.set_result(Result.TIMEOUT,
458                                 "Application timed out: %s secs" %
459                                 self.timeout,
460                                 "timeout")
461                 return True
462         elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
463             self.set_result(
464                 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
465             return True
466         else:
467             self.last_change_ts = time.time()
468             self.last_val = val
469
470         return False
471
472     def get_subproc_env(self):
473         return os.environ.copy()
474
475     def kill_subprocess(self):
476         subprocs_id = None
477         if self.options.rr and self.process and self.process.returncode is None:
478             cmd = ["ps", "-o", "pid", "--ppid", str(self.process.pid), "--noheaders"]
479             try:
480                 subprocs_id = [int(pid.strip('\n')) for
481                     pid in subprocess.check_output(cmd).decode().split(' ') if pid]
482             except FileNotFoundError:
483                 self.error("Ps not found, will probably not be able to get rr "
484                     "working properly after we kill the process")
485             except subprocess.CalledProcessError as e:
486                 self.error("Couldn't get rr subprocess pid: %s" % (e))
487
488         utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT, subprocs_id)
489
490     def run_external_checks(self):
491         pass
492
493     def thread_wrapper(self):
494         def enable_sigint():
495             # Restore the SIGINT handler for the child process (gdb) to ensure
496             # it can handle it.
497             signal.signal(signal.SIGINT, signal.SIG_DFL)
498
499         if self.options.gdb and os.name != "nt":
500             preexec_fn = enable_sigint
501         else:
502             preexec_fn = None
503
504         self.process = subprocess.Popen(self.command,
505                                         stderr=self.out,
506                                         stdout=self.out,
507                                         env=self.proc_env,
508                                         cwd=self.workdir,
509                                         preexec_fn=preexec_fn)
510         self.process.wait()
511         if self.result is not Result.TIMEOUT:
512             if self.process.returncode == 0:
513                 self.run_external_checks()
514             self.queue.put(None)
515
516     def get_valgrind_suppression_file(self, subdir, name):
517         p = get_data_file(subdir, name)
518         if p:
519             return p
520
521         self.error("Could not find any %s file" % name)
522
523     def get_valgrind_suppressions(self):
524         return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
525
526     def use_gdb(self, command):
527         if self.hard_timeout is not None:
528             self.hard_timeout *= GDB_TIMEOUT_FACTOR
529         self.timeout *= GDB_TIMEOUT_FACTOR
530
531         if not self.options.gdb_non_stop:
532             self.timeout = sys.maxsize
533             self.hard_timeout = sys.maxsize
534
535         args = ["gdb"]
536         if self.options.gdb_non_stop:
537             args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
538         args += ["--args"] + command
539         return args
540
541     def use_rr(self, command, subenv):
542         command = ["rr", 'record', '-h'] + command
543
544         self.timeout *= RR_TIMEOUT_FACTOR
545         self.rr_logdir = os.path.join(self.options.logsdir, self.classname.replace(".", os.sep), 'rr-logs')
546         subenv['_RR_TRACE_DIR'] = self.rr_logdir
547         try:
548             shutil.rmtree(self.rr_logdir, ignore_errors=False, onerror=None)
549         except FileNotFoundError:
550             pass
551         self.add_env_variable('_RR_TRACE_DIR', self.rr_logdir)
552
553         return command
554
555     def use_valgrind(self, command, subenv):
556         vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
557         self.extra_logfiles.add(vglogsfile)
558
559         vg_args = []
560
561         for o, v in [('trace-children', 'yes'),
562                      ('tool', 'memcheck'),
563                      ('leak-check', 'full'),
564                      ('leak-resolution', 'high'),
565                      # TODO: errors-for-leak-kinds should be set to all instead of definite
566                      # and all false positives should be added to suppression
567                      # files.
568                      ('errors-for-leak-kinds', 'definite,indirect'),
569                      ('show-leak-kinds', 'definite,indirect'),
570                      ('show-possibly-lost', 'no'),
571                      ('num-callers', '20'),
572                      ('error-exitcode', str(VALGRIND_ERROR_CODE)),
573                      ('gen-suppressions', 'all')]:
574             vg_args.append("--%s=%s" % (o, v))
575
576         if not self.options.redirect_logs:
577             vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
578             self.extra_logfiles.add(vglogsfile)
579             vg_args.append("--%s=%s" % ('log-file', vglogsfile))
580
581         for supp in self.get_valgrind_suppressions():
582             vg_args.append("--suppressions=%s" % supp)
583
584         command = ["valgrind"] + vg_args + command
585
586         # Tune GLib's memory allocator to be more valgrind friendly
587         subenv['G_DEBUG'] = 'gc-friendly'
588         subenv['G_SLICE'] = 'always-malloc'
589
590         if self.hard_timeout is not None:
591             self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
592         self.timeout *= VALGRIND_TIMEOUT_FACTOR
593
594         # Enable 'valgrind.config'
595         self.add_validate_config(get_data_file(
596             'data', 'valgrind.config'), subenv)
597         if subenv == self.proc_env:
598             self.add_env_variable('G_DEBUG', 'gc-friendly')
599             self.add_env_variable('G_SLICE', 'always-malloc')
600             self.add_env_variable('GST_VALIDATE_CONFIG',
601                                   self.proc_env['GST_VALIDATE_CONFIG'])
602
603         return command
604
605     def add_validate_config(self, config, subenv=None):
606         if not subenv:
607             subenv = self.extra_env_variables
608
609         cconf = subenv.get('GST_VALIDATE_CONFIG', "")
610         paths = [c for c in cconf.split(os.pathsep) if c] + [config]
611         subenv['GST_VALIDATE_CONFIG'] = os.pathsep.join(paths)
612
613     def launch_server(self):
614         return None
615
616     def get_logfile_repr(self):
617         if not self.options.redirect_logs:
618             if self.html_log:
619                 log = self.html_log
620             else:
621                 log = self.logfile
622
623             if CI_ARTIFACTS_URL:
624                 log = CI_ARTIFACTS_URL + os.path.relpath(log, self.options.logsdir)
625
626             return "\n    Log: %s" % (log)
627
628         return ""
629
630     def get_command_repr(self):
631         message = "%s %s" % (self._env_variable, ' '.join(
632             shlex.quote(arg) for arg in self.command))
633         if self.server_command:
634             message = "%s & %s" % (self.server_command, message)
635
636         return message
637
638     def test_start(self, queue):
639         self.open_logfile()
640
641         self.server_command = self.launch_server()
642         self.queue = queue
643         self.command = [self.application]
644         self._starting_time = time.time()
645         self.build_arguments()
646         self.proc_env = self.get_subproc_env()
647
648         for var, value in list(self.extra_env_variables.items()):
649             value = self.proc_env.get(var, '') + os.pathsep + value
650             self.proc_env[var] = value.strip(os.pathsep)
651             self.add_env_variable(var, self.proc_env[var])
652
653         if self.options.gdb:
654             self.command = self.use_gdb(self.command)
655
656             self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
657             # Make the gst-validate executable ignore SIGINT while gdb is
658             # running.
659             signal.signal(signal.SIGINT, signal.SIG_IGN)
660
661         if self.options.valgrind:
662             self.command = self.use_valgrind(self.command, self.proc_env)
663
664         if self.options.rr:
665             self.command = self.use_rr(self.command, self.proc_env)
666
667         if not self.options.redirect_logs:
668             self.out.write("# `%s`\n\n"
669                            "## Command\n\n``` bash\n%s\n```\n\n" % (
670                                self.classname, self.get_command_repr()))
671             self.out.write("## %s output\n\n``` \n\n" % os.path.basename(self.application))
672             self.out.flush()
673         else:
674             message = "Launching: %s%s\n" \
675                 "    Command: %s\n" % (Colors.ENDC, self.classname,
676                                        self.get_command_repr())
677             printc(message, Colors.OKBLUE)
678
679         self.thread = threading.Thread(target=self.thread_wrapper)
680         self.thread.start()
681
682         self.last_val = 0
683         self.last_change_ts = time.time()
684         self.start_ts = time.time()
685
686     def _dump_log_file(self, logfile):
687         if which('bat'):
688             try:
689                 subprocess.check_call(['bat', '-H', '1', '--paging=never', logfile])
690                 return
691             except (subprocess.CalledProcessError, FileNotFoundError):
692                 pass
693
694         with open(logfile, 'r') as fin:
695             for line in fin.readlines():
696                 print('> ' + line, end='')
697
698     def _dump_log_files(self):
699         self._dump_log_file(self.logfile)
700
701     def copy_logfiles(self, extra_folder="flaky_tests"):
702         path = os.path.dirname(os.path.join(self.options.logsdir, extra_folder,
703                             self.classname.replace(".", os.sep)))
704         mkdir(path)
705         self.logfile = shutil.copy(self.logfile, path)
706         extra_logs = []
707         for logfile in self.extra_logfiles:
708             extra_logs.append(shutil.copy(logfile, path))
709         self.extra_logfiles = extra_logs
710
711     def test_end(self, retry_on_failure=False):
712         self.kill_subprocess()
713         self.thread.join()
714         self.time_taken = time.time() - self._starting_time
715
716         if self.options.gdb:
717             signal.signal(signal.SIGINT, self.previous_sigint_handler)
718
719         self.finalize_logfiles()
720         message = None
721         end = "\n"
722
723         if self.options.dump_on_failure:
724             if self.result not in [Result.PASSED, Result.KNOWN_ERROR, Result.NOT_RUN]:
725                 self._dump_log_files()
726
727         # Only keep around env variables we need later
728         clean_env = {}
729         for n in self.__env_variable:
730             clean_env[n] = self.proc_env.get(n, None)
731         self.proc_env = clean_env
732
733         # Don't keep around JSON report objects, they were processed
734         # in check_results already
735         self.reports = []
736
737         return self.result
738
739
740 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
741     pass
742
743
744 class GstValidateListener(socketserver.BaseRequestHandler, Loggable):
745
746     def __init__(self, *args, **kwargs):
747         super().__init__(*args, **kwargs)
748         Loggable.__init__(self, "GstValidateListener")
749
750     def handle(self):
751         """Implements BaseRequestHandler handle method"""
752         test = None
753         self.logCategory = "GstValidateListener"
754         while True:
755             raw_len = self.request.recv(4)
756             if raw_len == b'':
757                 return
758             msglen = struct.unpack('>I', raw_len)[0]
759             e = None
760             raw_msg = bytes()
761             while msglen != len(raw_msg):
762                 raw_msg += self.request.recv(msglen - len(raw_msg))
763             if e is not None:
764                 continue
765             try:
766                 msg = raw_msg.decode('utf-8', 'ignore')
767             except UnicodeDecodeError as e:
768                 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
769                 continue
770
771             if msg == '':
772                 return
773
774             try:
775                 obj = json.loads(msg)
776             except json.decoder.JSONDecodeError as e:
777                 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
778                 continue
779
780             if test is None:
781                 # First message must contain the uuid
782                 uuid = obj.get("uuid", None)
783                 if uuid is None:
784                     return
785                 # Find test from launcher
786                 for t in self.server.launcher.tests:
787                     if uuid == t.get_uuid():
788                         test = t
789                         break
790                 if test is None:
791                     self.server.launcher.error(
792                         "Could not find test for UUID %s" % uuid)
793                     return
794
795             obj_type = obj.get("type", '')
796             if obj_type == 'position':
797                 test.set_position(obj['position'], obj['duration'],
798                                   obj['speed'])
799             elif obj_type == 'buffering':
800                 test.set_position(obj['position'], 100)
801             elif obj_type == 'action':
802                 test.add_action_execution(obj)
803                 # Make sure that action is taken into account when checking if process
804                 # is updating
805                 test.position += 1
806             elif obj_type == 'action-done':
807                 # Make sure that action end is taken into account when checking if process
808                 # is updating
809                 test.position += 1
810                 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
811             elif obj_type == 'report':
812                 test.add_report(obj)
813             elif obj_type == 'skip-test':
814                 test.set_result(Result.SKIPPED)
815
816
817 class GstValidateTest(Test):
818
819     """ A class representing a particular test. """
820     HARD_TIMEOUT_FACTOR = 5
821     fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
822     needs_gst_inspect = set()
823
824     def __init__(self, application_name, classname,
825                  options, reporter, duration=0,
826                  timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
827                  media_descriptor=None, extra_env_variables=None,
828                  expected_issues=None, workdir=None):
829
830         extra_env_variables = extra_env_variables or {}
831
832         if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
833             if timeout:
834                 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
835             elif duration:
836                 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
837             else:
838                 hard_timeout = None
839
840         # If we are running from source, use the -debug version of the
841         # application which is using rpath instead of libtool's wrappers. It's
842         # slightly faster to start and will not confuse valgrind.
843         debug = '%s-debug' % application_name
844         p = look_for_file_in_source_dir('tools', debug)
845         if p:
846             application_name = p
847
848         self.reports = []
849         self.position = -1
850         self.media_duration = -1
851         self.speed = 1.0
852         self.actions_infos = []
853         self.media_descriptor = media_descriptor
854         self.server = None
855         self.criticals = []
856
857         override_path = self.get_override_file(media_descriptor)
858         if override_path:
859             if extra_env_variables:
860                 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
861                     extra_env_variables[
862                         "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
863
864             extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
865
866         super(GstValidateTest, self).__init__(application_name, classname,
867                                               options, reporter,
868                                               duration=duration,
869                                               timeout=timeout,
870                                               hard_timeout=hard_timeout,
871                                               extra_env_variables=extra_env_variables,
872                                               expected_issues=expected_issues,
873                                               workdir=workdir)
874
875         if scenario is None or scenario.name.lower() == "none":
876             self.scenario = None
877         else:
878             self.scenario = scenario
879
880     def kill_subprocess(self):
881         Test.kill_subprocess(self)
882
883     def add_report(self, report):
884         self.reports.append(report)
885
886     def set_position(self, position, duration, speed=None):
887         self.position = position
888         self.media_duration = duration
889         if speed:
890             self.speed = speed
891
892     def add_action_execution(self, action_infos):
893         self.actions_infos.append(action_infos)
894
895     def get_override_file(self, media_descriptor):
896         if media_descriptor:
897             if media_descriptor.get_path():
898                 override_path = os.path.splitext(media_descriptor.get_path())[
899                     0] + VALIDATE_OVERRIDE_EXTENSION
900                 if os.path.exists(override_path):
901                     return override_path
902
903         return None
904
905     def get_current_position(self):
906         return self.position
907
908     def get_current_value(self):
909         return self.position
910
911     def get_subproc_env(self):
912         subproc_env = os.environ.copy()
913
914         if self.options.validate_default_config:
915             self.add_validate_config(self.options.validate_default_config,
916                 subproc_env, )
917
918         subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
919         subproc_env["GST_VALIDATE_LOGSDIR"] = self.options.logsdir
920
921         if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
922             gstlogsfile = os.path.splitext(self.logfile)[0] + '.gstdebug'
923             self.extra_logfiles.add(gstlogsfile)
924             subproc_env["GST_DEBUG_FILE"] = gstlogsfile
925
926         if self.options.no_color:
927             subproc_env["GST_DEBUG_NO_COLOR"] = '1'
928
929         # Ensure XInitThreads is called, see bgo#731525
930         subproc_env['GST_GL_XINITTHREADS'] = '1'
931         self.add_env_variable('GST_GL_XINITTHREADS', '1')
932
933         if self.scenario is not None:
934             scenario = self.scenario.get_execution_name()
935             subproc_env["GST_VALIDATE_SCENARIO"] = scenario
936             self.add_env_variable("GST_VALIDATE_SCENARIO",
937                                   subproc_env["GST_VALIDATE_SCENARIO"])
938         else:
939             try:
940                 del subproc_env["GST_VALIDATE_SCENARIO"]
941             except KeyError:
942                 pass
943
944         if not subproc_env.get('GST_DEBUG_DUMP_DOT_DIR'):
945             dotfilesdir = os.path.join(self.options.logsdir,
946                                 self.classname.replace(".", os.sep) + '.pipelines_dot_files')
947             mkdir(dotfilesdir)
948             subproc_env['GST_DEBUG_DUMP_DOT_DIR'] = dotfilesdir
949             if CI_ARTIFACTS_URL:
950                 dotfilesurl = CI_ARTIFACTS_URL + os.path.relpath(dotfilesdir,
951                                                                  self.options.logsdir)
952                 subproc_env['GST_VALIDATE_DEBUG_DUMP_DOT_URL'] = dotfilesurl
953
954         return subproc_env
955
956     def clean(self):
957         Test.clean(self)
958         self.reports = []
959         self.position = -1
960         self.media_duration = -1
961         self.speed = 1.0
962         self.actions_infos = []
963
964     def build_arguments(self):
965         super(GstValidateTest, self).build_arguments()
966         if "GST_VALIDATE" in os.environ:
967             self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
968
969         if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
970             self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
971                                   os.environ["GST_VALIDATE_SCENARIOS_PATH"])
972
973         self.add_env_variable("GST_VALIDATE_CONFIG")
974         self.add_env_variable("GST_VALIDATE_OVERRIDE")
975
976     def get_extra_log_content(self, extralog):
977         value = Test.get_extra_log_content(self, extralog)
978
979         return value
980
981     def report_matches_expected_issues(self, report, expected_issue):
982         for key in ['bug', 'bugs', 'sometimes']:
983             if key in expected_issue:
984                 del expected_issue[key]
985         for key, value in list(report.items()):
986             if key in expected_issue:
987                 if not re.findall(expected_issue[key], str(value)):
988                     return False
989                 expected_issue.pop(key)
990
991         if "can-happen-several-times" in expected_issue:
992             expected_issue.pop("can-happen-several-times")
993         return not bool(expected_issue)
994
995     def check_reported_issues(self, expected_issues):
996         ret = []
997         expected_retcode = [0]
998         for report in self.reports:
999             found = None
1000             for expected_issue in expected_issues:
1001                 if self.report_matches_expected_issues(report,
1002                                                        expected_issue.copy()):
1003                     found = expected_issue
1004                     break
1005
1006             if found is not None:
1007                 if not found.get('can-happen-several-times', False):
1008                     expected_issues.remove(found)
1009                 if report['level'] == 'critical':
1010                     if found.get('sometimes', True) and isinstance(expected_retcode, list):
1011                         expected_retcode.append(18)
1012                     else:
1013                         expected_retcode = [18]
1014             elif report['level'] == 'critical':
1015                 ret.append(report)
1016
1017         if not ret:
1018             return None, expected_issues, expected_retcode
1019
1020         return ret, expected_issues, expected_retcode
1021
1022     def check_expected_issue(self, expected_issue):
1023         res = True
1024         msg = ''
1025         expected_symbols = expected_issue.get('stacktrace_symbols')
1026         if expected_symbols:
1027             trace_gatherer = BackTraceGenerator.get_default()
1028             stack_trace = trace_gatherer.get_trace(self)
1029
1030             if stack_trace:
1031                 if not isinstance(expected_symbols, list):
1032                     expected_symbols = [expected_symbols]
1033
1034                 not_found_symbols = [s for s in expected_symbols
1035                                      if s not in stack_trace]
1036                 if not_found_symbols:
1037                     msg = " Expected symbols '%s' not found in stack trace " % (
1038                         not_found_symbols)
1039                     res = False
1040             else:
1041                 msg += " No stack trace available, could not verify symbols "
1042
1043         _, not_found_expected_issues, _ = self.check_reported_issues(expected_issue.get('issues', []))
1044         if not_found_expected_issues:
1045             mandatory_failures = [f for f in not_found_expected_issues
1046                                   if not f.get('sometimes', True)]
1047             if mandatory_failures:
1048                 msg = " (Expected issues not found: %s) " % mandatory_failures
1049                 res = False
1050
1051         return msg, res
1052
1053     def check_expected_timeout(self, expected_timeout):
1054         msg = "Expected timeout happened. "
1055         result = Result.PASSED
1056         message = expected_timeout.get('message')
1057         if message:
1058             if not re.findall(message, self.message):
1059                 result = Result.FAILED
1060                 msg = "Expected timeout message: %s got %s " % (
1061                     message, self.message)
1062
1063         stack_msg, stack_res = self.check_expected_issue(expected_timeout)
1064         if not stack_res:
1065             result = Result.TIMEOUT
1066             msg += stack_msg
1067
1068         return result, msg
1069
1070     def check_results(self):
1071         if self.result in [Result.FAILED, Result.PASSED, Result.SKIPPED]:
1072             return
1073
1074         self.debug("%s returncode: %s", self, self.process.returncode)
1075         expected_issues = copy.deepcopy(self.expected_issues)
1076         if self.options.rr:
1077             # signal.SIGPPIPE is 13 but it sometimes isn't present in python for some reason.
1078             expected_issues.append({"returncode": -13, "sometimes": True})
1079         self.criticals, not_found_expected_issues, expected_returncode = self.check_reported_issues(expected_issues)
1080         expected_timeout = None
1081         expected_signal = None
1082         for i, f in enumerate(not_found_expected_issues):
1083             returncode = f.get('returncode', [])
1084             if not isinstance(returncode, list):
1085                 returncode = [returncode]
1086
1087             if f.get('signame'):
1088                 signames = f['signame']
1089                 if not isinstance(signames, list):
1090                     signames = [signames]
1091
1092                 returncode = [EXITING_SIGNALS[signame] for signame in signames]
1093
1094             if returncode:
1095                 if 'sometimes' in f:
1096                     returncode.append(0)
1097                 expected_returncode = returncode
1098                 expected_signal = f
1099             elif f.get("timeout"):
1100                 expected_timeout = f
1101
1102         not_found_expected_issues = [f for f in not_found_expected_issues
1103                                      if not f.get('returncode') and not f.get('signame')]
1104
1105         msg = ""
1106         result = Result.PASSED
1107         if self.result == Result.TIMEOUT:
1108             with open(self.logfile) as f:
1109                 signal_fault_info = self.fault_sig_regex.findall(f.read())
1110                 if signal_fault_info:
1111                     result = Result.FAILED
1112                     msg = signal_fault_info[0]
1113                 elif expected_timeout:
1114                     not_found_expected_issues.remove(expected_timeout)
1115                     result, msg = self.check_expected_timeout(expected_timeout)
1116                 else:
1117                     return
1118         elif self.process.returncode in EXITING_SIGNALS:
1119             msg = "Application exited with signal %s" % (
1120                 EXITING_SIGNALS[self.process.returncode])
1121             if self.process.returncode not in expected_returncode:
1122                 result = Result.FAILED
1123             else:
1124                 if expected_signal:
1125                     stack_msg, stack_res = self.check_expected_issue(
1126                         expected_signal)
1127                     if not stack_res:
1128                         msg += stack_msg
1129                         result = Result.FAILED
1130             self.add_stack_trace_to_logfile()
1131         elif self.process.returncode == VALGRIND_ERROR_CODE:
1132             msg = "Valgrind reported errors "
1133             result = Result.FAILED
1134         elif self.process.returncode not in expected_returncode:
1135             msg = "Application returned %s " % self.process.returncode
1136             if expected_returncode != [0]:
1137                 msg += "(expected %s) " % expected_returncode
1138             result = Result.FAILED
1139
1140         if self.criticals:
1141             msg += "(critical errors: [%s]) " % ', '.join(set([c['summary']
1142                                                            for c in self.criticals]))
1143             result = Result.FAILED
1144
1145         if not_found_expected_issues:
1146             mandatory_failures = [f for f in not_found_expected_issues
1147                                   if not f.get('sometimes', True)]
1148
1149             if mandatory_failures:
1150                 msg += " (Expected errors not found: %s) " % mandatory_failures
1151                 result = Result.FAILED
1152         elif self.expected_issues:
1153             msg += ' %s(Expected errors occurred: %s)%s' % (Colors.OKBLUE,
1154                                                             self.expected_issues,
1155                                                             Colors.ENDC)
1156             result = Result.KNOWN_ERROR
1157
1158         if result == Result.PASSED:
1159             for report in self.reports:
1160                 if report["level"] == "expected":
1161                     result = Result.KNOWN_ERROR
1162                     break
1163
1164         self.set_result(result, msg.strip())
1165
1166     def _generate_expected_issues(self):
1167         res = ""
1168         self.criticals = self.criticals or []
1169         if self.result == Result.TIMEOUT:
1170             res += """            {
1171                 'timeout': True,
1172                 'sometimes': True,
1173             },"""
1174
1175         for report in self.criticals:
1176             res += "\n%s{" % (" " * 12)
1177
1178             for key, value in report.items():
1179                 if key == "type":
1180                     continue
1181                 if value is None:
1182                     continue
1183                 res += '\n%s%s"%s": "%s",' % (
1184                     " " * 16, "# " if key == "details" else "",
1185                     key, value.replace('\n', '\\n'))
1186
1187             res += "\n%s}," % (" " * 12)
1188
1189         return res
1190
1191     def get_valgrind_suppressions(self):
1192         result = super(GstValidateTest, self).get_valgrind_suppressions()
1193         result.extend(utils.get_gst_build_valgrind_suppressions())
1194         gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
1195         if gst_sup:
1196             result.append(gst_sup)
1197
1198         return result
1199
1200
1201 class GstValidateEncodingTestInterface(object):
1202     DURATION_TOLERANCE = GST_SECOND / 4
1203
1204     def __init__(self, combination, media_descriptor, duration_tolerance=None):
1205         super(GstValidateEncodingTestInterface, self).__init__()
1206
1207         self.media_descriptor = media_descriptor
1208         self.combination = combination
1209         self.dest_file = ""
1210
1211         self._duration_tolerance = duration_tolerance
1212         if duration_tolerance is None:
1213             self._duration_tolerance = self.DURATION_TOLERANCE
1214
1215     def get_current_size(self):
1216         try:
1217             size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
1218         except OSError:
1219             return None
1220
1221         self.debug("Size: %s" % size)
1222         return size
1223
1224     def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1225                           audio_restriction=None, audio_presence=0,
1226                           video_presence=0, variable_framerate=False):
1227         ret = ""
1228         if muxer:
1229             ret += muxer
1230         ret += ":"
1231         if venc:
1232             if video_restriction is not None:
1233                 ret = ret + video_restriction + '->'
1234             ret += venc
1235             props = ""
1236             if video_presence:
1237                 props += 'presence=%s,' % str(video_presence)
1238             if variable_framerate:
1239                 props += 'variable-framerate=true,'
1240             if props:
1241                 ret = ret + '|' + props[:-1]
1242         if aenc:
1243             ret += ":"
1244             if audio_restriction is not None:
1245                 ret = ret + audio_restriction + '->'
1246             ret += aenc
1247             if audio_presence:
1248                 ret = ret + '|' + str(audio_presence)
1249
1250         return ret.replace("::", ":")
1251
1252     def get_profile(self, video_restriction=None, audio_restriction=None,
1253             variable_framerate=False):
1254         vcaps = self.combination.get_video_caps()
1255         acaps = self.combination.get_audio_caps()
1256         if video_restriction is None:
1257             video_restriction = self.combination.video_restriction
1258         if audio_restriction is None:
1259             audio_restriction = self.combination.audio_restriction
1260         if self.media_descriptor is not None:
1261             if self.combination.video == "theora":
1262                 # Theoraenc doesn't support variable framerate, make sure to avoid them
1263                 framerate = self.media_descriptor.get_framerate()
1264                 if framerate == Fraction(0, 1):
1265                     framerate = Fraction(30, 1)
1266                     restriction = utils.GstCaps.new_from_str(video_restriction or "video/x-raw")
1267                     for struct, _ in restriction:
1268                         if struct.get("framerate") is None:
1269                             struct.set("framerate", struct.FRACTION_TYPE, framerate)
1270                     video_restriction = str(restriction)
1271
1272             video_presence = self.media_descriptor.get_num_tracks("video")
1273             if video_presence == 0:
1274                 vcaps = None
1275
1276             audio_presence = self.media_descriptor.get_num_tracks("audio")
1277             if audio_presence == 0:
1278                 acaps = None
1279
1280         return self._get_profile_full(self.combination.get_muxer_caps(),
1281                                       vcaps, acaps,
1282                                       audio_presence=audio_presence,
1283                                       video_presence=video_presence,
1284                                       video_restriction=video_restriction,
1285                                       audio_restriction=audio_restriction,
1286                                       variable_framerate=variable_framerate)
1287
1288     def _clean_caps(self, caps):
1289         """
1290         Returns a list of key=value or structure name, without "(types)" or ";" or ","
1291         """
1292         return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1293
1294     # pylint: disable=E1101
1295     def _has_caps_type_variant(self, c, ccaps):
1296         """
1297         Handle situations where we can have application/ogg or video/ogg or
1298         audio/ogg
1299         """
1300         has_variant = False
1301         media_type = re.findall("application/|video/|audio/", c)
1302         if media_type:
1303             media_type = media_type[0].replace('/', '')
1304             possible_mtypes = ["application", "video", "audio"]
1305             possible_mtypes.remove(media_type)
1306             for tmptype in possible_mtypes:
1307                 possible_c_variant = c.replace(media_type, tmptype)
1308                 if possible_c_variant in ccaps:
1309                     self.info(
1310                         "Found %s in %s, good enough!", possible_c_variant, ccaps)
1311                     has_variant = True
1312
1313         return has_variant
1314
1315     # pylint: disable=E1101
1316     def run_iqa_test(self, reference_file_uri):
1317         """
1318         Runs IQA test if @reference_file_path exists
1319         @test: The test to run tests on
1320         """
1321         if not GstValidateBaseTestManager.has_feature('iqa'):
1322             self.debug('Iqa element not present, not running extra test.')
1323             return
1324
1325         pipeline_desc = """
1326             uridecodebin uri=%s !
1327                 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1328             uridecodebin uri=%s ! iqa.
1329         """ % (reference_file_uri, self.dest_file)
1330         pipeline_desc = pipeline_desc.replace("\n", "")
1331
1332         command = [GstValidateBaseTestManager.COMMAND] + \
1333             shlex.split(pipeline_desc)
1334         msg = "## Running IQA tests on results of: " \
1335             + "%s\n### Command: \n```\n%s\n```\n" % (
1336                 self.classname, ' '.join(command))
1337         if not self.options.redirect_logs:
1338             self.out.write(msg)
1339             self.out.flush()
1340         else:
1341             printc(msg, Colors.OKBLUE)
1342
1343         self.process = subprocess.Popen(command,
1344                                         stderr=self.out,
1345                                         stdout=self.out,
1346                                         env=self.proc_env,
1347                                         cwd=self.workdir)
1348         self.process.wait()
1349
1350     def check_encoded_file(self):
1351         result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1352             self.dest_file)
1353         if result_descriptor is None:
1354             return (Result.FAILED, "Could not discover encoded file %s"
1355                     % self.dest_file)
1356
1357         duration = result_descriptor.get_duration()
1358         orig_duration = self.media_descriptor.get_duration()
1359         tolerance = self._duration_tolerance
1360
1361         if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1362             os.remove(result_descriptor.get_path())
1363             self.add_report(
1364                 {
1365                     'type': 'report',
1366                     'issue-id': 'transcoded-file-wrong-duration',
1367                     'summary': 'The duration of a transcoded file doesn\'t match the duration of the original file',
1368                     'level': 'critical',
1369                     'detected-on': 'pipeline',
1370                     'details': "Duration of encoded file is " " wrong (%s instead of %s)" % (
1371                         utils.TIME_ARGS(duration), utils.TIME_ARGS(orig_duration))
1372                 }
1373             )
1374         else:
1375             all_tracks_caps = result_descriptor.get_tracks_caps()
1376             container_caps = result_descriptor.get_caps()
1377             if container_caps:
1378                 all_tracks_caps.insert(0, ("container", container_caps))
1379
1380             for track_type, caps in all_tracks_caps:
1381                 ccaps = self._clean_caps(caps)
1382                 wanted_caps = self.combination.get_caps(track_type)
1383                 cwanted_caps = self._clean_caps(wanted_caps)
1384
1385                 if wanted_caps is None:
1386                     os.remove(result_descriptor.get_path())
1387                     self.add_report(
1388                         {
1389                             'type': 'report',
1390                             'issue-id': 'transcoded-file-wrong-stream-type',
1391                             'summary': 'Expected stream types during transcoding do not match expectations',
1392                             'level': 'critical',
1393                             'detected-on': 'pipeline',
1394                             'details': "Found a track of type %s in the encoded files"
1395                                     " but none where wanted in the encoded profile: %s" % (
1396                                         track_type, self.combination)
1397                         }
1398                     )
1399                     return
1400
1401                 for c in cwanted_caps:
1402                     if c not in ccaps:
1403                         if not self._has_caps_type_variant(c, ccaps):
1404                             os.remove(result_descriptor.get_path())
1405                             self.add_report(
1406                                 {
1407                                     'type': 'report',
1408                                     'issue-id': 'transcoded-file-wrong-caps',
1409                                     'summary': 'Expected stream caps during transcoding do not match expectations',
1410                                     'level': 'critical',
1411                                     'detected-on': 'pipeline',
1412                                     'details': "Field: %s  (from %s) not in caps of the outputted file %s" % (
1413                                         wanted_caps, c, ccaps)
1414                                 }
1415                             )
1416                             return
1417
1418             os.remove(result_descriptor.get_path())
1419
1420
1421 class TestsManager(Loggable):
1422
1423     """ A class responsible for managing tests. """
1424
1425     name = "base"
1426     loading_testsuite = None
1427
1428     def __init__(self):
1429
1430         Loggable.__init__(self)
1431
1432         self.tests = []
1433         self.unwanted_tests = []
1434         self.options = None
1435         self.args = None
1436         self.reporter = None
1437         self.wanted_tests_patterns = []
1438         self.blacklisted_tests_patterns = []
1439         self._generators = []
1440         self.check_testslist = True
1441         self.all_tests = None
1442         self.expected_issues = {}
1443         self.blacklisted_tests = []
1444
1445     def init(self):
1446         return True
1447
1448     def list_tests(self):
1449         return sorted(list(self.tests), key=lambda x: x.classname)
1450
1451     def find_tests(self, classname):
1452         regex = re.compile(classname)
1453         return [test for test in self.list_tests() if regex.findall(test.classname)]
1454
1455     def add_expected_issues(self, expected_issues):
1456         for bugid, failure_def in list(expected_issues.items()):
1457             tests_regexes = []
1458             for test_name_regex in failure_def['tests']:
1459                 regex = re.compile(test_name_regex)
1460                 tests_regexes.append(regex)
1461                 for test in self.tests:
1462                     if regex.findall(test.classname):
1463                         if failure_def.get('allow_flakiness'):
1464                             test.allow_flakiness = True
1465                             self.debug("%s allow flakiness" % (test.classname))
1466                         else:
1467                             for issue in failure_def['issues']:
1468                                 issue['bug'] = bugid
1469                             test.expected_issues.extend(failure_def['issues'])
1470                             self.debug("%s added expected issues from %s" % (
1471                                 test.classname, bugid))
1472             failure_def['tests'] = tests_regexes
1473
1474         self.expected_issues.update(expected_issues)
1475
1476     def add_test(self, test):
1477         if test.generator is None:
1478             test.classname = self.loading_testsuite + '.' + test.classname
1479
1480         for bugid, failure_def in list(self.expected_issues.items()):
1481             failure_def['bug'] = bugid
1482             for regex in failure_def['tests']:
1483                 if regex.findall(test.classname):
1484                     if failure_def.get('allow_flakiness'):
1485                         test.allow_flakiness = True
1486                         self.debug("%s allow flakiness" % (test.classname))
1487                     else:
1488                         for issue in failure_def['issues']:
1489                             issue['bug'] = bugid
1490                         test.expected_issues.extend(failure_def['issues'])
1491                         self.debug("%s added expected issues from %s" % (
1492                             test.classname, bugid))
1493
1494         if self._is_test_wanted(test):
1495             if test not in self.tests:
1496                 self.tests.append(test)
1497         else:
1498             if test not in self.tests:
1499                 self.unwanted_tests.append(test)
1500
1501     def get_tests(self):
1502         return self.tests
1503
1504     def populate_testsuite(self):
1505         pass
1506
1507     def add_generators(self, generators):
1508         """
1509         @generators: A list of, or one single #TestsGenerator to be used to generate tests
1510         """
1511         if not isinstance(generators, list):
1512             generators = [generators]
1513         self._generators.extend(generators)
1514         for generator in generators:
1515             generator.testsuite = self.loading_testsuite
1516
1517         self._generators = list(set(self._generators))
1518
1519     def get_generators(self):
1520         return self._generators
1521
1522     def _add_blacklist(self, blacklisted_tests):
1523         if not isinstance(blacklisted_tests, list):
1524             blacklisted_tests = [blacklisted_tests]
1525
1526         for patterns in blacklisted_tests:
1527             for pattern in patterns.split(","):
1528                 self.blacklisted_tests_patterns.append(re.compile(pattern))
1529
1530     def set_default_blacklist(self, default_blacklist):
1531         for test_regex, reason in default_blacklist:
1532             if not test_regex.startswith(self.loading_testsuite + '.'):
1533                 test_regex = self.loading_testsuite + '.' + test_regex
1534             self.blacklisted_tests.append((test_regex, reason))
1535             self._add_blacklist(test_regex)
1536
1537     def add_options(self, parser):
1538         """ Add more arguments. """
1539         pass
1540
1541     def set_settings(self, options, args, reporter):
1542         """ Set properties after options parsing. """
1543         self.options = options
1544         self.args = args
1545         self.reporter = reporter
1546
1547         self.populate_testsuite()
1548
1549         if self.options.valgrind:
1550             self.print_valgrind_bugs()
1551
1552         if options.wanted_tests:
1553             for patterns in options.wanted_tests:
1554                 for pattern in patterns.split(","):
1555                     self.wanted_tests_patterns.append(re.compile(pattern))
1556
1557         if options.blacklisted_tests:
1558             for patterns in options.blacklisted_tests:
1559                 self._add_blacklist(patterns)
1560
1561     def check_blacklists(self):
1562         if self.options.check_bugs_status:
1563             if not check_bugs_resolution(self.blacklisted_tests):
1564                 return False
1565
1566         return True
1567
1568     def log_blacklists(self):
1569         if self.blacklisted_tests:
1570             self.info("Currently 'hardcoded' %s blacklisted tests:" %
1571                       self.name)
1572
1573         for name, bug in self.blacklisted_tests:
1574             if not self.options.check_bugs_status:
1575                 self.info("  + %s --> bug: %s" % (name, bug))
1576
1577     def check_expected_issues(self):
1578         if not self.expected_issues or not self.options.check_bugs_status:
1579             return True
1580
1581         bugs_definitions = defaultdict(list)
1582         for bug, failure_def in list(self.expected_issues.items()):
1583             tests_names = '|'.join(
1584                 [regex.pattern for regex in failure_def['tests']])
1585             bugs_definitions[tests_names].extend([bug])
1586
1587         return check_bugs_resolution(bugs_definitions.items())
1588
1589     def _check_blacklisted(self, test):
1590         for pattern in self.blacklisted_tests_patterns:
1591             if pattern.findall(test.classname):
1592                 self.info("%s is blacklisted by %s", test.classname, pattern)
1593                 return True
1594
1595         return False
1596
1597     def _check_whitelisted(self, test):
1598         for pattern in self.wanted_tests_patterns:
1599             if pattern.findall(test.classname):
1600                 if self._check_blacklisted(test):
1601                     # If explicitly white listed that specific test
1602                     # bypass the blacklisting
1603                     if pattern.pattern != test.classname:
1604                         return False
1605                 return True
1606         return False
1607
1608     def _check_duration(self, test):
1609         if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1610             self.info("Not activating %s as its duration (%d) is superior"
1611                       " than the long limit (%d)" % (test, test.duration,
1612                                                      int(self.options.long_limit)))
1613             return False
1614
1615         return True
1616
1617     def _is_test_wanted(self, test):
1618         if self._check_whitelisted(test):
1619             if not self._check_duration(test):
1620                 return False
1621             return True
1622
1623         if self._check_blacklisted(test):
1624             return False
1625
1626         if not self._check_duration(test):
1627             return False
1628
1629         if not self.wanted_tests_patterns:
1630             return True
1631
1632         return False
1633
1634     def needs_http_server(self):
1635         return False
1636
1637     def print_valgrind_bugs(self):
1638         pass
1639
1640
1641 class TestsGenerator(Loggable):
1642
1643     def __init__(self, name, test_manager, tests=[]):
1644         Loggable.__init__(self)
1645         self.name = name
1646         self.test_manager = test_manager
1647         self.testsuite = None
1648         self._tests = {}
1649         for test in tests:
1650             self._tests[test.classname] = test
1651
1652     def generate_tests(self, *kwargs):
1653         """
1654         Method that generates tests
1655         """
1656         return list(self._tests.values())
1657
1658     def add_test(self, test):
1659         test.generator = self
1660         test.classname = self.testsuite + '.' + test.classname
1661         self._tests[test.classname] = test
1662
1663
1664 class GstValidateTestsGenerator(TestsGenerator):
1665
1666     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1667         pass
1668
1669     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1670         self.populate_tests(uri_minfo_special_scenarios, scenarios)
1671         return super(GstValidateTestsGenerator, self).generate_tests()
1672
1673
1674 class _TestsLauncher(Loggable):
1675
1676     def __init__(self):
1677
1678         Loggable.__init__(self)
1679
1680         self.options = None
1681         self.testers = []
1682         self.tests = []
1683         self.reporter = None
1684         self._list_testers()
1685         self.all_tests = None
1686         self.wanted_tests_patterns = []
1687
1688         self.queue = queue.Queue()
1689         self.jobs = []
1690         self.total_num_tests = 0
1691         self.current_progress = -1
1692         self.server = None
1693         self.httpsrv = None
1694         self.vfb_server = None
1695
1696     def _list_app_dirs(self):
1697         app_dirs = []
1698         env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1699         if env_dirs is not None:
1700             for dir_ in env_dirs.split(os.pathsep):
1701                 app_dirs.append(dir_)
1702
1703         return app_dirs
1704
1705     def _exec_app(self, app_dir, env):
1706         try:
1707             files = os.listdir(app_dir)
1708         except OSError as e:
1709             self.debug("Could not list %s: %s" % (app_dir, e))
1710             files = []
1711         for f in files:
1712             if f.endswith(".py"):
1713                 exec(compile(open(os.path.join(app_dir, f)).read(),
1714                              os.path.join(app_dir, f), 'exec'), env)
1715
1716     def _exec_apps(self, env):
1717         app_dirs = self._list_app_dirs()
1718         for app_dir in app_dirs:
1719             self._exec_app(app_dir, env)
1720
1721     def _list_testers(self):
1722         env = globals().copy()
1723         self._exec_apps(env)
1724
1725         testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1726         for tester in testers:
1727             if tester.init() is True:
1728                 self.testers.append(tester)
1729             else:
1730                 self.warning("Can not init tester: %s -- PATH is %s"
1731                              % (tester.name, os.environ["PATH"]))
1732
1733     def add_options(self, parser):
1734         for tester in self.testers:
1735             tester.add_options(parser)
1736
1737     def _load_testsuite(self, testsuites):
1738         exceptions = []
1739         for testsuite in testsuites:
1740             try:
1741                 sys.path.insert(0, os.path.dirname(testsuite))
1742                 spec = importlib.util.spec_from_file_location(os.path.basename(testsuite).replace(".py", ""), testsuite)
1743                 module = importlib.util.module_from_spec(spec)
1744                 spec.loader.exec_module(module)
1745                 return (module, None)
1746             except Exception as e:
1747                 exceptions.append("Could not load %s: %s" % (testsuite, e))
1748                 continue
1749             finally:
1750                 sys.path.remove(os.path.dirname(testsuite))
1751
1752         return (None, exceptions)
1753
1754     def _load_testsuites(self):
1755         testsuites = {}
1756         for testsuite in self.options.testsuites:
1757             if testsuite.endswith('.py') and os.path.exists(testsuite):
1758                 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1759                 loaded_module = self._load_testsuite([testsuite])
1760             else:
1761                 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1762                                              for d in self.options.testsuites_dirs]
1763                 loaded_module = self._load_testsuite(possible_testsuites_paths)
1764
1765             module = loaded_module[0]
1766             if not loaded_module[0]:
1767                 if "." in testsuite:
1768                     self.options.testsuites.append(testsuite.split('.')[0])
1769                     self.info("%s looks like a test name, trying that" %
1770                               testsuite)
1771                     self.options.wanted_tests.append(testsuite)
1772                 else:
1773                     if testsuite in testsuites:
1774                         self.info('Testuite %s was loaded previously', testsuite)
1775                         continue
1776                     printc("Could not load testsuite: %s, reasons: %s" % (
1777                         testsuite, loaded_module[1]), Colors.FAIL)
1778                 continue
1779
1780             if module.__name__ in testsuites:
1781                 self.info("Trying to load testsuite '%s' a second time?", module.__name__)
1782                 continue
1783
1784             testsuites[module.__name__] = module
1785             if not hasattr(module, "TEST_MANAGER"):
1786                 module.TEST_MANAGER = [tester.name for tester in self.testers]
1787             elif not isinstance(module.TEST_MANAGER, list):
1788                 module.TEST_MANAGER = [module.TEST_MANAGER]
1789
1790         self.options.testsuites = list(testsuites.values())
1791
1792     def _setup_testsuites(self):
1793         for testsuite in self.options.testsuites:
1794             loaded = False
1795             wanted_test_manager = None
1796             # TEST_MANAGER has been set in _load_testsuites()
1797             assert hasattr(testsuite, "TEST_MANAGER")
1798             wanted_test_manager = testsuite.TEST_MANAGER
1799             if not isinstance(wanted_test_manager, list):
1800                 wanted_test_manager = [wanted_test_manager]
1801
1802             for tester in self.testers:
1803                 if wanted_test_manager is not None and \
1804                         tester.name not in wanted_test_manager:
1805                     continue
1806
1807                 prev_testsuite_name = TestsManager.loading_testsuite
1808                 if self.options.user_paths:
1809                     TestsManager.loading_testsuite = tester.name
1810                     tester.register_defaults()
1811                     loaded = True
1812                 else:
1813                     TestsManager.loading_testsuite = testsuite.__name__
1814                     if testsuite.setup_tests(tester, self.options):
1815                         loaded = True
1816                 if prev_testsuite_name:
1817                     TestsManager.loading_testsuite = prev_testsuite_name
1818
1819             if not loaded:
1820                 printc("Could not load testsuite: %s"
1821                        " maybe because of missing TestManager"
1822                        % (testsuite), Colors.FAIL)
1823                 return False
1824
1825     def _load_config(self, options):
1826         printc("Loading config files is DEPRECATED"
1827                " you should use the new testsuite format now",)
1828
1829         for tester in self.testers:
1830             tester.options = options
1831             globals()[tester.name] = tester
1832         globals()["options"] = options
1833         c__file__ = __file__
1834         globals()["__file__"] = self.options.config
1835         exec(compile(open(self.options.config).read(),
1836                      self.options.config, 'exec'), globals())
1837         globals()["__file__"] = c__file__
1838
1839     def set_settings(self, options, args):
1840         if options.xunit_file:
1841             self.reporter = reporters.XunitReporter(options)
1842         else:
1843             self.reporter = reporters.Reporter(options)
1844
1845         self.options = options
1846         wanted_testers = None
1847         for tester in self.testers:
1848             if tester.name in args:
1849                 wanted_testers = tester.name
1850
1851         if wanted_testers:
1852             testers = self.testers
1853             self.testers = []
1854             for tester in testers:
1855                 if tester.name in args:
1856                     self.testers.append(tester)
1857                     args.remove(tester.name)
1858
1859         if options.config:
1860             self._load_config(options)
1861
1862         self._load_testsuites()
1863         if not self.options.testsuites:
1864             printc("Not testsuite loaded!", Colors.FAIL)
1865             return False
1866
1867         for tester in self.testers:
1868             tester.set_settings(options, args, self.reporter)
1869
1870         if not options.config and options.testsuites:
1871             if self._setup_testsuites() is False:
1872                 return False
1873
1874         if self.options.check_bugs_status:
1875             printc("-> Checking bugs resolution... ", end='')
1876
1877         for tester in self.testers:
1878             if not tester.check_blacklists():
1879                 return False
1880
1881             tester.log_blacklists()
1882
1883             if not tester.check_expected_issues():
1884                 return False
1885
1886         if self.options.check_bugs_status:
1887             printc("OK", Colors.OKGREEN)
1888
1889         if self.needs_http_server() or options.httponly is True:
1890             self.httpsrv = HTTPServer(options)
1891             self.httpsrv.start()
1892
1893         if options.no_display:
1894             self.vfb_server = get_virual_frame_buffer_server(options)
1895             res = self.vfb_server.start()
1896             if res[0] is False:
1897                 printc("Could not start virtual frame server: %s" % res[1],
1898                        Colors.FAIL)
1899                 return False
1900             os.environ["DISPLAY"] = self.vfb_server.display_id
1901
1902         return True
1903
1904     def _check_tester_has_other_testsuite(self, testsuite, tester):
1905         if tester.name != testsuite.TEST_MANAGER[0]:
1906             return True
1907
1908         for t in self.options.testsuites:
1909             if t != testsuite:
1910                 for other_testmanager in t.TEST_MANAGER:
1911                     if other_testmanager == tester.name:
1912                         return True
1913
1914         return False
1915
1916     def _check_defined_tests(self, tester, tests):
1917         if self.options.blacklisted_tests or self.options.wanted_tests:
1918             return
1919
1920         tests_names = [test.classname for test in tests]
1921         testlist_changed = False
1922         for testsuite in self.options.testsuites:
1923             if not self._check_tester_has_other_testsuite(testsuite, tester) \
1924                     and tester.check_testslist:
1925                 try:
1926                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1927                                          'r+')
1928
1929                     know_tests = testlist_file.read().split("\n")
1930                     testlist_file.close()
1931
1932                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1933                                          'w')
1934                 except IOError:
1935                     continue
1936
1937                 optional_out = []
1938                 for test in know_tests:
1939                     if test and test.strip('~') not in tests_names:
1940                         if not test.startswith('~'):
1941                             testlist_changed = True
1942                             printc("Test %s Not in testsuite %s anymore"
1943                                    % (test, testsuite.__file__), Colors.FAIL)
1944                         else:
1945                             optional_out.append((test, None))
1946
1947                 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1948                                      key=lambda x: x[0].strip('~'))
1949
1950                 for tname, test in tests_names:
1951                     if test and test.optional:
1952                         tname = '~' + tname
1953                     testlist_file.write("%s\n" % (tname))
1954                     if tname and tname not in know_tests:
1955                         printc("Test %s is NEW in testsuite %s"
1956                                % (tname, testsuite.__file__),
1957                                Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1958                         testlist_changed = True
1959
1960                 testlist_file.close()
1961                 break
1962
1963         return testlist_changed
1964
1965     def _split_tests(self, num_groups):
1966         groups = [[] for x in range(num_groups)]
1967         group = cycle(groups)
1968         for test in self.tests:
1969             next(group).append(test)
1970         return groups
1971
1972     def list_tests(self):
1973         for tester in self.testers:
1974             if not self._tester_needed(tester):
1975                 continue
1976
1977             tests = tester.list_tests()
1978             if self._check_defined_tests(tester, tests) and \
1979                     self.options.fail_on_testlist_change:
1980                 raise RuntimeError("Unexpected new test in testsuite.")
1981
1982             self.tests.extend(tests)
1983         self.tests.sort(key=lambda test: test.classname)
1984
1985         if self.options.num_parts < 1:
1986             raise RuntimeError("Tests must be split in positive number of parts.")
1987         if self.options.num_parts > len(self.tests):
1988             raise RuntimeError("Cannot have more parts then there exist tests.")
1989         if self.options.part_index < 1 or self.options.part_index > self.options.num_parts:
1990             raise RuntimeError("Part index is out of range")
1991
1992         self.tests = self._split_tests(self.options.num_parts)[self.options.part_index - 1]
1993         return self.tests
1994
1995     def _tester_needed(self, tester):
1996         for testsuite in self.options.testsuites:
1997             if tester.name in testsuite.TEST_MANAGER:
1998                 return True
1999         return False
2000
2001     def server_wrapper(self, ready):
2002         self.server = GstValidateTCPServer(
2003             ('localhost', 0), GstValidateListener)
2004         self.server.socket.settimeout(None)
2005         self.server.launcher = self
2006         self.serverport = self.server.socket.getsockname()[1]
2007         self.info("%s server port: %s" % (self, self.serverport))
2008         ready.set()
2009
2010         self.server.serve_forever(poll_interval=0.05)
2011
2012     def _start_server(self):
2013         self.info("Starting TCP Server")
2014         ready = threading.Event()
2015         self.server_thread = threading.Thread(target=self.server_wrapper,
2016                                               kwargs={'ready': ready})
2017         self.server_thread.start()
2018         ready.wait()
2019         os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
2020
2021     def _stop_server(self):
2022         if self.server:
2023             self.server.shutdown()
2024             self.server_thread.join()
2025             self.server.server_close()
2026             self.server = None
2027
2028     def test_wait(self):
2029         while True:
2030             # Check process every second for timeout
2031             try:
2032                 self.queue.get(timeout=1)
2033             except queue.Empty:
2034                 pass
2035
2036             for test in self.jobs:
2037                 if test.process_update():
2038                     self.jobs.remove(test)
2039                     return test
2040
2041     def tests_wait(self):
2042         try:
2043             test = self.test_wait()
2044             test.check_results()
2045         except KeyboardInterrupt:
2046             for test in self.jobs:
2047                 test.kill_subprocess()
2048             raise
2049
2050         return test
2051
2052     def start_new_job(self, tests_left):
2053         try:
2054             test = tests_left.pop(0)
2055         except IndexError:
2056             return False
2057
2058         test.test_start(self.queue)
2059
2060         self.jobs.append(test)
2061
2062         return True
2063
2064     def print_result(self, current_test_num, test, retry_on_failure=False):
2065         if test.result != Result.PASSED and not retry_on_failure:
2066             printc(str(test), color=utils.get_color_for_result(test.result))
2067
2068         length = 80
2069         progress = int(length * current_test_num // self.total_num_tests)
2070         bar = 'â–ˆ' * progress + '-' * (length - progress)
2071         if is_tty():
2072             printc('\r|%s| [%s/%s]' % (bar, current_test_num, self.total_num_tests), end='\r')
2073         else:
2074             if progress > self.current_progress:
2075                 self.current_progress = progress
2076                 printc('|%s| [%s/%s]' % (bar, current_test_num, self.total_num_tests))
2077
2078     def _run_tests(self, running_tests=None, all_alone=False, retry_on_failures=False):
2079         if not self.all_tests:
2080             self.all_tests = self.list_tests()
2081
2082         if not running_tests:
2083             running_tests = self.tests
2084
2085         self.reporter.init_timer()
2086         alone_tests = []
2087         tests = []
2088         for test in running_tests:
2089             if test.is_parallel and not all_alone:
2090                 tests.append(test)
2091             else:
2092                 alone_tests.append(test)
2093
2094         # use max to defend against the case where all tests are alone_tests
2095         max_num_jobs = max(min(self.options.num_jobs, len(tests)), 1)
2096         jobs_running = 0
2097
2098         if self.options.forever and len(tests) < self.options.num_jobs and len(tests):
2099             max_num_jobs = self.options.num_jobs
2100             copied = []
2101             i = 0
2102             while (len(tests) + len(copied)) < max_num_jobs:
2103                 copied.append(tests[i].copy(len(copied) + 1))
2104
2105                 i += 1
2106                 if i >= len(tests):
2107                     i = 0
2108             tests += copied
2109             self.tests += copied
2110
2111         self.total_num_tests = len(self.all_tests)
2112         printc("\nRunning %d tests..." % self.total_num_tests, color=Colors.HEADER)
2113         # if order of test execution doesn't matter, shuffle
2114         # the order to optimize cpu usage
2115         if self.options.shuffle:
2116             random.shuffle(tests)
2117             random.shuffle(alone_tests)
2118
2119         current_test_num = 1
2120         to_retry = []
2121         for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
2122             tests_left = list(tests)
2123             for i in range(num_jobs):
2124                 if not self.start_new_job(tests_left):
2125                     break
2126                 jobs_running += 1
2127
2128             while jobs_running != 0:
2129                 test = self.tests_wait()
2130                 jobs_running -= 1
2131                 current_test_num += 1
2132                 res = test.test_end(retry_on_failure=retry_on_failures)
2133                 to_report = True
2134                 if res not in [Result.PASSED, Result.SKIPPED, Result.KNOWN_ERROR]:
2135                     if self.options.forever or self.options.fatal_error:
2136                         self.print_result(current_test_num - 1, test, retry_on_failure=retry_on_failures)
2137                         self.reporter.after_test(test)
2138                         return False
2139
2140                     if retry_on_failures:
2141                         if not self.options.redirect_logs and test.allow_flakiness:
2142                             test.copy_logfiles()
2143                         printc(test)
2144                         to_retry.append(test)
2145
2146                         # Not adding to final report if flakiness is tolerated
2147                         to_report = not test.allow_flakiness
2148                 self.print_result(current_test_num - 1, test, retry_on_failure=retry_on_failures)
2149                 if to_report:
2150                     self.reporter.after_test(test)
2151                 if retry_on_failures:
2152                     test.clean()
2153                 if self.start_new_job(tests_left):
2154                     jobs_running += 1
2155
2156         if to_retry:
2157             printc("--> Rerunning the following tests to see if they are flaky:", Colors.WARNING)
2158             for test in to_retry:
2159                 printc('  * %s' % test.classname)
2160             printc('')
2161             return self._run_tests(to_retry, all_alone=True, retry_on_failures=False)
2162
2163         return True
2164
2165     def clean_tests(self, stop_server=False):
2166         for test in self.tests:
2167             test.clean()
2168         if stop_server:
2169             self._stop_server()
2170
2171     def run_tests(self):
2172         r = 0
2173         try:
2174             self._start_server()
2175             if self.options.forever:
2176                 r = 1
2177                 while True:
2178                     printc("-> Iteration %d" % r, end='\r')
2179
2180                     if not self._run_tests():
2181                         break
2182                     r += 1
2183                     self.clean_tests()
2184                     msg = "-> Iteration %d... %sOK%s" % (r, Colors.OKGREEN, Colors.ENDC)
2185                     printc(msg, end="\r")
2186
2187                 return False
2188             elif self.options.n_runs:
2189                 res = True
2190                 for r in range(self.options.n_runs):
2191                     printc("-> Iteration %d" % r, end='\r')
2192                     if not self._run_tests():
2193                         res = False
2194                         printc("ERROR", Colors.FAIL, end="\r")
2195                     else:
2196                         printc("OK", Colors.OKGREEN, end="\r")
2197                     self.clean_tests()
2198
2199                 return res
2200             else:
2201                 return self._run_tests(retry_on_failures=self.options.retry_on_failures)
2202         finally:
2203             if self.options.forever:
2204                 printc("\n-> Ran %d times" % r)
2205             if self.httpsrv:
2206                 self.httpsrv.stop()
2207             if self.vfb_server:
2208                 self.vfb_server.stop()
2209             self.clean_tests(True)
2210
2211     def final_report(self):
2212         return self.reporter.final_report()
2213
2214     def needs_http_server(self):
2215         for tester in self.testers:
2216             if tester.needs_http_server():
2217                 return True
2218
2219
2220 class NamedDic(object):
2221
2222     def __init__(self, props):
2223         if props:
2224             for name, value in props.items():
2225                 setattr(self, name, value)
2226
2227
2228 class Scenario(object):
2229
2230     def __init__(self, name, props, path=None):
2231         self.name = name
2232         self.path = path
2233
2234         for prop, value in props:
2235             setattr(self, prop.replace("-", "_"), value)
2236
2237     def get_execution_name(self):
2238         if self.path is not None:
2239             return self.path
2240         else:
2241             return self.name
2242
2243     def seeks(self):
2244         if hasattr(self, "seek"):
2245             return bool(self.seek)
2246
2247         return False
2248
2249     def needs_clock_sync(self):
2250         if hasattr(self, "need_clock_sync"):
2251             return bool(self.need_clock_sync)
2252
2253         return False
2254
2255     def needs_live_content(self):
2256         # Scenarios that can only be used on live content
2257         if hasattr(self, "live_content_required"):
2258             return bool(self.live_content_required)
2259         return False
2260
2261     def compatible_with_live_content(self):
2262         # if a live content is required it's implicitly compatible with
2263         # live content
2264         if self.needs_live_content():
2265             return True
2266         if hasattr(self, "live_content_compatible"):
2267             return bool(self.live_content_compatible)
2268         return False
2269
2270     def get_min_media_duration(self):
2271         if hasattr(self, "min_media_duration"):
2272             return float(self.min_media_duration)
2273
2274         return 0
2275
2276     def does_reverse_playback(self):
2277         if hasattr(self, "reverse_playback"):
2278             return bool(self.reverse_playback)
2279
2280         return False
2281
2282     def get_duration(self):
2283         try:
2284             return float(getattr(self, "duration"))
2285         except AttributeError:
2286             return 0
2287
2288     def get_min_tracks(self, track_type):
2289         try:
2290             return int(getattr(self, "min_%s_track" % track_type))
2291         except AttributeError:
2292             return 0
2293
2294     def __repr__(self):
2295         return "<Scenario %s>" % self.name
2296
2297
2298 class ScenarioManager(Loggable):
2299     _instance = None
2300     system_scenarios = []
2301     special_scenarios = {}
2302
2303     FILE_EXTENSION = "scenario"
2304
2305     def __new__(cls, *args, **kwargs):
2306         if not cls._instance:
2307             cls._instance = super(ScenarioManager, cls).__new__(
2308                 cls, *args, **kwargs)
2309             cls._instance.config = None
2310             cls._instance.discovered = False
2311             Loggable.__init__(cls._instance)
2312
2313         return cls._instance
2314
2315     def find_special_scenarios(self, mfile):
2316         scenarios = []
2317         mfile_bname = os.path.basename(mfile)
2318
2319         for f in os.listdir(os.path.dirname(mfile)):
2320             if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
2321                 scenarios.append(os.path.join(os.path.dirname(mfile), f))
2322
2323         if scenarios:
2324             scenarios = self.discover_scenarios(scenarios, mfile)
2325
2326         return scenarios
2327
2328     def discover_scenarios(self, scenario_paths=[], mfile=None):
2329         """
2330         Discover scenarios specified in scenario_paths or the default ones
2331         if nothing specified there
2332         """
2333         scenarios = []
2334         scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
2335         log_path = os.path.join(self.config.logsdir, "scenarios_discovery.log")
2336         logs = open(log_path, 'w')
2337
2338         try:
2339             command = [GstValidateBaseTestManager.COMMAND,
2340                        "--scenarios-defs-output-file", scenario_defs]
2341             command.extend(scenario_paths)
2342             subprocess.check_call(command, stdout=logs, stderr=logs)
2343         except subprocess.CalledProcessError as e:
2344             self.error(e)
2345             self.error('See %s' % log_path)
2346             pass
2347
2348         config = configparser.RawConfigParser()
2349         f = open(scenario_defs)
2350         config.readfp(f)
2351
2352         for section in config.sections():
2353             name = None
2354             if scenario_paths:
2355                 for scenario_path in scenario_paths:
2356                     if section == scenario_path:
2357                         if mfile is None:
2358                             name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2359                             path = scenario_path
2360                         else:
2361                             # The real name of the scenario is:
2362                             # filename.REALNAME.scenario
2363                             name = scenario_path.replace(mfile + ".", "").replace(
2364                                 "." + self.FILE_EXTENSION, "")
2365                             path = scenario_path
2366                         break
2367             else:
2368                 name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2369                 path = None
2370
2371             assert name
2372
2373             props = config.items(section)
2374             scenario = Scenario(name, props, path)
2375             if scenario_paths:
2376                 self.special_scenarios[path] = scenario
2377             scenarios.append(scenario)
2378
2379         if not scenario_paths:
2380             self.discovered = True
2381             self.system_scenarios.extend(scenarios)
2382
2383         return scenarios
2384
2385     def get_scenario(self, name):
2386         if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2387             scenario = self.special_scenarios.get(name)
2388             if scenario:
2389                 return scenario
2390
2391             scenarios = self.discover_scenarios([name])
2392             self.special_scenarios[name] = scenarios
2393
2394             if scenarios:
2395                 return scenarios[0]
2396
2397         if self.discovered is False:
2398             self.discover_scenarios()
2399
2400         if name is None:
2401             return self.system_scenarios
2402
2403         try:
2404             return [scenario for scenario in self.system_scenarios if scenario.name == name][0]
2405         except IndexError:
2406             self.warning("Scenario: %s not found" % name)
2407             return None
2408
2409
2410 class GstValidateBaseTestManager(TestsManager):
2411     scenarios_manager = ScenarioManager()
2412     features_cache = {}
2413
2414     def __init__(self):
2415         super(GstValidateBaseTestManager, self).__init__()
2416         self._scenarios = []
2417         self._encoding_formats = []
2418
2419     @classmethod
2420     def update_commands(cls, extra_paths=None):
2421         for varname, cmd in {'': 'gst-validate',
2422                              'TRANSCODING_': 'gst-validate-transcoding',
2423                              'MEDIA_CHECK_': 'gst-validate-media-check',
2424                              'RTSP_SERVER_': 'gst-validate-rtsp-server',
2425                              'INSPECT_': 'gst-inspect'}.items():
2426             setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2427
2428     @classmethod
2429     def has_feature(cls, featurename):
2430         try:
2431             return cls.features_cache[featurename]
2432         except KeyError:
2433             pass
2434
2435         try:
2436             subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2437             res = True
2438         except subprocess.CalledProcessError:
2439             res = False
2440
2441         cls.features_cache[featurename] = res
2442         return res
2443
2444     def add_scenarios(self, scenarios):
2445         """
2446         @scenarios A list or a unic scenario name(s) to be run on the tests.
2447                     They are just the default scenarios, and then depending on
2448                     the TestsGenerator to be used you can have more fine grained
2449                     control on what to be run on each series of tests.
2450         """
2451         if isinstance(scenarios, list):
2452             self._scenarios.extend(scenarios)
2453         else:
2454             self._scenarios.append(scenarios)
2455
2456         self._scenarios = list(set(self._scenarios))
2457
2458     def set_scenarios(self, scenarios):
2459         """
2460         Override the scenarios
2461         """
2462         self._scenarios = []
2463         self.add_scenarios(scenarios)
2464
2465     def get_scenarios(self):
2466         return self._scenarios
2467
2468     def add_encoding_formats(self, encoding_formats):
2469         """
2470         :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2471                            formats for transcoding test.
2472                            They are just the default encoding formats, and then depending on
2473                            the TestsGenerator to be used you can have more fine grained
2474                            control on what to be run on each series of tests.
2475         """
2476         if isinstance(encoding_formats, list):
2477             self._encoding_formats.extend(encoding_formats)
2478         else:
2479             self._encoding_formats.append(encoding_formats)
2480
2481         self._encoding_formats = list(set(self._encoding_formats))
2482
2483     def get_encoding_formats(self):
2484         return self._encoding_formats
2485
2486
2487 GstValidateBaseTestManager.update_commands()
2488
2489
2490 class MediaDescriptor(Loggable):
2491
2492     def __init__(self):
2493         Loggable.__init__(self)
2494
2495     def get_path(self):
2496         raise NotImplemented
2497
2498     def has_frames(self):
2499         return False
2500
2501     def get_framerate(self):
2502         for ttype, caps_str in self.get_tracks_caps():
2503             if ttype != "video":
2504                 continue
2505
2506             caps = utils.GstCaps.new_from_str(caps_str)
2507             if not caps:
2508                 self.warning("Could not create caps for %s" % caps_str)
2509                 continue
2510
2511             framerate = caps[0].get("framerate")
2512             if framerate:
2513                 return framerate
2514
2515         return Fraction(0, 1)
2516
2517     def get_media_filepath(self):
2518         raise NotImplemented
2519
2520     def skip_parsers(self):
2521         return False
2522
2523     def get_caps(self):
2524         raise NotImplemented
2525
2526     def get_uri(self):
2527         raise NotImplemented
2528
2529     def get_duration(self):
2530         raise NotImplemented
2531
2532     def get_protocol(self):
2533         raise NotImplemented
2534
2535     def is_seekable(self):
2536         raise NotImplemented
2537
2538     def is_live(self):
2539         raise NotImplemented
2540
2541     def is_image(self):
2542         raise NotImplemented
2543
2544     def get_num_tracks(self, track_type):
2545         raise NotImplemented
2546
2547     def get_tracks_caps(self):
2548         return []
2549
2550     def can_play_reverse(self):
2551         raise NotImplemented
2552
2553     def prerrols(self):
2554         return True
2555
2556     def is_compatible(self, scenario):
2557         if scenario is None:
2558             return True
2559
2560         if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2561             self.debug("Do not run %s as %s does not support seeking",
2562                        scenario, self.get_uri())
2563             return False
2564
2565         if self.is_image() and scenario.needs_clock_sync():
2566             self.debug("Do not run %s as %s is an image",
2567                        scenario, self.get_uri())
2568             return False
2569
2570         if not self.can_play_reverse() and scenario.does_reverse_playback():
2571             return False
2572
2573         if not self.is_live() and scenario.needs_live_content():
2574             self.debug("Do not run %s as %s is not a live content",
2575                        scenario, self.get_uri())
2576             return False
2577
2578         if self.is_live() and not scenario.compatible_with_live_content():
2579             self.debug("Do not run %s as %s is a live content",
2580                        scenario, self.get_uri())
2581             return False
2582
2583         if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2584             return False
2585
2586         if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2587             self.debug(
2588                 "Do not run %s as %s is too short (%i < min media duation : %i",
2589                 scenario, self.get_uri(),
2590                 self.get_duration() / GST_SECOND,
2591                 scenario.get_min_media_duration())
2592             return False
2593
2594         for track_type in ['audio', 'subtitle', 'video']:
2595             if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2596                 self.debug("%s -- %s | At least %s %s track needed  < %s"
2597                            % (scenario, self.get_uri(), track_type,
2598                               scenario.get_min_tracks(track_type),
2599                               self.get_num_tracks(track_type)))
2600                 return False
2601
2602         return True
2603
2604
2605 class GstValidateMediaDescriptor(MediaDescriptor):
2606     # Some extension file for discovering results
2607     SKIPPED_MEDIA_INFO_EXT = "media_info.skipped"
2608     MEDIA_INFO_EXT = "media_info"
2609     PUSH_MEDIA_INFO_EXT = "media_info.push"
2610     STREAM_INFO_EXT = "stream_info"
2611
2612     __all_descriptors = {}
2613
2614     @classmethod
2615     def get(cls, xml_path):
2616         if xml_path in cls.__all_descriptors:
2617             return cls.__all_descriptors[xml_path]
2618         return GstValidateMediaDescriptor(xml_path)
2619
2620     def __init__(self, xml_path):
2621         super(GstValidateMediaDescriptor, self).__init__()
2622
2623         self._media_file_path = None
2624         main_descriptor = self.__all_descriptors.get(xml_path)
2625         if main_descriptor:
2626             self._copy_data_from_main(main_descriptor)
2627         else:
2628             self.__all_descriptors[xml_path] = self
2629
2630             self._xml_path = xml_path
2631             try:
2632                 media_xml = ET.parse(xml_path).getroot()
2633             except xml.etree.ElementTree.ParseError:
2634                 printc("Could not parse %s" % xml_path,
2635                     Colors.FAIL)
2636                 raise
2637             self._extract_data(media_xml)
2638
2639         self.set_protocol(urllib.parse.urlparse(self.get_uri()).scheme)
2640
2641     def skip_parsers(self):
2642         return self._skip_parsers
2643
2644     def has_frames(self):
2645         return self._has_frames
2646
2647     def _copy_data_from_main(self, main_descriptor):
2648         for attr in main_descriptor.__dict__.keys():
2649             setattr(self, attr, getattr(main_descriptor, attr))
2650
2651     def _extract_data(self, media_xml):
2652         # Extract the information we need from the xml
2653         self._caps = media_xml.findall("streams")[0].attrib["caps"]
2654         self._track_caps = []
2655         try:
2656             streams = media_xml.findall("streams")[0].findall("stream")
2657         except IndexError:
2658             pass
2659         else:
2660             for stream in streams:
2661                 self._track_caps.append(
2662                     (stream.attrib["type"], stream.attrib["caps"]))
2663
2664         self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2665         self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2666         self._duration = int(media_xml.attrib["duration"])
2667         self._uri = media_xml.attrib["uri"]
2668         parsed_uri = urllib.parse.urlparse(self.get_uri())
2669         self._protocol = media_xml.get("protocol", parsed_uri.scheme)
2670         if parsed_uri.scheme == "file":
2671             if not os.path.exists(parsed_uri.path) and os.path.exists(self.get_media_filepath()):
2672                 self._uri = "file://" + self.get_media_filepath()
2673         elif parsed_uri.scheme == Protocols.IMAGESEQUENCE:
2674             self._media_file_path = os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(parsed_uri.path))
2675             self._uri = parsed_uri._replace(path=os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(self._media_file_path))).geturl()
2676         self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2677         self._is_live = media_xml.get("live", "false").lower() == "true"
2678         self._is_image = False
2679         for stream in media_xml.findall("streams")[0].findall("stream"):
2680             if stream.attrib["type"] == "image":
2681                 self._is_image = True
2682         self._track_types = []
2683         for stream in media_xml.findall("streams")[0].findall("stream"):
2684             self._track_types.append(stream.attrib["type"])
2685
2686     def __cleanup_media_info_ext(self):
2687         for ext in [self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT,
2688                 self.SKIPPED_MEDIA_INFO_EXT, ]:
2689             if self._xml_path.endswith(ext):
2690                 return self._xml_path[:len(self._xml_path) - (len(ext) + 1)]
2691
2692         assert "Not reached" == None  # noqa
2693
2694     @staticmethod
2695     def new_from_uri(uri, verbose=False, include_frames=False, is_push=False, is_skipped=False):
2696         """
2697             include_frames = 0 # Never
2698             include_frames = 1 # always
2699             include_frames = 2 # if previous file included them
2700
2701         """
2702         media_path = utils.url2path(uri)
2703
2704         ext = GstValidateMediaDescriptor.MEDIA_INFO_EXT
2705         if is_push:
2706             ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT
2707         elif is_skipped:
2708             ext = GstValidateMediaDescriptor.SKIPPED_MEDIA_INFO_EXT
2709         descriptor_path = "%s.%s" % (media_path, ext)
2710         args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2711         if include_frames == 2:
2712             try:
2713                 media_xml = ET.parse(descriptor_path).getroot()
2714                 prev_uri = urllib.parse.urlparse(media_xml.attrib['uri'])
2715                 if prev_uri.scheme == Protocols.IMAGESEQUENCE:
2716                     parsed_uri = urllib.parse.urlparse(uri)
2717                     uri = prev_uri._replace(path=os.path.join(os.path.dirname(parsed_uri.path), os.path.basename(prev_uri.path))).geturl()
2718                 include_frames = bool(int(media_xml.attrib["frame-detection"]))
2719                 if bool(int(media_xml.attrib.get("skip-parsers", 0))):
2720                     args.append("--skip-parsers")
2721             except FileNotFoundError:
2722                 pass
2723         else:
2724             include_frames = bool(include_frames)
2725         args.append(uri)
2726
2727         args.extend(["--output-file", descriptor_path])
2728         if include_frames:
2729             args.extend(["--full"])
2730
2731         if verbose:
2732             printc("Generating media info for %s\n"
2733                    "    Command: '%s'" % (media_path, ' '.join(args)),
2734                    Colors.OKBLUE)
2735
2736         try:
2737             subprocess.check_output(args, stderr=open(os.devnull))
2738         except subprocess.CalledProcessError as e:
2739             if verbose:
2740                 printc("Result: Failed", Colors.FAIL)
2741             else:
2742                 loggable.warning("GstValidateMediaDescriptor",
2743                                  "Exception: %s" % e)
2744             return None
2745
2746         if verbose:
2747             printc("Result: Passed", Colors.OKGREEN)
2748
2749         try:
2750             return GstValidateMediaDescriptor(descriptor_path)
2751         except (IOError, xml.etree.ElementTree.ParseError):
2752             return None
2753
2754     def get_path(self):
2755         return self._xml_path
2756
2757     def need_clock_sync(self):
2758         return Protocols.needs_clock_sync(self.get_protocol())
2759
2760     def get_media_filepath(self):
2761         if self._media_file_path is None:
2762             self._media_file_path = self.__cleanup_media_info_ext()
2763         return self._media_file_path
2764
2765     def get_caps(self):
2766         return self._caps
2767
2768     def get_tracks_caps(self):
2769         return self._track_caps
2770
2771     def get_uri(self):
2772         return self._uri
2773
2774     def get_duration(self):
2775         return self._duration
2776
2777     def set_protocol(self, protocol):
2778         if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2779             self._protocol = Protocols.PUSHFILE
2780         else:
2781             self._protocol = protocol
2782
2783     def get_protocol(self):
2784         return self._protocol
2785
2786     def is_seekable(self):
2787         return self._is_seekable
2788
2789     def is_live(self):
2790         return self._is_live
2791
2792     def can_play_reverse(self):
2793         return True
2794
2795     def is_image(self):
2796         return self._is_image
2797
2798     def get_num_tracks(self, track_type):
2799         n = 0
2800         for t in self._track_types:
2801             if t == track_type:
2802                 n += 1
2803
2804         return n
2805
2806     def get_clean_name(self):
2807         name = os.path.basename(self.get_path())
2808         regex = '|'.join(['\\.%s$' % ext for ext in [self.SKIPPED_MEDIA_INFO_EXT, self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT]])
2809         name = re.sub(regex, "", name)
2810
2811         return name.replace('.', "_")
2812
2813
2814 class MediaFormatCombination(object):
2815     FORMATS = {"aac": "audio/mpeg,mpegversion=4",  # Audio
2816                "ac3": "audio/x-ac3",
2817                "vorbis": "audio/x-vorbis",
2818                "mp3": "audio/mpeg,mpegversion=1,layer=3",
2819                "opus": "audio/x-opus",
2820                "rawaudio": "audio/x-raw",
2821
2822                # Video
2823                "h264": "video/x-h264",
2824                "h265": "video/x-h265",
2825                "vp8": "video/x-vp8",
2826                "vp9": "video/x-vp9",
2827                "theora": "video/x-theora",
2828                "prores": "video/x-prores",
2829                "jpeg": "image/jpeg",
2830
2831                # Containers
2832                "webm": "video/webm",
2833                "ogg": "application/ogg",
2834                "mkv": "video/x-matroska",
2835                "mp4": "video/quicktime,variant=iso;",
2836                "quicktime": "video/quicktime;"}
2837
2838     def __str__(self):
2839         return "%s and %s in %s" % (self.audio, self.video, self.container)
2840
2841     def __init__(self, container, audio, video, duration_factor=1,
2842             video_restriction=None, audio_restriction=None):
2843         """
2844         Describes a media format to be used for transcoding tests.
2845
2846         :param container: A string defining the container format to be used, must bin in self.FORMATS
2847         :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2848         :param video: A string defining the video format to be used, must bin in self.FORMATS
2849         """
2850         self.container = container
2851         self.audio = audio
2852         self.video = video
2853         self.video_restriction = video_restriction
2854         self.audio_restriction = audio_restriction
2855
2856     def get_caps(self, track_type):
2857         try:
2858             return self.FORMATS[self.__dict__[track_type]]
2859         except KeyError:
2860             return None
2861
2862     def get_audio_caps(self):
2863         return self.get_caps("audio")
2864
2865     def get_video_caps(self):
2866         return self.get_caps("video")
2867
2868     def get_muxer_caps(self):
2869         return self.get_caps("container")