21d418d302b95987b5958a2d1f3ccc472adc0cde
[platform/upstream/gstreamer.git] / validate / launcher / baseclasses.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19
20 """ Class representing tests and test managers. """
21
22 import json
23 import os
24 import sys
25 import re
26 import copy
27 import shlex
28 import socketserver
29 import struct
30 import time
31 from . import utils
32 import signal
33 import urllib.parse
34 import subprocess
35 import threading
36 import queue
37 import configparser
38 import xml
39 import random
40 import shutil
41 import uuid
42
43 from .utils import which
44 from . import reporters
45 from . import loggable
46 from .loggable import Loggable
47
48 try:
49     from lxml import etree as ET
50 except ImportError:
51     import xml.etree.cElementTree as ET
52
53 from .vfb_server import get_virual_frame_buffer_server
54 from .httpserver import HTTPServer
55 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
56     Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
57     check_bugs_resolution
58
59 # The factor by which we increase the hard timeout when running inside
60 # Valgrind
61 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
62 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
63 # The error reported by valgrind when detecting errors
64 VALGRIND_ERROR_CODE = 20
65
66 VALIDATE_OVERRIDE_EXTENSION = ".override"
67 EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
68     'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
69     'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
70 EXITING_SIGNALS.update({139: "SIGSEGV"})
71
72
73 class Test(Loggable):
74
75     """ A class representing a particular test. """
76
77     def __init__(self, application_name, classname, options,
78                  reporter, duration=0, timeout=DEFAULT_TIMEOUT,
79                  hard_timeout=None, extra_env_variables=None,
80                  expected_failures=None, is_parallel=True,
81                  workdir=None):
82         """
83         @timeout: The timeout during which the value return by get_current_value
84                   keeps being exactly equal
85         @hard_timeout: Max time the test can take in absolute
86         """
87         Loggable.__init__(self)
88         self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
89         if hard_timeout:
90             self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
91             self.hard_timeout *= options.timeout_factor
92         else:
93             self.hard_timeout = hard_timeout
94         self.classname = classname
95         self.options = options
96         self.application = application_name
97         self.command = []
98         self.server_command = None
99         self.reporter = reporter
100         self.process = None
101         self.proc_env = None
102         self.thread = None
103         self.queue = None
104         self.duration = duration
105         self.stack_trace = None
106         self._uuid = None
107         if expected_failures is None:
108             self.expected_failures = []
109         elif not isinstance(expected_failures, list):
110             self.expected_failures = [expected_failures]
111         else:
112             self.expected_failures = expected_failures
113
114         extra_env_variables = extra_env_variables or {}
115         self.extra_env_variables = extra_env_variables
116         self.optional = False
117         self.is_parallel = is_parallel
118         self.generator = None
119         # String representation of the test number in the testsuite
120         self.number = ""
121         self.workdir = workdir
122
123         self.clean()
124
125     def clean(self):
126         self.kill_subprocess()
127         self.message = ""
128         self.error_str = ""
129         self.time_taken = 0.0
130         self._starting_time = None
131         self.result = Result.NOT_RUN
132         self.logfile = None
133         self.out = None
134         self.extra_logfiles = []
135         self.__env_variable = []
136         self.kill_subprocess()
137
138     def __str__(self):
139         string = self.classname
140         if self.result != Result.NOT_RUN:
141             string += ": " + self.result
142             if self.result in [Result.FAILED, Result.TIMEOUT]:
143                 string += " '%s'\n" \
144                           "       You can reproduce with: %s\n" \
145                     % (self.message, self.get_command_repr())
146
147                 if not self.options.redirect_logs and \
148                         self.result == Result.PASSED or \
149                         not self.options.dump_on_failure:
150                     string += self.get_logfile_repr()
151
152         return string
153
154     def add_env_variable(self, variable, value=None):
155         """
156         Only usefull so that the gst-validate-launcher can print the exact
157         right command line to reproduce the tests
158         """
159         if value is None:
160             value = os.environ.get(variable, None)
161
162         if value is None:
163             return
164
165         self.__env_variable.append(variable)
166
167     @property
168     def _env_variable(self):
169         res = ""
170         for var in set(self.__env_variable):
171             if res:
172                 res += " "
173             value = self.proc_env.get(var, None)
174             if value is not None:
175                 res += "%s='%s'" % (var, value)
176
177         return res
178
179     def open_logfile(self):
180         if self.out:
181             return
182
183         path = os.path.join(self.options.logsdir,
184                             self.classname.replace(".", os.sep))
185         mkdir(os.path.dirname(path))
186         self.logfile = path
187
188         if self.options.redirect_logs == 'stdout':
189             self.out = sys.stdout
190         elif self.options.redirect_logs == 'stderr':
191             self.out = sys.stderr
192         else:
193             self.out = open(path, 'w+')
194
195     def close_logfile(self):
196         if not self.options.redirect_logs:
197             self.out.close()
198
199         self.out = None
200
201     def _get_file_content(self, file_name):
202         f = open(file_name, 'r+')
203         value = f.read()
204         f.close()
205
206         return value
207
208     def get_log_content(self):
209         return self._get_file_content(self.logfile)
210
211     def get_extra_log_content(self, extralog):
212         if extralog not in self.extra_logfiles:
213             return ""
214
215         return self._get_file_content(extralog)
216
217     def get_classname(self):
218         name = self.classname.split('.')[-1]
219         classname = self.classname.replace('.%s' % name, '')
220
221         return classname
222
223     def get_name(self):
224         return self.classname.split('.')[-1]
225
226     def get_uuid(self):
227         if self._uuid is None:
228             self._uuid = self.classname + str(uuid.uuid4())
229         return self._uuid
230
231     def add_arguments(self, *args):
232         self.command += args
233
234     def build_arguments(self):
235         self.add_env_variable("LD_PRELOAD")
236         self.add_env_variable("DISPLAY")
237
238     def add_stack_trace_to_logfile(self):
239         self.debug("Adding stack trace")
240         trace_gatherer = BackTraceGenerator.get_default()
241         stack_trace = trace_gatherer.get_trace(self)
242
243         if not stack_trace:
244             return
245
246         info = "\n\n== Stack trace: == \n%s" % stack_trace
247         if self.options.redirect_logs:
248             print(info)
249             return
250
251         if self.options.xunit_file:
252             self.stack_trace = stack_trace
253
254         with open(self.logfile, 'a') as f:
255             f.write(info)
256
257     def set_result(self, result, message="", error=""):
258         self.debug("Setting result: %s (message: %s, error: %s)" % (result,
259                                                                     message, error))
260
261         if result is Result.TIMEOUT:
262             if self.options.debug is True:
263                 if self.options.gdb:
264                     printc("Timeout, you should process <ctrl>c to get into gdb",
265                            Colors.FAIL)
266                     # and wait here until gdb exits
267                     self.process.communicate()
268                 else:
269                     pname = self.command[0]
270                     input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
271                           "Press enter to continue" % (Colors.FAIL, pname, self.process.pid,
272                                                        Colors.ENDC))
273             else:
274                 self.add_stack_trace_to_logfile()
275
276         self.result = result
277         self.message = message
278         self.error_str = error
279
280     def check_results(self):
281         if self.result is Result.FAILED or self.result is Result.TIMEOUT:
282             return
283
284         self.debug("%s returncode: %s", self, self.process.returncode)
285         if self.process.returncode == 0:
286             self.set_result(Result.PASSED)
287         elif self.process.returncode in EXITING_SIGNALS:
288             self.add_stack_trace_to_logfile()
289             self.set_result(Result.FAILED,
290                             "Application exited with signal %s" % (
291                                 EXITING_SIGNALS[self.process.returncode]))
292         elif self.process.returncode == VALGRIND_ERROR_CODE:
293             self.set_result(Result.FAILED, "Valgrind reported errors")
294         else:
295             self.set_result(Result.FAILED,
296                             "Application returned %d" % (self.process.returncode))
297
298     def get_current_value(self):
299         """
300         Lets subclasses implement a nicer timeout measurement method
301         They should return some value with which we will compare
302         the previous and timeout if they are egual during self.timeout
303         seconds
304         """
305         return Result.NOT_RUN
306
307     def process_update(self):
308         """
309         Returns True when process has finished running or has timed out.
310         """
311
312         if self.process is None:
313             # Process has not started running yet
314             return False
315
316         self.process.poll()
317         if self.process.returncode is not None:
318             return True
319
320         val = self.get_current_value()
321
322         self.debug("Got value: %s" % val)
323         if val is Result.NOT_RUN:
324             # The get_current_value logic is not implemented... dumb
325             # timeout
326             if time.time() - self.last_change_ts > self.timeout:
327                 self.set_result(Result.TIMEOUT,
328                                 "Application timed out: %s secs" %
329                                 self.timeout,
330                                 "timeout")
331                 return True
332             return False
333         elif val is Result.FAILED:
334             return True
335         elif val is Result.KNOWN_ERROR:
336             return True
337
338         self.log("New val %s" % val)
339
340         if val == self.last_val:
341             delta = time.time() - self.last_change_ts
342             self.debug("%s: Same value for %d/%d seconds" %
343                        (self, delta, self.timeout))
344             if delta > self.timeout:
345                 self.set_result(Result.TIMEOUT,
346                                 "Application timed out: %s secs" %
347                                 self.timeout,
348                                 "timeout")
349                 return True
350         elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
351             self.set_result(
352                 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
353             return True
354         else:
355             self.last_change_ts = time.time()
356             self.last_val = val
357
358         return False
359
360     def get_subproc_env(self):
361         return os.environ.copy()
362
363     def kill_subprocess(self):
364         utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT)
365
366     def run_external_checks(self):
367         pass
368
369     def thread_wrapper(self):
370         def enable_sigint():
371             # Restore the SIGINT handler for the child process (gdb) to ensure
372             # it can handle it.
373             signal.signal(signal.SIGINT, signal.SIG_DFL)
374
375         if self.options.gdb and os.name != "nt":
376             preexec_fn = enable_sigint
377         else:
378             preexec_fn = None
379
380         self.process = subprocess.Popen(self.command,
381                                         stderr=self.out,
382                                         stdout=self.out,
383                                         env=self.proc_env,
384                                         cwd=self.workdir,
385                                         preexec_fn=preexec_fn)
386         self.process.wait()
387         if self.result is not Result.TIMEOUT:
388             if self.process.returncode == 0:
389                 self.run_external_checks()
390             self.queue.put(None)
391
392     def get_valgrind_suppression_file(self, subdir, name):
393         p = get_data_file(subdir, name)
394         if p:
395             return p
396
397         self.error("Could not find any %s file" % name)
398
399     def get_valgrind_suppressions(self):
400         return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
401
402     def use_gdb(self, command):
403         if self.hard_timeout is not None:
404             self.hard_timeout *= GDB_TIMEOUT_FACTOR
405         self.timeout *= GDB_TIMEOUT_FACTOR
406
407         if not self.options.gdb_non_stop:
408             self.timeout = sys.maxsize
409             self.hard_timeout = sys.maxsize
410
411         args = ["gdb"]
412         if self.options.gdb_non_stop:
413             args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
414         args += ["--args"] + command
415         return args
416
417     def use_valgrind(self, command, subenv):
418         vglogsfile = self.logfile + '.valgrind'
419         self.extra_logfiles.append(vglogsfile)
420
421         vg_args = []
422
423         for o, v in [('trace-children', 'yes'),
424                      ('tool', 'memcheck'),
425                      ('leak-check', 'full'),
426                      ('leak-resolution', 'high'),
427                      # TODO: errors-for-leak-kinds should be set to all instead of definite
428                      # and all false positives should be added to suppression
429                      # files.
430                      ('errors-for-leak-kinds', 'definite'),
431                      ('num-callers', '20'),
432                      ('error-exitcode', str(VALGRIND_ERROR_CODE)),
433                      ('gen-suppressions', 'all')]:
434             vg_args.append("--%s=%s" % (o, v))
435
436         if not self.options.redirect_logs:
437             vglogsfile = self.logfile + '.valgrind'
438             self.extra_logfiles.append(vglogsfile)
439             vg_args.append("--%s=%s" % ('log-file', vglogsfile))
440
441         for supp in self.get_valgrind_suppressions():
442             vg_args.append("--suppressions=%s" % supp)
443
444         command = ["valgrind"] + vg_args + command
445
446         # Tune GLib's memory allocator to be more valgrind friendly
447         subenv['G_DEBUG'] = 'gc-friendly'
448         subenv['G_SLICE'] = 'always-malloc'
449
450         if self.hard_timeout is not None:
451             self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
452         self.timeout *= VALGRIND_TIMEOUT_FACTOR
453
454         # Enable 'valgrind.config'
455         self.add_validate_config(get_data_file(
456             'data', 'valgrind.config'), subenv)
457         if subenv == self.proc_env:
458             self.add_env_variable('G_DEBUG', 'gc-friendly')
459             self.add_env_variable('G_SLICE', 'always-malloc')
460             self.add_env_variable('GST_VALIDATE_CONFIG',
461                                   self.proc_env['GST_VALIDATE_CONFIG'])
462
463         return command
464
465     def add_validate_config(self, config, subenv=None):
466         if not subenv:
467             subenv = self.extra_env_variables
468
469         if "GST_VALIDATE_CONFIG" in subenv:
470             subenv['GST_VALIDATE_CONFIG'] = '%s%s%s' % (
471                 subenv['GST_VALIDATE_CONFIG'], os.pathsep, config)
472         else:
473             subenv['GST_VALIDATE_CONFIG'] = config
474
475     def launch_server(self):
476         return None
477
478     def get_logfile_repr(self):
479         message = "    Logs:\n"
480         logfiles = self.extra_logfiles.copy()
481
482         if not self.options.redirect_logs:
483             logfiles.insert(0, self.logfile)
484
485         for log in logfiles:
486             message += "         - %s\n" % log
487
488         return message
489
490     def get_command_repr(self):
491         message = "%s %s" % (self._env_variable, ' '.join(
492             shlex.quote(arg) for arg in self.command))
493         if self.server_command:
494             message = "%s & %s" % (self.server_command, message)
495
496         return message
497
498     def test_start(self, queue):
499         self.open_logfile()
500
501         self.server_command = self.launch_server()
502         self.queue = queue
503         self.command = [self.application]
504         self._starting_time = time.time()
505         self.build_arguments()
506         self.proc_env = self.get_subproc_env()
507
508         for var, value in list(self.extra_env_variables.items()):
509             value = self.proc_env.get(var, '') + os.pathsep + value
510             self.proc_env[var] = value.strip(os.pathsep)
511             self.add_env_variable(var, self.proc_env[var])
512
513         if self.options.gdb:
514             self.command = self.use_gdb(self.command)
515
516             self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
517             # Make the gst-validate executable ignore SIGINT while gdb is
518             # running.
519             signal.signal(signal.SIGINT, signal.SIG_IGN)
520
521         if self.options.valgrind:
522             self.command = self.use_valgrind(self.command, self.proc_env)
523
524         if not self.options.redirect_logs:
525             self.out.write("=================\n"
526                            "Test name: %s\n"
527                            "Command: '%s'\n"
528                            "=================\n\n"
529                            % (self.classname, ' '.join(self.command)))
530             self.out.flush()
531         else:
532             message = "Launching: %s%s\n" \
533                 "    Command: %s\n" % (Colors.ENDC, self.classname,
534                                        self.get_command_repr())
535             printc(message, Colors.OKBLUE)
536
537         self.thread = threading.Thread(target=self.thread_wrapper)
538         self.thread.start()
539
540         self.last_val = 0
541         self.last_change_ts = time.time()
542         self.start_ts = time.time()
543
544     def _dump_log_file(self, logfile):
545         message = "Dumping contents of %s\n" % logfile
546         printc(message, Colors.FAIL)
547
548         with open(logfile, 'r') as fin:
549             print(fin.read())
550
551     def _dump_log_files(self):
552         printc("Dumping log files on failure\n", Colors.FAIL)
553         self._dump_log_file(self.logfile)
554         for logfile in self.extra_logfiles:
555             self._dump_log_file(logfile)
556
557     def test_end(self):
558         self.kill_subprocess()
559         self.thread.join()
560         self.time_taken = time.time() - self._starting_time
561
562         if self.options.gdb:
563             signal.signal(signal.SIGINT, self.previous_sigint_handler)
564
565         if self.result != Result.PASSED:
566             message = str(self)
567             end = "\n"
568         else:
569             message = "%s %s: %s%s" % (self.number, self.classname, self.result,
570                                        " (" + self.message + ")" if self.message else "")
571             end = "\r"
572             if sys.stdout.isatty():
573                 term_width = shutil.get_terminal_size((80, 20))[0]
574                 if len(message) > term_width:
575                     message = message[0:term_width - 2] + '…'
576             else:
577                 message = None
578
579         if message is not None:
580             printc(message, color=utils.get_color_for_result(self.result), end=end)
581         self.close_logfile()
582
583         if self.options.dump_on_failure:
584             if self.result is not Result.PASSED:
585                 self._dump_log_files()
586
587         # Only keep around env variables we need later
588         clean_env = {}
589         for n in self.__env_variable:
590             clean_env[n] = self.proc_env.get(n, None)
591         self.proc_env = clean_env
592
593         # Don't keep around JSON report objects, they were processed
594         # in check_results already
595         self.reports = []
596
597         return self.result
598
599
600 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
601     pass
602
603
604 class GstValidateListener(socketserver.BaseRequestHandler):
605
606     def handle(self):
607         """Implements BaseRequestHandler handle method"""
608         test = None
609         while True:
610             raw_len = self.request.recv(4)
611             if raw_len == b'':
612                 return
613             msglen = struct.unpack('>I', raw_len)[0]
614             msg = self.request.recv(msglen).decode()
615             if msg == '':
616                 return
617
618             obj = json.loads(msg)
619
620             if test is None:
621                 # First message must contain the uuid
622                 uuid = obj.get("uuid", None)
623                 if uuid is None:
624                     return
625                 # Find test from launcher
626                 for t in self.server.launcher.tests:
627                     if uuid == t.get_uuid():
628                         test = t
629                         break
630                 if test is None:
631                     self.server.launcher.error(
632                         "Could not find test for UUID %s" % uuid)
633                     return
634
635             obj_type = obj.get("type", '')
636             if obj_type == 'position':
637                 test.set_position(obj['position'], obj['duration'],
638                                   obj['speed'])
639             elif obj_type == 'buffering':
640                 test.set_position(obj['position'], 100)
641             elif obj_type == 'action':
642                 test.add_action_execution(obj)
643                 # Make sure that action is taken into account when checking if process
644                 # is updating
645                 test.position += 1
646             elif obj_type == 'action-done':
647                 # Make sure that action end is taken into account when checking if process
648                 # is updating
649                 test.position += 1
650                 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
651             elif obj_type == 'report':
652                 test.add_report(obj)
653
654
655 class GstValidateTest(Test):
656
657     """ A class representing a particular test. """
658     HARD_TIMEOUT_FACTOR = 5
659     fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
660
661     def __init__(self, application_name, classname,
662                  options, reporter, duration=0,
663                  timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
664                  media_descriptor=None, extra_env_variables=None,
665                  expected_failures=None, workdir=None):
666
667         extra_env_variables = extra_env_variables or {}
668
669         if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
670             if timeout:
671                 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
672             elif duration:
673                 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
674             else:
675                 hard_timeout = None
676
677         # If we are running from source, use the -debug version of the
678         # application which is using rpath instead of libtool's wrappers. It's
679         # slightly faster to start and will not confuse valgrind.
680         debug = '%s-debug' % application_name
681         p = look_for_file_in_source_dir('tools', debug)
682         if p:
683             application_name = p
684
685         self.reports = []
686         self.position = -1
687         self.media_duration = -1
688         self.speed = 1.0
689         self.actions_infos = []
690         self.media_descriptor = media_descriptor
691         self.server = None
692
693         override_path = self.get_override_file(media_descriptor)
694         if override_path:
695             if extra_env_variables:
696                 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
697                     extra_env_variables[
698                         "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
699
700             extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
701
702         super(GstValidateTest, self).__init__(application_name, classname,
703                                               options, reporter,
704                                               duration=duration,
705                                               timeout=timeout,
706                                               hard_timeout=hard_timeout,
707                                               extra_env_variables=extra_env_variables,
708                                               expected_failures=expected_failures,
709                                               workdir=workdir)
710
711         # defines how much the process can be outside of the configured
712         # segment / seek
713         self._sent_eos_time = None
714
715         if scenario is None or scenario.name.lower() == "none":
716             self.scenario = None
717         else:
718             self.scenario = scenario
719
720     def kill_subprocess(self):
721         Test.kill_subprocess(self)
722
723     def add_report(self, report):
724         self.reports.append(report)
725
726     def set_position(self, position, duration, speed=None):
727         self.position = position
728         self.media_duration = duration
729         if speed:
730             self.speed = speed
731
732     def add_action_execution(self, action_infos):
733         if action_infos['action-type'] == 'eos':
734             self._sent_eos_time = time.time()
735         self.actions_infos.append(action_infos)
736
737     def get_override_file(self, media_descriptor):
738         if media_descriptor:
739             if media_descriptor.get_path():
740                 override_path = os.path.splitext(media_descriptor.get_path())[
741                     0] + VALIDATE_OVERRIDE_EXTENSION
742                 if os.path.exists(override_path):
743                     return override_path
744
745         return None
746
747     def get_current_position(self):
748         return self.position
749
750     def get_current_value(self):
751         if self.scenario:
752             if self._sent_eos_time is not None:
753                 t = time.time()
754                 if ((t - self._sent_eos_time)) > 30:
755                     if self.media_descriptor is not None and self.media_descriptor.get_protocol() == Protocols.HLS:
756                         self.set_result(Result.PASSED,
757                                         """Got no EOS 30 seconds after sending EOS,
758                                         in HLS known and tolerated issue:
759                                         https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
760                         return Result.KNOWN_ERROR
761
762                     self.set_result(
763                         Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
764
765                     return Result.FAILED
766
767         return self.position
768
769     def get_subproc_env(self):
770         subproc_env = os.environ.copy()
771
772         subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
773
774         if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
775             gstlogsfile = self.logfile + '.gstdebug'
776             self.extra_logfiles.append(gstlogsfile)
777             subproc_env["GST_DEBUG_FILE"] = gstlogsfile
778
779         if self.options.no_color:
780             subproc_env["GST_DEBUG_NO_COLOR"] = '1'
781
782         # Ensure XInitThreads is called, see bgo#731525
783         subproc_env['GST_GL_XINITTHREADS'] = '1'
784         self.add_env_variable('GST_GL_XINITTHREADS', '1')
785
786         if self.scenario is not None:
787             scenario = self.scenario.get_execution_name()
788             subproc_env["GST_VALIDATE_SCENARIO"] = scenario
789             self.add_env_variable("GST_VALIDATE_SCENARIO",
790                                   subproc_env["GST_VALIDATE_SCENARIO"])
791         else:
792             try:
793                 del subproc_env["GST_VALIDATE_SCENARIO"]
794             except KeyError:
795                 pass
796
797         return subproc_env
798
799     def clean(self):
800         Test.clean(self)
801         self._sent_eos_time = None
802         self.reports = []
803         self.position = -1
804         self.media_duration = -1
805         self.speed = 1.0
806         self.actions_infos = []
807
808     def build_arguments(self):
809         super(GstValidateTest, self).build_arguments()
810         if "GST_VALIDATE" in os.environ:
811             self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
812
813         if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
814             self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
815                                   os.environ["GST_VALIDATE_SCENARIOS_PATH"])
816
817         self.add_env_variable("GST_VALIDATE_CONFIG")
818         self.add_env_variable("GST_VALIDATE_OVERRIDE")
819
820     def get_extra_log_content(self, extralog):
821         value = Test.get_extra_log_content(self, extralog)
822
823         return value
824
825     def report_matches_expected_failure(self, report, expected_failure):
826         for key in ['bug', 'bugs', 'sometimes']:
827             if key in expected_failure:
828                 del expected_failure[key]
829         for key, value in list(report.items()):
830             if key in expected_failure:
831                 if not re.findall(expected_failure[key], str(value)):
832                     return False
833                 expected_failure.pop(key)
834
835         return not bool(expected_failure)
836
837     def check_reported_issues(self):
838         ret = []
839         expected_failures = copy.deepcopy(self.expected_failures)
840         expected_retcode = [0]
841         for report in self.reports:
842             found = None
843             for expected_failure in expected_failures:
844                 if self.report_matches_expected_failure(report,
845                                                         expected_failure.copy()):
846                     found = expected_failure
847                     break
848
849             if found is not None:
850                 expected_failures.remove(found)
851                 if report['level'] == 'critical':
852                     if found.get('sometimes') and isinstance(expected_retcode, list):
853                         expected_retcode.append(18)
854                     else:
855                         expected_retcode = [18]
856             elif report['level'] == 'critical':
857                 ret.append(report['summary'])
858
859         if not ret:
860             return None, expected_failures, expected_retcode
861
862         return ret, expected_failures, expected_retcode
863
864     def check_expected_timeout(self, expected_timeout):
865         msg = "Expected timeout happened. "
866         result = Result.PASSED
867         message = expected_timeout.get('message')
868         if message:
869             if not re.findall(message, self.message):
870                 result = Result.FAILED
871                 msg = "Expected timeout message: %s got %s " % (
872                     message, self.message)
873
874         expected_symbols = expected_timeout.get('stacktrace_symbols')
875         if expected_symbols:
876             trace_gatherer = BackTraceGenerator.get_default()
877             stack_trace = trace_gatherer.get_trace(self)
878
879             if stack_trace:
880                 if not isinstance(expected_symbols, list):
881                     expected_symbols = [expected_symbols]
882
883                 not_found_symbols = [s for s in expected_symbols
884                                      if s not in stack_trace]
885                 if not_found_symbols:
886                     result = Result.TIMEOUT
887                     msg = "Expected symbols '%s' not found in stack trace " % (
888                         not_found_symbols)
889             else:
890                 msg += "No stack trace available, could not verify symbols "
891
892         return result, msg
893
894     def check_results(self):
895         if self.result in [Result.FAILED, self.result is Result.PASSED]:
896             return
897
898         for report in self.reports:
899             if report.get('issue-id') == 'runtime::missing-plugin':
900                 self.set_result(Result.SKIPPED, "%s\n%s" % (report['summary'],
901                                                             report['details']))
902                 return
903
904         self.debug("%s returncode: %s", self, self.process.returncode)
905
906         criticals, not_found_expected_failures, expected_returncode = self.check_reported_issues()
907
908         expected_timeout = None
909         for i, f in enumerate(not_found_expected_failures):
910             if len(f) == 1 and f.get("returncode"):
911                 returncode = f['returncode']
912                 if not isinstance(expected_returncode, list):
913                     returncode = [expected_returncode]
914                 if 'sometimes' in f:
915                     returncode.append(0)
916             elif f.get("timeout"):
917                 expected_timeout = f
918
919         not_found_expected_failures = [f for f in not_found_expected_failures
920                                        if not f.get('returncode')]
921
922         msg = ""
923         result = Result.PASSED
924         if self.result == Result.TIMEOUT:
925             with open(self.logfile) as f:
926                 signal_fault_info = self.fault_sig_regex.findall(f.read())
927                 if signal_fault_info:
928                     result = Result.FAILED
929                     msg = signal_fault_info[0]
930                 elif expected_timeout:
931                     not_found_expected_failures.remove(expected_timeout)
932                     result, msg = self.check_expected_timeout(expected_timeout)
933                 else:
934                     return
935         elif self.process.returncode in EXITING_SIGNALS:
936             result = Result.FAILED
937             msg = "Application exited with signal %s" % (
938                 EXITING_SIGNALS[self.process.returncode]
939             )
940             self.add_stack_trace_to_logfile()
941         elif self.process.returncode == VALGRIND_ERROR_CODE:
942             msg = "Valgrind reported errors "
943             result = Result.FAILED
944         elif self.process.returncode not in expected_returncode:
945             msg = "Application returned %s " % self.process.returncode
946             if expected_returncode != [0]:
947                 msg += "(expected %s) " % expected_returncode
948             result = Result.FAILED
949
950         if criticals:
951             msg += "(critical errors: [%s]) " % ', '.join(criticals)
952             result = Result.FAILED
953
954         if not_found_expected_failures:
955             mandatory_failures = [f for f in not_found_expected_failures
956                                   if not f.get('sometimes')]
957
958             if mandatory_failures:
959                 msg += "(Expected errors not found: %s) " % mandatory_failures
960                 result = Result.FAILED
961         elif self.expected_failures:
962             msg += '%s(Expected errors occured: %s)%s' % (Colors.OKBLUE,
963                                                           self.expected_failures,
964                                                           Colors.ENDC)
965
966         self.set_result(result, msg.strip())
967
968     def get_valgrind_suppressions(self):
969         result = super(GstValidateTest, self).get_valgrind_suppressions()
970         gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
971         if gst_sup:
972             result.append(gst_sup)
973         return result
974
975
976 class GstValidateEncodingTestInterface(object):
977     DURATION_TOLERANCE = GST_SECOND / 4
978
979     def __init__(self, combination, media_descriptor, duration_tolerance=None):
980         super(GstValidateEncodingTestInterface, self).__init__()
981
982         self.media_descriptor = media_descriptor
983         self.combination = combination
984         self.dest_file = ""
985
986         self._duration_tolerance = duration_tolerance
987         if duration_tolerance is None:
988             self._duration_tolerance = self.DURATION_TOLERANCE
989
990     def get_current_size(self):
991         try:
992             size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
993         except OSError:
994             return None
995
996         self.debug("Size: %s" % size)
997         return size
998
999     def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1000                           audio_restriction=None, audio_presence=0,
1001                           video_presence=0):
1002         ret = ""
1003         if muxer:
1004             ret += muxer
1005         ret += ":"
1006         if venc:
1007             if video_restriction is not None:
1008                 ret = ret + video_restriction + '->'
1009             ret += venc
1010             if video_presence:
1011                 ret = ret + '|' + str(video_presence)
1012         if aenc:
1013             ret += ":"
1014             if audio_restriction is not None:
1015                 ret = ret + audio_restriction + '->'
1016             ret += aenc
1017             if audio_presence:
1018                 ret = ret + '|' + str(audio_presence)
1019
1020         return ret.replace("::", ":")
1021
1022     def get_profile(self, video_restriction=None, audio_restriction=None):
1023         vcaps = self.combination.get_video_caps()
1024         acaps = self.combination.get_audio_caps()
1025         if self.media_descriptor is not None:
1026             if self.media_descriptor.get_num_tracks("video") == 0:
1027                 vcaps = None
1028
1029             if self.media_descriptor.get_num_tracks("audio") == 0:
1030                 acaps = None
1031
1032         return self._get_profile_full(self.combination.get_muxer_caps(),
1033                                       vcaps, acaps,
1034                                       video_restriction=video_restriction,
1035                                       audio_restriction=audio_restriction)
1036
1037     def _clean_caps(self, caps):
1038         """
1039         Returns a list of key=value or structure name, without "(types)" or ";" or ","
1040         """
1041         return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1042
1043     # pylint: disable=E1101
1044     def _has_caps_type_variant(self, c, ccaps):
1045         """
1046         Handle situations where we can have application/ogg or video/ogg or
1047         audio/ogg
1048         """
1049         has_variant = False
1050         media_type = re.findall("application/|video/|audio/", c)
1051         if media_type:
1052             media_type = media_type[0].replace('/', '')
1053             possible_mtypes = ["application", "video", "audio"]
1054             possible_mtypes.remove(media_type)
1055             for tmptype in possible_mtypes:
1056                 possible_c_variant = c.replace(media_type, tmptype)
1057                 if possible_c_variant in ccaps:
1058                     self.info(
1059                         "Found %s in %s, good enough!", possible_c_variant, ccaps)
1060                     has_variant = True
1061
1062         return has_variant
1063
1064     # pylint: disable=E1101
1065     def run_iqa_test(self, reference_file_uri):
1066         """
1067         Runs IQA test if @reference_file_path exists
1068         @test: The test to run tests on
1069         """
1070         if not GstValidateBaseTestManager.has_feature('iqa'):
1071             self.debug('Iqa element not present, not running extra test.')
1072             return
1073
1074         pipeline_desc = """
1075             uridecodebin uri=%s !
1076                 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1077             uridecodebin uri=%s ! iqa.
1078         """ % (reference_file_uri, self.dest_file)
1079         pipeline_desc = pipeline_desc.replace("\n", "")
1080
1081         command = [GstValidateBaseTestManager.COMMAND] + \
1082             shlex.split(pipeline_desc)
1083         if not self.options.redirect_logs:
1084             self.out.write(
1085                 "=================\n"
1086                 "Running IQA tests on results of: %s\n"
1087                 "Command: '%s'\n"
1088                 "=================\n\n" % (
1089                     self.classname, ' '.join(command)))
1090             self.out.flush()
1091         else:
1092             message = "Running IQA tests on results of:%s %s\n" \
1093                 "    Command: %s\n" % (
1094                     Colors.ENDC, self.classname, ' '.join(command))
1095             printc(message, Colors.OKBLUE)
1096
1097         self.process = subprocess.Popen(command,
1098                                         stderr=self.out,
1099                                         stdout=self.out,
1100                                         env=self.proc_env,
1101                                         cwd=self.workdir)
1102         self.process.wait()
1103
1104     def check_encoded_file(self):
1105         result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1106             self.dest_file)
1107         if result_descriptor is None:
1108             return (Result.FAILED, "Could not discover encoded file %s"
1109                     % self.dest_file)
1110
1111         duration = result_descriptor.get_duration()
1112         orig_duration = self.media_descriptor.get_duration()
1113         tolerance = self._duration_tolerance
1114
1115         if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1116             os.remove(result_descriptor.get_path())
1117             return (Result.FAILED, "Duration of encoded file is "
1118                     " wrong (%s instead of %s)" %
1119                     (utils.TIME_ARGS(duration),
1120                      utils.TIME_ARGS(orig_duration)))
1121         else:
1122             all_tracks_caps = result_descriptor.get_tracks_caps()
1123             container_caps = result_descriptor.get_caps()
1124             if container_caps:
1125                 all_tracks_caps.insert(0, ("container", container_caps))
1126
1127             for track_type, caps in all_tracks_caps:
1128                 ccaps = self._clean_caps(caps)
1129                 wanted_caps = self.combination.get_caps(track_type)
1130                 cwanted_caps = self._clean_caps(wanted_caps)
1131
1132                 if wanted_caps is None:
1133                     os.remove(result_descriptor.get_path())
1134                     return (Result.FAILED,
1135                             "Found a track of type %s in the encoded files"
1136                             " but none where wanted in the encoded profile: %s"
1137                             % (track_type, self.combination))
1138
1139                 for c in cwanted_caps:
1140                     if c not in ccaps:
1141                         if not self._has_caps_type_variant(c, ccaps):
1142                             os.remove(result_descriptor.get_path())
1143                             return (Result.FAILED,
1144                                     "Field: %s  (from %s) not in caps of the outputed file %s"
1145                                     % (wanted_caps, c, ccaps))
1146
1147             os.remove(result_descriptor.get_path())
1148             return (Result.PASSED, "")
1149
1150
1151 class TestsManager(Loggable):
1152
1153     """ A class responsible for managing tests. """
1154
1155     name = "base"
1156     loading_testsuite = None
1157
1158     def __init__(self):
1159
1160         Loggable.__init__(self)
1161
1162         self.tests = []
1163         self.unwanted_tests = []
1164         self.options = None
1165         self.args = None
1166         self.reporter = None
1167         self.wanted_tests_patterns = []
1168         self.blacklisted_tests_patterns = []
1169         self._generators = []
1170         self.check_testslist = True
1171         self.all_tests = None
1172         self.expected_failures = {}
1173         self.blacklisted_tests = []
1174
1175     def init(self):
1176         return True
1177
1178     def list_tests(self):
1179         return sorted(list(self.tests), key=lambda x: x.classname)
1180
1181     def find_tests(self, classname):
1182         regex = re.compile(classname)
1183         return [test for test in self.list_tests() if regex.findall(test.classname)]
1184
1185     def add_expected_issues(self, expected_failures):
1186         expected_failures_re = {}
1187         for test_name_regex, failures in list(expected_failures.items()):
1188             regex = re.compile(test_name_regex)
1189             expected_failures_re[regex] = failures
1190             for test in self.tests:
1191                 if regex.findall(test.classname):
1192                     test.expected_failures.extend(failures)
1193
1194         self.expected_failures.update(expected_failures_re)
1195
1196     def add_test(self, test):
1197         if test.generator is None:
1198             test.classname = self.loading_testsuite + '.' + test.classname
1199         for regex, failures in list(self.expected_failures.items()):
1200             if regex.findall(test.classname):
1201                 test.expected_failures.extend(failures)
1202
1203         if self._is_test_wanted(test):
1204             if test not in self.tests:
1205                 self.tests.append(test)
1206                 self.tests.sort(key=lambda test: test.classname)
1207         else:
1208             if test not in self.tests:
1209                 self.unwanted_tests.append(test)
1210                 self.unwanted_tests.sort(key=lambda test: test.classname)
1211
1212     def get_tests(self):
1213         return self.tests
1214
1215     def populate_testsuite(self):
1216         pass
1217
1218     def add_generators(self, generators):
1219         """
1220         @generators: A list of, or one single #TestsGenerator to be used to generate tests
1221         """
1222         if not isinstance(generators, list):
1223             generators = [generators]
1224         self._generators.extend(generators)
1225         for generator in generators:
1226             generator.testsuite = self.loading_testsuite
1227
1228         self._generators = list(set(self._generators))
1229
1230     def get_generators(self):
1231         return self._generators
1232
1233     def _add_blacklist(self, blacklisted_tests):
1234         if not isinstance(blacklisted_tests, list):
1235             blacklisted_tests = [blacklisted_tests]
1236
1237         for patterns in blacklisted_tests:
1238             for pattern in patterns.split(","):
1239                 self.blacklisted_tests_patterns.append(re.compile(pattern))
1240
1241     def set_default_blacklist(self, default_blacklist):
1242         for test_regex, reason in default_blacklist:
1243             if not test_regex.startswith(self.loading_testsuite + '.'):
1244                 test_regex = self.loading_testsuite + '.' + test_regex
1245             self.blacklisted_tests.append((test_regex, reason))
1246
1247     def add_options(self, parser):
1248         """ Add more arguments. """
1249         pass
1250
1251     def set_settings(self, options, args, reporter):
1252         """ Set properties after options parsing. """
1253         self.options = options
1254         self.args = args
1255         self.reporter = reporter
1256
1257         self.populate_testsuite()
1258
1259         if self.options.valgrind:
1260             self.print_valgrind_bugs()
1261
1262         if options.wanted_tests:
1263             for patterns in options.wanted_tests:
1264                 for pattern in patterns.split(","):
1265                     self.wanted_tests_patterns.append(re.compile(pattern))
1266
1267         if options.blacklisted_tests:
1268             for patterns in options.blacklisted_tests:
1269                 self._add_blacklist(patterns)
1270
1271     def set_blacklists(self):
1272         if self.blacklisted_tests:
1273             self.info("Currently 'hardcoded' %s blacklisted tests:" %
1274                       self.name)
1275
1276         if self.options.check_bugs_status:
1277             if not check_bugs_resolution(self.blacklisted_tests):
1278                 return False
1279
1280         for name, bug in self.blacklisted_tests:
1281             self._add_blacklist(name)
1282             if not self.options.check_bugs_status:
1283                 self.info("  + %s --> bug: %s" % (name, bug))
1284
1285         return True
1286
1287     def check_expected_failures(self):
1288         if not self.expected_failures or not self.options.check_bugs_status:
1289             return True
1290
1291         if self.expected_failures:
1292             printc("\nCurrently known failures in the %s testsuite:"
1293                    % self.name, Colors.WARNING, title_char='-')
1294
1295         bugs_definitions = {}
1296         for regex, failures in list(self.expected_failures.items()):
1297             for failure in failures:
1298                 bugs = failure.get('bug')
1299                 if not bugs:
1300                     bugs = failure.get('bugs')
1301                 if not bugs:
1302                     printc('+ %s:\n  --> no bug reported associated with %s\n' % (
1303                         regex.pattern, failure), Colors.WARNING)
1304                     continue
1305
1306                 if not isinstance(bugs, list):
1307                     bugs = [bugs]
1308                 cbugs = bugs_definitions.get(regex.pattern, [])
1309                 bugs.extend([b for b in bugs if b not in cbugs])
1310                 bugs_definitions[regex.pattern] = bugs
1311
1312         return check_bugs_resolution(bugs_definitions.items())
1313
1314     def _check_blacklisted(self, test):
1315         for pattern in self.blacklisted_tests_patterns:
1316             if pattern.findall(test.classname):
1317                 self.info("%s is blacklisted by %s", test.classname, pattern)
1318                 return True
1319
1320         return False
1321
1322     def _check_whitelisted(self, test):
1323         for pattern in self.wanted_tests_patterns:
1324             if pattern.findall(test.classname):
1325                 if self._check_blacklisted(test):
1326                     # If explicitly white listed that specific test
1327                     # bypass the blacklisting
1328                     if pattern.pattern != test.classname:
1329                         return False
1330                 return True
1331         return False
1332
1333     def _check_duration(self, test):
1334         if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1335             self.info("Not activating %s as its duration (%d) is superior"
1336                       " than the long limit (%d)" % (test, test.duration,
1337                                                      int(self.options.long_limit)))
1338             return False
1339
1340         return True
1341
1342     def _is_test_wanted(self, test):
1343         if self._check_whitelisted(test):
1344             if not self._check_duration(test):
1345                 return False
1346             return True
1347
1348         if self._check_blacklisted(test):
1349             return False
1350
1351         if not self._check_duration(test):
1352             return False
1353
1354         if not self.wanted_tests_patterns:
1355             return True
1356
1357         return False
1358
1359     def needs_http_server(self):
1360         return False
1361
1362     def print_valgrind_bugs(self):
1363         pass
1364
1365
1366 class TestsGenerator(Loggable):
1367
1368     def __init__(self, name, test_manager, tests=[]):
1369         Loggable.__init__(self)
1370         self.name = name
1371         self.test_manager = test_manager
1372         self.testsuite = None
1373         self._tests = {}
1374         for test in tests:
1375             self._tests[test.classname] = test
1376
1377     def generate_tests(self, *kwargs):
1378         """
1379         Method that generates tests
1380         """
1381         return list(self._tests.values())
1382
1383     def add_test(self, test):
1384         test.generator = self
1385         test.classname = self.testsuite + '.' + test.classname
1386         self._tests[test.classname] = test
1387
1388
1389 class GstValidateTestsGenerator(TestsGenerator):
1390
1391     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1392         pass
1393
1394     @staticmethod
1395     def get_fakesink_for_media_type(media_type, needs_clock=False):
1396         if media_type == "video":
1397             if needs_clock:
1398                 return 'fakevideosink qos=true max-lateness=20000000'
1399
1400             return "fakevideosink sync=false"
1401
1402         if needs_clock:
1403             return "fakesink sync=true"
1404
1405         return "fakesink"
1406
1407     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1408         self.populate_tests(uri_minfo_special_scenarios, scenarios)
1409         return super(GstValidateTestsGenerator, self).generate_tests()
1410
1411
1412 class _TestsLauncher(Loggable):
1413
1414     def __init__(self):
1415
1416         Loggable.__init__(self)
1417
1418         self.options = None
1419         self.testers = []
1420         self.tests = []
1421         self.reporter = None
1422         self._list_testers()
1423         self.all_tests = None
1424         self.wanted_tests_patterns = []
1425
1426         self.queue = queue.Queue()
1427         self.jobs = []
1428         self.total_num_tests = 0
1429         self.server = None
1430         self.httpsrv = None
1431         self.vfb_server = None
1432
1433     def _list_app_dirs(self):
1434         app_dirs = []
1435         env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1436         if env_dirs is not None:
1437             for dir_ in env_dirs.split(":"):
1438                 app_dirs.append(dir_)
1439
1440         return app_dirs
1441
1442     def _exec_app(self, app_dir, env):
1443         try:
1444             files = os.listdir(app_dir)
1445         except OSError as e:
1446             self.debug("Could not list %s: %s" % (app_dir, e))
1447             files = []
1448         for f in files:
1449             if f.endswith(".py"):
1450                 exec(compile(open(os.path.join(app_dir, f)).read(),
1451                              os.path.join(app_dir, f), 'exec'), env)
1452
1453     def _exec_apps(self, env):
1454         app_dirs = self._list_app_dirs()
1455         for app_dir in app_dirs:
1456             self._exec_app(app_dir, env)
1457
1458     def _list_testers(self):
1459         env = globals().copy()
1460         self._exec_apps(env)
1461
1462         testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1463         for tester in testers:
1464             if tester.init() is True:
1465                 self.testers.append(tester)
1466             else:
1467                 self.warning("Can not init tester: %s -- PATH is %s"
1468                              % (tester.name, os.environ["PATH"]))
1469
1470     def add_options(self, parser):
1471         for tester in self.testers:
1472             tester.add_options(parser)
1473
1474     def _load_testsuite(self, testsuites):
1475         exceptions = []
1476         for testsuite in testsuites:
1477             try:
1478                 sys.path.insert(0, os.path.dirname(testsuite))
1479                 return (__import__(os.path.basename(testsuite).replace(".py", "")), None)
1480             except Exception as e:
1481                 exceptions.append("Could not load %s: %s" % (testsuite, e))
1482                 continue
1483             finally:
1484                 sys.path.remove(os.path.dirname(testsuite))
1485
1486         return (None, exceptions)
1487
1488     def _load_testsuites(self):
1489         testsuites = set()
1490         for testsuite in self.options.testsuites:
1491             if os.path.exists(testsuite):
1492                 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1493                 loaded_module = self._load_testsuite([testsuite])
1494             else:
1495                 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1496                                              for d in self.options.testsuites_dirs]
1497                 loaded_module = self._load_testsuite(possible_testsuites_paths)
1498
1499             module = loaded_module[0]
1500             if not loaded_module[0]:
1501                 if "." in testsuite:
1502                     self.options.testsuites.append(testsuite.split('.')[0])
1503                     self.info("%s looks like a test name, trying that" %
1504                               testsuite)
1505                     self.options.wanted_tests.append(testsuite)
1506                 else:
1507                     printc("Could not load testsuite: %s, reasons: %s" % (
1508                         testsuite, loaded_module[1]), Colors.FAIL)
1509                 continue
1510
1511             testsuites.add(module)
1512             if not hasattr(module, "TEST_MANAGER"):
1513                 module.TEST_MANAGER = [tester.name for tester in self.testers]
1514             elif not isinstance(module.TEST_MANAGER, list):
1515                 module.TEST_MANAGER = [module.TEST_MANAGER]
1516
1517         self.options.testsuites = list(testsuites)
1518
1519     def _setup_testsuites(self):
1520         for testsuite in self.options.testsuites:
1521             loaded = False
1522             wanted_test_manager = None
1523             if hasattr(testsuite, "TEST_MANAGER"):
1524                 wanted_test_manager = testsuite.TEST_MANAGER
1525                 if not isinstance(wanted_test_manager, list):
1526                     wanted_test_manager = [wanted_test_manager]
1527
1528             for tester in self.testers:
1529                 if wanted_test_manager is not None and \
1530                         tester.name not in wanted_test_manager:
1531                     continue
1532
1533                 prev_testsuite_name = TestsManager.loading_testsuite
1534                 if self.options.user_paths:
1535                     TestsManager.loading_testsuite = tester.name
1536                     tester.register_defaults()
1537                     loaded = True
1538                 else:
1539                     TestsManager.loading_testsuite = testsuite.__name__
1540                     if testsuite.setup_tests(tester, self.options):
1541                         loaded = True
1542                 if prev_testsuite_name:
1543                     TestsManager.loading_testsuite = prev_testsuite_name
1544
1545             if not loaded:
1546                 printc("Could not load testsuite: %s"
1547                        " maybe because of missing TestManager"
1548                        % (testsuite), Colors.FAIL)
1549                 return False
1550
1551     def _load_config(self, options):
1552         printc("Loading config files is DEPRECATED"
1553                " you should use the new testsuite format now",)
1554
1555         for tester in self.testers:
1556             tester.options = options
1557             globals()[tester.name] = tester
1558         globals()["options"] = options
1559         c__file__ = __file__
1560         globals()["__file__"] = self.options.config
1561         exec(compile(open(self.options.config).read(),
1562                      self.options.config, 'exec'), globals())
1563         globals()["__file__"] = c__file__
1564
1565     def set_settings(self, options, args):
1566         if options.xunit_file:
1567             self.reporter = reporters.XunitReporter(options)
1568         else:
1569             self.reporter = reporters.Reporter(options)
1570
1571         self.options = options
1572         wanted_testers = None
1573         for tester in self.testers:
1574             if tester.name in args:
1575                 wanted_testers = tester.name
1576
1577         if wanted_testers:
1578             testers = self.testers
1579             self.testers = []
1580             for tester in testers:
1581                 if tester.name in args:
1582                     self.testers.append(tester)
1583                     args.remove(tester.name)
1584
1585         if options.config:
1586             self._load_config(options)
1587
1588         self._load_testsuites()
1589         if not self.options.testsuites:
1590             printc("Not testsuite loaded!", Colors.FAIL)
1591             return False
1592
1593         for tester in self.testers:
1594             tester.set_settings(options, args, self.reporter)
1595
1596         if not options.config and options.testsuites:
1597             if self._setup_testsuites() is False:
1598                 return False
1599
1600         for tester in self.testers:
1601             if not tester.set_blacklists():
1602                 return False
1603
1604             if not tester.check_expected_failures():
1605                 return False
1606
1607         if self.needs_http_server() or options.httponly is True:
1608             self.httpsrv = HTTPServer(options)
1609             self.httpsrv.start()
1610
1611         if options.no_display:
1612             self.vfb_server = get_virual_frame_buffer_server(options)
1613             res = self.vfb_server.start()
1614             if res[0] is False:
1615                 printc("Could not start virtual frame server: %s" % res[1],
1616                        Colors.FAIL)
1617                 return False
1618             os.environ["DISPLAY"] = self.vfb_server.display_id
1619
1620         return True
1621
1622     def _check_tester_has_other_testsuite(self, testsuite, tester):
1623         if tester.name != testsuite.TEST_MANAGER[0]:
1624             return True
1625
1626         for t in self.options.testsuites:
1627             if t != testsuite:
1628                 for other_testmanager in t.TEST_MANAGER:
1629                     if other_testmanager == tester.name:
1630                         return True
1631
1632         return False
1633
1634     def _check_defined_tests(self, tester, tests):
1635         if self.options.blacklisted_tests or self.options.wanted_tests:
1636             return
1637
1638         tests_names = [test.classname for test in tests]
1639         testlist_changed = False
1640         for testsuite in self.options.testsuites:
1641             if not self._check_tester_has_other_testsuite(testsuite, tester) \
1642                     and tester.check_testslist:
1643                 try:
1644                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1645                                          'r+')
1646
1647                     know_tests = testlist_file.read().split("\n")
1648                     testlist_file.close()
1649
1650                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1651                                          'w')
1652                 except IOError:
1653                     continue
1654
1655                 optional_out = []
1656                 for test in know_tests:
1657                     if test and test.strip('~') not in tests_names:
1658                         if not test.startswith('~'):
1659                             testlist_changed = True
1660                             printc("Test %s Not in testsuite %s anymore"
1661                                    % (test, testsuite.__file__), Colors.FAIL)
1662                         else:
1663                             optional_out.append((test, None))
1664
1665                 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1666                                      key=lambda x: x[0].strip('~'))
1667
1668                 for tname, test in tests_names:
1669                     if test and test.optional:
1670                         tname = '~' + tname
1671                     testlist_file.write("%s\n" % (tname))
1672                     if tname and tname not in know_tests:
1673                         printc("Test %s is NEW in testsuite %s"
1674                                % (tname, testsuite.__file__),
1675                                Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1676                         testlist_changed = True
1677
1678                 testlist_file.close()
1679                 break
1680
1681         return testlist_changed
1682
1683     def list_tests(self):
1684         for tester in self.testers:
1685             if not self._tester_needed(tester):
1686                 continue
1687
1688             tests = tester.list_tests()
1689             if self._check_defined_tests(tester, tests) and \
1690                     self.options.fail_on_testlist_change:
1691                 raise RuntimeError("Unexpected new test in testsuite.")
1692
1693             self.tests.extend(tests)
1694         return sorted(list(self.tests), key=lambda t: t.classname)
1695
1696     def _tester_needed(self, tester):
1697         for testsuite in self.options.testsuites:
1698             if tester.name in testsuite.TEST_MANAGER:
1699                 return True
1700         return False
1701
1702     def server_wrapper(self, ready):
1703         self.server = GstValidateTCPServer(
1704             ('localhost', 0), GstValidateListener)
1705         self.server.socket.settimeout(None)
1706         self.server.launcher = self
1707         self.serverport = self.server.socket.getsockname()[1]
1708         self.info("%s server port: %s" % (self, self.serverport))
1709         ready.set()
1710
1711         self.server.serve_forever(poll_interval=0.05)
1712
1713     def _start_server(self):
1714         self.info("Starting TCP Server")
1715         ready = threading.Event()
1716         self.server_thread = threading.Thread(target=self.server_wrapper,
1717                                               kwargs={'ready': ready})
1718         self.server_thread.start()
1719         ready.wait()
1720         os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
1721
1722     def _stop_server(self):
1723         if self.server:
1724             self.server.shutdown()
1725             self.server_thread.join()
1726             self.server.server_close()
1727             self.server = None
1728
1729     def test_wait(self):
1730         while True:
1731             # Check process every second for timeout
1732             try:
1733                 self.queue.get(timeout=1)
1734             except queue.Empty:
1735                 pass
1736
1737             for test in self.jobs:
1738                 if test.process_update():
1739                     self.jobs.remove(test)
1740                     return test
1741
1742     def tests_wait(self):
1743         try:
1744             test = self.test_wait()
1745             test.check_results()
1746         except KeyboardInterrupt:
1747             for test in self.jobs:
1748                 test.kill_subprocess()
1749             raise
1750
1751         return test
1752
1753     def start_new_job(self, tests_left):
1754         try:
1755             test = tests_left.pop(0)
1756         except IndexError:
1757             return False
1758
1759         test.test_start(self.queue)
1760
1761         self.jobs.append(test)
1762
1763         return True
1764
1765     def _run_tests(self):
1766         cur_test_num = 0
1767
1768         if not self.all_tests:
1769             all_tests = self.list_tests()
1770             self.all_tests = all_tests
1771         self.total_num_tests = len(self.all_tests)
1772         if not sys.stdout.isatty():
1773             printc("\nRunning %d tests..." % self.total_num_tests, color=Colors.HEADER)
1774
1775         self.reporter.init_timer()
1776         alone_tests = []
1777         tests = []
1778         for test in self.tests:
1779             if test.is_parallel:
1780                 tests.append(test)
1781             else:
1782                 alone_tests.append(test)
1783
1784         max_num_jobs = min(self.options.num_jobs, len(tests))
1785         jobs_running = 0
1786
1787         # if order of test execution doesn't matter, shuffle
1788         # the order to optimize cpu usage
1789         if self.options.shuffle:
1790             random.shuffle(tests)
1791             random.shuffle(alone_tests)
1792
1793         current_test_num = 1
1794         for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
1795             tests_left = list(tests)
1796             for i in range(num_jobs):
1797                 if not self.start_new_job(tests_left):
1798                     break
1799                 jobs_running += 1
1800
1801             while jobs_running != 0:
1802                 test = self.tests_wait()
1803                 jobs_running -= 1
1804                 test.number = "[%d / %d] " % (current_test_num,
1805                                               self.total_num_tests)
1806                 current_test_num += 1
1807                 res = test.test_end()
1808                 self.reporter.after_test(test)
1809                 if res != Result.PASSED and (self.options.forever or
1810                                              self.options.fatal_error):
1811                     return False
1812                 if self.start_new_job(tests_left):
1813                     jobs_running += 1
1814
1815         return True
1816
1817     def clean_tests(self):
1818         for test in self.tests:
1819             test.clean()
1820         self._stop_server()
1821
1822     def run_tests(self):
1823         try:
1824             self._start_server()
1825             if self.options.forever:
1826                 r = 1
1827                 while True:
1828                     printc("Running iteration %d" % r, title=True)
1829
1830                     if not self._run_tests():
1831                         break
1832                     r += 1
1833                     self.clean_tests()
1834
1835                 return False
1836             elif self.options.n_runs:
1837                 res = True
1838                 for r in range(self.options.n_runs):
1839                     t = "Running iteration %d" % r
1840                     print("%s\n%s\n%s\n" % ("=" * len(t), t, "=" * len(t)))
1841                     if not self._run_tests():
1842                         res = False
1843                     self.clean_tests()
1844
1845                 return res
1846             else:
1847                 return self._run_tests()
1848         finally:
1849             if self.httpsrv:
1850                 self.httpsrv.stop()
1851             if self.vfb_server:
1852                 self.vfb_server.stop()
1853             self.clean_tests()
1854
1855     def final_report(self):
1856         return self.reporter.final_report()
1857
1858     def needs_http_server(self):
1859         for tester in self.testers:
1860             if tester.needs_http_server():
1861                 return True
1862
1863
1864 class NamedDic(object):
1865
1866     def __init__(self, props):
1867         if props:
1868             for name, value in props.items():
1869                 setattr(self, name, value)
1870
1871
1872 class Scenario(object):
1873
1874     def __init__(self, name, props, path=None):
1875         self.name = name
1876         self.path = path
1877
1878         for prop, value in props:
1879             setattr(self, prop.replace("-", "_"), value)
1880
1881     def get_execution_name(self):
1882         if self.path is not None:
1883             return self.path
1884         else:
1885             return self.name
1886
1887     def seeks(self):
1888         if hasattr(self, "seek"):
1889             return bool(self.seek)
1890
1891         return False
1892
1893     def needs_clock_sync(self):
1894         if hasattr(self, "need_clock_sync"):
1895             return bool(self.need_clock_sync)
1896
1897         return False
1898
1899     def needs_live_content(self):
1900         # Scenarios that can only be used on live content
1901         if hasattr(self, "live_content_required"):
1902             return bool(self.live_content_required)
1903         return False
1904
1905     def compatible_with_live_content(self):
1906         # if a live content is required it's implicitely compatible with
1907         # live content
1908         if self.needs_live_content():
1909             return True
1910         if hasattr(self, "live_content_compatible"):
1911             return bool(self.live_content_compatible)
1912         return False
1913
1914     def get_min_media_duration(self):
1915         if hasattr(self, "min_media_duration"):
1916             return float(self.min_media_duration)
1917
1918         return 0
1919
1920     def does_reverse_playback(self):
1921         if hasattr(self, "reverse_playback"):
1922             return bool(self.reverse_playback)
1923
1924         return False
1925
1926     def get_duration(self):
1927         try:
1928             return float(getattr(self, "duration"))
1929         except AttributeError:
1930             return 0
1931
1932     def get_min_tracks(self, track_type):
1933         try:
1934             return int(getattr(self, "min_%s_track" % track_type))
1935         except AttributeError:
1936             return 0
1937
1938     def __repr__(self):
1939         return "<Scenario %s>" % self.name
1940
1941
1942 class ScenarioManager(Loggable):
1943     _instance = None
1944     all_scenarios = []
1945
1946     FILE_EXTENSION = "scenario"
1947
1948     def __new__(cls, *args, **kwargs):
1949         if not cls._instance:
1950             cls._instance = super(ScenarioManager, cls).__new__(
1951                 cls, *args, **kwargs)
1952             cls._instance.config = None
1953             cls._instance.discovered = False
1954             Loggable.__init__(cls._instance)
1955
1956         return cls._instance
1957
1958     def find_special_scenarios(self, mfile):
1959         scenarios = []
1960         mfile_bname = os.path.basename(mfile)
1961
1962         for f in os.listdir(os.path.dirname(mfile)):
1963             if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
1964                 scenarios.append(os.path.join(os.path.dirname(mfile), f))
1965
1966         if scenarios:
1967             scenarios = self.discover_scenarios(scenarios, mfile)
1968
1969         return scenarios
1970
1971     def discover_scenarios(self, scenario_paths=[], mfile=None):
1972         """
1973         Discover scenarios specified in scenario_paths or the default ones
1974         if nothing specified there
1975         """
1976         scenarios = []
1977         scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
1978         logs = open(os.path.join(self.config.logsdir,
1979                                  "scenarios_discovery.log"), 'w')
1980
1981         try:
1982             command = [GstValidateBaseTestManager.COMMAND,
1983                        "--scenarios-defs-output-file", scenario_defs]
1984             command.extend(scenario_paths)
1985             subprocess.check_call(command, stdout=logs, stderr=logs)
1986         except subprocess.CalledProcessError as e:
1987             self.error(e)
1988             pass
1989
1990         config = configparser.RawConfigParser()
1991         f = open(scenario_defs)
1992         config.readfp(f)
1993
1994         for section in config.sections():
1995             if scenario_paths:
1996                 for scenario_path in scenario_paths:
1997                     if section in os.path.splitext(os.path.basename(scenario_path))[0]:
1998                         if mfile is None:
1999                             name = section
2000                             path = scenario_path
2001                         else:
2002                             # The real name of the scenario is:
2003                             # filename.REALNAME.scenario
2004                             name = scenario_path.replace(mfile + ".", "").replace(
2005                                 "." + self.FILE_EXTENSION, "")
2006                             path = scenario_path
2007             else:
2008                 name = section
2009                 path = None
2010
2011             props = config.items(section)
2012             scenarios.append(Scenario(name, props, path))
2013
2014         if not scenario_paths:
2015             self.discovered = True
2016             self.all_scenarios.extend(scenarios)
2017
2018         return scenarios
2019
2020     def get_scenario(self, name):
2021         if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2022             scenarios = self.discover_scenarios([name])
2023
2024             if scenarios:
2025                 return scenarios[0]
2026
2027         if self.discovered is False:
2028             self.discover_scenarios()
2029
2030         if name is None:
2031             return self.all_scenarios
2032
2033         try:
2034             return [scenario for scenario in self.all_scenarios if scenario.name == name][0]
2035         except IndexError:
2036             self.warning("Scenario: %s not found" % name)
2037             return None
2038
2039
2040 class GstValidateBaseTestManager(TestsManager):
2041     scenarios_manager = ScenarioManager()
2042     features_cache = {}
2043
2044     def __init__(self):
2045         super(GstValidateBaseTestManager, self).__init__()
2046         self._scenarios = []
2047         self._encoding_formats = []
2048
2049     @classmethod
2050     def update_commands(cls, extra_paths=None):
2051         for varname, cmd in {'': 'gst-validate',
2052                              'TRANSCODING_': 'gst-validate-transcoding',
2053                              'MEDIA_CHECK_': 'gst-validate-media-check',
2054                              'RTSP_SERVER_': 'gst-validate-rtsp-server',
2055                              'INSPECT_': 'gst-inspect'}.items():
2056             setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2057
2058     @classmethod
2059     def has_feature(cls, featurename):
2060         try:
2061             return cls.features_cache[featurename]
2062         except KeyError:
2063             pass
2064
2065         try:
2066             subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2067             res = True
2068         except subprocess.CalledProcessError:
2069             res = False
2070
2071         cls.features_cache[featurename] = res
2072         return res
2073
2074     def add_scenarios(self, scenarios):
2075         """
2076         @scenarios A list or a unic scenario name(s) to be run on the tests.
2077                     They are just the default scenarios, and then depending on
2078                     the TestsGenerator to be used you can have more fine grained
2079                     control on what to be run on each serie of tests.
2080         """
2081         if isinstance(scenarios, list):
2082             self._scenarios.extend(scenarios)
2083         else:
2084             self._scenarios.append(scenarios)
2085
2086         self._scenarios = list(set(self._scenarios))
2087
2088     def set_scenarios(self, scenarios):
2089         """
2090         Override the scenarios
2091         """
2092         self._scenarios = []
2093         self.add_scenarios(scenarios)
2094
2095     def get_scenarios(self):
2096         return self._scenarios
2097
2098     def add_encoding_formats(self, encoding_formats):
2099         """
2100         :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2101                            formats for transcoding test.
2102                            They are just the default encoding formats, and then depending on
2103                            the TestsGenerator to be used you can have more fine grained
2104                            control on what to be run on each serie of tests.
2105         """
2106         if isinstance(encoding_formats, list):
2107             self._encoding_formats.extend(encoding_formats)
2108         else:
2109             self._encoding_formats.append(encoding_formats)
2110
2111         self._encoding_formats = list(set(self._encoding_formats))
2112
2113     def get_encoding_formats(self):
2114         return self._encoding_formats
2115
2116
2117 GstValidateBaseTestManager.update_commands()
2118
2119
2120 class MediaDescriptor(Loggable):
2121
2122     def __init__(self):
2123         Loggable.__init__(self)
2124
2125     def get_path(self):
2126         raise NotImplemented
2127
2128     def has_frames(self):
2129         return False
2130
2131     def get_media_filepath(self):
2132         raise NotImplemented
2133
2134     def skip_parsers(self):
2135         return False
2136
2137     def get_caps(self):
2138         raise NotImplemented
2139
2140     def get_uri(self):
2141         raise NotImplemented
2142
2143     def get_duration(self):
2144         raise NotImplemented
2145
2146     def get_protocol(self):
2147         raise NotImplemented
2148
2149     def is_seekable(self):
2150         raise NotImplemented
2151
2152     def is_live(self):
2153         raise NotImplemented
2154
2155     def is_image(self):
2156         raise NotImplemented
2157
2158     def get_num_tracks(self, track_type):
2159         raise NotImplemented
2160
2161     def can_play_reverse(self):
2162         raise NotImplemented
2163
2164     def prerrols(self):
2165         return True
2166
2167     def is_compatible(self, scenario):
2168         if scenario is None:
2169             return True
2170
2171         if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2172             self.debug("Do not run %s as %s does not support seeking",
2173                        scenario, self.get_uri())
2174             return False
2175
2176         if self.is_image() and scenario.needs_clock_sync():
2177             self.debug("Do not run %s as %s is an image",
2178                        scenario, self.get_uri())
2179             return False
2180
2181         if not self.can_play_reverse() and scenario.does_reverse_playback():
2182             return False
2183
2184         if not self.is_live() and scenario.needs_live_content():
2185             self.debug("Do not run %s as %s is not a live content",
2186                        scenario, self.get_uri())
2187             return False
2188
2189         if self.is_live() and not scenario.compatible_with_live_content():
2190             self.debug("Do not run %s as %s is a live content",
2191                        scenario, self.get_uri())
2192             return False
2193
2194         if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2195             return False
2196
2197         if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2198             self.debug(
2199                 "Do not run %s as %s is too short (%i < min media duation : %i",
2200                 scenario, self.get_uri(),
2201                 self.get_duration() / GST_SECOND,
2202                 scenario.get_min_media_duration())
2203             return False
2204
2205         for track_type in ['audio', 'subtitle', 'video']:
2206             if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2207                 self.debug("%s -- %s | At least %s %s track needed  < %s"
2208                            % (scenario, self.get_uri(), track_type,
2209                               scenario.get_min_tracks(track_type),
2210                               self.get_num_tracks(track_type)))
2211                 return False
2212
2213         return True
2214
2215
2216 class GstValidateMediaDescriptor(MediaDescriptor):
2217     # Some extension file for discovering results
2218     MEDIA_INFO_EXT = "media_info"
2219     PUSH_MEDIA_INFO_EXT = "media_info.push"
2220     STREAM_INFO_EXT = "stream_info"
2221
2222     def __init__(self, xml_path):
2223         super(GstValidateMediaDescriptor, self).__init__()
2224
2225         self._xml_path = xml_path
2226         try:
2227             media_xml = ET.parse(xml_path).getroot()
2228         except xml.etree.ElementTree.ParseError:
2229             printc("Could not parse %s" % xml_path,
2230                    Colors.FAIL)
2231             raise
2232
2233         self._extract_data(media_xml)
2234
2235         self.set_protocol(urllib.parse.urlparse(
2236             urllib.parse.urlparse(self.get_uri()).scheme).scheme)
2237
2238     def skip_parsers(self):
2239         return self._skip_parsers
2240
2241     def has_frames(self):
2242         return self._has_frames
2243
2244     def _extract_data(self, media_xml):
2245         # Extract the information we need from the xml
2246         self._caps = media_xml.findall("streams")[0].attrib["caps"]
2247         self._track_caps = []
2248         try:
2249             streams = media_xml.findall("streams")[0].findall("stream")
2250         except IndexError:
2251             pass
2252         else:
2253             for stream in streams:
2254                 self._track_caps.append(
2255                     (stream.attrib["type"], stream.attrib["caps"]))
2256         self._uri = media_xml.attrib["uri"]
2257         self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2258         self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2259         self._duration = int(media_xml.attrib["duration"])
2260         self._protocol = media_xml.get("protocol", None)
2261         self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2262         self._is_live = media_xml.get("live", "false").lower() == "true"
2263         self._is_image = False
2264         for stream in media_xml.findall("streams")[0].findall("stream"):
2265             if stream.attrib["type"] == "image":
2266                 self._is_image = True
2267         self._track_types = []
2268         for stream in media_xml.findall("streams")[0].findall("stream"):
2269             self._track_types.append(stream.attrib["type"])
2270
2271     @staticmethod
2272     def new_from_uri(uri, verbose=False, include_frames=False, is_push=False):
2273         """
2274             include_frames = 0 # Never
2275             include_frames = 1 # always
2276             include_frames = 2 # if previous file included them
2277
2278         """
2279         media_path = utils.url2path(uri)
2280
2281         ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT if is_push else \
2282             GstValidateMediaDescriptor.MEDIA_INFO_EXT
2283         descriptor_path = "%s.%s" % (media_path, ext)
2284         args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2285         args.append(uri)
2286         if include_frames == 2:
2287             try:
2288                 media_xml = ET.parse(descriptor_path).getroot()
2289
2290                 include_frames = bool(int(media_xml.attrib["frame-detection"]))
2291                 if bool(int(media_xml.attrib.get("skip-parsers"))):
2292                     args.append("--skip-parsers")
2293             except FileNotFoundError:
2294                 pass
2295         else:
2296             include_frames = bool(include_frames)
2297
2298         args.extend(["--output-file", descriptor_path])
2299         if include_frames:
2300             args.extend(["--full"])
2301
2302         if verbose:
2303             printc("Generating media info for %s\n"
2304                    "    Command: '%s'" % (media_path, ' '.join(args)),
2305                    Colors.OKBLUE)
2306
2307         try:
2308             subprocess.check_output(args, stderr=open(os.devnull))
2309         except subprocess.CalledProcessError as e:
2310             if verbose:
2311                 printc("Result: Failed", Colors.FAIL)
2312             else:
2313                 loggable.warning("GstValidateMediaDescriptor",
2314                                  "Exception: %s" % e)
2315             return None
2316
2317         if verbose:
2318             printc("Result: Passed", Colors.OKGREEN)
2319
2320         try:
2321             return GstValidateMediaDescriptor(descriptor_path)
2322         except (IOError, xml.etree.ElementTree.ParseError):
2323             return None
2324
2325     def get_path(self):
2326         return self._xml_path
2327
2328     def need_clock_sync(self):
2329         return Protocols.needs_clock_sync(self.get_protocol())
2330
2331     def get_media_filepath(self):
2332         if self.get_protocol() == Protocols.FILE:
2333             return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "")
2334         elif self.get_protocol() == Protocols.PUSHFILE:
2335             return self._xml_path.replace("." + self.PUSH_MEDIA_INFO_EXT, "")
2336         else:
2337             return self._xml_path.replace("." + self.STREAM_INFO_EXT, "")
2338
2339     def get_caps(self):
2340         return self._caps
2341
2342     def get_tracks_caps(self):
2343         return self._track_caps
2344
2345     def get_uri(self):
2346         return self._uri
2347
2348     def get_duration(self):
2349         return self._duration
2350
2351     def set_protocol(self, protocol):
2352         if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2353             self._protocol = Protocols.PUSHFILE
2354         else:
2355             self._protocol = protocol
2356
2357     def get_protocol(self):
2358         return self._protocol
2359
2360     def is_seekable(self):
2361         return self._is_seekable
2362
2363     def is_live(self):
2364         return self._is_live
2365
2366     def can_play_reverse(self):
2367         return True
2368
2369     def is_image(self):
2370         return self._is_image
2371
2372     def get_num_tracks(self, track_type):
2373         n = 0
2374         for t in self._track_types:
2375             if t == track_type:
2376                 n += 1
2377
2378         return n
2379
2380     def get_clean_name(self):
2381         name = os.path.basename(self.get_path())
2382         name = re.sub("\.stream_info|\.media_info", "", name)
2383
2384         return name.replace('.', "_")
2385
2386
2387 class MediaFormatCombination(object):
2388     FORMATS = {"aac": "audio/mpeg,mpegversion=4",  # Audio
2389                "ac3": "audio/x-ac3",
2390                "vorbis": "audio/x-vorbis",
2391                "mp3": "audio/mpeg,mpegversion=1,layer=3",
2392                "opus": "audio/x-opus",
2393                "rawaudio": "audio/x-raw",
2394
2395                # Video
2396                "h264": "video/x-h264",
2397                "h265": "video/x-h265",
2398                "vp8": "video/x-vp8",
2399                "vp9": "video/x-vp9",
2400                "theora": "video/x-theora",
2401                "prores": "video/x-prores",
2402                "jpeg": "image/jpeg",
2403
2404                # Containers
2405                "webm": "video/webm",
2406                "ogg": "application/ogg",
2407                "mkv": "video/x-matroska",
2408                "mp4": "video/quicktime,variant=iso;",
2409                "quicktime": "video/quicktime;"}
2410
2411     def __str__(self):
2412         return "%s and %s in %s" % (self.audio, self.video, self.container)
2413
2414     def __init__(self, container, audio, video):
2415         """
2416         Describes a media format to be used for transcoding tests.
2417
2418         :param container: A string defining the container format to be used, must bin in self.FORMATS
2419         :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2420         :param video: A string defining the video format to be used, must bin in self.FORMATS
2421         """
2422         self.container = container
2423         self.audio = audio
2424         self.video = video
2425
2426     def get_caps(self, track_type):
2427         try:
2428             return self.FORMATS[self.__dict__[track_type]]
2429         except KeyError:
2430             return None
2431
2432     def get_audio_caps(self):
2433         return self.get_caps("audio")
2434
2435     def get_video_caps(self):
2436         return self.get_caps("video")
2437
2438     def get_muxer_caps(self):
2439         return self.get_caps("container")