3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
20 """ Class representing tests and test managers. """
43 from itertools import cycle
44 from fractions import Fraction
46 from .utils import which
47 from . import reporters
48 from . import loggable
49 from .loggable import Loggable
51 from collections import defaultdict
53 from lxml import etree as ET
55 import xml.etree.cElementTree as ET
58 from .vfb_server import get_virual_frame_buffer_server
59 from .httpserver import HTTPServer
60 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
61 Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
62 check_bugs_resolution, is_tty
64 # The factor by which we increase the hard timeout when running inside
66 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
68 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
69 # The error reported by valgrind when detecting errors
70 VALGRIND_ERROR_CODE = 20
72 VALIDATE_OVERRIDE_EXTENSION = ".override"
73 EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
74 'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
75 'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
76 EXITING_SIGNALS.update({139: "SIGSEGV"})
77 EXITING_SIGNALS.update({(v, k) for k, v in EXITING_SIGNALS.items()})
80 CI_ARTIFACTS_URL = os.environ.get('CI_ARTIFACTS_URL')
85 """ A class representing a particular test. """
87 def __init__(self, application_name, classname, options,
88 reporter, duration=0, timeout=DEFAULT_TIMEOUT,
89 hard_timeout=None, extra_env_variables=None,
90 expected_issues=None, is_parallel=True,
93 @timeout: The timeout during which the value return by get_current_value
94 keeps being exactly equal
95 @hard_timeout: Max time the test can take in absolute
97 Loggable.__init__(self)
98 self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
100 self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
101 self.hard_timeout *= options.timeout_factor
103 self.hard_timeout = hard_timeout
104 self.classname = classname
105 self.options = options
106 self.application = application_name
108 self.server_command = None
109 self.reporter = reporter
114 self.duration = duration
115 self.stack_trace = None
117 if expected_issues is None:
118 self.expected_issues = []
119 elif not isinstance(expected_issues, list):
120 self.expected_issues = [expected_issues]
122 self.expected_issues = expected_issues
124 extra_env_variables = extra_env_variables or {}
125 self.extra_env_variables = extra_env_variables
126 self.optional = False
127 self.is_parallel = is_parallel
128 self.generator = None
129 self.workdir = workdir
130 self.allow_flakiness = False
132 self.rr_logdir = None
136 def _generate_expected_issues(self):
139 def generate_expected_issues(self):
140 res = '%s"FIXME \'%s\' issues [REPORT A BUG ' % (" " * 4, self.classname) \
141 + 'in https://gitlab.freedesktop.org/gstreamer/ '\
142 + 'or use a proper bug description]": {'
147 "issues": [""" % (self.classname)
149 retcode = self.process.returncode if self.process else 0
151 signame = EXITING_SIGNALS.get(retcode)
152 val = "'" + signame + "'" if signame else retcode
156 },""" % ("signame" if signame else "returncode", val)
158 res += self._generate_expected_issues()
159 res += "\n%s],\n%s},\n" % (" " * 8, " " * 4)
163 def copy(self, nth=None):
164 copied_test = copy.copy(self)
166 copied_test.classname += '_it' + str(nth)
167 copied_test.options = copy.copy(self.options)
168 copied_test.options.logsdir = os.path.join(copied_test.options.logsdir, str(nth))
169 os.makedirs(copied_test.options.logsdir, exist_ok=True)
174 self.kill_subprocess()
177 self.time_taken = 0.0
178 self._starting_time = None
179 self.result = Result.NOT_RUN
182 self.extra_logfiles = set()
183 self.__env_variable = []
184 self.kill_subprocess()
188 string = self.classname
189 if self.result != Result.NOT_RUN:
190 string += ": " + self.result
191 if self.result in [Result.FAILED, Result.TIMEOUT]:
192 string += " '%s'" % self.message
193 if not self.options.dump_on_failure:
194 if not self.options.redirect_logs and self.result != Result.PASSED:
195 string += self.get_logfile_repr()
197 string = "\n==> %s" % string
201 def add_env_variable(self, variable, value=None):
203 Only useful so that the gst-validate-launcher can print the exact
204 right command line to reproduce the tests
207 value = os.environ.get(variable, None)
212 self.__env_variable.append(variable)
215 def _env_variable(self):
217 if not self.options.verbose or self.options.verbose > 1:
218 for var in set(self.__env_variable):
221 value = self.proc_env.get(var, None)
222 if value is not None:
223 res += "%s='%s'" % (var, value)
225 res += "[Not displaying environment variables, rerun with -vv for the full command]"
229 def open_logfile(self):
233 path = os.path.join(self.options.logsdir,
234 self.classname.replace(".", os.sep) + '.md')
235 mkdir(os.path.dirname(path))
238 if self.options.redirect_logs == 'stdout':
239 self.out = sys.stdout
240 elif self.options.redirect_logs == 'stderr':
241 self.out = sys.stderr
243 self.out = open(path, 'w+')
245 def finalize_logfiles(self):
246 self.out.write("\n**Duration**: %s" % self.time_taken)
247 if not self.options.redirect_logs:
249 for logfile in self.extra_logfiles:
250 # Only copy over extra logfile content if it's below a certain threshold
251 # Avoid copying gigabytes of data if a lot of debugging is activated
252 if os.path.getsize(logfile) < 500 * 1024:
253 self.out.write('\n\n## %s:\n\n```\n%s\n```\n' % (
254 os.path.basename(logfile), self.get_extra_log_content(logfile))
257 self.out.write('\n\n## %s:\n\n**Log file too big.**\n %s\n\n Check file content directly\n\n' % (
258 os.path.basename(logfile), logfile)
262 self.out.write('\n\n## rr trace:\n\n```\nrr replay %s/latest-trace\n```\n' % (
268 if self.options.html:
269 self.html_log = os.path.splitext(self.logfile)[0] + '.html'
271 parser = commonmark.Parser()
272 with open(self.logfile) as f:
273 ast = parser.parse(f.read())
275 renderer = commonmark.HtmlRenderer()
276 html = renderer.render(ast)
277 with open(self.html_log, 'w') as f:
282 def _get_file_content(self, file_name):
283 f = open(file_name, 'r+')
289 def get_log_content(self):
290 return self._get_file_content(self.logfile)
292 def get_extra_log_content(self, extralog):
293 if extralog not in self.extra_logfiles:
296 return self._get_file_content(extralog)
298 def get_classname(self):
299 name = self.classname.split('.')[-1]
300 classname = self.classname.replace('.%s' % name, '')
305 return self.classname.split('.')[-1]
308 if self._uuid is None:
309 self._uuid = self.classname + str(uuid.uuid4())
312 def add_arguments(self, *args):
315 def build_arguments(self):
316 self.add_env_variable("LD_PRELOAD")
317 self.add_env_variable("DISPLAY")
319 def add_stack_trace_to_logfile(self):
320 self.debug("Adding stack trace")
324 trace_gatherer = BackTraceGenerator.get_default()
325 stack_trace = trace_gatherer.get_trace(self)
330 info = "\n\n## Stack trace\n\n```\n%s\n```" % stack_trace
331 if self.options.redirect_logs:
335 if self.options.xunit_file:
336 self.stack_trace = stack_trace
341 def add_known_issue_information(self):
342 if self.expected_issues:
343 info = "\n\n## Already known issues\n\n``` python\n%s\n```\n\n" % (
344 json.dumps(self.expected_issues, indent=4)
349 info += "\n\n**You can mark the issues as 'known' by adding the " \
350 + " following lines to the list of known issues**\n" \
351 + "\n\n``` python\n%s\n```" % (self.generate_expected_issues())
353 if self.options.redirect_logs:
359 def set_result(self, result, message="", error=""):
361 if not self.options.redirect_logs:
362 self.out.write("\n```\n")
365 self.debug("Setting result: %s (message: %s, error: %s)" % (result,
368 if result is Result.TIMEOUT:
369 if self.options.debug is True:
371 printc("Timeout, you should process <ctrl>c to get into gdb",
373 # and wait here until gdb exits
374 self.process.communicate()
376 pname = self.command[0]
377 input("%sTimeout happened on %s you can attach gdb doing:\n $gdb %s %d%s\n"
378 "Press enter to continue" % (Colors.FAIL, self.classname,
379 pname, self.process.pid, Colors.ENDC))
381 self.add_stack_trace_to_logfile()
384 self.message = message
385 self.error_str = error
387 if result not in [Result.PASSED, Result.NOT_RUN, Result.SKIPPED]:
388 self.add_known_issue_information()
390 def check_results(self):
391 if self.result is Result.FAILED or self.result is Result.TIMEOUT:
394 self.debug("%s returncode: %s", self, self.process.returncode)
395 if self.options.rr and self.process.returncode == -signal.SIGPIPE:
396 self.set_result(Result.SKIPPED, "SIGPIPE received under `rr`, known issue.")
397 elif self.process.returncode == 0:
398 self.set_result(Result.PASSED)
399 elif self.process.returncode in EXITING_SIGNALS:
400 self.add_stack_trace_to_logfile()
401 self.set_result(Result.FAILED,
402 "Application exited with signal %s" % (
403 EXITING_SIGNALS[self.process.returncode]))
404 elif self.process.returncode == VALGRIND_ERROR_CODE:
405 self.set_result(Result.FAILED, "Valgrind reported errors")
407 self.set_result(Result.FAILED,
408 "Application returned %d" % (self.process.returncode))
410 def get_current_value(self):
412 Lets subclasses implement a nicer timeout measurement method
413 They should return some value with which we will compare
414 the previous and timeout if they are egual during self.timeout
417 return Result.NOT_RUN
419 def process_update(self):
421 Returns True when process has finished running or has timed out.
424 if self.process is None:
425 # Process has not started running yet
429 if self.process.returncode is not None:
432 val = self.get_current_value()
434 self.debug("Got value: %s" % val)
435 if val is Result.NOT_RUN:
436 # The get_current_value logic is not implemented... dumb
438 if time.time() - self.last_change_ts > self.timeout:
439 self.set_result(Result.TIMEOUT,
440 "Application timed out: %s secs" %
445 elif val is Result.FAILED:
447 elif val is Result.KNOWN_ERROR:
450 self.log("New val %s" % val)
452 if val == self.last_val:
453 delta = time.time() - self.last_change_ts
454 self.debug("%s: Same value for %d/%d seconds" %
455 (self, delta, self.timeout))
456 if delta > self.timeout:
457 self.set_result(Result.TIMEOUT,
458 "Application timed out: %s secs" %
462 elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
464 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
467 self.last_change_ts = time.time()
472 def get_subproc_env(self):
473 return os.environ.copy()
475 def kill_subprocess(self):
477 if self.options.rr and self.process and self.process.returncode is None:
478 cmd = ["ps", "-o", "pid", "--ppid", str(self.process.pid), "--noheaders"]
480 subprocs_id = [int(pid.strip('\n')) for
481 pid in subprocess.check_output(cmd).decode().split(' ') if pid]
482 except FileNotFoundError:
483 self.error("Ps not found, will probably not be able to get rr "
484 "working properly after we kill the process")
485 except subprocess.CalledProcessError as e:
486 self.error("Couldn't get rr subprocess pid: %s" % (e))
488 utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT, subprocs_id)
490 def run_external_checks(self):
493 def thread_wrapper(self):
495 # Restore the SIGINT handler for the child process (gdb) to ensure
497 signal.signal(signal.SIGINT, signal.SIG_DFL)
499 if self.options.gdb and os.name != "nt":
500 preexec_fn = enable_sigint
504 self.process = subprocess.Popen(self.command,
509 preexec_fn=preexec_fn)
511 if self.result is not Result.TIMEOUT:
512 if self.process.returncode == 0:
513 self.run_external_checks()
516 def get_valgrind_suppression_file(self, subdir, name):
517 p = get_data_file(subdir, name)
521 self.error("Could not find any %s file" % name)
523 def get_valgrind_suppressions(self):
524 return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
526 def use_gdb(self, command):
527 if self.hard_timeout is not None:
528 self.hard_timeout *= GDB_TIMEOUT_FACTOR
529 self.timeout *= GDB_TIMEOUT_FACTOR
531 if not self.options.gdb_non_stop:
532 self.timeout = sys.maxsize
533 self.hard_timeout = sys.maxsize
536 if self.options.gdb_non_stop:
537 args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
538 args += ["--args"] + command
541 def use_rr(self, command, subenv):
542 command = ["rr", 'record', '-h'] + command
544 self.timeout *= RR_TIMEOUT_FACTOR
545 self.rr_logdir = os.path.join(self.options.logsdir, self.classname.replace(".", os.sep), 'rr-logs')
546 subenv['_RR_TRACE_DIR'] = self.rr_logdir
548 shutil.rmtree(self.rr_logdir, ignore_errors=False, onerror=None)
549 except FileNotFoundError:
551 self.add_env_variable('_RR_TRACE_DIR', self.rr_logdir)
555 def use_valgrind(self, command, subenv):
556 vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
557 self.extra_logfiles.add(vglogsfile)
561 for o, v in [('trace-children', 'yes'),
562 ('tool', 'memcheck'),
563 ('leak-check', 'full'),
564 ('leak-resolution', 'high'),
565 # TODO: errors-for-leak-kinds should be set to all instead of definite
566 # and all false positives should be added to suppression
568 ('errors-for-leak-kinds', 'definite,indirect'),
569 ('show-leak-kinds', 'definite,indirect'),
570 ('show-possibly-lost', 'no'),
571 ('num-callers', '20'),
572 ('error-exitcode', str(VALGRIND_ERROR_CODE)),
573 ('gen-suppressions', 'all')]:
574 vg_args.append("--%s=%s" % (o, v))
576 if not self.options.redirect_logs:
577 vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
578 self.extra_logfiles.add(vglogsfile)
579 vg_args.append("--%s=%s" % ('log-file', vglogsfile))
581 for supp in self.get_valgrind_suppressions():
582 vg_args.append("--suppressions=%s" % supp)
584 command = ["valgrind"] + vg_args + command
586 # Tune GLib's memory allocator to be more valgrind friendly
587 subenv['G_DEBUG'] = 'gc-friendly'
588 subenv['G_SLICE'] = 'always-malloc'
590 if self.hard_timeout is not None:
591 self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
592 self.timeout *= VALGRIND_TIMEOUT_FACTOR
594 # Enable 'valgrind.config'
595 self.add_validate_config(get_data_file(
596 'data', 'valgrind.config'), subenv)
597 if subenv == self.proc_env:
598 self.add_env_variable('G_DEBUG', 'gc-friendly')
599 self.add_env_variable('G_SLICE', 'always-malloc')
600 self.add_env_variable('GST_VALIDATE_CONFIG',
601 self.proc_env['GST_VALIDATE_CONFIG'])
605 def add_validate_config(self, config, subenv=None):
607 subenv = self.extra_env_variables
609 cconf = subenv.get('GST_VALIDATE_CONFIG', "")
610 paths = [c for c in cconf.split(os.pathsep) if c] + [config]
611 subenv['GST_VALIDATE_CONFIG'] = os.pathsep.join(paths)
613 def launch_server(self):
616 def get_logfile_repr(self):
617 if not self.options.redirect_logs:
624 log = CI_ARTIFACTS_URL + os.path.relpath(log, self.options.logsdir)
626 return "\n Log: %s" % (log)
630 def get_command_repr(self):
631 message = "%s %s" % (self._env_variable, ' '.join(
632 shlex.quote(arg) for arg in self.command))
633 if self.server_command:
634 message = "%s & %s" % (self.server_command, message)
638 def test_start(self, queue):
641 self.server_command = self.launch_server()
643 self.command = [self.application]
644 self._starting_time = time.time()
645 self.build_arguments()
646 self.proc_env = self.get_subproc_env()
648 for var, value in list(self.extra_env_variables.items()):
649 value = self.proc_env.get(var, '') + os.pathsep + value
650 self.proc_env[var] = value.strip(os.pathsep)
651 self.add_env_variable(var, self.proc_env[var])
654 self.command = self.use_gdb(self.command)
656 self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
657 # Make the gst-validate executable ignore SIGINT while gdb is
659 signal.signal(signal.SIGINT, signal.SIG_IGN)
661 if self.options.valgrind:
662 self.command = self.use_valgrind(self.command, self.proc_env)
665 self.command = self.use_rr(self.command, self.proc_env)
667 if not self.options.redirect_logs:
668 self.out.write("# `%s`\n\n"
669 "## Command\n\n``` bash\n%s\n```\n\n" % (
670 self.classname, self.get_command_repr()))
671 self.out.write("## %s output\n\n``` \n\n" % os.path.basename(self.application))
674 message = "Launching: %s%s\n" \
675 " Command: %s\n" % (Colors.ENDC, self.classname,
676 self.get_command_repr())
677 printc(message, Colors.OKBLUE)
679 self.thread = threading.Thread(target=self.thread_wrapper)
683 self.last_change_ts = time.time()
684 self.start_ts = time.time()
686 def _dump_log_file(self, logfile):
689 subprocess.check_call(['bat', '-H', '1', '--paging=never', logfile])
691 except (subprocess.CalledProcessError, FileNotFoundError):
694 with open(logfile, 'r') as fin:
695 for line in fin.readlines():
696 print('> ' + line, end='')
698 def _dump_log_files(self):
699 self._dump_log_file(self.logfile)
701 def copy_logfiles(self, extra_folder="flaky_tests"):
702 path = os.path.dirname(os.path.join(self.options.logsdir, extra_folder,
703 self.classname.replace(".", os.sep)))
705 self.logfile = shutil.copy(self.logfile, path)
707 for logfile in self.extra_logfiles:
708 extra_logs.append(shutil.copy(logfile, path))
709 self.extra_logfiles = extra_logs
711 def test_end(self, retry_on_failure=False):
712 self.kill_subprocess()
714 self.time_taken = time.time() - self._starting_time
717 signal.signal(signal.SIGINT, self.previous_sigint_handler)
719 self.finalize_logfiles()
723 if self.options.dump_on_failure:
724 if self.result not in [Result.PASSED, Result.KNOWN_ERROR, Result.NOT_RUN]:
725 self._dump_log_files()
727 # Only keep around env variables we need later
729 for n in self.__env_variable:
730 clean_env[n] = self.proc_env.get(n, None)
731 self.proc_env = clean_env
733 # Don't keep around JSON report objects, they were processed
734 # in check_results already
740 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
744 class GstValidateListener(socketserver.BaseRequestHandler, Loggable):
746 def __init__(self, *args, **kwargs):
747 super().__init__(*args, **kwargs)
748 Loggable.__init__(self, "GstValidateListener")
751 """Implements BaseRequestHandler handle method"""
753 self.logCategory = "GstValidateListener"
755 raw_len = self.request.recv(4)
758 msglen = struct.unpack('>I', raw_len)[0]
761 while msglen != len(raw_msg):
762 raw_msg += self.request.recv(msglen - len(raw_msg))
766 msg = raw_msg.decode('utf-8', 'ignore')
767 except UnicodeDecodeError as e:
768 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
775 obj = json.loads(msg)
776 except json.decoder.JSONDecodeError as e:
777 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
781 # First message must contain the uuid
782 uuid = obj.get("uuid", None)
785 # Find test from launcher
786 for t in self.server.launcher.tests:
787 if uuid == t.get_uuid():
791 self.server.launcher.error(
792 "Could not find test for UUID %s" % uuid)
795 obj_type = obj.get("type", '')
796 if obj_type == 'position':
797 test.set_position(obj['position'], obj['duration'],
799 elif obj_type == 'buffering':
800 test.set_position(obj['position'], 100)
801 elif obj_type == 'action':
802 test.add_action_execution(obj)
803 # Make sure that action is taken into account when checking if process
806 elif obj_type == 'action-done':
807 # Make sure that action end is taken into account when checking if process
810 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
811 elif obj_type == 'report':
813 elif obj_type == 'skip-test':
814 test.set_result(Result.SKIPPED)
817 class GstValidateTest(Test):
819 """ A class representing a particular test. """
820 HARD_TIMEOUT_FACTOR = 5
821 fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
822 needs_gst_inspect = set()
824 def __init__(self, application_name, classname,
825 options, reporter, duration=0,
826 timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
827 media_descriptor=None, extra_env_variables=None,
828 expected_issues=None, workdir=None):
830 extra_env_variables = extra_env_variables or {}
832 if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
834 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
836 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
840 # If we are running from source, use the -debug version of the
841 # application which is using rpath instead of libtool's wrappers. It's
842 # slightly faster to start and will not confuse valgrind.
843 debug = '%s-debug' % application_name
844 p = look_for_file_in_source_dir('tools', debug)
850 self.media_duration = -1
852 self.actions_infos = []
853 self.media_descriptor = media_descriptor
857 override_path = self.get_override_file(media_descriptor)
859 if extra_env_variables:
860 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
862 "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
864 extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
866 super(GstValidateTest, self).__init__(application_name, classname,
870 hard_timeout=hard_timeout,
871 extra_env_variables=extra_env_variables,
872 expected_issues=expected_issues,
875 if scenario is None or scenario.name.lower() == "none":
878 self.scenario = scenario
880 def kill_subprocess(self):
881 Test.kill_subprocess(self)
883 def add_report(self, report):
884 self.reports.append(report)
886 def set_position(self, position, duration, speed=None):
887 self.position = position
888 self.media_duration = duration
892 def add_action_execution(self, action_infos):
893 self.actions_infos.append(action_infos)
895 def get_override_file(self, media_descriptor):
897 if media_descriptor.get_path():
898 override_path = os.path.splitext(media_descriptor.get_path())[
899 0] + VALIDATE_OVERRIDE_EXTENSION
900 if os.path.exists(override_path):
905 def get_current_position(self):
908 def get_current_value(self):
911 def get_subproc_env(self):
912 subproc_env = os.environ.copy()
914 if self.options.validate_default_config:
915 self.add_validate_config(self.options.validate_default_config,
918 subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
919 subproc_env["GST_VALIDATE_LOGSDIR"] = self.options.logsdir
921 if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
922 gstlogsfile = os.path.splitext(self.logfile)[0] + '.gstdebug'
923 self.extra_logfiles.add(gstlogsfile)
924 subproc_env["GST_DEBUG_FILE"] = gstlogsfile
926 if self.options.no_color:
927 subproc_env["GST_DEBUG_NO_COLOR"] = '1'
929 # Ensure XInitThreads is called, see bgo#731525
930 subproc_env['GST_GL_XINITTHREADS'] = '1'
931 self.add_env_variable('GST_GL_XINITTHREADS', '1')
933 if self.scenario is not None:
934 scenario = self.scenario.get_execution_name()
935 subproc_env["GST_VALIDATE_SCENARIO"] = scenario
936 self.add_env_variable("GST_VALIDATE_SCENARIO",
937 subproc_env["GST_VALIDATE_SCENARIO"])
940 del subproc_env["GST_VALIDATE_SCENARIO"]
944 if not subproc_env.get('GST_DEBUG_DUMP_DOT_DIR'):
945 dotfilesdir = os.path.join(self.options.logsdir,
946 self.classname.replace(".", os.sep) + '.pipelines_dot_files')
948 subproc_env['GST_DEBUG_DUMP_DOT_DIR'] = dotfilesdir
950 dotfilesurl = CI_ARTIFACTS_URL + os.path.relpath(dotfilesdir,
951 self.options.logsdir)
952 subproc_env['GST_VALIDATE_DEBUG_DUMP_DOT_URL'] = dotfilesurl
960 self.media_duration = -1
962 self.actions_infos = []
964 def build_arguments(self):
965 super(GstValidateTest, self).build_arguments()
966 if "GST_VALIDATE" in os.environ:
967 self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
969 if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
970 self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
971 os.environ["GST_VALIDATE_SCENARIOS_PATH"])
973 self.add_env_variable("GST_VALIDATE_CONFIG")
974 self.add_env_variable("GST_VALIDATE_OVERRIDE")
976 def get_extra_log_content(self, extralog):
977 value = Test.get_extra_log_content(self, extralog)
981 def report_matches_expected_issues(self, report, expected_issue):
982 for key in ['bug', 'bugs', 'sometimes']:
983 if key in expected_issue:
984 del expected_issue[key]
985 for key, value in list(report.items()):
986 if key in expected_issue:
987 if not re.findall(expected_issue[key], str(value)):
989 expected_issue.pop(key)
991 if "can-happen-several-times" in expected_issue:
992 expected_issue.pop("can-happen-several-times")
993 return not bool(expected_issue)
995 def check_reported_issues(self, expected_issues):
997 expected_retcode = [0]
998 for report in self.reports:
1000 for expected_issue in expected_issues:
1001 if self.report_matches_expected_issues(report,
1002 expected_issue.copy()):
1003 found = expected_issue
1006 if found is not None:
1007 if not found.get('can-happen-several-times', False):
1008 expected_issues.remove(found)
1009 if report['level'] == 'critical':
1010 if found.get('sometimes', True) and isinstance(expected_retcode, list):
1011 expected_retcode.append(18)
1013 expected_retcode = [18]
1014 elif report['level'] == 'critical':
1018 return None, expected_issues, expected_retcode
1020 return ret, expected_issues, expected_retcode
1022 def check_expected_issue(self, expected_issue):
1025 expected_symbols = expected_issue.get('stacktrace_symbols')
1026 if expected_symbols:
1027 trace_gatherer = BackTraceGenerator.get_default()
1028 stack_trace = trace_gatherer.get_trace(self)
1031 if not isinstance(expected_symbols, list):
1032 expected_symbols = [expected_symbols]
1034 not_found_symbols = [s for s in expected_symbols
1035 if s not in stack_trace]
1036 if not_found_symbols:
1037 msg = " Expected symbols '%s' not found in stack trace " % (
1041 msg += " No stack trace available, could not verify symbols "
1043 _, not_found_expected_issues, _ = self.check_reported_issues(expected_issue.get('issues', []))
1044 if not_found_expected_issues:
1045 mandatory_failures = [f for f in not_found_expected_issues
1046 if not f.get('sometimes', True)]
1047 if mandatory_failures:
1048 msg = " (Expected issues not found: %s) " % mandatory_failures
1053 def check_expected_timeout(self, expected_timeout):
1054 msg = "Expected timeout happened. "
1055 result = Result.PASSED
1056 message = expected_timeout.get('message')
1058 if not re.findall(message, self.message):
1059 result = Result.FAILED
1060 msg = "Expected timeout message: %s got %s " % (
1061 message, self.message)
1063 stack_msg, stack_res = self.check_expected_issue(expected_timeout)
1065 result = Result.TIMEOUT
1070 def check_results(self):
1071 if self.result in [Result.FAILED, Result.PASSED, Result.SKIPPED]:
1074 self.debug("%s returncode: %s", self, self.process.returncode)
1075 expected_issues = copy.deepcopy(self.expected_issues)
1077 # signal.SIGPPIPE is 13 but it sometimes isn't present in python for some reason.
1078 expected_issues.append({"returncode": -13, "sometimes": True})
1079 self.criticals, not_found_expected_issues, expected_returncode = self.check_reported_issues(expected_issues)
1080 expected_timeout = None
1081 expected_signal = None
1082 for i, f in enumerate(not_found_expected_issues):
1083 returncode = f.get('returncode', [])
1084 if not isinstance(returncode, list):
1085 returncode = [returncode]
1087 if f.get('signame'):
1088 signames = f['signame']
1089 if not isinstance(signames, list):
1090 signames = [signames]
1092 returncode = [EXITING_SIGNALS[signame] for signame in signames]
1095 if 'sometimes' in f:
1096 returncode.append(0)
1097 expected_returncode = returncode
1099 elif f.get("timeout"):
1100 expected_timeout = f
1102 not_found_expected_issues = [f for f in not_found_expected_issues
1103 if not f.get('returncode') and not f.get('signame')]
1106 result = Result.PASSED
1107 if self.result == Result.TIMEOUT:
1108 with open(self.logfile) as f:
1109 signal_fault_info = self.fault_sig_regex.findall(f.read())
1110 if signal_fault_info:
1111 result = Result.FAILED
1112 msg = signal_fault_info[0]
1113 elif expected_timeout:
1114 not_found_expected_issues.remove(expected_timeout)
1115 result, msg = self.check_expected_timeout(expected_timeout)
1118 elif self.process.returncode in EXITING_SIGNALS:
1119 msg = "Application exited with signal %s" % (
1120 EXITING_SIGNALS[self.process.returncode])
1121 if self.process.returncode not in expected_returncode:
1122 result = Result.FAILED
1125 stack_msg, stack_res = self.check_expected_issue(
1129 result = Result.FAILED
1130 self.add_stack_trace_to_logfile()
1131 elif self.process.returncode == VALGRIND_ERROR_CODE:
1132 msg = "Valgrind reported errors "
1133 result = Result.FAILED
1134 elif self.process.returncode not in expected_returncode:
1135 msg = "Application returned %s " % self.process.returncode
1136 if expected_returncode != [0]:
1137 msg += "(expected %s) " % expected_returncode
1138 result = Result.FAILED
1141 msg += "(critical errors: [%s]) " % ', '.join(set([c['summary']
1142 for c in self.criticals]))
1143 result = Result.FAILED
1145 if not_found_expected_issues:
1146 mandatory_failures = [f for f in not_found_expected_issues
1147 if not f.get('sometimes', True)]
1149 if mandatory_failures:
1150 msg += " (Expected errors not found: %s) " % mandatory_failures
1151 result = Result.FAILED
1152 elif self.expected_issues:
1153 msg += ' %s(Expected errors occurred: %s)%s' % (Colors.OKBLUE,
1154 self.expected_issues,
1156 result = Result.KNOWN_ERROR
1158 if result == Result.PASSED:
1159 for report in self.reports:
1160 if report["level"] == "expected":
1161 result = Result.KNOWN_ERROR
1164 self.set_result(result, msg.strip())
1166 def _generate_expected_issues(self):
1168 self.criticals = self.criticals or []
1169 if self.result == Result.TIMEOUT:
1175 for report in self.criticals:
1176 res += "\n%s{" % (" " * 12)
1178 for key, value in report.items():
1183 res += '\n%s%s"%s": "%s",' % (
1184 " " * 16, "# " if key == "details" else "",
1185 key, value.replace('\n', '\\n'))
1187 res += "\n%s}," % (" " * 12)
1191 def get_valgrind_suppressions(self):
1192 result = super(GstValidateTest, self).get_valgrind_suppressions()
1193 result.extend(utils.get_gst_build_valgrind_suppressions())
1194 gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
1196 result.append(gst_sup)
1201 class GstValidateEncodingTestInterface(object):
1202 DURATION_TOLERANCE = GST_SECOND / 4
1204 def __init__(self, combination, media_descriptor, duration_tolerance=None):
1205 super(GstValidateEncodingTestInterface, self).__init__()
1207 self.media_descriptor = media_descriptor
1208 self.combination = combination
1211 self._duration_tolerance = duration_tolerance
1212 if duration_tolerance is None:
1213 self._duration_tolerance = self.DURATION_TOLERANCE
1215 def get_current_size(self):
1217 size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
1221 self.debug("Size: %s" % size)
1224 def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1225 audio_restriction=None, audio_presence=0,
1226 video_presence=0, variable_framerate=False):
1232 if video_restriction is not None:
1233 ret = ret + video_restriction + '->'
1237 props += 'presence=%s,' % str(video_presence)
1238 if variable_framerate:
1239 props += 'variable-framerate=true,'
1241 ret = ret + '|' + props[:-1]
1244 if audio_restriction is not None:
1245 ret = ret + audio_restriction + '->'
1248 ret = ret + '|' + str(audio_presence)
1250 return ret.replace("::", ":")
1252 def get_profile(self, video_restriction=None, audio_restriction=None,
1253 variable_framerate=False):
1254 vcaps = self.combination.get_video_caps()
1255 acaps = self.combination.get_audio_caps()
1256 if video_restriction is None:
1257 video_restriction = self.combination.video_restriction
1258 if audio_restriction is None:
1259 audio_restriction = self.combination.audio_restriction
1260 if self.media_descriptor is not None:
1261 if self.combination.video == "theora":
1262 # Theoraenc doesn't support variable framerate, make sure to avoid them
1263 framerate = self.media_descriptor.get_framerate()
1264 if framerate == Fraction(0, 1):
1265 framerate = Fraction(30, 1)
1266 restriction = utils.GstCaps.new_from_str(video_restriction or "video/x-raw")
1267 for struct, _ in restriction:
1268 if struct.get("framerate") is None:
1269 struct.set("framerate", struct.FRACTION_TYPE, framerate)
1270 video_restriction = str(restriction)
1272 video_presence = self.media_descriptor.get_num_tracks("video")
1273 if video_presence == 0:
1276 audio_presence = self.media_descriptor.get_num_tracks("audio")
1277 if audio_presence == 0:
1280 return self._get_profile_full(self.combination.get_muxer_caps(),
1282 audio_presence=audio_presence,
1283 video_presence=video_presence,
1284 video_restriction=video_restriction,
1285 audio_restriction=audio_restriction,
1286 variable_framerate=variable_framerate)
1288 def _clean_caps(self, caps):
1290 Returns a list of key=value or structure name, without "(types)" or ";" or ","
1292 return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1294 # pylint: disable=E1101
1295 def _has_caps_type_variant(self, c, ccaps):
1297 Handle situations where we can have application/ogg or video/ogg or
1301 media_type = re.findall("application/|video/|audio/", c)
1303 media_type = media_type[0].replace('/', '')
1304 possible_mtypes = ["application", "video", "audio"]
1305 possible_mtypes.remove(media_type)
1306 for tmptype in possible_mtypes:
1307 possible_c_variant = c.replace(media_type, tmptype)
1308 if possible_c_variant in ccaps:
1310 "Found %s in %s, good enough!", possible_c_variant, ccaps)
1315 # pylint: disable=E1101
1316 def run_iqa_test(self, reference_file_uri):
1318 Runs IQA test if @reference_file_path exists
1319 @test: The test to run tests on
1321 if not GstValidateBaseTestManager.has_feature('iqa'):
1322 self.debug('Iqa element not present, not running extra test.')
1326 uridecodebin uri=%s !
1327 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1328 uridecodebin uri=%s ! iqa.
1329 """ % (reference_file_uri, self.dest_file)
1330 pipeline_desc = pipeline_desc.replace("\n", "")
1332 command = [GstValidateBaseTestManager.COMMAND] + \
1333 shlex.split(pipeline_desc)
1334 msg = "## Running IQA tests on results of: " \
1335 + "%s\n### Command: \n```\n%s\n```\n" % (
1336 self.classname, ' '.join(command))
1337 if not self.options.redirect_logs:
1341 printc(msg, Colors.OKBLUE)
1343 self.process = subprocess.Popen(command,
1350 def check_encoded_file(self):
1351 result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1353 if result_descriptor is None:
1354 return (Result.FAILED, "Could not discover encoded file %s"
1357 duration = result_descriptor.get_duration()
1358 orig_duration = self.media_descriptor.get_duration()
1359 tolerance = self._duration_tolerance
1361 if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1362 os.remove(result_descriptor.get_path())
1366 'issue-id': 'transcoded-file-wrong-duration',
1367 'summary': 'The duration of a transcoded file doesn\'t match the duration of the original file',
1368 'level': 'critical',
1369 'detected-on': 'pipeline',
1370 'details': "Duration of encoded file is " " wrong (%s instead of %s)" % (
1371 utils.TIME_ARGS(duration), utils.TIME_ARGS(orig_duration))
1375 all_tracks_caps = result_descriptor.get_tracks_caps()
1376 container_caps = result_descriptor.get_caps()
1378 all_tracks_caps.insert(0, ("container", container_caps))
1380 for track_type, caps in all_tracks_caps:
1381 ccaps = self._clean_caps(caps)
1382 wanted_caps = self.combination.get_caps(track_type)
1383 cwanted_caps = self._clean_caps(wanted_caps)
1385 if wanted_caps is None:
1386 os.remove(result_descriptor.get_path())
1390 'issue-id': 'transcoded-file-wrong-stream-type',
1391 'summary': 'Expected stream types during transcoding do not match expectations',
1392 'level': 'critical',
1393 'detected-on': 'pipeline',
1394 'details': "Found a track of type %s in the encoded files"
1395 " but none where wanted in the encoded profile: %s" % (
1396 track_type, self.combination)
1401 for c in cwanted_caps:
1403 if not self._has_caps_type_variant(c, ccaps):
1404 os.remove(result_descriptor.get_path())
1408 'issue-id': 'transcoded-file-wrong-caps',
1409 'summary': 'Expected stream caps during transcoding do not match expectations',
1410 'level': 'critical',
1411 'detected-on': 'pipeline',
1412 'details': "Field: %s (from %s) not in caps of the outputted file %s" % (
1413 wanted_caps, c, ccaps)
1418 os.remove(result_descriptor.get_path())
1421 class TestsManager(Loggable):
1423 """ A class responsible for managing tests. """
1426 loading_testsuite = None
1430 Loggable.__init__(self)
1433 self.unwanted_tests = []
1436 self.reporter = None
1437 self.wanted_tests_patterns = []
1438 self.blacklisted_tests_patterns = []
1439 self._generators = []
1440 self.check_testslist = True
1441 self.all_tests = None
1442 self.expected_issues = {}
1443 self.blacklisted_tests = []
1448 def list_tests(self):
1449 return sorted(list(self.tests), key=lambda x: x.classname)
1451 def find_tests(self, classname):
1452 regex = re.compile(classname)
1453 return [test for test in self.list_tests() if regex.findall(test.classname)]
1455 def add_expected_issues(self, expected_issues):
1456 for bugid, failure_def in list(expected_issues.items()):
1458 for test_name_regex in failure_def['tests']:
1459 regex = re.compile(test_name_regex)
1460 tests_regexes.append(regex)
1461 for test in self.tests:
1462 if regex.findall(test.classname):
1463 if failure_def.get('allow_flakiness'):
1464 test.allow_flakiness = True
1465 self.debug("%s allow flakiness" % (test.classname))
1467 for issue in failure_def['issues']:
1468 issue['bug'] = bugid
1469 test.expected_issues.extend(failure_def['issues'])
1470 self.debug("%s added expected issues from %s" % (
1471 test.classname, bugid))
1472 failure_def['tests'] = tests_regexes
1474 self.expected_issues.update(expected_issues)
1476 def add_test(self, test):
1477 if test.generator is None:
1478 test.classname = self.loading_testsuite + '.' + test.classname
1480 for bugid, failure_def in list(self.expected_issues.items()):
1481 failure_def['bug'] = bugid
1482 for regex in failure_def['tests']:
1483 if regex.findall(test.classname):
1484 if failure_def.get('allow_flakiness'):
1485 test.allow_flakiness = True
1486 self.debug("%s allow flakiness" % (test.classname))
1488 for issue in failure_def['issues']:
1489 issue['bug'] = bugid
1490 test.expected_issues.extend(failure_def['issues'])
1491 self.debug("%s added expected issues from %s" % (
1492 test.classname, bugid))
1494 if self._is_test_wanted(test):
1495 if test not in self.tests:
1496 self.tests.append(test)
1498 if test not in self.tests:
1499 self.unwanted_tests.append(test)
1501 def get_tests(self):
1504 def populate_testsuite(self):
1507 def add_generators(self, generators):
1509 @generators: A list of, or one single #TestsGenerator to be used to generate tests
1511 if not isinstance(generators, list):
1512 generators = [generators]
1513 self._generators.extend(generators)
1514 for generator in generators:
1515 generator.testsuite = self.loading_testsuite
1517 self._generators = list(set(self._generators))
1519 def get_generators(self):
1520 return self._generators
1522 def _add_blacklist(self, blacklisted_tests):
1523 if not isinstance(blacklisted_tests, list):
1524 blacklisted_tests = [blacklisted_tests]
1526 for patterns in blacklisted_tests:
1527 for pattern in patterns.split(","):
1528 self.blacklisted_tests_patterns.append(re.compile(pattern))
1530 def set_default_blacklist(self, default_blacklist):
1531 for test_regex, reason in default_blacklist:
1532 if not test_regex.startswith(self.loading_testsuite + '.'):
1533 test_regex = self.loading_testsuite + '.' + test_regex
1534 self.blacklisted_tests.append((test_regex, reason))
1535 self._add_blacklist(test_regex)
1537 def add_options(self, parser):
1538 """ Add more arguments. """
1541 def set_settings(self, options, args, reporter):
1542 """ Set properties after options parsing. """
1543 self.options = options
1545 self.reporter = reporter
1547 self.populate_testsuite()
1549 if self.options.valgrind:
1550 self.print_valgrind_bugs()
1552 if options.wanted_tests:
1553 for patterns in options.wanted_tests:
1554 for pattern in patterns.split(","):
1555 self.wanted_tests_patterns.append(re.compile(pattern))
1557 if options.blacklisted_tests:
1558 for patterns in options.blacklisted_tests:
1559 self._add_blacklist(patterns)
1561 def check_blacklists(self):
1562 if self.options.check_bugs_status:
1563 if not check_bugs_resolution(self.blacklisted_tests):
1568 def log_blacklists(self):
1569 if self.blacklisted_tests:
1570 self.info("Currently 'hardcoded' %s blacklisted tests:" %
1573 for name, bug in self.blacklisted_tests:
1574 if not self.options.check_bugs_status:
1575 self.info(" + %s --> bug: %s" % (name, bug))
1577 def check_expected_issues(self):
1578 if not self.expected_issues or not self.options.check_bugs_status:
1581 bugs_definitions = defaultdict(list)
1582 for bug, failure_def in list(self.expected_issues.items()):
1583 tests_names = '|'.join(
1584 [regex.pattern for regex in failure_def['tests']])
1585 bugs_definitions[tests_names].extend([bug])
1587 return check_bugs_resolution(bugs_definitions.items())
1589 def _check_blacklisted(self, test):
1590 for pattern in self.blacklisted_tests_patterns:
1591 if pattern.findall(test.classname):
1592 self.info("%s is blacklisted by %s", test.classname, pattern)
1597 def _check_whitelisted(self, test):
1598 for pattern in self.wanted_tests_patterns:
1599 if pattern.findall(test.classname):
1600 if self._check_blacklisted(test):
1601 # If explicitly white listed that specific test
1602 # bypass the blacklisting
1603 if pattern.pattern != test.classname:
1608 def _check_duration(self, test):
1609 if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1610 self.info("Not activating %s as its duration (%d) is superior"
1611 " than the long limit (%d)" % (test, test.duration,
1612 int(self.options.long_limit)))
1617 def _is_test_wanted(self, test):
1618 if self._check_whitelisted(test):
1619 if not self._check_duration(test):
1623 if self._check_blacklisted(test):
1626 if not self._check_duration(test):
1629 if not self.wanted_tests_patterns:
1634 def needs_http_server(self):
1637 def print_valgrind_bugs(self):
1641 class TestsGenerator(Loggable):
1643 def __init__(self, name, test_manager, tests=[]):
1644 Loggable.__init__(self)
1646 self.test_manager = test_manager
1647 self.testsuite = None
1650 self._tests[test.classname] = test
1652 def generate_tests(self, *kwargs):
1654 Method that generates tests
1656 return list(self._tests.values())
1658 def add_test(self, test):
1659 test.generator = self
1660 test.classname = self.testsuite + '.' + test.classname
1661 self._tests[test.classname] = test
1664 class GstValidateTestsGenerator(TestsGenerator):
1666 def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1669 def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1670 self.populate_tests(uri_minfo_special_scenarios, scenarios)
1671 return super(GstValidateTestsGenerator, self).generate_tests()
1674 class _TestsLauncher(Loggable):
1678 Loggable.__init__(self)
1683 self.reporter = None
1684 self._list_testers()
1685 self.all_tests = None
1686 self.wanted_tests_patterns = []
1688 self.queue = queue.Queue()
1690 self.total_num_tests = 0
1691 self.current_progress = -1
1694 self.vfb_server = None
1696 def _list_app_dirs(self):
1698 env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1699 if env_dirs is not None:
1700 for dir_ in env_dirs.split(os.pathsep):
1701 app_dirs.append(dir_)
1705 def _exec_app(self, app_dir, env):
1707 files = os.listdir(app_dir)
1708 except OSError as e:
1709 self.debug("Could not list %s: %s" % (app_dir, e))
1712 if f.endswith(".py"):
1713 exec(compile(open(os.path.join(app_dir, f)).read(),
1714 os.path.join(app_dir, f), 'exec'), env)
1716 def _exec_apps(self, env):
1717 app_dirs = self._list_app_dirs()
1718 for app_dir in app_dirs:
1719 self._exec_app(app_dir, env)
1721 def _list_testers(self):
1722 env = globals().copy()
1723 self._exec_apps(env)
1725 testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1726 for tester in testers:
1727 if tester.init() is True:
1728 self.testers.append(tester)
1730 self.warning("Can not init tester: %s -- PATH is %s"
1731 % (tester.name, os.environ["PATH"]))
1733 def add_options(self, parser):
1734 for tester in self.testers:
1735 tester.add_options(parser)
1737 def _load_testsuite(self, testsuites):
1739 for testsuite in testsuites:
1741 sys.path.insert(0, os.path.dirname(testsuite))
1742 spec = importlib.util.spec_from_file_location(os.path.basename(testsuite).replace(".py", ""), testsuite)
1743 module = importlib.util.module_from_spec(spec)
1744 spec.loader.exec_module(module)
1745 return (module, None)
1746 except Exception as e:
1747 exceptions.append("Could not load %s: %s" % (testsuite, e))
1750 sys.path.remove(os.path.dirname(testsuite))
1752 return (None, exceptions)
1754 def _load_testsuites(self):
1756 for testsuite in self.options.testsuites:
1757 if testsuite.endswith('.py') and os.path.exists(testsuite):
1758 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1759 loaded_module = self._load_testsuite([testsuite])
1761 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1762 for d in self.options.testsuites_dirs]
1763 loaded_module = self._load_testsuite(possible_testsuites_paths)
1765 module = loaded_module[0]
1766 if not loaded_module[0]:
1767 if "." in testsuite:
1768 self.options.testsuites.append(testsuite.split('.')[0])
1769 self.info("%s looks like a test name, trying that" %
1771 self.options.wanted_tests.append(testsuite)
1773 if testsuite in testsuites:
1774 self.info('Testuite %s was loaded previously', testsuite)
1776 printc("Could not load testsuite: %s, reasons: %s" % (
1777 testsuite, loaded_module[1]), Colors.FAIL)
1780 if module.__name__ in testsuites:
1781 self.info("Trying to load testsuite '%s' a second time?", module.__name__)
1784 testsuites[module.__name__] = module
1785 if not hasattr(module, "TEST_MANAGER"):
1786 module.TEST_MANAGER = [tester.name for tester in self.testers]
1787 elif not isinstance(module.TEST_MANAGER, list):
1788 module.TEST_MANAGER = [module.TEST_MANAGER]
1790 self.options.testsuites = list(testsuites.values())
1792 def _setup_testsuites(self):
1793 for testsuite in self.options.testsuites:
1795 wanted_test_manager = None
1796 # TEST_MANAGER has been set in _load_testsuites()
1797 assert hasattr(testsuite, "TEST_MANAGER")
1798 wanted_test_manager = testsuite.TEST_MANAGER
1799 if not isinstance(wanted_test_manager, list):
1800 wanted_test_manager = [wanted_test_manager]
1802 for tester in self.testers:
1803 if wanted_test_manager is not None and \
1804 tester.name not in wanted_test_manager:
1807 prev_testsuite_name = TestsManager.loading_testsuite
1808 if self.options.user_paths:
1809 TestsManager.loading_testsuite = tester.name
1810 tester.register_defaults()
1813 TestsManager.loading_testsuite = testsuite.__name__
1814 if testsuite.setup_tests(tester, self.options):
1816 if prev_testsuite_name:
1817 TestsManager.loading_testsuite = prev_testsuite_name
1820 printc("Could not load testsuite: %s"
1821 " maybe because of missing TestManager"
1822 % (testsuite), Colors.FAIL)
1825 def _load_config(self, options):
1826 printc("Loading config files is DEPRECATED"
1827 " you should use the new testsuite format now",)
1829 for tester in self.testers:
1830 tester.options = options
1831 globals()[tester.name] = tester
1832 globals()["options"] = options
1833 c__file__ = __file__
1834 globals()["__file__"] = self.options.config
1835 exec(compile(open(self.options.config).read(),
1836 self.options.config, 'exec'), globals())
1837 globals()["__file__"] = c__file__
1839 def set_settings(self, options, args):
1840 if options.xunit_file:
1841 self.reporter = reporters.XunitReporter(options)
1843 self.reporter = reporters.Reporter(options)
1845 self.options = options
1846 wanted_testers = None
1847 for tester in self.testers:
1848 if tester.name in args:
1849 wanted_testers = tester.name
1852 testers = self.testers
1854 for tester in testers:
1855 if tester.name in args:
1856 self.testers.append(tester)
1857 args.remove(tester.name)
1860 self._load_config(options)
1862 self._load_testsuites()
1863 if not self.options.testsuites:
1864 printc("Not testsuite loaded!", Colors.FAIL)
1867 for tester in self.testers:
1868 tester.set_settings(options, args, self.reporter)
1870 if not options.config and options.testsuites:
1871 if self._setup_testsuites() is False:
1874 if self.options.check_bugs_status:
1875 printc("-> Checking bugs resolution... ", end='')
1877 for tester in self.testers:
1878 if not tester.check_blacklists():
1881 tester.log_blacklists()
1883 if not tester.check_expected_issues():
1886 if self.options.check_bugs_status:
1887 printc("OK", Colors.OKGREEN)
1889 if self.needs_http_server() or options.httponly is True:
1890 self.httpsrv = HTTPServer(options)
1891 self.httpsrv.start()
1893 if options.no_display:
1894 self.vfb_server = get_virual_frame_buffer_server(options)
1895 res = self.vfb_server.start()
1897 printc("Could not start virtual frame server: %s" % res[1],
1900 os.environ["DISPLAY"] = self.vfb_server.display_id
1904 def _check_tester_has_other_testsuite(self, testsuite, tester):
1905 if tester.name != testsuite.TEST_MANAGER[0]:
1908 for t in self.options.testsuites:
1910 for other_testmanager in t.TEST_MANAGER:
1911 if other_testmanager == tester.name:
1916 def _check_defined_tests(self, tester, tests):
1917 if self.options.blacklisted_tests or self.options.wanted_tests:
1920 tests_names = [test.classname for test in tests]
1921 testlist_changed = False
1922 for testsuite in self.options.testsuites:
1923 if not self._check_tester_has_other_testsuite(testsuite, tester) \
1924 and tester.check_testslist:
1926 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1929 know_tests = testlist_file.read().split("\n")
1930 testlist_file.close()
1932 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1938 for test in know_tests:
1939 if test and test.strip('~') not in tests_names:
1940 if not test.startswith('~'):
1941 testlist_changed = True
1942 printc("Test %s Not in testsuite %s anymore"
1943 % (test, testsuite.__file__), Colors.FAIL)
1945 optional_out.append((test, None))
1947 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1948 key=lambda x: x[0].strip('~'))
1950 for tname, test in tests_names:
1951 if test and test.optional:
1953 testlist_file.write("%s\n" % (tname))
1954 if tname and tname not in know_tests:
1955 printc("Test %s is NEW in testsuite %s"
1956 % (tname, testsuite.__file__),
1957 Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1958 testlist_changed = True
1960 testlist_file.close()
1963 return testlist_changed
1965 def _split_tests(self, num_groups):
1966 groups = [[] for x in range(num_groups)]
1967 group = cycle(groups)
1968 for test in self.tests:
1969 next(group).append(test)
1972 def list_tests(self):
1973 for tester in self.testers:
1974 if not self._tester_needed(tester):
1977 tests = tester.list_tests()
1978 if self._check_defined_tests(tester, tests) and \
1979 self.options.fail_on_testlist_change:
1980 raise RuntimeError("Unexpected new test in testsuite.")
1982 self.tests.extend(tests)
1983 self.tests.sort(key=lambda test: test.classname)
1985 if self.options.num_parts < 1:
1986 raise RuntimeError("Tests must be split in positive number of parts.")
1987 if self.options.num_parts > len(self.tests):
1988 raise RuntimeError("Cannot have more parts then there exist tests.")
1989 if self.options.part_index < 1 or self.options.part_index > self.options.num_parts:
1990 raise RuntimeError("Part index is out of range")
1992 self.tests = self._split_tests(self.options.num_parts)[self.options.part_index - 1]
1995 def _tester_needed(self, tester):
1996 for testsuite in self.options.testsuites:
1997 if tester.name in testsuite.TEST_MANAGER:
2001 def server_wrapper(self, ready):
2002 self.server = GstValidateTCPServer(
2003 ('localhost', 0), GstValidateListener)
2004 self.server.socket.settimeout(None)
2005 self.server.launcher = self
2006 self.serverport = self.server.socket.getsockname()[1]
2007 self.info("%s server port: %s" % (self, self.serverport))
2010 self.server.serve_forever(poll_interval=0.05)
2012 def _start_server(self):
2013 self.info("Starting TCP Server")
2014 ready = threading.Event()
2015 self.server_thread = threading.Thread(target=self.server_wrapper,
2016 kwargs={'ready': ready})
2017 self.server_thread.start()
2019 os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
2021 def _stop_server(self):
2023 self.server.shutdown()
2024 self.server_thread.join()
2025 self.server.server_close()
2028 def test_wait(self):
2030 # Check process every second for timeout
2032 self.queue.get(timeout=1)
2036 for test in self.jobs:
2037 if test.process_update():
2038 self.jobs.remove(test)
2041 def tests_wait(self):
2043 test = self.test_wait()
2044 test.check_results()
2045 except KeyboardInterrupt:
2046 for test in self.jobs:
2047 test.kill_subprocess()
2052 def start_new_job(self, tests_left):
2054 test = tests_left.pop(0)
2058 test.test_start(self.queue)
2060 self.jobs.append(test)
2064 def print_result(self, current_test_num, test, retry_on_failure=False):
2065 if test.result != Result.PASSED and not retry_on_failure:
2066 printc(str(test), color=utils.get_color_for_result(test.result))
2069 progress = int(length * current_test_num // self.total_num_tests)
2070 bar = 'â–ˆ' * progress + '-' * (length - progress)
2072 printc('\r|%s| [%s/%s]' % (bar, current_test_num, self.total_num_tests), end='\r')
2074 if progress > self.current_progress:
2075 self.current_progress = progress
2076 printc('|%s| [%s/%s]' % (bar, current_test_num, self.total_num_tests))
2078 def _run_tests(self, running_tests=None, all_alone=False, retry_on_failures=False):
2079 if not self.all_tests:
2080 self.all_tests = self.list_tests()
2082 if not running_tests:
2083 running_tests = self.tests
2085 self.reporter.init_timer()
2088 for test in running_tests:
2089 if test.is_parallel and not all_alone:
2092 alone_tests.append(test)
2094 # use max to defend against the case where all tests are alone_tests
2095 max_num_jobs = max(min(self.options.num_jobs, len(tests)), 1)
2098 if self.options.forever and len(tests) < self.options.num_jobs and len(tests):
2099 max_num_jobs = self.options.num_jobs
2102 while (len(tests) + len(copied)) < max_num_jobs:
2103 copied.append(tests[i].copy(len(copied) + 1))
2109 self.tests += copied
2111 self.total_num_tests = len(self.all_tests)
2112 printc("\nRunning %d tests..." % self.total_num_tests, color=Colors.HEADER)
2113 # if order of test execution doesn't matter, shuffle
2114 # the order to optimize cpu usage
2115 if self.options.shuffle:
2116 random.shuffle(tests)
2117 random.shuffle(alone_tests)
2119 current_test_num = 1
2121 for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
2122 tests_left = list(tests)
2123 for i in range(num_jobs):
2124 if not self.start_new_job(tests_left):
2128 while jobs_running != 0:
2129 test = self.tests_wait()
2131 current_test_num += 1
2132 res = test.test_end(retry_on_failure=retry_on_failures)
2134 if res not in [Result.PASSED, Result.SKIPPED, Result.KNOWN_ERROR]:
2135 if self.options.forever or self.options.fatal_error:
2136 self.print_result(current_test_num - 1, test, retry_on_failure=retry_on_failures)
2137 self.reporter.after_test(test)
2140 if retry_on_failures:
2141 if not self.options.redirect_logs and test.allow_flakiness:
2142 test.copy_logfiles()
2144 to_retry.append(test)
2146 # Not adding to final report if flakiness is tolerated
2147 to_report = not test.allow_flakiness
2148 self.print_result(current_test_num - 1, test, retry_on_failure=retry_on_failures)
2150 self.reporter.after_test(test)
2151 if retry_on_failures:
2153 if self.start_new_job(tests_left):
2157 printc("--> Rerunning the following tests to see if they are flaky:", Colors.WARNING)
2158 for test in to_retry:
2159 printc(' * %s' % test.classname)
2161 return self._run_tests(to_retry, all_alone=True, retry_on_failures=False)
2165 def clean_tests(self, stop_server=False):
2166 for test in self.tests:
2171 def run_tests(self):
2174 self._start_server()
2175 if self.options.forever:
2178 printc("-> Iteration %d" % r, end='\r')
2180 if not self._run_tests():
2184 msg = "-> Iteration %d... %sOK%s" % (r, Colors.OKGREEN, Colors.ENDC)
2185 printc(msg, end="\r")
2188 elif self.options.n_runs:
2190 for r in range(self.options.n_runs):
2191 printc("-> Iteration %d" % r, end='\r')
2192 if not self._run_tests():
2194 printc("ERROR", Colors.FAIL, end="\r")
2196 printc("OK", Colors.OKGREEN, end="\r")
2201 return self._run_tests(retry_on_failures=self.options.retry_on_failures)
2203 if self.options.forever:
2204 printc("\n-> Ran %d times" % r)
2208 self.vfb_server.stop()
2209 self.clean_tests(True)
2211 def final_report(self):
2212 return self.reporter.final_report()
2214 def needs_http_server(self):
2215 for tester in self.testers:
2216 if tester.needs_http_server():
2220 class NamedDic(object):
2222 def __init__(self, props):
2224 for name, value in props.items():
2225 setattr(self, name, value)
2228 class Scenario(object):
2230 def __init__(self, name, props, path=None):
2234 for prop, value in props:
2235 setattr(self, prop.replace("-", "_"), value)
2237 def get_execution_name(self):
2238 if self.path is not None:
2244 if hasattr(self, "seek"):
2245 return bool(self.seek)
2249 def needs_clock_sync(self):
2250 if hasattr(self, "need_clock_sync"):
2251 return bool(self.need_clock_sync)
2255 def needs_live_content(self):
2256 # Scenarios that can only be used on live content
2257 if hasattr(self, "live_content_required"):
2258 return bool(self.live_content_required)
2261 def compatible_with_live_content(self):
2262 # if a live content is required it's implicitly compatible with
2264 if self.needs_live_content():
2266 if hasattr(self, "live_content_compatible"):
2267 return bool(self.live_content_compatible)
2270 def get_min_media_duration(self):
2271 if hasattr(self, "min_media_duration"):
2272 return float(self.min_media_duration)
2276 def does_reverse_playback(self):
2277 if hasattr(self, "reverse_playback"):
2278 return bool(self.reverse_playback)
2282 def get_duration(self):
2284 return float(getattr(self, "duration"))
2285 except AttributeError:
2288 def get_min_tracks(self, track_type):
2290 return int(getattr(self, "min_%s_track" % track_type))
2291 except AttributeError:
2295 return "<Scenario %s>" % self.name
2298 class ScenarioManager(Loggable):
2300 system_scenarios = []
2301 special_scenarios = {}
2303 FILE_EXTENSION = "scenario"
2305 def __new__(cls, *args, **kwargs):
2306 if not cls._instance:
2307 cls._instance = super(ScenarioManager, cls).__new__(
2308 cls, *args, **kwargs)
2309 cls._instance.config = None
2310 cls._instance.discovered = False
2311 Loggable.__init__(cls._instance)
2313 return cls._instance
2315 def find_special_scenarios(self, mfile):
2317 mfile_bname = os.path.basename(mfile)
2319 for f in os.listdir(os.path.dirname(mfile)):
2320 if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
2321 scenarios.append(os.path.join(os.path.dirname(mfile), f))
2324 scenarios = self.discover_scenarios(scenarios, mfile)
2328 def discover_scenarios(self, scenario_paths=[], mfile=None):
2330 Discover scenarios specified in scenario_paths or the default ones
2331 if nothing specified there
2334 scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
2335 log_path = os.path.join(self.config.logsdir, "scenarios_discovery.log")
2336 logs = open(log_path, 'w')
2339 command = [GstValidateBaseTestManager.COMMAND,
2340 "--scenarios-defs-output-file", scenario_defs]
2341 command.extend(scenario_paths)
2342 subprocess.check_call(command, stdout=logs, stderr=logs)
2343 except subprocess.CalledProcessError as e:
2345 self.error('See %s' % log_path)
2348 config = configparser.RawConfigParser()
2349 f = open(scenario_defs)
2352 for section in config.sections():
2355 for scenario_path in scenario_paths:
2356 if section == scenario_path:
2358 name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2359 path = scenario_path
2361 # The real name of the scenario is:
2362 # filename.REALNAME.scenario
2363 name = scenario_path.replace(mfile + ".", "").replace(
2364 "." + self.FILE_EXTENSION, "")
2365 path = scenario_path
2368 name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2373 props = config.items(section)
2374 scenario = Scenario(name, props, path)
2376 self.special_scenarios[path] = scenario
2377 scenarios.append(scenario)
2379 if not scenario_paths:
2380 self.discovered = True
2381 self.system_scenarios.extend(scenarios)
2385 def get_scenario(self, name):
2386 if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2387 scenario = self.special_scenarios.get(name)
2391 scenarios = self.discover_scenarios([name])
2392 self.special_scenarios[name] = scenarios
2397 if self.discovered is False:
2398 self.discover_scenarios()
2401 return self.system_scenarios
2404 return [scenario for scenario in self.system_scenarios if scenario.name == name][0]
2406 self.warning("Scenario: %s not found" % name)
2410 class GstValidateBaseTestManager(TestsManager):
2411 scenarios_manager = ScenarioManager()
2415 super(GstValidateBaseTestManager, self).__init__()
2416 self._scenarios = []
2417 self._encoding_formats = []
2420 def update_commands(cls, extra_paths=None):
2421 for varname, cmd in {'': 'gst-validate',
2422 'TRANSCODING_': 'gst-validate-transcoding',
2423 'MEDIA_CHECK_': 'gst-validate-media-check',
2424 'RTSP_SERVER_': 'gst-validate-rtsp-server',
2425 'INSPECT_': 'gst-inspect'}.items():
2426 setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2429 def has_feature(cls, featurename):
2431 return cls.features_cache[featurename]
2436 subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2438 except subprocess.CalledProcessError:
2441 cls.features_cache[featurename] = res
2444 def add_scenarios(self, scenarios):
2446 @scenarios A list or a unic scenario name(s) to be run on the tests.
2447 They are just the default scenarios, and then depending on
2448 the TestsGenerator to be used you can have more fine grained
2449 control on what to be run on each series of tests.
2451 if isinstance(scenarios, list):
2452 self._scenarios.extend(scenarios)
2454 self._scenarios.append(scenarios)
2456 self._scenarios = list(set(self._scenarios))
2458 def set_scenarios(self, scenarios):
2460 Override the scenarios
2462 self._scenarios = []
2463 self.add_scenarios(scenarios)
2465 def get_scenarios(self):
2466 return self._scenarios
2468 def add_encoding_formats(self, encoding_formats):
2470 :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2471 formats for transcoding test.
2472 They are just the default encoding formats, and then depending on
2473 the TestsGenerator to be used you can have more fine grained
2474 control on what to be run on each series of tests.
2476 if isinstance(encoding_formats, list):
2477 self._encoding_formats.extend(encoding_formats)
2479 self._encoding_formats.append(encoding_formats)
2481 self._encoding_formats = list(set(self._encoding_formats))
2483 def get_encoding_formats(self):
2484 return self._encoding_formats
2487 GstValidateBaseTestManager.update_commands()
2490 class MediaDescriptor(Loggable):
2493 Loggable.__init__(self)
2496 raise NotImplemented
2498 def has_frames(self):
2501 def get_framerate(self):
2502 for ttype, caps_str in self.get_tracks_caps():
2503 if ttype != "video":
2506 caps = utils.GstCaps.new_from_str(caps_str)
2508 self.warning("Could not create caps for %s" % caps_str)
2511 framerate = caps[0].get("framerate")
2515 return Fraction(0, 1)
2517 def get_media_filepath(self):
2518 raise NotImplemented
2520 def skip_parsers(self):
2524 raise NotImplemented
2527 raise NotImplemented
2529 def get_duration(self):
2530 raise NotImplemented
2532 def get_protocol(self):
2533 raise NotImplemented
2535 def is_seekable(self):
2536 raise NotImplemented
2539 raise NotImplemented
2542 raise NotImplemented
2544 def get_num_tracks(self, track_type):
2545 raise NotImplemented
2547 def get_tracks_caps(self):
2550 def can_play_reverse(self):
2551 raise NotImplemented
2556 def is_compatible(self, scenario):
2557 if scenario is None:
2560 if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2561 self.debug("Do not run %s as %s does not support seeking",
2562 scenario, self.get_uri())
2565 if self.is_image() and scenario.needs_clock_sync():
2566 self.debug("Do not run %s as %s is an image",
2567 scenario, self.get_uri())
2570 if not self.can_play_reverse() and scenario.does_reverse_playback():
2573 if not self.is_live() and scenario.needs_live_content():
2574 self.debug("Do not run %s as %s is not a live content",
2575 scenario, self.get_uri())
2578 if self.is_live() and not scenario.compatible_with_live_content():
2579 self.debug("Do not run %s as %s is a live content",
2580 scenario, self.get_uri())
2583 if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2586 if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2588 "Do not run %s as %s is too short (%i < min media duation : %i",
2589 scenario, self.get_uri(),
2590 self.get_duration() / GST_SECOND,
2591 scenario.get_min_media_duration())
2594 for track_type in ['audio', 'subtitle', 'video']:
2595 if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2596 self.debug("%s -- %s | At least %s %s track needed < %s"
2597 % (scenario, self.get_uri(), track_type,
2598 scenario.get_min_tracks(track_type),
2599 self.get_num_tracks(track_type)))
2605 class GstValidateMediaDescriptor(MediaDescriptor):
2606 # Some extension file for discovering results
2607 SKIPPED_MEDIA_INFO_EXT = "media_info.skipped"
2608 MEDIA_INFO_EXT = "media_info"
2609 PUSH_MEDIA_INFO_EXT = "media_info.push"
2610 STREAM_INFO_EXT = "stream_info"
2612 __all_descriptors = {}
2615 def get(cls, xml_path):
2616 if xml_path in cls.__all_descriptors:
2617 return cls.__all_descriptors[xml_path]
2618 return GstValidateMediaDescriptor(xml_path)
2620 def __init__(self, xml_path):
2621 super(GstValidateMediaDescriptor, self).__init__()
2623 self._media_file_path = None
2624 main_descriptor = self.__all_descriptors.get(xml_path)
2626 self._copy_data_from_main(main_descriptor)
2628 self.__all_descriptors[xml_path] = self
2630 self._xml_path = xml_path
2632 media_xml = ET.parse(xml_path).getroot()
2633 except xml.etree.ElementTree.ParseError:
2634 printc("Could not parse %s" % xml_path,
2637 self._extract_data(media_xml)
2639 self.set_protocol(urllib.parse.urlparse(self.get_uri()).scheme)
2641 def skip_parsers(self):
2642 return self._skip_parsers
2644 def has_frames(self):
2645 return self._has_frames
2647 def _copy_data_from_main(self, main_descriptor):
2648 for attr in main_descriptor.__dict__.keys():
2649 setattr(self, attr, getattr(main_descriptor, attr))
2651 def _extract_data(self, media_xml):
2652 # Extract the information we need from the xml
2653 self._caps = media_xml.findall("streams")[0].attrib["caps"]
2654 self._track_caps = []
2656 streams = media_xml.findall("streams")[0].findall("stream")
2660 for stream in streams:
2661 self._track_caps.append(
2662 (stream.attrib["type"], stream.attrib["caps"]))
2664 self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2665 self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2666 self._duration = int(media_xml.attrib["duration"])
2667 self._uri = media_xml.attrib["uri"]
2668 parsed_uri = urllib.parse.urlparse(self.get_uri())
2669 self._protocol = media_xml.get("protocol", parsed_uri.scheme)
2670 if parsed_uri.scheme == "file":
2671 if not os.path.exists(parsed_uri.path) and os.path.exists(self.get_media_filepath()):
2672 self._uri = "file://" + self.get_media_filepath()
2673 elif parsed_uri.scheme == Protocols.IMAGESEQUENCE:
2674 self._media_file_path = os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(parsed_uri.path))
2675 self._uri = parsed_uri._replace(path=os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(self._media_file_path))).geturl()
2676 self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2677 self._is_live = media_xml.get("live", "false").lower() == "true"
2678 self._is_image = False
2679 for stream in media_xml.findall("streams")[0].findall("stream"):
2680 if stream.attrib["type"] == "image":
2681 self._is_image = True
2682 self._track_types = []
2683 for stream in media_xml.findall("streams")[0].findall("stream"):
2684 self._track_types.append(stream.attrib["type"])
2686 def __cleanup_media_info_ext(self):
2687 for ext in [self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT,
2688 self.SKIPPED_MEDIA_INFO_EXT, ]:
2689 if self._xml_path.endswith(ext):
2690 return self._xml_path[:len(self._xml_path) - (len(ext) + 1)]
2692 assert "Not reached" == None # noqa
2695 def new_from_uri(uri, verbose=False, include_frames=False, is_push=False, is_skipped=False):
2697 include_frames = 0 # Never
2698 include_frames = 1 # always
2699 include_frames = 2 # if previous file included them
2702 media_path = utils.url2path(uri)
2704 ext = GstValidateMediaDescriptor.MEDIA_INFO_EXT
2706 ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT
2708 ext = GstValidateMediaDescriptor.SKIPPED_MEDIA_INFO_EXT
2709 descriptor_path = "%s.%s" % (media_path, ext)
2710 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2711 if include_frames == 2:
2713 media_xml = ET.parse(descriptor_path).getroot()
2714 prev_uri = urllib.parse.urlparse(media_xml.attrib['uri'])
2715 if prev_uri.scheme == Protocols.IMAGESEQUENCE:
2716 parsed_uri = urllib.parse.urlparse(uri)
2717 uri = prev_uri._replace(path=os.path.join(os.path.dirname(parsed_uri.path), os.path.basename(prev_uri.path))).geturl()
2718 include_frames = bool(int(media_xml.attrib["frame-detection"]))
2719 if bool(int(media_xml.attrib.get("skip-parsers", 0))):
2720 args.append("--skip-parsers")
2721 except FileNotFoundError:
2724 include_frames = bool(include_frames)
2727 args.extend(["--output-file", descriptor_path])
2729 args.extend(["--full"])
2732 printc("Generating media info for %s\n"
2733 " Command: '%s'" % (media_path, ' '.join(args)),
2737 subprocess.check_output(args, stderr=open(os.devnull))
2738 except subprocess.CalledProcessError as e:
2740 printc("Result: Failed", Colors.FAIL)
2742 loggable.warning("GstValidateMediaDescriptor",
2743 "Exception: %s" % e)
2747 printc("Result: Passed", Colors.OKGREEN)
2750 return GstValidateMediaDescriptor(descriptor_path)
2751 except (IOError, xml.etree.ElementTree.ParseError):
2755 return self._xml_path
2757 def need_clock_sync(self):
2758 return Protocols.needs_clock_sync(self.get_protocol())
2760 def get_media_filepath(self):
2761 if self._media_file_path is None:
2762 self._media_file_path = self.__cleanup_media_info_ext()
2763 return self._media_file_path
2768 def get_tracks_caps(self):
2769 return self._track_caps
2774 def get_duration(self):
2775 return self._duration
2777 def set_protocol(self, protocol):
2778 if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2779 self._protocol = Protocols.PUSHFILE
2781 self._protocol = protocol
2783 def get_protocol(self):
2784 return self._protocol
2786 def is_seekable(self):
2787 return self._is_seekable
2790 return self._is_live
2792 def can_play_reverse(self):
2796 return self._is_image
2798 def get_num_tracks(self, track_type):
2800 for t in self._track_types:
2806 def get_clean_name(self):
2807 name = os.path.basename(self.get_path())
2808 regex = '|'.join(['\\.%s$' % ext for ext in [self.SKIPPED_MEDIA_INFO_EXT, self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT]])
2809 name = re.sub(regex, "", name)
2811 return name.replace('.', "_")
2814 class MediaFormatCombination(object):
2815 FORMATS = {"aac": "audio/mpeg,mpegversion=4", # Audio
2816 "ac3": "audio/x-ac3",
2817 "vorbis": "audio/x-vorbis",
2818 "mp3": "audio/mpeg,mpegversion=1,layer=3",
2819 "opus": "audio/x-opus",
2820 "rawaudio": "audio/x-raw",
2823 "h264": "video/x-h264",
2824 "h265": "video/x-h265",
2825 "vp8": "video/x-vp8",
2826 "vp9": "video/x-vp9",
2827 "theora": "video/x-theora",
2828 "prores": "video/x-prores",
2829 "jpeg": "image/jpeg",
2832 "webm": "video/webm",
2833 "ogg": "application/ogg",
2834 "mkv": "video/x-matroska",
2835 "mp4": "video/quicktime,variant=iso;",
2836 "quicktime": "video/quicktime;"}
2839 return "%s and %s in %s" % (self.audio, self.video, self.container)
2841 def __init__(self, container, audio, video, duration_factor=1,
2842 video_restriction=None, audio_restriction=None):
2844 Describes a media format to be used for transcoding tests.
2846 :param container: A string defining the container format to be used, must bin in self.FORMATS
2847 :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2848 :param video: A string defining the video format to be used, must bin in self.FORMATS
2850 self.container = container
2853 self.video_restriction = video_restriction
2854 self.audio_restriction = audio_restriction
2856 def get_caps(self, track_type):
2858 return self.FORMATS[self.__dict__[track_type]]
2862 def get_audio_caps(self):
2863 return self.get_caps("audio")
2865 def get_video_caps(self):
2866 return self.get_caps("video")
2868 def get_muxer_caps(self):
2869 return self.get_caps("container")