validate:launcher: Launch tests in `_TestsLauncher` not in TestsManagaer
[platform/upstream/gstreamer.git] / validate / launcher / baseclasses.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19
20 """ Class representing tests and test managers. """
21
22 import json
23 import os
24 import sys
25 import re
26 import copy
27 import socketserver
28 import struct
29 import time
30 from . import utils
31 import signal
32 import urllib.parse
33 import subprocess
34 import threading
35 import queue
36 import configparser
37 import xml
38
39 from . import reporters
40 from . import loggable
41 from .loggable import Loggable
42
43 try:
44     from lxml import etree as ET
45 except ImportError:
46     import xml.etree.cElementTree as ET
47
48 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
49     Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
50     check_bugs_resolution
51
52 # The factor by which we increase the hard timeout when running inside
53 # Valgrind
54 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
55 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
56 # The error reported by valgrind when detecting errors
57 VALGRIND_ERROR_CODE = 20
58
59 VALIDATE_OVERRIDE_EXTENSION = ".override"
60 COREDUMP_SIGNALS = [-getattr(signal, s) for s in [
61     'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
62     'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)] + [139]
63
64
65 class Test(Loggable):
66
67     """ A class representing a particular test. """
68
69     def __init__(self, application_name, classname, options,
70                  reporter, duration=0, timeout=DEFAULT_TIMEOUT,
71                  hard_timeout=None, extra_env_variables=None,
72                  expected_failures=None, is_parallel=True):
73         """
74         @timeout: The timeout during which the value return by get_current_value
75                   keeps being exactly equal
76         @hard_timeout: Max time the test can take in absolute
77         """
78         Loggable.__init__(self)
79         self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
80         if hard_timeout:
81             self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
82             self.hard_timeout *= options.timeout_factor
83         else:
84             self.hard_timeout = hard_timeout
85         self.classname = classname
86         self.options = options
87         self.application = application_name
88         self.command = []
89         self.server_command = None
90         self.reporter = reporter
91         self.process = None
92         self.proc_env = None
93         self.thread = None
94         self.queue = None
95         self.duration = duration
96         self.stack_trace = None
97         if expected_failures is None:
98             self.expected_failures = []
99         elif not isinstance(expected_failures, list):
100             self.expected_failures = [expected_failures]
101         else:
102             self.expected_failures = expected_failures
103
104         extra_env_variables = extra_env_variables or {}
105         self.extra_env_variables = extra_env_variables
106         self.optional = False
107         self.is_parallel = is_parallel
108         self.generator = None
109
110         self.clean()
111
112     def clean(self):
113         self.kill_subprocess()
114         self.message = ""
115         self.error_str = ""
116         self.time_taken = 0.0
117         self._starting_time = None
118         self.result = Result.NOT_RUN
119         self.logfile = None
120         self.out = None
121         self.extra_logfiles = []
122         self.__env_variable = []
123         self.kill_subprocess()
124
125     def __str__(self):
126         string = self.classname
127         if self.result != Result.NOT_RUN:
128             string += ": " + self.result
129             if self.result in [Result.FAILED, Result.TIMEOUT]:
130                 string += " '%s'\n" \
131                           "       You can reproduce with: %s\n" \
132                     % (self.message, self.get_command_repr())
133
134                 string += self.get_logfile_repr()
135
136         return string
137
138     def add_env_variable(self, variable, value=None):
139         """
140         Only usefull so that the gst-validate-launcher can print the exact
141         right command line to reproduce the tests
142         """
143         if value is None:
144             value = os.environ.get(variable, None)
145
146         if value is None:
147             return
148
149         self.__env_variable.append(variable)
150
151     @property
152     def _env_variable(self):
153         res = ""
154         for var in set(self.__env_variable):
155             if res:
156                 res += " "
157             value = self.proc_env.get(var, None)
158             if value is not None:
159                 res += "%s='%s'" % (var, value)
160
161         return res
162
163     def open_logfile(self):
164         if self.out:
165             return
166
167         path = os.path.join(self.options.logsdir,
168                             self.classname.replace(".", os.sep))
169         mkdir(os.path.dirname(path))
170         self.logfile = path
171
172         if self.options.redirect_logs == 'stdout':
173             self.out = sys.stdout
174         elif self.options.redirect_logs == 'stderr':
175             self.out = sys.stderr
176         else:
177             self.out = open(path, 'w+')
178
179     def close_logfile(self):
180         if not self.options.redirect_logs:
181             self.out.close()
182
183         self.out = None
184
185     def _get_file_content(self, file_name):
186         f = open(file_name, 'r+')
187         value = f.read()
188         f.close()
189
190         return value
191
192     def get_log_content(self):
193         return self._get_file_content(self.logfile)
194
195     def get_extra_log_content(self, extralog):
196         if extralog not in self.extra_logfiles:
197             return ""
198
199         return self._get_file_content(extralog)
200
201     def get_classname(self):
202         name = self.classname.split('.')[-1]
203         classname = self.classname.replace('.%s' % name, '')
204
205         return classname
206
207     def get_name(self):
208         return self.classname.split('.')[-1]
209
210     def add_arguments(self, *args):
211         self.command += args
212
213     def build_arguments(self):
214         self.add_env_variable("LD_PRELOAD")
215         self.add_env_variable("DISPLAY")
216
217     def add_stack_trace_to_logfile(self):
218         trace_gatherer = BackTraceGenerator.get_default()
219         stack_trace = trace_gatherer.get_trace(self)
220
221         if not stack_trace:
222             return
223
224         info = "\n\n== Stack trace: == \n%s" % stack_trace
225         if self.options.redirect_logs:
226             print(info)
227         elif self.options.xunit_file:
228             self.stack_trace = stack_trace
229         else:
230             with open(self.logfile, 'a') as f:
231                 f.write(info)
232
233     def set_result(self, result, message="", error=""):
234         self.debug("Setting result: %s (message: %s, error: %s)" % (result,
235                    message, error))
236
237         if result is Result.TIMEOUT:
238             if self.options.debug is True:
239                 if self.options.gdb:
240                     printc("Timeout, you should process <ctrl>c to get into gdb",
241                         Colors.FAIL)
242                     # and wait here until gdb exits
243                     self.process.communicate()
244                 else:
245                     pname = self.command[0]
246                     input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
247                           "Press enter to continue" % (Colors.FAIL, pname, self.process.pid,
248                                                        Colors.ENDC))
249             else:
250                 self.add_stack_trace_to_logfile()
251
252         self.result = result
253         self.message = message
254         self.error_str = error
255
256     def check_results(self):
257         if self.result is Result.FAILED or self.result is Result.TIMEOUT:
258             return
259
260         self.debug("%s returncode: %s", self, self.process.returncode)
261         if self.process.returncode == 0:
262             self.set_result(Result.PASSED)
263         elif self.process.returncode in [-signal.SIGSEGV, -signal.SIGABRT, 139]:
264             self.add_stack_trace_to_logfile()
265             self.set_result(Result.FAILED,
266                             "Application segfaulted, returne code: %d" % (
267                                 self.process.returncode))
268         elif self.process.returncode == VALGRIND_ERROR_CODE:
269             self.set_result(Result.FAILED, "Valgrind reported errors")
270         else:
271             self.set_result(Result.FAILED,
272                             "Application returned %d" % (self.process.returncode))
273
274     def get_current_value(self):
275         """
276         Lets subclasses implement a nicer timeout measurement method
277         They should return some value with which we will compare
278         the previous and timeout if they are egual during self.timeout
279         seconds
280         """
281         return Result.NOT_RUN
282
283     def process_update(self):
284         """
285         Returns True when process has finished running or has timed out.
286         """
287
288         if self.process is None:
289             # Process has not started running yet
290             return False
291
292         self.process.poll()
293         if self.process.returncode is not None:
294             return True
295
296         val = self.get_current_value()
297
298         self.debug("Got value: %s" % val)
299         if val is Result.NOT_RUN:
300             # The get_current_value logic is not implemented... dumb
301             # timeout
302             if time.time() - self.last_change_ts > self.timeout:
303                 self.set_result(Result.TIMEOUT,
304                                 "Application timed out: %s secs" %
305                                 self.timeout,
306                                 "timeout")
307                 return True
308             return False
309         elif val is Result.FAILED:
310             return True
311         elif val is Result.KNOWN_ERROR:
312             return True
313
314         self.log("New val %s" % val)
315
316         if val == self.last_val:
317             delta = time.time() - self.last_change_ts
318             self.debug("%s: Same value for %d/%d seconds" %
319                        (self, delta, self.timeout))
320             if delta > self.timeout:
321                 self.set_result(Result.TIMEOUT,
322                                 "Application timed out: %s secs" %
323                                 self.timeout,
324                                 "timeout")
325                 return True
326         elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
327             self.set_result(
328                 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
329             return True
330         else:
331             self.last_change_ts = time.time()
332             self.last_val = val
333
334         return False
335
336     def get_subproc_env(self):
337         return os.environ.copy()
338
339     def kill_subprocess(self):
340         utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT)
341
342     def thread_wrapper(self):
343         self.process = subprocess.Popen(self.command,
344                                         stderr=self.out,
345                                         stdout=self.out,
346                                         env=self.proc_env)
347         self.process.wait()
348         if self.result is not Result.TIMEOUT:
349             self.queue.put(None)
350
351     def get_valgrind_suppression_file(self, subdir, name):
352         p = get_data_file(subdir, name)
353         if p:
354             return p
355
356         self.error("Could not find any %s file" % name)
357
358     def get_valgrind_suppressions(self):
359         return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
360
361     def use_gdb(self, command):
362         if self.hard_timeout is not None:
363             self.hard_timeout *= GDB_TIMEOUT_FACTOR
364         self.timeout *= GDB_TIMEOUT_FACTOR
365         return ["gdb", "-ex", "run", "-ex", "backtrace", "-ex", "quit", "--args"] + command
366
367     def use_valgrind(self, command, subenv):
368         vglogsfile = self.logfile + '.valgrind'
369         self.extra_logfiles.append(vglogsfile)
370
371         vg_args = []
372
373         for o, v in [('trace-children', 'yes'),
374                 ('tool', 'memcheck'),
375                 ('leak-check', 'full'),
376                 ('leak-resolution', 'high'),
377                 # TODO: errors-for-leak-kinds should be set to all instead of definite
378                 #       and all false positives should be added to suppression files.
379                 ('errors-for-leak-kinds', 'definite'),
380                 ('num-callers', '20'),
381                 ('error-exitcode', str(VALGRIND_ERROR_CODE)),
382                 ('gen-suppressions', 'all')]:
383             vg_args.append("--%s=%s" % (o, v))
384
385         if not self.options.redirect_logs:
386             vglogsfile = self.logfile + '.valgrind'
387             self.extra_logfiles.append(vglogsfile)
388             vg_args.append("--%s=%s" % ('log-file', vglogsfile))
389
390         for supp in self.get_valgrind_suppressions():
391             vg_args.append("--suppressions=%s" % supp)
392
393         command = ["valgrind"] + vg_args + command
394
395         # Tune GLib's memory allocator to be more valgrind friendly
396         subenv['G_DEBUG'] = 'gc-friendly'
397         subenv['G_SLICE'] = 'always-malloc'
398
399         if self.hard_timeout is not None:
400             self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
401         self.timeout *= VALGRIND_TIMEOUT_FACTOR
402
403         # Enable 'valgrind.config'
404         vg_config = get_data_file('data', 'valgrind.config')
405
406         if self.proc_env.get('GST_VALIDATE_CONFIG'):
407             subenv['GST_VALIDATE_CONFIG'] = '%s%s%s' % (self.proc_env['GST_VALIDATE_CONFIG'], os.pathsep, vg_config)
408         else:
409             subenv['GST_VALIDATE_CONFIG'] = vg_config
410
411         if subenv == self.proc_env:
412             self.add_env_variable('G_DEBUG', 'gc-friendly')
413             self.add_env_variable('G_SLICE', 'always-malloc')
414             self.add_env_variable('GST_VALIDATE_CONFIG', self.proc_env['GST_VALIDATE_CONFIG'])
415
416         return command
417
418     def launch_server(self):
419         return None
420
421     def get_logfile_repr(self):
422         message = "    Logs:\n"
423         logfiles = self.extra_logfiles.copy()
424
425         if not self.options.redirect_logs:
426             logfiles.insert(0, self.logfile)
427
428         for log in logfiles:
429             message += "\n         - %s" % log
430
431         return message
432
433     def get_command_repr(self):
434         message = "%s %s" % (self._env_variable, ' '.join(self.command))
435         if self.server_command:
436             message = "%s & %s" % (self.server_command, message)
437
438         return "'%s'" % message
439
440     def test_start(self, queue):
441         self.open_logfile()
442
443         self.server_command = self.launch_server()
444         self.queue = queue
445         self.command = [self.application]
446         self._starting_time = time.time()
447         self.build_arguments()
448         self.proc_env = self.get_subproc_env()
449
450         for var, value in list(self.extra_env_variables.items()):
451             value = self.proc_env.get(var, '') + os.pathsep + value
452             self.proc_env[var] = value.strip(os.pathsep)
453             self.add_env_variable(var, self.proc_env[var])
454
455         if self.options.gdb:
456             self.command = self.use_gdb(self.command)
457         if self.options.valgrind:
458             self.command = self.use_valgrind(self.command, self.proc_env)
459
460         message = "Launching: %s%s\n" \
461                   "    Command: %s\n" % (Colors.ENDC, self.classname,
462                                          self.get_command_repr())
463
464         if not self.options.redirect_logs:
465             message += self.get_logfile_repr()
466
467             self.out.write("=================\n"
468                            "Test name: %s\n"
469                            "Command: '%s'\n"
470                            "=================\n\n"
471                            % (self.classname, ' '.join(self.command)))
472             self.out.flush()
473
474         printc(message, Colors.OKBLUE)
475
476         self.thread = threading.Thread(target=self.thread_wrapper)
477         self.thread.start()
478
479         self.last_val = 0
480         self.last_change_ts = time.time()
481         self.start_ts = time.time()
482
483     def _dump_log_file(self, logfile):
484         message = "Dumping contents of %s\n" % logfile
485         printc(message, Colors.FAIL)
486
487         with open(logfile, 'r') as fin:
488             print(fin.read())
489
490     def _dump_log_files(self):
491         printc("Dumping log files on failure\n", Colors.FAIL)
492         self._dump_log_file(self.logfile)
493         for logfile in self.extra_logfiles:
494             self._dump_log_file(logfile)
495
496     def test_end(self):
497         self.kill_subprocess()
498         self.thread.join()
499         self.time_taken = time.time() - self._starting_time
500
501         message = "%s: %s%s\n" % (self.classname, self.result,
502                " (" + self.message + ")" if self.message else "")
503         if not self.options.redirect_logs:
504             message += self.get_logfile_repr()
505
506         printc(message, color=utils.get_color_for_result(self.result))
507
508         self.close_logfile()
509
510         if self.options.dump_on_failure:
511             if self.result is not Result.PASSED:
512                 self._dump_log_files()
513
514         return self.result
515
516
517 class GstValidateListener(socketserver.BaseRequestHandler):
518     def handle(self):
519         """Implements BaseRequestHandler handle method"""
520         while True:
521             raw_len = self.request.recv(4)
522             if raw_len == b'':
523                 return
524             msglen = struct.unpack('>I', raw_len)[0]
525             msg = self.request.recv(msglen).decode()
526             if msg == '':
527                 return
528
529             obj = json.loads(msg)
530             test = getattr(self.server, "test")
531
532             obj_type = obj.get("type", '')
533             if obj_type == 'position':
534                 test.set_position(obj['position'], obj['duration'],
535                                 obj['speed'])
536             elif obj_type == 'buffering':
537                 test.set_position(obj['position'], 100)
538             elif obj_type == 'action':
539                 test.add_action_execution(obj)
540                 # Make sure that action is taken into account when checking if process
541                 # is updating
542                 test.position += 1
543             elif obj_type == 'action-done':
544                 # Make sure that action end is taken into account when checking if process
545                 # is updating
546                 test.position += 1
547                 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
548             elif obj_type == 'report':
549                 test.add_report(obj)
550
551
552 class GstValidateTest(Test):
553
554     """ A class representing a particular test. """
555     findpos_regex = re.compile(
556         '.*position.*(\d+):(\d+):(\d+).(\d+).*duration.*(\d+):(\d+):(\d+).(\d+)')
557     findlastseek_regex = re.compile(
558         'seeking to.*(\d+):(\d+):(\d+).(\d+).*stop.*(\d+):(\d+):(\d+).(\d+).*rate.*(\d+)\.(\d+)')
559
560     HARD_TIMEOUT_FACTOR = 5
561
562     def __init__(self, application_name, classname,
563                  options, reporter, duration=0,
564                  timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
565                  media_descriptor=None, extra_env_variables=None,
566                  expected_failures=None):
567
568         extra_env_variables = extra_env_variables or {}
569
570         if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
571             if timeout:
572                 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
573             elif duration:
574                 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
575             else:
576                 hard_timeout = None
577
578         # If we are running from source, use the -debug version of the
579         # application which is using rpath instead of libtool's wrappers. It's
580         # slightly faster to start and will not confuse valgrind.
581         debug = '%s-debug' % application_name
582         p = look_for_file_in_source_dir('tools', debug)
583         if p:
584             application_name = p
585
586         self.reports = []
587         self.position = -1
588         self.media_duration = -1
589         self.speed = 1.0
590         self.actions_infos = []
591         self.media_descriptor = media_descriptor
592         self.server = None
593
594         override_path = self.get_override_file(media_descriptor)
595         if override_path:
596             if extra_env_variables:
597                 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
598                     extra_env_variables["GST_VALIDATE_OVERRIDE"] += os.path.pathsep
599
600             extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
601
602         super(GstValidateTest, self).__init__(application_name, classname,
603                                               options, reporter,
604                                               duration=duration,
605                                               timeout=timeout,
606                                               hard_timeout=hard_timeout,
607                                               extra_env_variables=extra_env_variables,
608                                               expected_failures=expected_failures)
609
610         # defines how much the process can be outside of the configured
611         # segment / seek
612         self._sent_eos_time = None
613
614         if scenario is None or scenario.name.lower() == "none":
615             self.scenario = None
616         else:
617             self.scenario = scenario
618
619     def stop_server(self):
620         if self.server:
621             self.server.shutdown()
622             self.server_thread.join()
623             self.server.server_close()
624             self.server = None
625
626     def kill_subprocess(self):
627         Test.kill_subprocess(self)
628         self.stop_server()
629
630     def add_report(self, report):
631         self.reports.append(report)
632
633     def set_position(self, position, duration, speed=None):
634         self.position = position
635         self.media_duration = duration
636         if speed:
637             self.speed = speed
638
639     def add_action_execution(self, action_infos):
640         if action_infos['action-type'] == 'eos':
641             self._sent_eos_time = time.time()
642         self.actions_infos.append(action_infos)
643
644     def server_wrapper(self, ready):
645         self.server = socketserver.TCPServer(('localhost', 0), GstValidateListener)
646         self.server.socket.settimeout(None)
647         self.server.test = self
648         self.serverport = self.server.socket.getsockname()[1]
649         self.info("%s server port: %s" % (self, self.serverport))
650         ready.set()
651
652         self.server.serve_forever()
653
654     def test_start(self, queue):
655         ready = threading.Event()
656         self.server_thread = threading.Thread(target=self.server_wrapper,
657                                               kwargs={'ready': ready})
658         self.server_thread.start()
659         ready.wait()
660
661         Test.test_start(self, queue)
662
663     def test_end(self):
664         res = Test.test_end(self)
665         self.stop_server()
666
667         return res
668
669     def get_override_file(self, media_descriptor):
670         if media_descriptor:
671             if media_descriptor.get_path():
672                 override_path = os.path.splitext(media_descriptor.get_path())[0] + VALIDATE_OVERRIDE_EXTENSION
673                 if os.path.exists(override_path):
674                     return override_path
675
676         return None
677
678     def get_current_position(self):
679         return self.position
680
681     def get_current_value(self):
682         if self.scenario:
683             if self._sent_eos_time is not None:
684                 t = time.time()
685                 if ((t - self._sent_eos_time)) > 30:
686                     if self.media_descriptor.get_protocol() == Protocols.HLS:
687                         self.set_result(Result.PASSED,
688                                         """Got no EOS 30 seconds after sending EOS,
689                                         in HLS known and tolerated issue:
690                                         https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
691                         return Result.KNOWN_ERROR
692
693                     self.set_result(
694                         Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
695
696                     return Result.FAILED
697
698         return self.position
699
700     def get_subproc_env(self):
701         subproc_env = os.environ.copy()
702
703         subproc_env["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
704
705         if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
706             gstlogsfile = self.logfile + '.gstdebug'
707             self.extra_logfiles.append(gstlogsfile)
708             subproc_env["GST_DEBUG_FILE"] = gstlogsfile
709
710         if self.options.no_color:
711             subproc_env["GST_DEBUG_NO_COLOR"] = '1'
712
713         # Ensure XInitThreads is called, see bgo#731525
714         subproc_env['GST_GL_XINITTHREADS'] = '1'
715         self.add_env_variable('GST_GL_XINITTHREADS', '1')
716
717         if self.scenario is not None:
718             scenario = self.scenario.get_execution_name()
719             subproc_env["GST_VALIDATE_SCENARIO"] = scenario
720             self.add_env_variable("GST_VALIDATE_SCENARIO",
721                                   subproc_env["GST_VALIDATE_SCENARIO"])
722         else:
723             try:
724                 del subproc_env["GST_VALIDATE_SCENARIO"]
725             except KeyError:
726                 pass
727
728         return subproc_env
729
730     def clean(self):
731         Test.clean(self)
732         self._sent_eos_time = None
733         self.reports = []
734         self.position = -1
735         self.media_duration = -1
736         self.speed = 1.0
737         self.actions_infos = []
738
739     def build_arguments(self):
740         super(GstValidateTest, self).build_arguments()
741         if "GST_VALIDATE" in os.environ:
742             self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
743
744         if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
745             self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
746                                   os.environ["GST_VALIDATE_SCENARIOS_PATH"])
747
748         self.add_env_variable("GST_VALIDATE_CONFIG")
749         self.add_env_variable("GST_VALIDATE_OVERRIDE")
750
751     def get_extra_log_content(self, extralog):
752         value = Test.get_extra_log_content(self, extralog)
753
754         return value
755
756     def report_matches_expected_failure(self, report, expected_failure):
757         for key in ['bug', 'bugs', 'sometimes']:
758             if key in expected_failure:
759                 del expected_failure[key]
760         for key, value in list(report.items()):
761             if key in expected_failure:
762                 if not re.findall(expected_failure[key], str(value)):
763                     return False
764                 expected_failure.pop(key)
765
766         return not bool(expected_failure)
767
768     def check_reported_issues(self):
769         ret = []
770         expected_failures = copy.deepcopy(self.expected_failures)
771         expected_retcode = [0]
772         for report in self.reports:
773             found = None
774             for expected_failure in expected_failures:
775                 if self.report_matches_expected_failure(report,
776                                                         expected_failure.copy()):
777                     found = expected_failure
778                     break
779
780             if found is not None:
781                 expected_failures.remove(found)
782                 if report['level'] == 'critical':
783                     if found.get('sometimes') and isinstance(expected_retcode, list):
784                         expected_retcode.append(18)
785                     else:
786                         expected_retcode = [18]
787             elif report['level'] == 'critical':
788                 ret.append(report['summary'])
789
790         if not ret:
791             return None, expected_failures, expected_retcode
792
793         return ret, expected_failures, expected_retcode
794
795     def check_expected_timeout(self, expected_timeout):
796         msg = "Expected timeout happened. "
797         result = Result.PASSED
798         message = expected_timeout.get('message')
799         if message:
800             if not re.findall(message, self.message):
801                 result = Result.FAILED
802                 msg = "Expected timeout message: %s got %s " % (
803                     message, self.message)
804
805         expected_symbols = expected_timeout.get('stacktrace_symbols')
806         if expected_symbols:
807             trace_gatherer = BackTraceGenerator.get_default()
808             stack_trace = trace_gatherer.get_trace(self)
809
810             if stack_trace:
811                 if not isinstance(expected_symbols, list):
812                     expected_symbols = [expected_symbols]
813
814                 not_found_symbols = [s for s in expected_symbols
815                                      if s not in stack_trace]
816                 if not_found_symbols:
817                     result = Result.TIMEOUT
818                     msg = "Expected symbols '%s' not found in stack trace " % (
819                         not_found_symbols)
820             else:
821                 msg += "No stack trace available, could not verify symbols "
822
823         return result, msg
824
825     def check_results(self):
826         if self.result in [Result.FAILED, self.result is Result.PASSED]:
827             return
828
829         for report in self.reports:
830             if report.get('issue-id') == 'runtime::missing-plugin':
831                 self.set_result(Result.SKIPPED, "%s\n%s" % (report['summary'],
832                                                          report['details']))
833                 return
834
835         self.debug("%s returncode: %s", self, self.process.returncode)
836
837         criticals, not_found_expected_failures, expected_returncode = self.check_reported_issues()
838
839         expected_timeout = None
840         for i, f in enumerate(not_found_expected_failures):
841             if len(f) == 1 and f.get("returncode"):
842                 returncode = f['returncode']
843                 if not isinstance(expected_returncode, list):
844                     returncode = [expected_returncode]
845                 if 'sometimes' in f:
846                     returncode.append(0)
847             elif f.get("timeout"):
848                 expected_timeout = f
849
850         not_found_expected_failures = [f for f in not_found_expected_failures
851                                        if not f.get('returncode')]
852
853         msg = ""
854         result = Result.PASSED
855         if self.result == Result.TIMEOUT:
856             if expected_timeout:
857                 not_found_expected_failures.remove(expected_timeout)
858                 result, msg = self.check_expected_timeout(expected_timeout)
859             else:
860                 return
861         elif self.process.returncode in COREDUMP_SIGNALS:
862             result = Result.FAILED
863             msg = "Application segfaulted "
864             self.add_stack_trace_to_logfile()
865         elif self.process.returncode == VALGRIND_ERROR_CODE:
866             msg = "Valgrind reported errors "
867             result = Result.FAILED
868         elif self.process.returncode not in expected_returncode:
869             msg = "Application returned %s " % self.process.returncode
870             if expected_returncode != [0]:
871                 msg += "(expected %s) " % expected_returncode
872             result = Result.FAILED
873
874         if criticals:
875             msg += "(critical errors: [%s]) " % ', '.join(criticals)
876             result = Result.FAILED
877
878         if not_found_expected_failures:
879             mandatory_failures = [f for f in not_found_expected_failures
880                                   if not f.get('sometimes')]
881
882             if mandatory_failures:
883                 msg += "(Expected errors not found: %s) " % mandatory_failures
884                 result = Result.FAILED
885         elif self.expected_failures:
886                 msg += '%s(Expected errors occured: %s)%s' % (Colors.OKBLUE,
887                                                            self.expected_failures,
888                                                            Colors.ENDC)
889
890         self.set_result(result, msg.strip())
891
892     def get_valgrind_suppressions(self):
893         result = super(GstValidateTest, self).get_valgrind_suppressions()
894         gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
895         if gst_sup:
896             result.append(gst_sup)
897         return result
898
899
900 class GstValidateEncodingTestInterface(object):
901     DURATION_TOLERANCE = GST_SECOND / 4
902
903     def __init__(self, combination, media_descriptor, duration_tolerance=None):
904         super(GstValidateEncodingTestInterface, self).__init__()
905
906         self.media_descriptor = media_descriptor
907         self.combination = combination
908         self.dest_file = ""
909
910         self._duration_tolerance = duration_tolerance
911         if duration_tolerance is None:
912             self._duration_tolerance = self.DURATION_TOLERANCE
913
914     def get_current_size(self):
915         try:
916             size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
917         except OSError:
918             return None
919
920         self.debug("Size: %s" % size)
921         return size
922
923     def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
924                           audio_restriction=None, audio_presence=0,
925                           video_presence=0):
926         ret = ""
927         if muxer:
928             ret += muxer
929         ret += ":"
930         if venc:
931             if video_restriction is not None:
932                 ret = ret + video_restriction + '->'
933             ret += venc
934             if video_presence:
935                 ret = ret + '|' + str(video_presence)
936         if aenc:
937             ret += ":"
938             if audio_restriction is not None:
939                 ret = ret + audio_restriction + '->'
940             ret += aenc
941             if audio_presence:
942                 ret = ret + '|' + str(audio_presence)
943
944         return ret.replace("::", ":")
945
946     def get_profile(self, video_restriction=None, audio_restriction=None):
947         vcaps = self.combination.get_video_caps()
948         acaps = self.combination.get_audio_caps()
949         if self.media_descriptor is not None:
950             if self.media_descriptor.get_num_tracks("video") == 0:
951                 vcaps = None
952
953             if self.media_descriptor.get_num_tracks("audio") == 0:
954                 acaps = None
955
956         return self._get_profile_full(self.combination.get_muxer_caps(),
957                                       vcaps, acaps,
958                                       video_restriction=video_restriction,
959                                       audio_restriction=audio_restriction)
960
961     def _clean_caps(self, caps):
962         """
963         Returns a list of key=value or structure name, without "(types)" or ";" or ","
964         """
965         return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
966
967     def _has_caps_type_variant(self, c, ccaps):
968         """
969         Handle situations where we can have application/ogg or video/ogg or
970         audio/ogg
971         """
972         has_variant = False
973         media_type = re.findall("application/|video/|audio/", c)
974         if media_type:
975             media_type = media_type[0].replace('/', '')
976             possible_mtypes = ["application", "video", "audio"]
977             possible_mtypes.remove(media_type)
978             for tmptype in possible_mtypes:
979                 possible_c_variant = c.replace(media_type, tmptype)
980                 if possible_c_variant in ccaps:
981                     self.info(
982                         "Found %s in %s, good enough!", possible_c_variant, ccaps)
983                     has_variant = True
984
985         return has_variant
986
987     def check_encoded_file(self):
988         result_descriptor = GstValidateMediaDescriptor.new_from_uri(
989             self.dest_file)
990         if result_descriptor is None:
991             return (Result.FAILED, "Could not discover encoded file %s"
992                     % self.dest_file)
993
994         duration = result_descriptor.get_duration()
995         orig_duration = self.media_descriptor.get_duration()
996         tolerance = self._duration_tolerance
997
998         if orig_duration - tolerance >= duration <= orig_duration + tolerance:
999             os.remove(result_descriptor.get_path())
1000             return (Result.FAILED, "Duration of encoded file is "
1001                     " wrong (%s instead of %s)" %
1002                     (utils.TIME_ARGS(duration),
1003                      utils.TIME_ARGS(orig_duration)))
1004         else:
1005             all_tracks_caps = result_descriptor.get_tracks_caps()
1006             container_caps = result_descriptor.get_caps()
1007             if container_caps:
1008                 all_tracks_caps.insert(0, ("container", container_caps))
1009
1010             for track_type, caps in all_tracks_caps:
1011                 ccaps = self._clean_caps(caps)
1012                 wanted_caps = self.combination.get_caps(track_type)
1013                 cwanted_caps = self._clean_caps(wanted_caps)
1014
1015                 if wanted_caps is None:
1016                     os.remove(result_descriptor.get_path())
1017                     return (Result.FAILED,
1018                             "Found a track of type %s in the encoded files"
1019                             " but none where wanted in the encoded profile: %s"
1020                             % (track_type, self.combination))
1021
1022                 for c in cwanted_caps:
1023                     if c not in ccaps:
1024                         if not self._has_caps_type_variant(c, ccaps):
1025                             os.remove(result_descriptor.get_path())
1026                             return (Result.FAILED,
1027                                     "Field: %s  (from %s) not in caps of the outputed file %s"
1028                                     % (wanted_caps, c, ccaps))
1029
1030             os.remove(result_descriptor.get_path())
1031             return (Result.PASSED, "")
1032
1033
1034 class TestsManager(Loggable):
1035
1036     """ A class responsible for managing tests. """
1037
1038     name = "base"
1039     loading_testsuite = None
1040
1041     def __init__(self):
1042
1043         Loggable.__init__(self)
1044
1045         self.tests = []
1046         self.unwanted_tests = []
1047         self.options = None
1048         self.args = None
1049         self.reporter = None
1050         self.wanted_tests_patterns = []
1051         self.blacklisted_tests_patterns = []
1052         self._generators = []
1053         self.check_testslist = True
1054         self.all_tests = None
1055         self.expected_failures = {}
1056         self.blacklisted_tests = []
1057
1058     def init(self):
1059         return True
1060
1061     def list_tests(self):
1062         return sorted(list(self.tests), key=lambda x: x.classname)
1063
1064     def add_expected_issues(self, expected_failures):
1065         expected_failures_re = {}
1066         for test_name_regex, failures in list(expected_failures.items()):
1067             regex = re.compile(test_name_regex)
1068             expected_failures_re[regex] = failures
1069             for test in self.tests:
1070                 if regex.findall(test.classname):
1071                     test.expected_failures.extend(failures)
1072
1073         self.expected_failures.update(expected_failures_re)
1074
1075     def add_test(self, test):
1076         if test.generator is None:
1077             test.classname = self.loading_testsuite + '.' + test.classname
1078         for regex, failures in list(self.expected_failures.items()):
1079             if regex.findall(test.classname):
1080                 test.expected_failures.extend(failures)
1081
1082         if self._is_test_wanted(test):
1083             if test not in self.tests:
1084                 self.tests.append(test)
1085                 self.tests.sort(key=lambda test: test.classname)
1086         else:
1087             if test not in self.tests:
1088                 self.unwanted_tests.append(test)
1089                 self.unwanted_tests.sort(key=lambda test: test.classname)
1090
1091     def get_tests(self):
1092         return self.tests
1093
1094     def populate_testsuite(self):
1095         pass
1096
1097     def add_generators(self, generators):
1098         """
1099         @generators: A list of, or one single #TestsGenerator to be used to generate tests
1100         """
1101         if not isinstance(generators, list):
1102             generators = [generators]
1103         self._generators.extend(generators)
1104         for generator in generators:
1105             generator.testsuite = self.loading_testsuite
1106
1107         self._generators = list(set(self._generators))
1108
1109     def get_generators(self):
1110         return self._generators
1111
1112     def _add_blacklist(self, blacklisted_tests):
1113         if not isinstance(blacklisted_tests, list):
1114             blacklisted_tests = [blacklisted_tests]
1115
1116         for patterns in blacklisted_tests:
1117             for pattern in patterns.split(","):
1118                 self.blacklisted_tests_patterns.append(re.compile(pattern))
1119
1120     def set_default_blacklist(self, default_blacklist):
1121         for test_regex, reason in default_blacklist:
1122             if not test_regex.startswith(self.loading_testsuite + '.'):
1123                 test_regex = self.loading_testsuite + '.' + test_regex
1124             self.blacklisted_tests.append((test_regex, reason))
1125
1126     def add_options(self, parser):
1127         """ Add more arguments. """
1128         pass
1129
1130     def set_settings(self, options, args, reporter):
1131         """ Set properties after options parsing. """
1132         self.options = options
1133         self.args = args
1134         self.reporter = reporter
1135
1136         self.populate_testsuite()
1137
1138         if self.options.valgrind:
1139             self.print_valgrind_bugs()
1140
1141         if options.wanted_tests:
1142             for patterns in options.wanted_tests:
1143                 for pattern in patterns.split(","):
1144                     self.wanted_tests_patterns.append(re.compile(pattern))
1145
1146         if options.blacklisted_tests:
1147             for patterns in options.blacklisted_tests:
1148                 self._add_blacklist(patterns)
1149
1150     def set_blacklists(self):
1151         if self.blacklisted_tests:
1152             printc("\nCurrently 'hardcoded' %s blacklisted tests:" %
1153                    self.name, Colors.WARNING, title_char='-')
1154
1155         if self.options.check_bugs_status:
1156             if not check_bugs_resolution(self.blacklisted_tests):
1157                 return False
1158
1159         for name, bug in self.blacklisted_tests:
1160             self._add_blacklist(name)
1161             if not self.options.check_bugs_status:
1162                 print("  + %s \n   --> bug: %s\n" % (name, bug))
1163
1164         return True
1165
1166     def check_expected_failures(self):
1167         if not self.expected_failures or not self.options.check_bugs_status:
1168             return True
1169
1170         if self.expected_failures:
1171             printc("\nCurrently known failures in the %s testsuite:"
1172                    % self.name, Colors.WARNING, title_char='-')
1173
1174         bugs_definitions = {}
1175         for regex, failures in list(self.expected_failures.items()):
1176             for failure in failures:
1177                 bugs = failure.get('bug')
1178                 if not bugs:
1179                     bugs = failure.get('bugs')
1180                 if not bugs:
1181                     printc('+ %s:\n  --> no bug reported associated with %s\n' % (
1182                         regex.pattern, failure), Colors.WARNING)
1183                     continue
1184
1185                 if not isinstance(bugs, list):
1186                     bugs = [bugs]
1187                 cbugs = bugs_definitions.get(regex.pattern, [])
1188                 bugs.extend([b for b in bugs if b not in cbugs])
1189                 bugs_definitions[regex.pattern] = bugs
1190
1191         return check_bugs_resolution(bugs_definitions.items())
1192
1193     def _check_blacklisted(self, test):
1194         for pattern in self.blacklisted_tests_patterns:
1195             if pattern.findall(test.classname):
1196                 self.info("%s is blacklisted by %s", test.classname, pattern)
1197                 return True
1198
1199         return False
1200
1201     def _check_whitelisted(self, test):
1202         for pattern in self.wanted_tests_patterns:
1203             if pattern.findall(test.classname):
1204                 if self._check_blacklisted(test):
1205                     # If explicitly white listed that specific test
1206                     # bypass the blacklisting
1207                     if pattern.pattern != test.classname:
1208                         return False
1209                 return True
1210         return False
1211
1212     def _check_duration(self, test):
1213         if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1214             self.info("Not activating %s as its duration (%d) is superior"
1215                       " than the long limit (%d)" % (test, test.duration,
1216                                                      int(self.options.long_limit)))
1217             return False
1218
1219         return True
1220
1221     def _is_test_wanted(self, test):
1222         if self._check_whitelisted(test):
1223             if not self._check_duration(test):
1224                 return False
1225             return True
1226
1227         if self._check_blacklisted(test):
1228             return False
1229
1230         if not self._check_duration(test):
1231             return False
1232
1233         if not self.wanted_tests_patterns:
1234             return True
1235
1236         return False
1237
1238     def needs_http_server(self):
1239         return False
1240
1241     def print_valgrind_bugs(self):
1242         pass
1243
1244
1245 class TestsGenerator(Loggable):
1246
1247     def __init__(self, name, test_manager, tests=[]):
1248         Loggable.__init__(self)
1249         self.name = name
1250         self.test_manager = test_manager
1251         self.testsuite = None
1252         self._tests = {}
1253         for test in tests:
1254             self._tests[test.classname] = test
1255
1256     def generate_tests(self, *kwargs):
1257         """
1258         Method that generates tests
1259         """
1260         return list(self._tests.values())
1261
1262     def add_test(self, test):
1263         test.generator = self
1264         test.classname = self.testsuite + '.' + test.classname
1265         self._tests[test.classname] = test
1266
1267
1268 class GstValidateTestsGenerator(TestsGenerator):
1269
1270     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1271         pass
1272
1273     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1274         self.populate_tests(uri_minfo_special_scenarios, scenarios)
1275         return super(GstValidateTestsGenerator, self).generate_tests()
1276
1277
1278 class _TestsLauncher(Loggable):
1279
1280     def __init__(self, libsdir):
1281
1282         Loggable.__init__(self)
1283
1284         self.libsdir = libsdir
1285         self.options = None
1286         self.testers = []
1287         self.tests = []
1288         self.reporter = None
1289         self._list_testers()
1290         self.all_tests = None
1291         self.wanted_tests_patterns = []
1292
1293         self.queue = queue.Queue()
1294         self.jobs = []
1295         self.total_num_tests = 0
1296
1297     def _list_app_dirs(self):
1298         app_dirs = []
1299         app_dirs.append(os.path.join(self.libsdir, "apps"))
1300         env_dirs = os.environ.get("GST_VALIDATE_APPS_DIR")
1301         if env_dirs is not None:
1302             for dir_ in env_dirs.split(":"):
1303                 app_dirs.append(dir_)
1304                 sys.path.append(dir_)
1305
1306         return app_dirs
1307
1308     def _exec_app(self, app_dir, env):
1309         try:
1310             files = os.listdir(app_dir)
1311         except OSError as e:
1312             self.debug("Could not list %s: %s" % (app_dir, e))
1313             files = []
1314         for f in files:
1315             if f.endswith(".py"):
1316                 exec(compile(open(os.path.join(app_dir, f)).read(), os.path.join(app_dir, f), 'exec'), env)
1317
1318     def _exec_apps(self, env):
1319         app_dirs = self._list_app_dirs()
1320         for app_dir in app_dirs:
1321             self._exec_app(app_dir, env)
1322
1323     def _list_testers(self):
1324         env = globals().copy()
1325         self._exec_apps(env)
1326
1327         testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1328         for tester in testers:
1329             if tester.init() is True:
1330                 self.testers.append(tester)
1331             else:
1332                 self.warning("Can not init tester: %s -- PATH is %s"
1333                              % (tester.name, os.environ["PATH"]))
1334
1335     def add_options(self, parser):
1336         for tester in self.testers:
1337             tester.add_options(parser)
1338
1339     def _load_testsuite(self, testsuites):
1340         exceptions = []
1341         for testsuite in testsuites:
1342             try:
1343                 sys.path.insert(0, os.path.dirname(testsuite))
1344                 return (__import__(os.path.basename(testsuite).replace(".py", "")), None)
1345             except Exception as e:
1346                 exceptions.append("Could not load %s: %s" % (testsuite, e))
1347                 continue
1348             finally:
1349                 sys.path.remove(os.path.dirname(testsuite))
1350
1351         return (None, exceptions)
1352
1353     def _load_testsuites(self):
1354         testsuites = []
1355         for testsuite in self.options.testsuites:
1356             if os.path.exists(testsuite):
1357                 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1358                 loaded_module = self._load_testsuite([testsuite])
1359             else:
1360                 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1361                               for d in self.options.testsuites_dirs]
1362                 loaded_module = self._load_testsuite(possible_testsuites_paths)
1363
1364             module = loaded_module[0]
1365             if not loaded_module[0]:
1366                 if "." in testsuite:
1367                     self.options.testsuites.append(testsuite.split('.')[0])
1368                     self.info("%s looks like a test name, trying that" % testsuite)
1369                     self.options.wanted_tests.append(testsuite)
1370                 else:
1371                     printc("Could not load testsuite: %s, reasons: %s" % (
1372                         testsuite, loaded_module[1]), Colors.FAIL)
1373                 continue
1374
1375             testsuites.append(module)
1376             if not hasattr(module, "TEST_MANAGER"):
1377                 module.TEST_MANAGER = [tester.name for tester in self.testers]
1378             elif not isinstance(module.TEST_MANAGER, list):
1379                 module.TEST_MANAGER = [module.TEST_MANAGER]
1380
1381         self.options.testsuites = testsuites
1382
1383     def _setup_testsuites(self):
1384         for testsuite in self.options.testsuites:
1385             loaded = False
1386             wanted_test_manager = None
1387             if hasattr(testsuite, "TEST_MANAGER"):
1388                 wanted_test_manager = testsuite.TEST_MANAGER
1389                 if not isinstance(wanted_test_manager, list):
1390                     wanted_test_manager = [wanted_test_manager]
1391
1392             for tester in self.testers:
1393                 if wanted_test_manager is not None and \
1394                         tester.name not in wanted_test_manager:
1395                     continue
1396
1397                 if self.options.user_paths:
1398                     TestsManager.loading_testsuite = tester.name
1399                     tester.register_defaults()
1400                     loaded = True
1401                 else:
1402                     TestsManager.loading_testsuite = testsuite.__name__
1403                     if testsuite.setup_tests(tester, self.options):
1404                         loaded = True
1405                     TestsManager.loading_testsuite = None
1406
1407             if not loaded:
1408                 printc("Could not load testsuite: %s"
1409                        " maybe because of missing TestManager"
1410                        % (testsuite), Colors.FAIL)
1411                 return False
1412
1413     def _load_config(self, options):
1414         printc("Loading config files is DEPRECATED"
1415                " you should use the new testsuite format now",)
1416
1417         for tester in self.testers:
1418             tester.options = options
1419             globals()[tester.name] = tester
1420         globals()["options"] = options
1421         c__file__ = __file__
1422         globals()["__file__"] = self.options.config
1423         exec(compile(open(self.options.config).read(), self.options.config, 'exec'), globals())
1424         globals()["__file__"] = c__file__
1425
1426     def set_settings(self, options, args):
1427         if options.xunit_file:
1428             self.reporter = reporters.XunitReporter(options)
1429         else:
1430             self.reporter = reporters.Reporter(options)
1431
1432         self.options = options
1433         wanted_testers = None
1434         for tester in self.testers:
1435             if tester.name in args:
1436                 wanted_testers = tester.name
1437
1438         if wanted_testers:
1439             testers = self.testers
1440             self.testers = []
1441             for tester in testers:
1442                 if tester.name in args:
1443                     self.testers.append(tester)
1444                     args.remove(tester.name)
1445
1446         if options.config:
1447             self._load_config(options)
1448
1449         self._load_testsuites()
1450         if not self.options.testsuites:
1451             printc("Not testsuite loaded!", Colors.FAIL)
1452             return False
1453
1454         for tester in self.testers:
1455             tester.set_settings(options, args, self.reporter)
1456
1457         if not options.config and options.testsuites:
1458             if self._setup_testsuites() is False:
1459                 return False
1460
1461         for tester in self.testers:
1462             if not tester.set_blacklists():
1463                 return False
1464
1465             if not tester.check_expected_failures():
1466                 return False
1467
1468         return True
1469
1470     def _check_tester_has_other_testsuite(self, testsuite, tester):
1471         if tester.name != testsuite.TEST_MANAGER[0]:
1472             return True
1473
1474         for t in self.options.testsuites:
1475             if t != testsuite:
1476                 for other_testmanager in t.TEST_MANAGER:
1477                     if other_testmanager == tester.name:
1478                         return True
1479
1480         return False
1481
1482     def _check_defined_tests(self, tester, tests):
1483         if self.options.blacklisted_tests or self.options.wanted_tests:
1484             return
1485
1486         tests_names = [test.classname for test in tests]
1487         testlist_changed = False
1488         for testsuite in self.options.testsuites:
1489             if not self._check_tester_has_other_testsuite(testsuite, tester) \
1490                     and tester.check_testslist:
1491                 try:
1492                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1493                                          'r+')
1494
1495                     know_tests = testlist_file.read().split("\n")
1496                     testlist_file.close()
1497
1498                     testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1499                                          'w')
1500                 except IOError:
1501                     continue
1502
1503                 optional_out = []
1504                 for test in know_tests:
1505                     if test and test.strip('~') not in tests_names:
1506                         if not test.startswith('~'):
1507                             testlist_changed = True
1508                             printc("Test %s Not in testsuite %s anymore"
1509                                 % (test, testsuite.__file__), Colors.FAIL)
1510                         else:
1511                             optional_out.append((test, None))
1512
1513                 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1514                                      key=lambda x: x[0].strip('~'))
1515
1516                 for tname, test in tests_names:
1517                     if test and test.optional:
1518                         tname = '~' + tname
1519                     testlist_file.write("%s\n" % (tname))
1520                     if tname and tname not in know_tests:
1521                         printc("Test %s is NEW in testsuite %s"
1522                                % (tname, testsuite.__file__), Colors.OKGREEN)
1523                         testlist_changed = True
1524
1525                 testlist_file.close()
1526                 break
1527
1528         return testlist_changed
1529
1530     def list_tests(self):
1531         for tester in self.testers:
1532             if not self._tester_needed(tester):
1533                 continue
1534
1535             tests = tester.list_tests()
1536             if self._check_defined_tests(tester, tests) and \
1537                     self.options.fail_on_testlist_change:
1538                 return -1
1539
1540             self.tests.extend(tests)
1541         return sorted(list(self.tests), key=lambda t: t.classname)
1542
1543     def _tester_needed(self, tester):
1544         for testsuite in self.options.testsuites:
1545             if tester.name in testsuite.TEST_MANAGER:
1546                 return True
1547         return False
1548
1549     def print_test_num(self, test):
1550         cur_test_num = self.tests.index(test) + 1
1551         sys.stdout.write("[%d / %d] " % (cur_test_num, self.total_num_tests))
1552
1553     def test_wait(self):
1554         while True:
1555             # Check process every second for timeout
1556             try:
1557                 self.queue.get(timeout=1)
1558             except queue.Empty:
1559                 pass
1560
1561             for test in self.jobs:
1562                 if test.process_update():
1563                     self.jobs.remove(test)
1564                     return test
1565
1566     def tests_wait(self):
1567         try:
1568             test = self.test_wait()
1569             test.check_results()
1570         except KeyboardInterrupt:
1571             for test in self.jobs:
1572                 test.kill_subprocess()
1573             raise
1574
1575         return test
1576
1577     def start_new_job(self, tests_left):
1578         try:
1579             test = tests_left.pop(0)
1580         except IndexError:
1581             return False
1582
1583         self.print_test_num(test)
1584         test.test_start(self.queue)
1585
1586         self.jobs.append(test)
1587
1588         return True
1589
1590     def _run_tests(self):
1591         cur_test_num = 0
1592
1593         if not self.all_tests:
1594             all_tests = self.list_tests()
1595             if all_tests == -1:
1596                 return False
1597             self.all_tests = all_tests
1598         self.total_num_tests = len(self.all_tests)
1599
1600         self.reporter.init_timer()
1601         alone_tests = []
1602         tests = []
1603         for test in self.tests:
1604             if test.is_parallel:
1605                 tests.append(test)
1606             else:
1607                 alone_tests.append(test)
1608
1609         max_num_jobs = min(self.options.num_jobs, len(tests))
1610         jobs_running = 0
1611
1612         for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
1613             tests_left = list(tests)
1614             for i in range(num_jobs):
1615                 if not self.start_new_job(tests_left):
1616                     break
1617                 jobs_running += 1
1618
1619             while jobs_running != 0:
1620                 test = self.tests_wait()
1621                 jobs_running -= 1
1622                 self.print_test_num(test)
1623                 res = test.test_end()
1624                 self.reporter.after_test(test)
1625                 if res != Result.PASSED and (self.options.forever or
1626                                             self.options.fatal_error):
1627                     return test.result
1628                 if self.start_new_job(tests_left):
1629                     jobs_running += 1
1630
1631         return True
1632
1633     def clean_tests(self):
1634         for test in self.tests:
1635             test.clean()
1636
1637     def run_tests(self):
1638         if self.options.forever:
1639             r = 1
1640             while True:
1641                 t = "Running iteration %d" % r
1642                 print("%s\n%s\n%s\n" % ("=" * len(t), t, "=" * len(t)))
1643
1644                 if not self._run_tests():
1645                     break
1646                 r += 1
1647                 self.clean_tests()
1648
1649             return False
1650         elif self.options.n_runs:
1651             res = True
1652             for r in range(self.options.n_runs):
1653                 t = "Running iteration %d" % r
1654                 print("%s\n%s\n%s\n" % ("=" * len(t), t, "=" * len(t)))
1655                 if not self._run_tests():
1656                     res = False
1657                 self.clean_tests()
1658
1659             return res
1660         else:
1661             return self._run_tests()
1662
1663     def final_report(self):
1664         return self.reporter.final_report()
1665
1666     def needs_http_server(self):
1667         for tester in self.testers:
1668             if tester.needs_http_server():
1669                 return True
1670
1671
1672 class NamedDic(object):
1673
1674     def __init__(self, props):
1675         if props:
1676             for name, value in props.items():
1677                 setattr(self, name, value)
1678
1679
1680 class Scenario(object):
1681
1682     def __init__(self, name, props, path=None):
1683         self.name = name
1684         self.path = path
1685
1686         for prop, value in props:
1687             setattr(self, prop.replace("-", "_"), value)
1688
1689     def get_execution_name(self):
1690         if self.path is not None:
1691             return self.path
1692         else:
1693             return self.name
1694
1695     def seeks(self):
1696         if hasattr(self, "seek"):
1697             return bool(self.seek)
1698
1699         return False
1700
1701     def needs_clock_sync(self):
1702         if hasattr(self, "need_clock_sync"):
1703             return bool(self.need_clock_sync)
1704
1705         return False
1706
1707     def needs_live_content(self):
1708         # Scenarios that can only be used on live content
1709         if hasattr(self, "live_content_required"):
1710             return bool(self.live_content_required)
1711         return False
1712
1713     def compatible_with_live_content(self):
1714         # if a live content is required it's implicitely compatible with
1715         # live content
1716         if self.needs_live_content():
1717             return True
1718         if hasattr(self, "live_content_compatible"):
1719             return bool(self.live_content_compatible)
1720         return False
1721
1722     def get_min_media_duration(self):
1723         if hasattr(self, "min_media_duration"):
1724             return float(self.min_media_duration)
1725
1726         return 0
1727
1728     def does_reverse_playback(self):
1729         if hasattr(self, "reverse_playback"):
1730             return bool(self.reverse_playback)
1731
1732         return False
1733
1734     def get_duration(self):
1735         try:
1736             return float(getattr(self, "duration"))
1737         except AttributeError:
1738             return 0
1739
1740     def get_min_tracks(self, track_type):
1741         try:
1742             return int(getattr(self, "min_%s_track" % track_type))
1743         except AttributeError:
1744             return 0
1745
1746     def __repr__(self):
1747         return "<Scenario %s>" % self.name
1748
1749
1750 class ScenarioManager(Loggable):
1751     _instance = None
1752     all_scenarios = []
1753
1754     FILE_EXTENSION = "scenario"
1755     GST_VALIDATE_COMMAND = ""
1756
1757     def __new__(cls, *args, **kwargs):
1758         if not cls._instance:
1759             cls._instance = super(ScenarioManager, cls).__new__(
1760                 cls, *args, **kwargs)
1761             cls._instance.config = None
1762             cls._instance.discovered = False
1763             Loggable.__init__(cls._instance)
1764
1765         return cls._instance
1766
1767     def find_special_scenarios(self, mfile):
1768         scenarios = []
1769         mfile_bname = os.path.basename(mfile)
1770
1771         for f in os.listdir(os.path.dirname(mfile)):
1772             if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
1773                 scenarios.append(os.path.join(os.path.dirname(mfile), f))
1774
1775         if scenarios:
1776             scenarios = self.discover_scenarios(scenarios, mfile)
1777
1778         return scenarios
1779
1780     def discover_scenarios(self, scenario_paths=[], mfile=None):
1781         """
1782         Discover scenarios specified in scenario_paths or the default ones
1783         if nothing specified there
1784         """
1785         scenarios = []
1786         scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
1787         logs = open(os.path.join(self.config.logsdir,
1788                                  "scenarios_discovery.log"), 'w')
1789
1790         try:
1791             command = [self.GST_VALIDATE_COMMAND,
1792                        "--scenarios-defs-output-file", scenario_defs]
1793             command.extend(scenario_paths)
1794             subprocess.check_call(command, stdout=logs, stderr=logs)
1795         except subprocess.CalledProcessError:
1796             pass
1797
1798         config = configparser.RawConfigParser()
1799         f = open(scenario_defs)
1800         config.readfp(f)
1801
1802         for section in config.sections():
1803             if scenario_paths:
1804                 for scenario_path in scenario_paths:
1805                     if mfile is None:
1806                         name = section
1807                         path = scenario_path
1808                     elif section in scenario_path:
1809                         # The real name of the scenario is:
1810                         # filename.REALNAME.scenario
1811                         name = scenario_path.replace(mfile + ".", "").replace(
1812                             "." + self.FILE_EXTENSION, "")
1813                         path = scenario_path
1814             else:
1815                 name = section
1816                 path = None
1817
1818             props = config.items(section)
1819             scenarios.append(Scenario(name, props, path))
1820
1821         if not scenario_paths:
1822             self.discovered = True
1823             self.all_scenarios.extend(scenarios)
1824
1825         return scenarios
1826
1827     def get_scenario(self, name):
1828         if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
1829             scenarios = self.discover_scenarios([name])
1830
1831             if scenarios:
1832                 return scenarios[0]
1833
1834         if self.discovered is False:
1835             self.discover_scenarios()
1836
1837         if name is None:
1838             return self.all_scenarios
1839
1840         try:
1841             return [scenario for scenario in self.all_scenarios if scenario.name == name][0]
1842         except IndexError:
1843             self.warning("Scenario: %s not found" % name)
1844             return None
1845
1846
1847 class GstValidateBaseTestManager(TestsManager):
1848     scenarios_manager = ScenarioManager()
1849
1850     def __init__(self):
1851         super(GstValidateBaseTestManager, self).__init__()
1852         self._scenarios = []
1853         self._encoding_formats = []
1854
1855     def add_scenarios(self, scenarios):
1856         """
1857         @scenarios A list or a unic scenario name(s) to be run on the tests.
1858                     They are just the default scenarios, and then depending on
1859                     the TestsGenerator to be used you can have more fine grained
1860                     control on what to be run on each serie of tests.
1861         """
1862         if isinstance(scenarios, list):
1863             self._scenarios.extend(scenarios)
1864         else:
1865             self._scenarios.append(scenarios)
1866
1867         self._scenarios = list(set(self._scenarios))
1868
1869     def set_scenarios(self, scenarios):
1870         """
1871         Override the scenarios
1872         """
1873         self._scenarios = []
1874         self.add_scenarios(scenarios)
1875
1876     def get_scenarios(self):
1877         return self._scenarios
1878
1879     def add_encoding_formats(self, encoding_formats):
1880         """
1881         :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
1882                            formats for transcoding test.
1883                            They are just the default encoding formats, and then depending on
1884                            the TestsGenerator to be used you can have more fine grained
1885                            control on what to be run on each serie of tests.
1886         """
1887         if isinstance(encoding_formats, list):
1888             self._encoding_formats.extend(encoding_formats)
1889         else:
1890             self._encoding_formats.append(encoding_formats)
1891
1892         self._encoding_formats = list(set(self._encoding_formats))
1893
1894     def get_encoding_formats(self):
1895         return self._encoding_formats
1896
1897
1898 class MediaDescriptor(Loggable):
1899
1900     def __init__(self):
1901         Loggable.__init__(self)
1902
1903     def get_path(self):
1904         raise NotImplemented
1905
1906     def get_media_filepath(self):
1907         raise NotImplemented
1908
1909     def get_caps(self):
1910         raise NotImplemented
1911
1912     def get_uri(self):
1913         raise NotImplemented
1914
1915     def get_duration(self):
1916         raise NotImplemented
1917
1918     def get_protocol(self):
1919         raise NotImplemented
1920
1921     def is_seekable(self):
1922         raise NotImplemented
1923
1924     def is_live(self):
1925         raise NotImplemented
1926
1927     def is_image(self):
1928         raise NotImplemented
1929
1930     def get_num_tracks(self, track_type):
1931         raise NotImplemented
1932
1933     def can_play_reverse(self):
1934         raise NotImplemented
1935
1936     def prerrols(self):
1937         return True
1938
1939     def is_compatible(self, scenario):
1940         if scenario is None:
1941             return True
1942
1943         if scenario.seeks() and (not self.is_seekable() or self.is_image()):
1944             self.debug("Do not run %s as %s does not support seeking",
1945                        scenario, self.get_uri())
1946             return False
1947
1948         if self.is_image() and scenario.needs_clock_sync():
1949             self.debug("Do not run %s as %s is an image",
1950                        scenario, self.get_uri())
1951             return False
1952
1953         if not self.can_play_reverse() and scenario.does_reverse_playback():
1954             return False
1955
1956         if not self.is_live() and scenario.needs_live_content():
1957             self.debug("Do not run %s as %s is not a live content",
1958                        scenario, self.get_uri())
1959             return False
1960
1961         if self.is_live() and not scenario.compatible_with_live_content():
1962             self.debug("Do not run %s as %s is a live content", scenario, self.get_uri())
1963             return False
1964
1965         if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
1966             return False
1967
1968         if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
1969             self.debug(
1970                 "Do not run %s as %s is too short (%i < min media duation : %i",
1971                 scenario, self.get_uri(),
1972                 self.get_duration() / GST_SECOND,
1973                 scenario.get_min_media_duration())
1974             return False
1975
1976         for track_type in ['audio', 'subtitle', 'video']:
1977             if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
1978                 self.debug("%s -- %s | At least %s %s track needed  < %s"
1979                            % (scenario, self.get_uri(), track_type,
1980                               scenario.get_min_tracks(track_type),
1981                               self.get_num_tracks(track_type)))
1982                 return False
1983
1984         return True
1985
1986
1987 class GstValidateMediaDescriptor(MediaDescriptor):
1988     # Some extension file for discovering results
1989     MEDIA_INFO_EXT = "media_info"
1990     STREAM_INFO_EXT = "stream_info"
1991
1992     DISCOVERER_COMMAND = "gst-validate-media-check-1.0"
1993     if "win32" in sys.platform:
1994         DISCOVERER_COMMAND += ".exe"
1995
1996     def __init__(self, xml_path):
1997         super(GstValidateMediaDescriptor, self).__init__()
1998
1999         self._xml_path = xml_path
2000         try:
2001             self.media_xml = ET.parse(xml_path).getroot()
2002         except xml.etree.ElementTree.ParseError:
2003             printc("Could not parse %s" % xml_path,
2004                    Colors.FAIL)
2005             raise
2006
2007         # Sanity checks
2008         self.media_xml.attrib["duration"]
2009         self.media_xml.attrib["seekable"]
2010
2011         self.set_protocol(urllib.parse.urlparse(urllib.parse.urlparse(self.get_uri()).scheme).scheme)
2012
2013     @staticmethod
2014     def new_from_uri(uri, verbose=False, include_frames=False):
2015         """
2016             include_frames = 0 # Never
2017             include_frames = 1 # always
2018             include_frames = 2 # if previous file included them
2019
2020         """
2021         media_path = utils.url2path(uri)
2022
2023         descriptor_path = "%s.%s" % (
2024             media_path, GstValidateMediaDescriptor.MEDIA_INFO_EXT)
2025         if include_frames == 2:
2026             try:
2027                 media_xml = ET.parse(descriptor_path).getroot()
2028                 frames = media_xml.findall('streams/stream/frame')
2029                 include_frames = bool(frames)
2030             except FileNotFoundError:
2031                 pass
2032         else:
2033             include_frames = bool(include_frames)
2034
2035         args = GstValidateMediaDescriptor.DISCOVERER_COMMAND.split(" ")
2036         args.append(uri)
2037
2038         args.extend(["--output-file", descriptor_path])
2039         if include_frames:
2040             args.extend(["--full"])
2041
2042         if verbose:
2043             printc("Generating media info for %s\n"
2044                    "    Command: '%s'" % (media_path, ' '.join(args)),
2045                    Colors.OKBLUE)
2046
2047         try:
2048             subprocess.check_output(args, stderr=open(os.devnull))
2049         except subprocess.CalledProcessError as e:
2050             if verbose:
2051                 printc("Result: Failed", Colors.FAIL)
2052             else:
2053                 loggable.warning("GstValidateMediaDescriptor", "Exception: %s" % e)
2054             return None
2055
2056         if verbose:
2057             printc("Result: Passed", Colors.OKGREEN)
2058
2059         try:
2060             return GstValidateMediaDescriptor(descriptor_path)
2061         except (IOError, xml.etree.ElementTree.ParseError):
2062             return None
2063
2064     def get_path(self):
2065         return self._xml_path
2066
2067     def need_clock_sync(self):
2068         return Protocols.needs_clock_sync(self.get_protocol())
2069
2070     def get_media_filepath(self):
2071         if self.get_protocol() == Protocols.FILE:
2072             return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "")
2073         else:
2074             return self._xml_path.replace("." + self.STREAM_INFO_EXT, "")
2075
2076     def get_caps(self):
2077         return self.media_xml.findall("streams")[0].attrib["caps"]
2078
2079     def get_tracks_caps(self):
2080         res = []
2081         try:
2082             streams = self.media_xml.findall("streams")[0].findall("stream")
2083         except IndexError:
2084             return res
2085
2086         for stream in streams:
2087             res.append((stream.attrib["type"], stream.attrib["caps"]))
2088
2089         return res
2090
2091     def get_uri(self):
2092         return self.media_xml.attrib["uri"]
2093
2094     def get_duration(self):
2095         return int(self.media_xml.attrib["duration"])
2096
2097     def set_protocol(self, protocol):
2098         self.media_xml.attrib["protocol"] = protocol
2099
2100     def get_protocol(self):
2101         return self.media_xml.attrib["protocol"]
2102
2103     def is_seekable(self):
2104         return self.media_xml.attrib["seekable"].lower() == "true"
2105
2106     def is_live(self):
2107         return self.media_xml.get("live", "false").lower() == "true"
2108
2109     def can_play_reverse(self):
2110         return True
2111
2112     def is_image(self):
2113         for stream in self.media_xml.findall("streams")[0].findall("stream"):
2114             if stream.attrib["type"] == "image":
2115                 return True
2116         return False
2117
2118     def get_num_tracks(self, track_type):
2119         n = 0
2120         for stream in self.media_xml.findall("streams")[0].findall("stream"):
2121             if stream.attrib["type"] == track_type:
2122                 n += 1
2123
2124         return n
2125
2126     def get_clean_name(self):
2127         name = os.path.basename(self.get_path())
2128         name = re.sub("\.stream_info|\.media_info", "", name)
2129
2130         return name.replace('.', "_")
2131
2132
2133 class MediaFormatCombination(object):
2134     FORMATS = {"aac": "audio/mpeg,mpegversion=4",  # Audio
2135                "ac3": "audio/x-ac3",
2136                "vorbis": "audio/x-vorbis",
2137                "mp3": "audio/mpeg,mpegversion=1,layer=3",
2138                "opus": "audio/x-opus",
2139                "rawaudio": "audio/x-raw",
2140
2141                # Video
2142                "h264": "video/x-h264",
2143                "h265": "video/x-h265",
2144                "vp8": "video/x-vp8",
2145                "vp9": "video/x-vp9",
2146                "theora": "video/x-theora",
2147                "prores": "video/x-prores",
2148                "jpeg": "image/jpeg",
2149
2150                # Containers
2151                "webm": "video/webm",
2152                "ogg": "application/ogg",
2153                "mkv": "video/x-matroska",
2154                "mp4": "video/quicktime,variant=iso;",
2155                "quicktime": "video/quicktime;"}
2156
2157     def __str__(self):
2158         return "%s and %s in %s" % (self.audio, self.video, self.container)
2159
2160     def __init__(self, container, audio, video):
2161         """
2162         Describes a media format to be used for transcoding tests.
2163
2164         :param container: A string defining the container format to be used, must bin in self.FORMATS
2165         :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2166         :param video: A string defining the video format to be used, must bin in self.FORMATS
2167         """
2168         self.container = container
2169         self.audio = audio
2170         self.video = video
2171
2172     def get_caps(self, track_type):
2173         try:
2174             return self.FORMATS[self.__dict__[track_type]]
2175         except KeyError:
2176             return None
2177
2178     def get_audio_caps(self):
2179         return self.get_caps("audio")
2180
2181     def get_video_caps(self):
2182         return self.get_caps("video")
2183
2184     def get_muxer_caps(self):
2185         return self.get_caps("container")