New validate plugin: validateflow
[platform/upstream/gstreamer.git] / validate / launcher / baseclasses.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19
20 """ Class representing tests and test managers. """
21
22 import json
23 import os
24 import sys
25 import re
26 import copy
27 import shlex
28 import socketserver
29 import struct
30 import time
31 from . import utils
32 import signal
33 import urllib.parse
34 import subprocess
35 import threading
36 import queue
37 import configparser
38 import xml
39 import random
40 import shutil
41 import uuid
42
43 from .utils import which
44 from . import reporters
45 from . import loggable
46 from .loggable import Loggable
47
48 try:
49     from lxml import etree as ET
50 except ImportError:
51     import xml.etree.cElementTree as ET
52
53 from .vfb_server import get_virual_frame_buffer_server
54 from .httpserver import HTTPServer
55 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
56     Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
57     check_bugs_resolution
58
59 # The factor by which we increase the hard timeout when running inside
60 # Valgrind
61 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
62 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
63 # The error reported by valgrind when detecting errors
64 VALGRIND_ERROR_CODE = 20
65
66 VALIDATE_OVERRIDE_EXTENSION = ".override"
67 EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
68     'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
69     'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
70 EXITING_SIGNALS.update({139: "SIGSEGV"})
71
72
73 class Test(Loggable):
74
75     """ A class representing a particular test. """
76
77     def __init__(self, application_name, classname, options,
78                  reporter, duration=0, timeout=DEFAULT_TIMEOUT,
79                  hard_timeout=None, extra_env_variables=None,
80                  expected_failures=None, is_parallel=True,
81                  workdir=None):
82         """
83         @timeout: The timeout during which the value return by get_current_value
84                   keeps being exactly equal
85         @hard_timeout: Max time the test can take in absolute
86         """
87         Loggable.__init__(self)
88         self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
89         if hard_timeout:
90             self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
91             self.hard_timeout *= options.timeout_factor
92         else:
93             self.hard_timeout = hard_timeout
94         self.classname = classname
95         self.options = options
96         self.application = application_name
97         self.command = []
98         self.server_command = None
99         self.reporter = reporter
100         self.process = None
101         self.proc_env = None
102         self.thread = None
103         self.queue = None
104         self.duration = duration
105         self.stack_trace = None
106         self._uuid = None
107         if expected_failures is None:
108             self.expected_failures = []
109         elif not isinstance(expected_failures, list):
110             self.expected_failures = [expected_failures]
111         else:
112             self.expected_failures = expected_failures
113
114         extra_env_variables = extra_env_variables or {}
115         self.extra_env_variables = extra_env_variables
116         self.optional = False
117         self.is_parallel = is_parallel
118         self.generator = None
119         # String representation of the test number in the testsuite
120         self.number = ""
121         self.workdir = workdir
122
123         self.clean()
124
125     def clean(self):
126         self.kill_subprocess()
127         self.message = ""
128         self.error_str = ""
129         self.time_taken = 0.0
130         self._starting_time = None
131         self.result = Result.NOT_RUN
132         self.logfile = None
133         self.out = None
134         self.extra_logfiles = []
135         self.__env_variable = []
136         self.kill_subprocess()
137
138     def __str__(self):
139         string = self.classname
140         if self.result != Result.NOT_RUN:
141             string += ": " + self.result
142             if self.result in [Result.FAILED, Result.TIMEOUT]:
143                 string += " '%s'\n" \
144                           "       You can reproduce with: %s\n" \
145                     % (self.message, self.get_command_repr())
146
147                 if not self.options.redirect_logs and \
148                         self.result == Result.PASSED or \
149                         not self.options.dump_on_failure:
150                     string += self.get_logfile_repr()
151
152         return string
153
154     def add_env_variable(self, variable, value=None):
155         """
156         Only usefull so that the gst-validate-launcher can print the exact
157         right command line to reproduce the tests
158         """
159         if value is None:
160             value = os.environ.get(variable, None)
161
162         if value is None:
163             return
164
165         self.__env_variable.append(variable)
166
167     @property
168     def _env_variable(self):
169         res = ""
170         for var in set(self.__env_variable):
171             if res:
172                 res += " "
173             value = self.proc_env.get(var, None)
174             if value is not None:
175                 res += "%s='%s'" % (var, value)
176
177         return res
178
179     def open_logfile(self):
180         if self.out:
181             return
182
183         path = os.path.join(self.options.logsdir,
184                             self.classname.replace(".", os.sep))
185         mkdir(os.path.dirname(path))
186         self.logfile = path
187
188         if self.options.redirect_logs == 'stdout':
189             self.out = sys.stdout
190         elif self.options.redirect_logs == 'stderr':
191             self.out = sys.stderr
192         else:
193             self.out = open(path, 'w+')
194
195     def close_logfile(self):
196         if not self.options.redirect_logs:
197             self.out.close()
198
199         self.out = None
200
201     def _get_file_content(self, file_name):
202         f = open(file_name, 'r+')
203         value = f.read()
204         f.close()
205
206         return value
207
208     def get_log_content(self):
209         return self._get_file_content(self.logfile)
210
211     def get_extra_log_content(self, extralog):
212         if extralog not in self.extra_logfiles:
213             return ""
214
215         return self._get_file_content(extralog)
216
217     def get_classname(self):
218         name = self.classname.split('.')[-1]
219         classname = self.classname.replace('.%s' % name, '')
220
221         return classname
222
223     def get_name(self):
224         return self.classname.split('.')[-1]
225
226     def get_uuid(self):
227         if self._uuid is None:
228             self._uuid = self.classname + str(uuid.uuid4())
229         return self._uuid
230
231     def add_arguments(self, *args):
232         self.command += args
233
234     def build_arguments(self):
235         self.add_env_variable("LD_PRELOAD")
236         self.add_env_variable("DISPLAY")
237
238     def add_stack_trace_to_logfile(self):
239         self.debug("Adding stack trace")
240         trace_gatherer = BackTraceGenerator.get_default()
241         stack_trace = trace_gatherer.get_trace(self)
242
243         if not stack_trace:
244             return
245
246         info = "\n\n== Stack trace: == \n%s" % stack_trace
247         if self.options.redirect_logs:
248             print(info)
249             return
250
251         if self.options.xunit_file:
252             self.stack_trace = stack_trace
253
254         with open(self.logfile, 'a') as f:
255             f.write(info)
256
257     def set_result(self, result, message="", error=""):
258         self.debug("Setting result: %s (message: %s, error: %s)" % (result,
259                                                                     message, error))
260
261         if result is Result.TIMEOUT:
262             if self.options.debug is True:
263                 if self.options.gdb:
264                     printc("Timeout, you should process <ctrl>c to get into gdb",
265                            Colors.FAIL)
266                     # and wait here until gdb exits
267                     self.process.communicate()
268                 else:
269                     pname = self.command[0]
270                     input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
271                           "Press enter to continue" % (Colors.FAIL, pname, self.process.pid,
272                                                        Colors.ENDC))
273             else:
274                 self.add_stack_trace_to_logfile()
275
276         self.result = result
277         self.message = message
278         self.error_str = error
279
280     def check_results(self):
281         if self.result is Result.FAILED or self.result is Result.TIMEOUT:
282             return
283
284         self.debug("%s returncode: %s", self, self.process.returncode)
285         if self.process.returncode == 0:
286             self.set_result(Result.PASSED)
287         elif self.process.returncode in EXITING_SIGNALS:
288             self.add_stack_trace_to_logfile()
289             self.set_result(Result.FAILED,
290                             "Application exited with signal %s" % (
291                                 EXITING_SIGNALS[self.process.returncode]))
292         elif self.process.returncode == VALGRIND_ERROR_CODE:
293             self.set_result(Result.FAILED, "Valgrind reported errors")
294         else:
295             self.set_result(Result.FAILED,
296                             "Application returned %d" % (self.process.returncode))
297
298     def get_current_value(self):
299         """
300         Lets subclasses implement a nicer timeout measurement method
301         They should return some value with which we will compare
302         the previous and timeout if they are egual during self.timeout
303         seconds
304         """
305         return Result.NOT_RUN
306
307     def process_update(self):
308         """
309         Returns True when process has finished running or has timed out.
310         """
311
312         if self.process is None:
313             # Process has not started running yet
314             return False
315
316         self.process.poll()
317         if self.process.returncode is not None:
318             return True
319
320         val = self.get_current_value()
321
322         self.debug("Got value: %s" % val)
323         if val is Result.NOT_RUN:
324             # The get_current_value logic is not implemented... dumb
325             # timeout
326             if time.time() - self.last_change_ts > self.timeout:
327                 self.set_result(Result.TIMEOUT,
328                                 "Application timed out: %s secs" %
329                                 self.timeout,
330                                 "timeout")
331                 return True
332             return False
333         elif val is Result.FAILED:
334             return True
335         elif val is Result.KNOWN_ERROR:
336             return True
337
338         self.log("New val %s" % val)
339
340         if val == self.last_val:
341             delta = time.time() - self.last_change_ts
342             self.debug("%s: Same value for %d/%d seconds" %
343                        (self, delta, self.timeout))
344             if delta > self.timeout:
345                 self.set_result(Result.TIMEOUT,
346                                 "Application timed out: %s secs" %
347                                 self.timeout,
348                                 "timeout")
349                 return True
350         elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
351             self.set_result(
352                 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
353             return True
354         else:
355             self.last_change_ts = time.time()
356             self.last_val = val
357
358         return False
359
360     def get_subproc_env(self):
361         return os.environ.copy()
362
363     def kill_subprocess(self):
364         utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT)
365
366     def run_external_checks(self):
367         pass
368
369     def thread_wrapper(self):
370         def enable_sigint():
371             # Restore the SIGINT handler for the child process (gdb) to ensure
372             # it can handle it.
373             signal.signal(signal.SIGINT, signal.SIG_DFL)
374
375         if self.options.gdb and os.name != "nt":
376             preexec_fn = enable_sigint
377         else:
378             preexec_fn = None
379
380         self.process = subprocess.Popen(self.command,
381                                         stderr=self.out,
382                                         stdout=self.out,
383                                         env=self.proc_env,
384                                         cwd=self.workdir,
385                                         preexec_fn=preexec_fn)
386         self.process.wait()
387         if self.result is not Result.TIMEOUT:
388             if self.process.returncode == 0:
389                 self.run_external_checks()
390             self.queue.put(None)
391
392     def get_valgrind_suppression_file(self, subdir, name):
393         p = get_data_file(subdir, name)
394         if p:
395             return p
396
397         self.error("Could not find any %s file" % name)
398
399     def get_valgrind_suppressions(self):
400         return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
401
402     def use_gdb(self, command):
403         if self.hard_timeout is not None:
404             self.hard_timeout *= GDB_TIMEOUT_FACTOR
405         self.timeout *= GDB_TIMEOUT_FACTOR
406
407         if not self.options.gdb_non_stop:
408             self.timeout = sys.maxsize
409             self.hard_timeout = sys.maxsize
410
411         args = ["gdb"]
412         if self.options.gdb_non_stop:
413             args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
414         args += ["--args"] + command
415         return args
416
417     def use_valgrind(self, command, subenv):
418         vglogsfile = self.logfile + '.valgrind'
419         self.extra_logfiles.append(vglogsfile)
420
421         vg_args = []
422
423         for o, v in [('trace-children', 'yes'),
424                      ('tool', 'memcheck'),
425                      ('leak-check', 'full'),
426                      ('leak-resolution', 'high'),
427                      # TODO: errors-for-leak-kinds should be set to all instead of definite
428                      # and all false positives should be added to suppression
429                      # files.
430                      ('errors-for-leak-kinds', 'definite'),
431                      ('num-callers', '20'),
432                      ('error-exitcode', str(VALGRIND_ERROR_CODE)),
433                      ('gen-suppressions', 'all')]:
434             vg_args.append("--%s=%s" % (o, v))
435
436         if not self.options.redirect_logs:
437             vglogsfile = self.logfile + '.valgrind'
438             self.extra_logfiles.append(vglogsfile)
439             vg_args.append("--%s=%s" % ('log-file', vglogsfile))
440
441         for supp in self.get_valgrind_suppressions():
442             vg_args.append("--suppressions=%s" % supp)
443
444         command = ["valgrind"] + vg_args + command
445
446         # Tune GLib's memory allocator to be more valgrind friendly
447         subenv['G_DEBUG'] = 'gc-friendly'
448         subenv['G_SLICE'] = 'always-malloc'
449
450         if self.hard_timeout is not None:
451             self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
452         self.timeout *= VALGRIND_TIMEOUT_FACTOR
453
454         # Enable 'valgrind.config'
455         self.add_validate_config(get_data_file(
456             'data', 'valgrind.config'), subenv)
457         if subenv == self.proc_env:
458             self.add_env_variable('G_DEBUG', 'gc-friendly')
459             self.add_env_variable('G_SLICE', 'always-malloc')
460             self.add_env_variable('GST_VALIDATE_CONFIG',
461                                   self.proc_env['GST_VALIDATE_CONFIG'])
462
463         return command
464
465     def add_validate_config(self, config, subenv=None):
466         if not subenv:
467             subenv = self.extra_env_variables
468
469         if "GST_VALIDATE_CONFIG" in subenv:
470             subenv['GST_VALIDATE_CONFIG'] = '%s%s%s' % (
471                 subenv['GST_VALIDATE_CONFIG'], os.pathsep, config)
472         else:
473             subenv['GST_VALIDATE_CONFIG'] = config
474
475     def launch_server(self):
476         return None
477
478     def get_logfile_repr(self):
479         message = "    Logs:\n"
480         logfiles = self.extra_logfiles.copy()
481
482         if not self.options.redirect_logs:
483             logfiles.insert(0, self.logfile)
484
485         for log in logfiles:
486             message += "         - %s\n" % log
487
488         return message
489
490     def get_command_repr(self):
491         message = "%s %s" % (self._env_variable, ' '.join(
492             shlex.quote(arg) for arg in self.command))
493         if self.server_command:
494             message = "%s & %s" % (self.server_command, message)
495
496         return message
497
498     def test_start(self, queue):
499         self.open_logfile()
500
501         self.server_command = self.launch_server()
502         self.queue = queue
503         self.command = [self.application]
504         self._starting_time = time.time()
505         self.build_arguments()
506         self.proc_env = self.get_subproc_env()
507
508         for var, value in list(self.extra_env_variables.items()):
509             value = self.proc_env.get(var, '') + os.pathsep + value
510             self.proc_env[var] = value.strip(os.pathsep)
511             self.add_env_variable(var, self.proc_env[var])
512
513         if self.options.gdb:
514             self.command = self.use_gdb(self.command)
515
516             self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
517             # Make the gst-validate executable ignore SIGINT while gdb is
518             # running.
519             signal.signal(signal.SIGINT, signal.SIG_IGN)
520
521         if self.options.valgrind:
522             self.command = self.use_valgrind(self.command, self.proc_env)
523
524         if not self.options.redirect_logs:
525             self.out.write("=================\n"
526                            "Test name: %s\n"
527                            "Command: '%s'\n"
528                            "=================\n\n"
529                            % (self.classname, ' '.join(self.command)))
530             self.out.flush()
531         else:
532             message = "Launching: %s%s\n" \
533                 "    Command: %s\n" % (Colors.ENDC, self.classname,
534                                        self.get_command_repr())
535             printc(message, Colors.OKBLUE)
536
537         self.thread = threading.Thread(target=self.thread_wrapper)
538         self.thread.start()
539
540         self.last_val = 0
541         self.last_change_ts = time.time()
542         self.start_ts = time.time()
543
544     def _dump_log_file(self, logfile):
545         message = "Dumping contents of %s\n" % logfile
546         printc(message, Colors.FAIL)
547
548         with open(logfile, 'r') as fin:
549             print(fin.read())
550
551     def _dump_log_files(self):
552         printc("Dumping log files on failure\n", Colors.FAIL)
553         self._dump_log_file(self.logfile)
554         for logfile in self.extra_logfiles:
555             self._dump_log_file(logfile)
556
557     def test_end(self):
558         self.kill_subprocess()
559         self.thread.join()
560         self.time_taken = time.time() - self._starting_time
561
562         if self.options.gdb:
563             signal.signal(signal.SIGINT, self.previous_sigint_handler)
564
565         if self.result != Result.PASSED:
566             message = str(self)
567             end = "\n"
568         else:
569             message = "%s %s: %s%s" % (self.number, self.classname, self.result,
570                                        " (" + self.message + ")" if self.message else "")
571             end = "\r"
572             if sys.stdout.isatty():
573                 term_width = shutil.get_terminal_size((80, 20))[0]
574                 if len(message) > term_width:
575                     message = message[0:term_width - 2] + '…'
576             else:
577                 message = None
578
579         if message is not None:
580             printc(message, color=utils.get_color_for_result(self.result), end=end)
581         self.close_logfile()
582
583         if self.options.dump_on_failure:
584             if self.result is not Result.PASSED:
585                 self._dump_log_files()
586
587         # Only keep around env variables we need later
588         clean_env = {}
589         for n in self.__env_variable:
590             clean_env[n] = self.proc_env.get(n, None)
591         self.proc_env = clean_env
592
593         # Don't keep around JSON report objects, they were processed
594         # in check_results already
595         self.reports = []
596
597         return self.result
598
599
600 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
601     pass
602
603
604 class GstValidateListener(socketserver.BaseRequestHandler):
605
606     def handle(self):
607         """Implements BaseRequestHandler handle method"""
608         test = None
609         while True:
610             raw_len = self.request.recv(4)
611             if raw_len == b'':
612                 return
613             msglen = struct.unpack('>I', raw_len)[0]
614             msg = self.request.recv(msglen).decode()
615             if msg == '':
616                 return
617
618             obj = json.loads(msg)
619
620             if test is None:
621                 # First message must contain the uuid
622                 uuid = obj.get("uuid", None)
623                 if uuid is None:
624                     return
625                 # Find test from launcher
626                 for t in self.server.launcher.tests:
627                     if uuid == t.get_uuid():
628                         test = t
629                         break
630                 if test is None:
631                     self.server.launcher.error(
632                         "Could not find test for UUID %s" % uuid)
633                     return
634
635             obj_type = obj.get("type", '')
636             if obj_type == 'position':
637                 test.set_position(obj['position'], obj['duration'],
638                                   obj['speed'])
639             elif obj_type == 'buffering':
640                 test.set_position(obj['position'], 100)
641             elif obj_type == 'action':
642                 test.add_action_execution(obj)
643                 # Make sure that action is taken into account when checking if process
644                 # is updating
645                 test.position += 1
646             elif obj_type == 'action-done':
647                 # Make sure that action end is taken into account when checking if process
648                 # is updating
649                 test.position += 1
650                 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
651             elif obj_type == 'report':
652                 test.add_report(obj)
653
654
655 class GstValidateTest(Test):
656
657     """ A class representing a particular test. """
658     HARD_TIMEOUT_FACTOR = 5
659     fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
660
661     def __init__(self, application_name, classname,
662                  options, reporter, duration=0,
663                  timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
664                  media_descriptor=None, extra_env_variables=None,
665                  expected_failures=None, workdir=None):
666
667         extra_env_variables = extra_env_variables or {}
668
669         if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
670             if timeout:
671                 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
672             elif duration:
673                 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
674             else:
675                 hard_timeout = None
676
677         # If we are running from source, use the -debug version of the
678         # application which is using rpath instead of libtool's wrappers. It's
679         # slightly faster to start and will not confuse valgrind.
680         debug = '%s-debug' % application_name
681         p = look_for_file_in_source_dir('tools', debug)
682         if p:
683             application_name = p
684
685         self.reports = []
686         self.position = -1
687         self.media_duration = -1
688         self.speed = 1.0
689         self.actions_infos = []
690         self.media_descriptor = media_descriptor
691         self.server = None
692
693         override_path = self.get_override_file(media_descriptor)
694         if override_path:
695             if extra_env_variables:
696                 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
697                     extra_env_variables[
698                         "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
699
700             extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
701
702         super(GstValidateTest, self).__init__(application_name, classname,
703                                               options, reporter,
704                                               duration=duration,
705                                               timeout=timeout,
706                                               hard_timeout=hard_timeout,
707                                               extra_env_variables=extra_env_variables,
708                                               expected_failures=expected_failures,
709                                               workdir=workdir)
710
711         # defines how much the process can be outside of the configured
712         # segment / seek
713         self._sent_eos_time = None
714
715         if scenario is None or scenario.name.lower() == "none":
716             self.scenario = None
717         else:
718             self.scenario = scenario
719
720     def kill_subprocess(self):
721         Test.kill_subprocess(self)
722
723     def add_report(self, report):
724         self.reports.append(report)
725
726     def set_position(self, position, duration, speed=None):
727         self.position = position
728         self.media_duration = duration
729         if speed:
730             self.speed = speed
731
732     def add_action_execution(self, action_infos):
733         if action_infos['action-type'] == 'eos':
734             self._sent_eos_time = time.time()
735         self.actions_infos.append(action_infos)
736
737     def get_override_file(self, media_descriptor):
738         if media_descriptor:
739             if media_descriptor.get_path():
740                 override_path = os.path.splitext(media_descriptor.get_path())[
741                     0] + VALIDATE_OVERRIDE_EXTENSION
742                 if os.path.exists(override_path):
743                     return override_path
744
745         return None
746
747     def get_current_position(self):
748         return self.position
749
750     def get_current_value(self):
751         if self.scenario:
752             if self._sent_eos_time is not None:
753                 t = time.time()
754                 if ((t - self._sent_eos_time)) > 30:
755                     if self.media_descriptor is not None and self.media_descriptor.get_protocol() == Protocols.HLS:
756                         self.set_result(Result.PASSED,
757                                         """Got no EOS 30 seconds after sending EOS,
758                                         in HLS known and tolerated issue:
759                                         https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
760                         return Result.KNOWN_ERROR
761
762                     self.set_result(
763                         Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
764
765                     return Result.FAILED
766
767         return self.position
768
769     def get_subproc_env(self):
770         subproc_env = os.environ.copy()
771
772         subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
773
774         if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
775             gstlogsfile = self.logfile + '.gstdebug'
776             self.extra_logfiles.append(gstlogsfile)
777             subproc_env["GST_DEBUG_FILE"] = gstlogsfile
778
779         if self.options.no_color:
780             subproc_env["GST_DEBUG_NO_COLOR"] = '1'
781
782         # Ensure XInitThreads is called, see bgo#731525
783         subproc_env['GST_GL_XINITTHREADS'] = '1'
784         self.add_env_variable('GST_GL_XINITTHREADS', '1')
785
786         if self.scenario is not None:
787             scenario = self.scenario.get_execution_name()
788             subproc_env["GST_VALIDATE_SCENARIO"] = scenario
789             self.add_env_variable("GST_VALIDATE_SCENARIO",
790                                   subproc_env["GST_VALIDATE_SCENARIO"])
791         else:
792             try:
793                 del subproc_env["GST_VALIDATE_SCENARIO"]
794             except KeyError:
795                 pass
796
797         return subproc_env
798
799     def clean(self):
800         Test.clean(self)
801         self._sent_eos_time = None
802         self.reports = []
803         self.position = -1
804         self.media_duration = -1
805         self.speed = 1.0
806         self.actions_infos = []
807
808     def build_arguments(self):
809         super(GstValidateTest, self).build_arguments()
810         if "GST_VALIDATE" in os.environ:
811             self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
812
813         if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
814             self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
815                                   os.environ["GST_VALIDATE_SCENARIOS_PATH"])
816
817         self.add_env_variable("GST_VALIDATE_CONFIG")
818         self.add_env_variable("GST_VALIDATE_OVERRIDE")
819
820     def get_extra_log_content(self, extralog):
821         value = Test.get_extra_log_content(self, extralog)
822
823         return value
824
825     def report_matches_expected_failure(self, report, expected_failure):
826         for key in ['bug', 'bugs', 'sometimes']:
827             if key in expected_failure:
828                 del expected_failure[key]
829         for key, value in list(report.items()):
830             if key in expected_failure:
831                 if not re.findall(expected_failure[key], str(value)):
832                     return False
833                 expected_failure.pop(key)
834
835         return not bool(expected_failure)
836
837     def check_reported_issues(self):
838         ret = []
839         expected_failures = copy.deepcopy(self.expected_failures)
840         expected_retcode = [0]
841         for report in self.reports:
842             found = None
843             for expected_failure in expected_failures:
844                 if self.report_matches_expected_failure(report,
845                                                         expected_failure.copy()):
846                     found = expected_failure
847                     break
848
849             if found is not None:
850                 expected_failures.remove(found)
851                 if report['level'] == 'critical':
852                     if found.get('sometimes') and isinstance(expected_retcode, list):
853                         expected_retcode.append(18)
854                     else:
855                         expected_retcode = [18]
856             elif report['level'] == 'critical':
857                 ret.append(report['summary'])
858
859         if not ret:
860             return None, expected_failures, expected_retcode
861
862         return ret, expected_failures, expected_retcode
863
864     def check_expected_timeout(self, expected_timeout):
865         msg = "Expected timeout happened. "
866         result = Result.PASSED
867         message = expected_timeout.get('message')
868         if message:
869             if not re.findall(message, self.message):
870                 result = Result.FAILED
871                 msg = "Expected timeout message: %s got %s " % (
872                     message, self.message)
873
874         expected_symbols = expected_timeout.get('stacktrace_symbols')
875         if expected_symbols:
876             trace_gatherer = BackTraceGenerator.get_default()
877             stack_trace = trace_gatherer.get_trace(self)
878
879             if stack_trace:
880                 if not isinstance(expected_symbols, list):
881                     expected_symbols = [expected_symbols]
882
883                 not_found_symbols = [s for s in expected_symbols
884                                      if s not in stack_trace]
885                 if not_found_symbols:
886                     result = Result.TIMEOUT
887                     msg = "Expected symbols '%s' not found in stack trace " % (
888                         not_found_symbols)
889             else:
890                 msg += "No stack trace available, could not verify symbols "
891
892         return result, msg
893
894     def check_results(self):
895         if self.result in [Result.FAILED, self.result is Result.PASSED]:
896             return
897
898         for report in self.reports:
899             if report.get('issue-id') == 'runtime::missing-plugin':
900                 self.set_result(Result.SKIPPED, "%s\n%s" % (report['summary'],
901                                                             report['details']))
902                 return
903
904         self.debug("%s returncode: %s", self, self.process.returncode)
905
906         criticals, not_found_expected_failures, expected_returncode = self.check_reported_issues()
907
908         expected_timeout = None
909         for i, f in enumerate(not_found_expected_failures):
910             if len(f) == 1 and f.get("returncode"):
911                 returncode = f['returncode']
912                 if not isinstance(expected_returncode, list):
913                     returncode = [expected_returncode]
914                 if 'sometimes' in f:
915                     returncode.append(0)
916             elif f.get("timeout"):
917                 expected_timeout = f
918
919         not_found_expected_failures = [f for f in not_found_expected_failures
920                                        if not f.get('returncode')]
921
922         msg = ""
923         result = Result.PASSED
924         if self.result == Result.TIMEOUT:
925             with open(self.logfile) as f:
926                 signal_fault_info = self.fault_sig_regex.findall(f.read())
927                 if signal_fault_info:
928                     result = Result.FAILED
929                     msg = signal_fault_info[0]
930                 elif expected_timeout:
931                     not_found_expected_failures.remove(expected_timeout)
932                     result, msg = self.check_expected_timeout(expected_timeout)
933                 else:
934                     return
935         elif self.process.returncode in EXITING_SIGNALS:
936             result = Result.FAILED
937             msg = "Application exited with signal %s" % (
938                 EXITING_SIGNALS[self.process.returncode]
939             )
940             self.add_stack_trace_to_logfile()
941         elif self.process.returncode == VALGRIND_ERROR_CODE:
942             msg = "Valgrind reported errors "
943             result = Result.FAILED
944         elif self.process.returncode not in expected_returncode:
945             msg = "Application returned %s " % self.process.returncode
946             if expected_returncode != [0]:
947                 msg += "(expected %s) " % expected_returncode
948             result = Result.FAILED
949
950         if criticals:
951             msg += "(critical errors: [%s]) " % ', '.join(criticals)
952             result = Result.FAILED
953
954         if not_found_expected_failures:
955             mandatory_failures = [f for f in not_found_expected_failures
956                                   if not f.get('sometimes')]
957
958             if mandatory_failures:
959                 msg += "(Expected errors not found: %s) " % mandatory_failures
960                 result = Result.FAILED
961         elif self.expected_failures:
962             msg += '%s(Expected errors occured: %s)%s' % (Colors.OKBLUE,
963                                                           self.expected_failures,
964                                                           Colors.ENDC)
965
966         self.set_result(result, msg.strip())
967
968     def get_valgrind_suppressions(self):
969         result = super(GstValidateTest, self).get_valgrind_suppressions()
970         gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
971         if gst_sup:
972             result.append(gst_sup)
973         return result
974
975
976 class GstValidateEncodingTestInterface(object):
977     DURATION_TOLERANCE = GST_SECOND / 4
978
979     def __init__(self, combination, media_descriptor, duration_tolerance=None):
980         super(GstValidateEncodingTestInterface, self).__init__()
981
982         self.media_descriptor = media_descriptor
983         self.combination = combination
984         self.dest_file = ""
985
986         self._duration_tolerance = duration_tolerance
987         if duration_tolerance is None:
988             self._duration_tolerance = self.DURATION_TOLERANCE
989
990     def get_current_size(self):
991         try:
992             size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
993         except OSError:
994             return None
995
996         self.debug("Size: %s" % size)
997         return size
998
999     def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1000                           audio_restriction=None, audio_presence=0,
1001                           video_presence=0):
1002         ret = ""
1003         if muxer:
1004             ret += muxer
1005         ret += ":"
1006         if venc:
1007             if video_restriction is not None:
1008                 ret = ret + video_restriction + '->'
1009             ret += venc
1010             if video_presence:
1011                 ret = ret + '|' + str(video_presence)
1012         if aenc:
1013             ret += ":"
1014             if audio_restriction is not None:
1015                 ret = ret + audio_restriction + '->'
1016             ret += aenc
1017             if audio_presence:
1018                 ret = ret + '|' + str(audio_presence)
1019
1020         return ret.replace("::", ":")
1021
1022     def get_profile(self, video_restriction=None, audio_restriction=None):
1023         vcaps = self.combination.get_video_caps()
1024         acaps = self.combination.get_audio_caps()
1025         if self.media_descriptor is not None:
1026             if self.media_descriptor.get_num_tracks("video") == 0:
1027                 vcaps = None
1028
1029             if self.media_descriptor.get_num_tracks("audio") == 0:
1030                 acaps = None
1031
1032         return self._get_profile_full(self.combination.get_muxer_caps(),
1033                                       vcaps, acaps,
1034                                       video_restriction=video_restriction,
1035                                       audio_restriction=audio_restriction)
1036
1037     def _clean_caps(self, caps):
1038         """
1039         Returns a list of key=value or structure name, without "(types)" or ";" or ","
1040         """
1041         return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1042
1043     # pylint: disable=E1101
1044     def _has_caps_type_variant(self, c, ccaps):
1045         """
1046         Handle situations where we can have application/ogg or video/ogg or
1047         audio/ogg
1048         """
1049         has_variant = False
1050         media_type = re.findall("application/|video/|audio/", c)
1051         if media_type:
1052             media_type = media_type[0].replace('/', '')
1053             possible_mtypes = ["application", "video", "audio"]
1054             possible_mtypes.remove(media_type)
1055             for tmptype in possible_mtypes:
1056                 possible_c_variant = c.replace(media_type, tmptype)
1057                 if possible_c_variant in ccaps:
1058                     self.info(
1059                         "Found %s in %s, good enough!", possible_c_variant, ccaps)
1060                     has_variant = True
1061
1062         return has_variant
1063
1064     # pylint: disable=E1101
1065     def run_iqa_test(self, reference_file_uri):
1066         """
1067         Runs IQA test if @reference_file_path exists
1068         @test: The test to run tests on
1069         """
1070         if not GstValidateBaseTestManager.has_feature('iqa'):
1071             self.debug('Iqa element not present, not running extra test.')
1072             return
1073
1074         pipeline_desc = """
1075             uridecodebin uri=%s !
1076                 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1077             uridecodebin uri=%s ! iqa.
1078         """ % (reference_file_uri, self.dest_file)
1079         pipeline_desc = pipeline_desc.replace("\n", "")
1080
1081         command = [GstValidateBaseTestManager.COMMAND] + \
1082             shlex.split(pipeline_desc)
1083         if not self.options.redirect_logs:
1084             self.out.write(
1085                 "=================\n"
1086                 "Running IQA tests on results of: %s\n"
1087                 "Command: '%s'\n"
1088                 "=================\n\n" % (
1089                     self.classname, ' '.join(command)))
1090             self.out.flush()
1091         else:
1092             message = "Running IQA tests on results of:%s %s\n" \
1093                 "    Command: %s\n" % (
1094                     Colors.ENDC, self.classname, ' '.join(command))
1095             printc(message, Colors.OKBLUE)
1096
1097         self.process = subprocess.Popen(command,
1098                                         stderr=self.out,
1099                                         stdout=self.out,
1100                                         env=self.proc_env,
1101                                         cwd=self.workdir)
1102         self.process.wait()
1103
1104     def check_encoded_file(self):
1105         result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1106             self.dest_file)
1107         if result_descriptor is None:
1108             return (Result.FAILED, "Could not discover encoded file %s"
1109                     % self.dest_file)
1110
1111         duration = result_descriptor.get_duration()
1112         orig_duration = self.media_descriptor.get_duration()
1113         tolerance = self._duration_tolerance
1114
1115         if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1116             os.remove(result_descriptor.get_path())
1117             return (Result.FAILED, "Duration of encoded file is "
1118                     " wrong (%s instead of %s)" %
1119                     (utils.TIME_ARGS(duration),
1120                      utils.TIME_ARGS(orig_duration)))
1121         else:
1122             all_tracks_caps = result_descriptor.get_tracks_caps()
1123             container_caps = result_descriptor.get_caps()
1124             if container_caps:
1125                 all_tracks_caps.insert(0, ("container", container_caps))
1126
1127             for track_type, caps in all_tracks_caps:
1128                 ccaps = self._clean_caps(caps)
1129                 wanted_caps = self.combination.get_caps(track_type)
1130                 cwanted_caps = self._clean_caps(wanted_caps)
1131
1132                 if wanted_caps is None:
1133                     os.remove(result_descriptor.get_path())
1134                     return (Result.FAILED,
1135                             "Found a track of type %s in the encoded files"
1136                             " but none where wanted in the encoded profile: %s"
1137                             % (track_type, self.combination))
1138
1139                 for c in cwanted_caps:
1140                     if c not in ccaps:
1141                         if not self._has_caps_type_variant(c, ccaps):
1142                             os.remove(result_descriptor.get_path())
1143                             return (Result.FAILED,
1144                                     "Field: %s  (from %s) not in caps of the outputed file %s"
1145                                     % (wanted_caps, c, ccaps))
1146
1147             os.remove(result_descriptor.get_path())
1148             return (Result.PASSED, "")
1149
1150
1151 class TestsManager(Loggable):
1152
1153     """ A class responsible for managing tests. """
1154
1155     name = "base"
1156     loading_testsuite = None
1157
1158     def __init__(self):
1159
1160         Loggable.__init__(self)
1161
1162         self.tests = []
1163         self.unwanted_tests = []
1164         self.options = None
1165         self.args = None
1166         self.reporter = None
1167         self.wanted_tests_patterns = []
1168         self.blacklisted_tests_patterns = []
1169         self._generators = []
1170         self.check_testslist = True
1171         self.all_tests = None
1172         self.expected_failures = {}
1173         self.blacklisted_tests = []
1174
1175     def init(self):
1176         return True
1177
1178     def list_tests(self):
1179         return sorted(list(self.tests), key=lambda x: x.classname)
1180
1181     def find_tests(self, classname):
1182         regex = re.compile(classname)
1183         return [test for test in self.list_tests() if regex.findall(test.classname)]
1184
1185     def add_expected_issues(self, expected_failures):
1186         expected_failures_re = {}
1187         for test_name_regex, failures in list(expected_failures.items()):
1188             regex = re.compile(test_name_regex)
1189             expected_failures_re[regex] = failures
1190             for test in self.tests:
1191                 if regex.findall(test.classname):
1192                     test.expected_failures.extend(failures)
1193
1194         self.expected_failures.update(expected_failures_re)
1195
1196     def add_test(self, test):
1197         if test.generator is None:
1198             test.classname = self.loading_testsuite + '.' + test.classname
1199         for regex, failures in list(self.expected_failures.items()):
1200             if regex.findall(test.classname):
1201                 test.expected_failures.extend(failures)
1202
1203         if self._is_test_wanted(test):
1204             if test not in self.tests:
1205                 self.tests.append(test)
1206                 self.tests.sort(key=lambda test: test.classname)
1207         else:
1208             if test not in self.tests:
1209                 self.unwanted_tests.append(test)
1210                 self.unwanted_tests.sort(key=lambda test: test.classname)
1211
1212     def get_tests(self):
1213         return self.tests
1214
1215     def populate_testsuite(self):
1216         pass
1217
1218     def add_generators(self, generators):
1219         """
1220         @generators: A list of, or one single #TestsGenerator to be used to generate tests
1221         """
1222         if not isinstance(generators, list):
1223             generators = [generators]
1224         self._generators.extend(generators)
1225         for generator in generators:
1226             generator.testsuite = self.loading_testsuite
1227
1228         self._generators = list(set(self._generators))
1229
1230     def get_generators(self):
1231         return self._generators
1232
1233     def _add_blacklist(self, blacklisted_tests):
1234         if not isinstance(blacklisted_tests, list):
1235             blacklisted_tests = [blacklisted_tests]
1236
1237         for patterns in blacklisted_tests:
1238             for pattern in patterns.split(","):
1239                 self.blacklisted_tests_patterns.append(re.compile(pattern))
1240
1241     def set_default_blacklist(self, default_blacklist):
1242         for test_regex, reason in default_blacklist:
1243             if not test_regex.startswith(self.loading_testsuite + '.'):
1244                 test_regex = self.loading_testsuite + '.' + test_regex
1245             self.blacklisted_tests.append((test_regex, reason))
1246
1247     def add_options(self, parser):
1248         """ Add more arguments. """
1249         pass
1250
1251     def set_settings(self, options, args, reporter):
1252         """ Set properties after options parsing. """
1253         self.options = options
1254         self.args = args
1255         self.reporter = reporter
1256
1257         self.populate_testsuite()
1258
1259         if self.options.valgrind:
1260             self.print_valgrind_bugs()
1261
1262         if options.wanted_tests:
1263             for patterns in options.wanted_tests:
1264                 for pattern in patterns.split(","):
1265                     self.wanted_tests_patterns.append(re.compile(pattern))
1266
1267         if options.blacklisted_tests:
1268             for patterns in options.blacklisted_tests:
1269                 self._add_blacklist(patterns)
1270
1271     def set_blacklists(self):
1272         if self.blacklisted_tests:
1273             self.info("Currently 'hardcoded' %s blacklisted tests:" %
1274                       self.name)
1275
1276         if self.options.check_bugs_status:
1277             if not check_bugs_resolution(self.blacklisted_tests):
1278                 return False
1279
1280         for name, bug in self.blacklisted_tests:
1281             self._add_blacklist(name)
1282             if not self.options.check_bugs_status:
1283                 self.info("  + %s --> bug: %s" % (name, bug))
1284
1285         return True
1286
1287     def check_expected_failures(self):
1288         if not self.expected_failures or not self.options.check_bugs_status:
1289             return True
1290
1291         if self.expected_failures:
1292             printc("\nCurrently known failures in the %s testsuite:"
1293                    % self.name, Colors.WARNING, title_char='-')
1294
1295         bugs_definitions = {}
1296         for regex, failures in list(self.expected_failures.items()):
1297             for failure in failures:
1298                 bugs = failure.get('bug')
1299                 if not bugs:
1300                     bugs = failure.get('bugs')
1301                 if not bugs:
1302                     printc('+ %s:\n  --> no bug reported associated with %s\n' % (
1303                         regex.pattern, failure), Colors.WARNING)
1304                     continue
1305
1306                 if not isinstance(bugs, list):
1307                     bugs = [bugs]
1308                 cbugs = bugs_definitions.get(regex.pattern, [])
1309                 bugs.extend([b for b in bugs if b not in cbugs])
1310                 bugs_definitions[regex.pattern] = bugs
1311
1312         return check_bugs_resolution(bugs_definitions.items())
1313
1314     def _check_blacklisted(self, test):
1315         for pattern in self.blacklisted_tests_patterns:
1316             if pattern.findall(test.classname):
1317                 self.info("%s is blacklisted by %s", test.classname, pattern)
1318                 return True
1319
1320         return False
1321
1322     def _check_whitelisted(self, test):
1323         for pattern in self.wanted_tests_patterns:
1324             if pattern.findall(test.classname):
1325                 if self._check_blacklisted(test):
1326                     # If explicitly white listed that specific test
1327                     # bypass the blacklisting
1328                     if pattern.pattern != test.classname:
1329                         return False
1330                 return True
1331         return False
1332
1333     def _check_duration(self, test):
1334         if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1335             self.info("Not activating %s as its duration (%d) is superior"
1336                       " than the long limit (%d)" % (test, test.duration,
1337                                                      int(self.options.long_limit)))
1338             return False
1339
1340         return True
1341
1342     def _is_test_wanted(self, test):
1343         if self._check_whitelisted(test):
1344             if not self._check_duration(test):
1345                 return False
1346             return True
1347
1348         if self._check_blacklisted(test):
1349             return False
1350
1351         if not self._check_duration(test):
1352             return False
1353
1354         if not self.wanted_tests_patterns:
1355             return True
1356
1357         return False
1358
1359     def needs_http_server(self):
1360         return False
1361
1362     def print_valgrind_bugs(self):
1363         pass
1364
1365
1366 class TestsGenerator(Loggable):
1367
1368     def __init__(self, name, test_manager, tests=[]):
1369         Loggable.__init__(self)
1370         self.name = name
1371         self.test_manager = test_manager
1372         self.testsuite = None
1373         self._tests = {}
1374         for test in tests:
1375             self._tests[test.classname] = test
1376
1377     def generate_tests(self, *kwargs):
1378         """
1379         Method that generates tests
1380         """
1381         return list(self._tests.values())
1382
1383     def add_test(self, test):
1384         test.generator = self
1385         test.classname = self.testsuite + '.' + test.classname
1386         self._tests[test.classname] = test
1387
1388
1389 class GstValidateTestsGenerator(TestsGenerator):
1390
1391     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1392         pass
1393
1394     @staticmethod
1395     def get_fakesink_for_media_type(media_type, needs_clock=False):
1396         if media_type == "video":
1397             if needs_clock:
1398                 return 'fakevideosink qos=true max-lateness=20000000'
1399
1400             return "fakevideosink sync=false"
1401
1402         if needs_clock:
1403             return "fakesink sync=true"
1404
1405         return "fakesink"
1406
1407     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1408         self.populate_tests(uri_minfo_special_scenarios, scenarios)
1409         return super(GstValidateTestsGenerator, self).generate_tests()
1410
1411
1412 class _TestsLauncher(Loggable):
1413
1414     def __init__(self):
1415
1416         Loggable.__init__(self)
1417
1418         self.options = None
1419         self.testers = []
1420         self.tests = []
1421         self.reporter = None
1422         self._list_testers()
1423         self.all_tests = None
1424         self.wanted_tests_patterns = []
1425
1426         self.queue = queue.Queue()
1427         self.jobs = []
1428         self.total_num_tests = 0
1429         self.server = None
1430         self.httpsrv = None
1431         self.vfb_server = None
1432
1433     def _list_app_dirs(self):
1434         app_dirs = []
1435         env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1436         if env_dirs is not None:
1437             for dir_ in env_dirs.split(":"):
1438                 app_dirs.append(dir_)
1439
1440         return app_dirs
1441
1442     def _exec_app(self, app_dir, env):
1443         try:
1444             files = os.listdir(app_dir)
1445         except OSError as e:
1446             self.debug("Could not list %s: %s" % (app_dir, e))
1447             files = []
1448         for f in files:
1449             if f.endswith(".py"):
1450                 exec(compile(open(os.path.join(app_dir, f)).read(),
1451                              os.path.join(app_dir, f), 'exec'), env)
1452
1453     def _exec_apps(self, env):
1454         app_dirs = self._list_app_dirs()
1455         for app_dir in app_dirs:
1456             self._exec_app(app_dir, env)
1457
1458     def _list_testers(self):
1459         env = globals().copy()
1460         self._exec_apps(env)
1461
1462         testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1463         for tester in testers:
1464             if tester.init() is True:
1465                 self.testers.append(tester)
1466             else:
1467                 self.warning("Can not init tester: %s -- PATH is %s"
1468                              % (tester.name, os.environ["PATH"]))
1469
1470     def add_options(self, parser):
1471         for tester in self.testers:
1472             tester.add_options(parser)
1473
1474     def _load_testsuite(self, testsuites):
1475         exceptions = []
1476         for testsuite in testsuites:
1477             try:
1478                 sys.path.insert(0, os.path.dirname(testsuite))
1479                 return (__import__(os.path.basename(testsuite).replace(".py", "")), None)
1480             except Exception as e:
1481                 exceptions.append("Could not load %s: %s" % (testsuite, e))
1482                 continue
1483             finally:
1484                 sys.path.remove(os.path.dirname(testsuite))
1485
1486         return (None, exceptions)
1487
1488     def _load_testsuites(self):
1489         testsuites = set()
1490         for testsuite in self.options.testsuites:
1491             if os.path.exists(testsuite):
1492                 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1493                 loaded_module = self._load_testsuite([testsuite])
1494             else:
1495                 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1496                                              for d in self.options.testsuites_dirs]
1497                 loaded_module = self._load_testsuite(possible_testsuites_paths)
1498
1499             module = loaded_module[0]
1500             if not loaded_module[0]:
1501                 if "." in testsuite:
1502                     self.options.testsuites.append(testsuite.split('.')[0])
1503                     self.info("%s looks like a test name, trying that" %
1504                               testsuite)
1505                     self.options.wanted_tests.append(testsuite)
1506                 else:
1507                     printc("Could not load testsuite: %s, reasons: %s" % (
1508                         testsuite, loaded_module[1]), Colors.FAIL)
1509                 continue
1510
1511             testsuites.add(module)
1512             if not hasattr(module, "TEST_MANAGER"):
1513                 module.TEST_MANAGER = [tester.name for tester in self.testers]
1514             elif not isinstance(module.TEST_MANAGER, list):
1515                 module.TEST_MANAGER = [module.TEST_MANAGER]
1516
1517         self.options.testsuites = list(testsuites)
1518
1519     def _setup_testsuites(self):
1520         for testsuite in self.options.testsuites:
1521             loaded = False
1522             wanted_test_manager = None
1523             # TEST_MANAGER has been set in _load_testsuites()
1524             assert hasattr(testsuite, "TEST_MANAGER")
1525             wanted_test_manager = testsuite.TEST_MANAGER
1526             if not isinstance(wanted_test_manager, list):
1527                 wanted_test_manager = [wanted_test_manager]
1528
1529             for tester in self.testers:
1530                 if wanted_test_manager is not None and \
1531                         tester.name not in wanted_test_manager:
1532                     continue
1533
1534                 prev_testsuite_name = TestsManager.loading_testsuite
1535                 if self.options.user_paths:
1536                     TestsManager.loading_testsuite = tester.name
1537                     tester.register_defaults()
1538                     loaded = True
1539                 else:
1540                     TestsManager.loading_testsuite = testsuite.__name__
1541                     if testsuite.setup_tests(tester, self.options):
1542                         loaded = True
1543                 if prev_testsuite_name:
1544                     TestsManager.loading_testsuite = prev_testsuite_name
1545
1546             if not loaded:
1547                 printc("Could not load testsuite: %s"
1548                        " maybe because of missing TestManager"
1549                        % (testsuite), Colors.FAIL)
1550                 return False
1551
1552     def _load_config(self, options):
1553         printc("Loading config files is DEPRECATED"
1554                " you should use the new testsuite format now",)
1555
1556         for tester in self.testers:
1557             tester.options = options
1558             globals()[tester.name] = tester
1559         globals()["options"] = options
1560         c__file__ = __file__
1561         globals()["__file__"] = self.options.config
1562         exec(compile(open(self.options.config).read(),
1563                      self.options.config, 'exec'), globals())
1564         globals()["__file__"] = c__file__
1565
1566     def set_settings(self, options, args):
1567         if options.xunit_file:
1568             self.reporter = reporters.XunitReporter(options)
1569         else:
1570             self.reporter = reporters.Reporter(options)
1571
1572         self.options = options
1573         wanted_testers = None
1574         for tester in self.testers:
1575             if tester.name in args:
1576                 wanted_testers = tester.name
1577
1578         if wanted_testers:
1579             testers = self.testers
1580             self.testers = []
1581             for tester in testers:
1582                 if tester.name in args:
1583                     self.testers.append(tester)
1584                     args.remove(tester.name)
1585
1586         if options.config:
1587             self._load_config(options)
1588
1589         self._load_testsuites()
1590         if not self.options.testsuites:
1591             printc("Not testsuite loaded!", Colors.FAIL)
1592             return False
1593
1594         for tester in self.testers:
1595             tester.set_settings(options, args, self.reporter)
1596
1597         if not options.config and options.testsuites:
1598             if self._setup_testsuites() is False:
1599                 return False
1600
1601         for tester in self.testers:
1602             if not tester.set_blacklists():
1603                 return False
1604
1605             if not tester.check_expected_failures():
1606                 return False
1607
1608         if self.needs_http_server() or options.httponly is True:
1609             self.httpsrv = HTTPServer(options)
1610             self.httpsrv.start()
1611
1612         if options.no_display:
1613             self.vfb_server = get_virual_frame_buffer_server(options)
1614             res = self.vfb_server.start()
1615             if res[0] is False:
1616                 printc("Could not start virtual frame server: %s" % res[1],
1617                        Colors.FAIL)
1618                 return False
1619             os.environ["DISPLAY"] = self.vfb_server.display_id
1620
1621         return True
1622
1623     def _check_tester_has_other_testsuite(self, testsuite, tester):
1624         if tester.name != testsuite.TEST_MANAGER[0]:
1625             return True
1626
1627         for t in self.options.testsuites:
1628             if t != testsuite:
1629                 for other_testmanager in t.TEST_MANAGER:
1630                     if other_testmanager == tester.name:
1631                         return True
1632
1633         return False
1634
1635     def _check_defined_tests(self, tester, tests):
1636         if self.options.blacklisted_tests or self.options.wanted_tests:
1637             return
1638
1639         tests_names = [test.classname for test in tests]
1640         testlist_changed = False
1641         for testsuite in self.options.testsuites:
1642             if not self._check_tester_has_other_testsuite(testsuite, tester) \
1643                     and tester.check_testslist:
1644                 try:
1645                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1646                                          'r+')
1647
1648                     know_tests = testlist_file.read().split("\n")
1649                     testlist_file.close()
1650
1651                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1652                                          'w')
1653                 except IOError:
1654                     continue
1655
1656                 optional_out = []
1657                 for test in know_tests:
1658                     if test and test.strip('~') not in tests_names:
1659                         if not test.startswith('~'):
1660                             testlist_changed = True
1661                             printc("Test %s Not in testsuite %s anymore"
1662                                    % (test, testsuite.__file__), Colors.FAIL)
1663                         else:
1664                             optional_out.append((test, None))
1665
1666                 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1667                                      key=lambda x: x[0].strip('~'))
1668
1669                 for tname, test in tests_names:
1670                     if test and test.optional:
1671                         tname = '~' + tname
1672                     testlist_file.write("%s\n" % (tname))
1673                     if tname and tname not in know_tests:
1674                         printc("Test %s is NEW in testsuite %s"
1675                                % (tname, testsuite.__file__),
1676                                Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1677                         testlist_changed = True
1678
1679                 testlist_file.close()
1680                 break
1681
1682         return testlist_changed
1683
1684     def list_tests(self):
1685         for tester in self.testers:
1686             if not self._tester_needed(tester):
1687                 continue
1688
1689             tests = tester.list_tests()
1690             if self._check_defined_tests(tester, tests) and \
1691                     self.options.fail_on_testlist_change:
1692                 raise RuntimeError("Unexpected new test in testsuite.")
1693
1694             self.tests.extend(tests)
1695         return sorted(list(self.tests), key=lambda t: t.classname)
1696
1697     def _tester_needed(self, tester):
1698         for testsuite in self.options.testsuites:
1699             if tester.name in testsuite.TEST_MANAGER:
1700                 return True
1701         return False
1702
1703     def server_wrapper(self, ready):
1704         self.server = GstValidateTCPServer(
1705             ('localhost', 0), GstValidateListener)
1706         self.server.socket.settimeout(None)
1707         self.server.launcher = self
1708         self.serverport = self.server.socket.getsockname()[1]
1709         self.info("%s server port: %s" % (self, self.serverport))
1710         ready.set()
1711
1712         self.server.serve_forever(poll_interval=0.05)
1713
1714     def _start_server(self):
1715         self.info("Starting TCP Server")
1716         ready = threading.Event()
1717         self.server_thread = threading.Thread(target=self.server_wrapper,
1718                                               kwargs={'ready': ready})
1719         self.server_thread.start()
1720         ready.wait()
1721         os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
1722
1723     def _stop_server(self):
1724         if self.server:
1725             self.server.shutdown()
1726             self.server_thread.join()
1727             self.server.server_close()
1728             self.server = None
1729
1730     def test_wait(self):
1731         while True:
1732             # Check process every second for timeout
1733             try:
1734                 self.queue.get(timeout=1)
1735             except queue.Empty:
1736                 pass
1737
1738             for test in self.jobs:
1739                 if test.process_update():
1740                     self.jobs.remove(test)
1741                     return test
1742
1743     def tests_wait(self):
1744         try:
1745             test = self.test_wait()
1746             test.check_results()
1747         except KeyboardInterrupt:
1748             for test in self.jobs:
1749                 test.kill_subprocess()
1750             raise
1751
1752         return test
1753
1754     def start_new_job(self, tests_left):
1755         try:
1756             test = tests_left.pop(0)
1757         except IndexError:
1758             return False
1759
1760         test.test_start(self.queue)
1761
1762         self.jobs.append(test)
1763
1764         return True
1765
1766     def _run_tests(self):
1767         if not self.all_tests:
1768             self.all_tests = self.list_tests()
1769         self.total_num_tests = len(self.all_tests)
1770         if not sys.stdout.isatty():
1771             printc("\nRunning %d tests..." % self.total_num_tests, color=Colors.HEADER)
1772
1773         self.reporter.init_timer()
1774         alone_tests = []
1775         tests = []
1776         for test in self.tests:
1777             if test.is_parallel:
1778                 tests.append(test)
1779             else:
1780                 alone_tests.append(test)
1781
1782         max_num_jobs = min(self.options.num_jobs, len(tests))
1783         jobs_running = 0
1784
1785         # if order of test execution doesn't matter, shuffle
1786         # the order to optimize cpu usage
1787         if self.options.shuffle:
1788             random.shuffle(tests)
1789             random.shuffle(alone_tests)
1790
1791         current_test_num = 1
1792         for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
1793             tests_left = list(tests)
1794             for i in range(num_jobs):
1795                 if not self.start_new_job(tests_left):
1796                     break
1797                 jobs_running += 1
1798
1799             while jobs_running != 0:
1800                 test = self.tests_wait()
1801                 jobs_running -= 1
1802                 test.number = "[%d / %d] " % (current_test_num,
1803                                               self.total_num_tests)
1804                 current_test_num += 1
1805                 res = test.test_end()
1806                 self.reporter.after_test(test)
1807                 if res != Result.PASSED and (self.options.forever
1808                                              or self.options.fatal_error):
1809                     return False
1810                 if self.start_new_job(tests_left):
1811                     jobs_running += 1
1812
1813         return True
1814
1815     def clean_tests(self):
1816         for test in self.tests:
1817             test.clean()
1818         self._stop_server()
1819
1820     def run_tests(self):
1821         try:
1822             self._start_server()
1823             if self.options.forever:
1824                 r = 1
1825                 while True:
1826                     printc("Running iteration %d" % r, title=True)
1827
1828                     if not self._run_tests():
1829                         break
1830                     r += 1
1831                     self.clean_tests()
1832
1833                 return False
1834             elif self.options.n_runs:
1835                 res = True
1836                 for r in range(self.options.n_runs):
1837                     t = "Running iteration %d" % r
1838                     print("%s\n%s\n%s\n" % ("=" * len(t), t, "=" * len(t)))
1839                     if not self._run_tests():
1840                         res = False
1841                     self.clean_tests()
1842
1843                 return res
1844             else:
1845                 return self._run_tests()
1846         finally:
1847             if self.httpsrv:
1848                 self.httpsrv.stop()
1849             if self.vfb_server:
1850                 self.vfb_server.stop()
1851             self.clean_tests()
1852
1853     def final_report(self):
1854         return self.reporter.final_report()
1855
1856     def needs_http_server(self):
1857         for tester in self.testers:
1858             if tester.needs_http_server():
1859                 return True
1860
1861
1862 class NamedDic(object):
1863
1864     def __init__(self, props):
1865         if props:
1866             for name, value in props.items():
1867                 setattr(self, name, value)
1868
1869
1870 class Scenario(object):
1871
1872     def __init__(self, name, props, path=None):
1873         self.name = name
1874         self.path = path
1875
1876         for prop, value in props:
1877             setattr(self, prop.replace("-", "_"), value)
1878
1879     def get_execution_name(self):
1880         if self.path is not None:
1881             return self.path
1882         else:
1883             return self.name
1884
1885     def seeks(self):
1886         if hasattr(self, "seek"):
1887             return bool(self.seek)
1888
1889         return False
1890
1891     def needs_clock_sync(self):
1892         if hasattr(self, "need_clock_sync"):
1893             return bool(self.need_clock_sync)
1894
1895         return False
1896
1897     def needs_live_content(self):
1898         # Scenarios that can only be used on live content
1899         if hasattr(self, "live_content_required"):
1900             return bool(self.live_content_required)
1901         return False
1902
1903     def compatible_with_live_content(self):
1904         # if a live content is required it's implicitely compatible with
1905         # live content
1906         if self.needs_live_content():
1907             return True
1908         if hasattr(self, "live_content_compatible"):
1909             return bool(self.live_content_compatible)
1910         return False
1911
1912     def get_min_media_duration(self):
1913         if hasattr(self, "min_media_duration"):
1914             return float(self.min_media_duration)
1915
1916         return 0
1917
1918     def does_reverse_playback(self):
1919         if hasattr(self, "reverse_playback"):
1920             return bool(self.reverse_playback)
1921
1922         return False
1923
1924     def get_duration(self):
1925         try:
1926             return float(getattr(self, "duration"))
1927         except AttributeError:
1928             return 0
1929
1930     def get_min_tracks(self, track_type):
1931         try:
1932             return int(getattr(self, "min_%s_track" % track_type))
1933         except AttributeError:
1934             return 0
1935
1936     def __repr__(self):
1937         return "<Scenario %s>" % self.name
1938
1939
1940 class ScenarioManager(Loggable):
1941     _instance = None
1942     all_scenarios = []
1943
1944     FILE_EXTENSION = "scenario"
1945
1946     def __new__(cls, *args, **kwargs):
1947         if not cls._instance:
1948             cls._instance = super(ScenarioManager, cls).__new__(
1949                 cls, *args, **kwargs)
1950             cls._instance.config = None
1951             cls._instance.discovered = False
1952             Loggable.__init__(cls._instance)
1953
1954         return cls._instance
1955
1956     def find_special_scenarios(self, mfile):
1957         scenarios = []
1958         mfile_bname = os.path.basename(mfile)
1959
1960         for f in os.listdir(os.path.dirname(mfile)):
1961             if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
1962                 scenarios.append(os.path.join(os.path.dirname(mfile), f))
1963
1964         if scenarios:
1965             scenarios = self.discover_scenarios(scenarios, mfile)
1966
1967         return scenarios
1968
1969     def discover_scenarios(self, scenario_paths=[], mfile=None):
1970         """
1971         Discover scenarios specified in scenario_paths or the default ones
1972         if nothing specified there
1973         """
1974         scenarios = []
1975         scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
1976         logs = open(os.path.join(self.config.logsdir,
1977                                  "scenarios_discovery.log"), 'w')
1978
1979         try:
1980             command = [GstValidateBaseTestManager.COMMAND,
1981                        "--scenarios-defs-output-file", scenario_defs]
1982             command.extend(scenario_paths)
1983             subprocess.check_call(command, stdout=logs, stderr=logs)
1984         except subprocess.CalledProcessError as e:
1985             self.error(e)
1986             pass
1987
1988         config = configparser.RawConfigParser()
1989         f = open(scenario_defs)
1990         config.readfp(f)
1991
1992         for section in config.sections():
1993             if scenario_paths:
1994                 for scenario_path in scenario_paths:
1995                     if section in os.path.splitext(os.path.basename(scenario_path))[0]:
1996                         if mfile is None:
1997                             name = section
1998                             path = scenario_path
1999                         else:
2000                             # The real name of the scenario is:
2001                             # filename.REALNAME.scenario
2002                             name = scenario_path.replace(mfile + ".", "").replace(
2003                                 "." + self.FILE_EXTENSION, "")
2004                             path = scenario_path
2005             else:
2006                 name = section
2007                 path = None
2008
2009             props = config.items(section)
2010             scenarios.append(Scenario(name, props, path))
2011
2012         if not scenario_paths:
2013             self.discovered = True
2014             self.all_scenarios.extend(scenarios)
2015
2016         return scenarios
2017
2018     def get_scenario(self, name):
2019         if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2020             scenarios = self.discover_scenarios([name])
2021
2022             if scenarios:
2023                 return scenarios[0]
2024
2025         if self.discovered is False:
2026             self.discover_scenarios()
2027
2028         if name is None:
2029             return self.all_scenarios
2030
2031         try:
2032             return [scenario for scenario in self.all_scenarios if scenario.name == name][0]
2033         except IndexError:
2034             self.warning("Scenario: %s not found" % name)
2035             return None
2036
2037
2038 class GstValidateBaseTestManager(TestsManager):
2039     scenarios_manager = ScenarioManager()
2040     features_cache = {}
2041
2042     def __init__(self):
2043         super(GstValidateBaseTestManager, self).__init__()
2044         self._scenarios = []
2045         self._encoding_formats = []
2046
2047     @classmethod
2048     def update_commands(cls, extra_paths=None):
2049         for varname, cmd in {'': 'gst-validate',
2050                              'TRANSCODING_': 'gst-validate-transcoding',
2051                              'MEDIA_CHECK_': 'gst-validate-media-check',
2052                              'RTSP_SERVER_': 'gst-validate-rtsp-server',
2053                              'INSPECT_': 'gst-inspect'}.items():
2054             setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2055
2056     @classmethod
2057     def has_feature(cls, featurename):
2058         try:
2059             return cls.features_cache[featurename]
2060         except KeyError:
2061             pass
2062
2063         try:
2064             subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2065             res = True
2066         except subprocess.CalledProcessError:
2067             res = False
2068
2069         cls.features_cache[featurename] = res
2070         return res
2071
2072     def add_scenarios(self, scenarios):
2073         """
2074         @scenarios A list or a unic scenario name(s) to be run on the tests.
2075                     They are just the default scenarios, and then depending on
2076                     the TestsGenerator to be used you can have more fine grained
2077                     control on what to be run on each serie of tests.
2078         """
2079         if isinstance(scenarios, list):
2080             self._scenarios.extend(scenarios)
2081         else:
2082             self._scenarios.append(scenarios)
2083
2084         self._scenarios = list(set(self._scenarios))
2085
2086     def set_scenarios(self, scenarios):
2087         """
2088         Override the scenarios
2089         """
2090         self._scenarios = []
2091         self.add_scenarios(scenarios)
2092
2093     def get_scenarios(self):
2094         return self._scenarios
2095
2096     def add_encoding_formats(self, encoding_formats):
2097         """
2098         :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2099                            formats for transcoding test.
2100                            They are just the default encoding formats, and then depending on
2101                            the TestsGenerator to be used you can have more fine grained
2102                            control on what to be run on each serie of tests.
2103         """
2104         if isinstance(encoding_formats, list):
2105             self._encoding_formats.extend(encoding_formats)
2106         else:
2107             self._encoding_formats.append(encoding_formats)
2108
2109         self._encoding_formats = list(set(self._encoding_formats))
2110
2111     def get_encoding_formats(self):
2112         return self._encoding_formats
2113
2114
2115 GstValidateBaseTestManager.update_commands()
2116
2117
2118 class MediaDescriptor(Loggable):
2119
2120     def __init__(self):
2121         Loggable.__init__(self)
2122
2123     def get_path(self):
2124         raise NotImplemented
2125
2126     def has_frames(self):
2127         return False
2128
2129     def get_media_filepath(self):
2130         raise NotImplemented
2131
2132     def skip_parsers(self):
2133         return False
2134
2135     def get_caps(self):
2136         raise NotImplemented
2137
2138     def get_uri(self):
2139         raise NotImplemented
2140
2141     def get_duration(self):
2142         raise NotImplemented
2143
2144     def get_protocol(self):
2145         raise NotImplemented
2146
2147     def is_seekable(self):
2148         raise NotImplemented
2149
2150     def is_live(self):
2151         raise NotImplemented
2152
2153     def is_image(self):
2154         raise NotImplemented
2155
2156     def get_num_tracks(self, track_type):
2157         raise NotImplemented
2158
2159     def can_play_reverse(self):
2160         raise NotImplemented
2161
2162     def prerrols(self):
2163         return True
2164
2165     def is_compatible(self, scenario):
2166         if scenario is None:
2167             return True
2168
2169         if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2170             self.debug("Do not run %s as %s does not support seeking",
2171                        scenario, self.get_uri())
2172             return False
2173
2174         if self.is_image() and scenario.needs_clock_sync():
2175             self.debug("Do not run %s as %s is an image",
2176                        scenario, self.get_uri())
2177             return False
2178
2179         if not self.can_play_reverse() and scenario.does_reverse_playback():
2180             return False
2181
2182         if not self.is_live() and scenario.needs_live_content():
2183             self.debug("Do not run %s as %s is not a live content",
2184                        scenario, self.get_uri())
2185             return False
2186
2187         if self.is_live() and not scenario.compatible_with_live_content():
2188             self.debug("Do not run %s as %s is a live content",
2189                        scenario, self.get_uri())
2190             return False
2191
2192         if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2193             return False
2194
2195         if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2196             self.debug(
2197                 "Do not run %s as %s is too short (%i < min media duation : %i",
2198                 scenario, self.get_uri(),
2199                 self.get_duration() / GST_SECOND,
2200                 scenario.get_min_media_duration())
2201             return False
2202
2203         for track_type in ['audio', 'subtitle', 'video']:
2204             if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2205                 self.debug("%s -- %s | At least %s %s track needed  < %s"
2206                            % (scenario, self.get_uri(), track_type,
2207                               scenario.get_min_tracks(track_type),
2208                               self.get_num_tracks(track_type)))
2209                 return False
2210
2211         return True
2212
2213
2214 class GstValidateMediaDescriptor(MediaDescriptor):
2215     # Some extension file for discovering results
2216     MEDIA_INFO_EXT = "media_info"
2217     PUSH_MEDIA_INFO_EXT = "media_info.push"
2218     STREAM_INFO_EXT = "stream_info"
2219
2220     def __init__(self, xml_path):
2221         super(GstValidateMediaDescriptor, self).__init__()
2222
2223         self._xml_path = xml_path
2224         try:
2225             media_xml = ET.parse(xml_path).getroot()
2226         except xml.etree.ElementTree.ParseError:
2227             printc("Could not parse %s" % xml_path,
2228                    Colors.FAIL)
2229             raise
2230
2231         self._extract_data(media_xml)
2232
2233         self.set_protocol(urllib.parse.urlparse(
2234             urllib.parse.urlparse(self.get_uri()).scheme).scheme)
2235
2236     def skip_parsers(self):
2237         return self._skip_parsers
2238
2239     def has_frames(self):
2240         return self._has_frames
2241
2242     def _extract_data(self, media_xml):
2243         # Extract the information we need from the xml
2244         self._caps = media_xml.findall("streams")[0].attrib["caps"]
2245         self._track_caps = []
2246         try:
2247             streams = media_xml.findall("streams")[0].findall("stream")
2248         except IndexError:
2249             pass
2250         else:
2251             for stream in streams:
2252                 self._track_caps.append(
2253                     (stream.attrib["type"], stream.attrib["caps"]))
2254         self._uri = media_xml.attrib["uri"]
2255         self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2256         self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2257         self._duration = int(media_xml.attrib["duration"])
2258         self._protocol = media_xml.get("protocol", None)
2259         self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2260         self._is_live = media_xml.get("live", "false").lower() == "true"
2261         self._is_image = False
2262         for stream in media_xml.findall("streams")[0].findall("stream"):
2263             if stream.attrib["type"] == "image":
2264                 self._is_image = True
2265         self._track_types = []
2266         for stream in media_xml.findall("streams")[0].findall("stream"):
2267             self._track_types.append(stream.attrib["type"])
2268
2269     @staticmethod
2270     def new_from_uri(uri, verbose=False, include_frames=False, is_push=False):
2271         """
2272             include_frames = 0 # Never
2273             include_frames = 1 # always
2274             include_frames = 2 # if previous file included them
2275
2276         """
2277         media_path = utils.url2path(uri)
2278
2279         ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT if is_push else \
2280             GstValidateMediaDescriptor.MEDIA_INFO_EXT
2281         descriptor_path = "%s.%s" % (media_path, ext)
2282         args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2283         args.append(uri)
2284         if include_frames == 2:
2285             try:
2286                 media_xml = ET.parse(descriptor_path).getroot()
2287
2288                 include_frames = bool(int(media_xml.attrib["frame-detection"]))
2289                 if bool(int(media_xml.attrib.get("skip-parsers"))):
2290                     args.append("--skip-parsers")
2291             except FileNotFoundError:
2292                 pass
2293         else:
2294             include_frames = bool(include_frames)
2295
2296         args.extend(["--output-file", descriptor_path])
2297         if include_frames:
2298             args.extend(["--full"])
2299
2300         if verbose:
2301             printc("Generating media info for %s\n"
2302                    "    Command: '%s'" % (media_path, ' '.join(args)),
2303                    Colors.OKBLUE)
2304
2305         try:
2306             subprocess.check_output(args, stderr=open(os.devnull))
2307         except subprocess.CalledProcessError as e:
2308             if verbose:
2309                 printc("Result: Failed", Colors.FAIL)
2310             else:
2311                 loggable.warning("GstValidateMediaDescriptor",
2312                                  "Exception: %s" % e)
2313             return None
2314
2315         if verbose:
2316             printc("Result: Passed", Colors.OKGREEN)
2317
2318         try:
2319             return GstValidateMediaDescriptor(descriptor_path)
2320         except (IOError, xml.etree.ElementTree.ParseError):
2321             return None
2322
2323     def get_path(self):
2324         return self._xml_path
2325
2326     def need_clock_sync(self):
2327         return Protocols.needs_clock_sync(self.get_protocol())
2328
2329     def get_media_filepath(self):
2330         if self.get_protocol() == Protocols.FILE:
2331             return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "")
2332         elif self.get_protocol() == Protocols.PUSHFILE:
2333             return self._xml_path.replace("." + self.PUSH_MEDIA_INFO_EXT, "")
2334         else:
2335             return self._xml_path.replace("." + self.STREAM_INFO_EXT, "")
2336
2337     def get_caps(self):
2338         return self._caps
2339
2340     def get_tracks_caps(self):
2341         return self._track_caps
2342
2343     def get_uri(self):
2344         return self._uri
2345
2346     def get_duration(self):
2347         return self._duration
2348
2349     def set_protocol(self, protocol):
2350         if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2351             self._protocol = Protocols.PUSHFILE
2352         else:
2353             self._protocol = protocol
2354
2355     def get_protocol(self):
2356         return self._protocol
2357
2358     def is_seekable(self):
2359         return self._is_seekable
2360
2361     def is_live(self):
2362         return self._is_live
2363
2364     def can_play_reverse(self):
2365         return True
2366
2367     def is_image(self):
2368         return self._is_image
2369
2370     def get_num_tracks(self, track_type):
2371         n = 0
2372         for t in self._track_types:
2373             if t == track_type:
2374                 n += 1
2375
2376         return n
2377
2378     def get_clean_name(self):
2379         name = os.path.basename(self.get_path())
2380         name = re.sub("\.stream_info|\.media_info", "", name)
2381
2382         return name.replace('.', "_")
2383
2384
2385 class MediaFormatCombination(object):
2386     FORMATS = {"aac": "audio/mpeg,mpegversion=4",  # Audio
2387                "ac3": "audio/x-ac3",
2388                "vorbis": "audio/x-vorbis",
2389                "mp3": "audio/mpeg,mpegversion=1,layer=3",
2390                "opus": "audio/x-opus",
2391                "rawaudio": "audio/x-raw",
2392
2393                # Video
2394                "h264": "video/x-h264",
2395                "h265": "video/x-h265",
2396                "vp8": "video/x-vp8",
2397                "vp9": "video/x-vp9",
2398                "theora": "video/x-theora",
2399                "prores": "video/x-prores",
2400                "jpeg": "image/jpeg",
2401
2402                # Containers
2403                "webm": "video/webm",
2404                "ogg": "application/ogg",
2405                "mkv": "video/x-matroska",
2406                "mp4": "video/quicktime,variant=iso;",
2407                "quicktime": "video/quicktime;"}
2408
2409     def __str__(self):
2410         return "%s and %s in %s" % (self.audio, self.video, self.container)
2411
2412     def __init__(self, container, audio, video):
2413         """
2414         Describes a media format to be used for transcoding tests.
2415
2416         :param container: A string defining the container format to be used, must bin in self.FORMATS
2417         :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2418         :param video: A string defining the video format to be used, must bin in self.FORMATS
2419         """
2420         self.container = container
2421         self.audio = audio
2422         self.video = video
2423
2424     def get_caps(self, track_type):
2425         try:
2426             return self.FORMATS[self.__dict__[track_type]]
2427         except KeyError:
2428             return None
2429
2430     def get_audio_caps(self):
2431         return self.get_caps("audio")
2432
2433     def get_video_caps(self):
2434         return self.get_caps("video")
2435
2436     def get_muxer_caps(self):
2437         return self.get_caps("container")