3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
20 """ Class representing tests and test managers. """
43 from itertools import cycle
44 from fractions import Fraction
46 from .utils import which
47 from . import reporters
48 from . import loggable
49 from .loggable import Loggable
51 from collections import defaultdict
53 from lxml import etree as ET
55 import xml.etree.cElementTree as ET
58 from .vfb_server import get_virual_frame_buffer_server
59 from .httpserver import HTTPServer
60 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
61 Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
62 check_bugs_resolution, is_tty
64 # The factor by which we increase the hard timeout when running inside
66 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
68 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
69 # The error reported by valgrind when detecting errors
70 VALGRIND_ERROR_CODE = 20
72 VALIDATE_OVERRIDE_EXTENSION = ".override"
73 EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
74 'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
75 'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
76 EXITING_SIGNALS.update({139: "SIGSEGV"})
77 EXITING_SIGNALS.update({(v, k) for k, v in EXITING_SIGNALS.items()})
80 CI_ARTIFACTS_URL = os.environ.get('CI_ARTIFACTS_URL')
85 """ A class representing a particular test. """
87 def __init__(self, application_name, classname, options,
88 reporter, duration=0, timeout=DEFAULT_TIMEOUT,
89 hard_timeout=None, extra_env_variables=None,
90 expected_issues=None, is_parallel=True,
93 @timeout: The timeout during which the value return by get_current_value
94 keeps being exactly equal
95 @hard_timeout: Max time the test can take in absolute
97 Loggable.__init__(self)
98 self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
100 self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
101 self.hard_timeout *= options.timeout_factor
103 self.hard_timeout = hard_timeout
104 self.classname = classname
105 self.options = options
106 self.application = application_name
108 self.server_command = None
109 self.reporter = reporter
114 self.duration = duration
115 self.stack_trace = None
117 if expected_issues is None:
118 self.expected_issues = []
119 elif not isinstance(expected_issues, list):
120 self.expected_issues = [expected_issues]
122 self.expected_issues = expected_issues
124 extra_env_variables = extra_env_variables or {}
125 self.extra_env_variables = extra_env_variables
126 self.optional = False
127 self.is_parallel = is_parallel
128 self.generator = None
129 self.workdir = workdir
130 self.allow_flakiness = False
132 self.rr_logdir = None
136 def _generate_expected_issues(self):
139 def generate_expected_issues(self):
140 res = '%s"FIXME \'%s\' issues [REPORT A BUG ' % (" " * 4, self.classname) \
141 + 'in https://gitlab.freedesktop.org/gstreamer/ '\
142 + 'or use a proper bug description]": {'
147 "issues": [""" % (self.classname)
149 retcode = self.process.returncode if self.process else 0
151 signame = EXITING_SIGNALS.get(retcode)
152 val = "'" + signame + "'" if signame else retcode
156 },""" % ("signame" if signame else "returncode", val)
158 res += self._generate_expected_issues()
159 res += "\n%s],\n%s},\n" % (" " * 8, " " * 4)
163 def copy(self, nth=None):
164 copied_test = copy.copy(self)
166 copied_test.classname += '_it' + str(nth)
167 copied_test.options = copy.copy(self.options)
168 copied_test.options.logsdir = os.path.join(copied_test.options.logsdir, str(nth))
169 os.makedirs(copied_test.options.logsdir, exist_ok=True)
174 self.kill_subprocess()
177 self.time_taken = 0.0
178 self._starting_time = None
179 self.result = Result.NOT_RUN
182 self.extra_logfiles = set()
183 self.__env_variable = []
184 self.kill_subprocess()
188 string = self.classname
189 if self.result != Result.NOT_RUN:
190 string += ": " + self.result
191 if self.result in [Result.FAILED, Result.TIMEOUT]:
192 string += " '%s'" % self.message
193 if not self.options.dump_on_failure:
194 if not self.options.redirect_logs and self.result != Result.PASSED:
195 string += self.get_logfile_repr()
197 string = "\n==> %s" % string
201 def add_env_variable(self, variable, value=None):
203 Only useful so that the gst-validate-launcher can print the exact
204 right command line to reproduce the tests
207 value = os.environ.get(variable, None)
212 self.__env_variable.append(variable)
215 def _env_variable(self):
217 if not self.options.verbose or self.options.verbose > 1:
218 for var in set(self.__env_variable):
221 value = self.proc_env.get(var, None)
222 if value is not None:
223 res += "%s='%s'" % (var, value)
225 res += "[Not displaying environment variables, rerun with -vv for the full command]"
229 def open_logfile(self):
233 path = os.path.join(self.options.logsdir,
234 self.classname.replace(".", os.sep) + '.md')
235 mkdir(os.path.dirname(path))
238 if self.options.redirect_logs == 'stdout':
239 self.out = sys.stdout
240 elif self.options.redirect_logs == 'stderr':
241 self.out = sys.stderr
243 self.out = open(path, 'w+')
245 def finalize_logfiles(self):
246 self.out.write("\n**Duration**: %s" % self.time_taken)
247 if not self.options.redirect_logs:
249 for logfile in self.extra_logfiles:
250 # Only copy over extra logfile content if it's below a certain threshold
251 # Avoid copying gigabytes of data if a lot of debugging is activated
252 if os.path.getsize(logfile) < 500 * 1024:
253 self.out.write('\n\n## %s:\n\n```\n%s\n```\n' % (
254 os.path.basename(logfile), self.get_extra_log_content(logfile))
257 self.out.write('\n\n## %s:\n\n**Log file too big.**\n %s\n\n Check file content directly\n\n' % (
258 os.path.basename(logfile), logfile)
262 self.out.write('\n\n## rr trace:\n\n```\nrr replay %s/latest-trace\n```\n' % (
268 if self.options.html:
269 self.html_log = os.path.splitext(self.logfile)[0] + '.html'
271 parser = commonmark.Parser()
272 with open(self.logfile) as f:
273 ast = parser.parse(f.read())
275 renderer = commonmark.HtmlRenderer()
276 html = renderer.render(ast)
277 with open(self.html_log, 'w') as f:
282 def _get_file_content(self, file_name):
283 f = open(file_name, 'r+')
289 def get_log_content(self):
290 return self._get_file_content(self.logfile)
292 def get_extra_log_content(self, extralog):
293 if extralog not in self.extra_logfiles:
296 return self._get_file_content(extralog)
298 def get_classname(self):
299 name = self.classname.split('.')[-1]
300 classname = self.classname.replace('.%s' % name, '')
305 return self.classname.split('.')[-1]
308 if self._uuid is None:
309 self._uuid = self.classname + str(uuid.uuid4())
312 def add_arguments(self, *args):
315 def build_arguments(self):
316 self.add_env_variable("LD_PRELOAD")
317 self.add_env_variable("DISPLAY")
319 def add_stack_trace_to_logfile(self):
320 self.debug("Adding stack trace")
324 trace_gatherer = BackTraceGenerator.get_default()
325 stack_trace = trace_gatherer.get_trace(self)
330 info = "\n\n## Stack trace\n\n```\n%s\n```" % stack_trace
331 if self.options.redirect_logs:
335 if self.options.xunit_file:
336 self.stack_trace = stack_trace
341 def add_known_issue_information(self):
342 if self.expected_issues:
343 info = "\n\n## Already known issues\n\n``` python\n%s\n```\n\n" % (
344 json.dumps(self.expected_issues, indent=4)
349 info += "\n\n**You can mark the issues as 'known' by adding the " \
350 + " following lines to the list of known issues**\n" \
351 + "\n\n``` python\n%s\n```" % (self.generate_expected_issues())
353 if self.options.redirect_logs:
359 def set_result(self, result, message="", error=""):
361 if not self.options.redirect_logs:
362 self.out.write("\n```\n")
365 self.debug("Setting result: %s (message: %s, error: %s)" % (result,
368 if result is Result.TIMEOUT:
369 if self.options.debug is True:
371 printc("Timeout, you should process <ctrl>c to get into gdb",
373 # and wait here until gdb exits
374 self.process.communicate()
376 pname = self.command[0]
377 input("%sTimeout happened on %s you can attach gdb doing:\n $gdb %s %d%s\n"
378 "Press enter to continue" % (Colors.FAIL, self.classname,
379 pname, self.process.pid, Colors.ENDC))
381 self.add_stack_trace_to_logfile()
384 self.message = message
385 self.error_str = error
387 if result not in [Result.PASSED, Result.NOT_RUN, Result.SKIPPED]:
388 self.add_known_issue_information()
390 def check_results(self):
391 if self.result is Result.FAILED or self.result is Result.TIMEOUT:
394 self.debug("%s returncode: %s", self, self.process.returncode)
395 if self.options.rr and self.process.returncode == -signal.SIGPIPE:
396 self.set_result(Result.SKIPPED, "SIGPIPE received under `rr`, known issue.")
397 elif self.process.returncode == 0:
398 self.set_result(Result.PASSED)
399 elif self.process.returncode in EXITING_SIGNALS:
400 self.add_stack_trace_to_logfile()
401 self.set_result(Result.FAILED,
402 "Application exited with signal %s" % (
403 EXITING_SIGNALS[self.process.returncode]))
404 elif self.process.returncode == VALGRIND_ERROR_CODE:
405 self.set_result(Result.FAILED, "Valgrind reported errors")
407 self.set_result(Result.FAILED,
408 "Application returned %d" % (self.process.returncode))
410 def get_current_value(self):
412 Lets subclasses implement a nicer timeout measurement method
413 They should return some value with which we will compare
414 the previous and timeout if they are egual during self.timeout
417 return Result.NOT_RUN
419 def process_update(self):
421 Returns True when process has finished running or has timed out.
424 if self.process is None:
425 # Process has not started running yet
429 if self.process.returncode is not None:
432 val = self.get_current_value()
434 self.debug("Got value: %s" % val)
435 if val is Result.NOT_RUN:
436 # The get_current_value logic is not implemented... dumb
438 if time.time() - self.last_change_ts > self.timeout:
439 self.set_result(Result.TIMEOUT,
440 "Application timed out: %s secs" %
445 elif val is Result.FAILED:
447 elif val is Result.KNOWN_ERROR:
450 self.log("New val %s" % val)
452 if val == self.last_val:
453 delta = time.time() - self.last_change_ts
454 self.debug("%s: Same value for %d/%d seconds" %
455 (self, delta, self.timeout))
456 if delta > self.timeout:
457 self.set_result(Result.TIMEOUT,
458 "Application timed out: %s secs" %
462 elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
464 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
467 self.last_change_ts = time.time()
472 def get_subproc_env(self):
473 return os.environ.copy()
475 def kill_subprocess(self):
477 if self.options.rr and self.process and self.process.returncode is None:
478 cmd = ["ps", "-o", "pid", "--ppid", str(self.process.pid), "--noheaders"]
480 subprocs_id = [int(pid.strip('\n')) for
481 pid in subprocess.check_output(cmd).decode().split(' ') if pid]
482 except FileNotFoundError:
483 self.error("Ps not found, will probably not be able to get rr "
484 "working properly after we kill the process")
485 except subprocess.CalledProcessError as e:
486 self.error("Couldn't get rr subprocess pid: %s" % (e))
488 utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT, subprocs_id)
490 def run_external_checks(self):
493 def thread_wrapper(self):
495 # Restore the SIGINT handler for the child process (gdb) to ensure
497 signal.signal(signal.SIGINT, signal.SIG_DFL)
499 if self.options.gdb and os.name != "nt":
500 preexec_fn = enable_sigint
504 self.process = subprocess.Popen(self.command,
509 preexec_fn=preexec_fn)
511 if self.result is not Result.TIMEOUT:
512 if self.process.returncode == 0:
513 self.run_external_checks()
516 def get_valgrind_suppression_file(self, subdir, name):
517 p = get_data_file(subdir, name)
521 self.error("Could not find any %s file" % name)
523 def get_valgrind_suppressions(self):
524 return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
526 def use_gdb(self, command):
527 if self.hard_timeout is not None:
528 self.hard_timeout *= GDB_TIMEOUT_FACTOR
529 self.timeout *= GDB_TIMEOUT_FACTOR
531 if not self.options.gdb_non_stop:
532 self.timeout = sys.maxsize
533 self.hard_timeout = sys.maxsize
536 if self.options.gdb_non_stop:
537 args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
538 args += ["--args"] + command
541 def use_rr(self, command, subenv):
542 command = ["rr", 'record', '-h'] + command
544 self.timeout *= RR_TIMEOUT_FACTOR
545 self.rr_logdir = os.path.join(self.options.logsdir, self.classname.replace(".", os.sep), 'rr-logs')
546 subenv['_RR_TRACE_DIR'] = self.rr_logdir
548 shutil.rmtree(self.rr_logdir, ignore_errors=False, onerror=None)
549 except FileNotFoundError:
551 self.add_env_variable('_RR_TRACE_DIR', self.rr_logdir)
555 def use_valgrind(self, command, subenv):
556 vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
557 self.extra_logfiles.add(vglogsfile)
561 for o, v in [('trace-children', 'yes'),
562 ('tool', 'memcheck'),
563 ('leak-check', 'full'),
564 ('leak-resolution', 'high'),
565 # TODO: errors-for-leak-kinds should be set to all instead of definite
566 # and all false positives should be added to suppression
568 ('errors-for-leak-kinds', 'definite,indirect'),
569 ('show-leak-kinds', 'definite,indirect'),
570 ('show-possibly-lost', 'no'),
571 ('num-callers', '20'),
572 ('error-exitcode', str(VALGRIND_ERROR_CODE)),
573 ('gen-suppressions', 'all')]:
574 vg_args.append("--%s=%s" % (o, v))
576 if not self.options.redirect_logs:
577 vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
578 self.extra_logfiles.add(vglogsfile)
579 vg_args.append("--%s=%s" % ('log-file', vglogsfile))
581 for supp in self.get_valgrind_suppressions():
582 vg_args.append("--suppressions=%s" % supp)
584 command = ["valgrind"] + vg_args + command
586 # Tune GLib's memory allocator to be more valgrind friendly
587 subenv['G_DEBUG'] = 'gc-friendly'
588 subenv['G_SLICE'] = 'always-malloc'
590 if self.hard_timeout is not None:
591 self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
592 self.timeout *= VALGRIND_TIMEOUT_FACTOR
594 # Enable 'valgrind.config'
595 self.add_validate_config(get_data_file(
596 'data', 'valgrind.config'), subenv)
597 if subenv == self.proc_env:
598 self.add_env_variable('G_DEBUG', 'gc-friendly')
599 self.add_env_variable('G_SLICE', 'always-malloc')
600 self.add_env_variable('GST_VALIDATE_CONFIG',
601 self.proc_env['GST_VALIDATE_CONFIG'])
605 def add_validate_config(self, config, subenv=None):
607 subenv = self.extra_env_variables
609 cconf = subenv.get('GST_VALIDATE_CONFIG', "")
610 paths = [c for c in cconf.split(os.pathsep) if c] + [config]
611 subenv['GST_VALIDATE_CONFIG'] = os.pathsep.join(paths)
613 def launch_server(self):
616 def get_logfile_repr(self):
617 if not self.options.redirect_logs:
624 log = CI_ARTIFACTS_URL + os.path.relpath(log, self.options.logsdir)
626 return "\n Log: %s" % (log)
630 def get_command_repr(self):
631 message = "%s %s" % (self._env_variable, ' '.join(
632 shlex.quote(arg) for arg in self.command))
633 if self.server_command:
634 message = "%s & %s" % (self.server_command, message)
638 def test_start(self, queue):
641 self.server_command = self.launch_server()
643 self.command = [self.application]
644 self._starting_time = time.time()
645 self.build_arguments()
646 self.proc_env = self.get_subproc_env()
648 for var, value in list(self.extra_env_variables.items()):
649 value = self.proc_env.get(var, '') + os.pathsep + value
650 self.proc_env[var] = value.strip(os.pathsep)
651 self.add_env_variable(var, self.proc_env[var])
654 self.command = self.use_gdb(self.command)
656 self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
657 # Make the gst-validate executable ignore SIGINT while gdb is
659 signal.signal(signal.SIGINT, signal.SIG_IGN)
661 if self.options.valgrind:
662 self.command = self.use_valgrind(self.command, self.proc_env)
665 self.command = self.use_rr(self.command, self.proc_env)
667 if not self.options.redirect_logs:
668 self.out.write("# `%s`\n\n"
669 "## Command\n\n``` bash\n%s\n```\n\n" % (
670 self.classname, self.get_command_repr()))
671 self.out.write("## %s output\n\n``` \n\n" % os.path.basename(self.application))
674 message = "Launching: %s%s\n" \
675 " Command: %s\n" % (Colors.ENDC, self.classname,
676 self.get_command_repr())
677 printc(message, Colors.OKBLUE)
679 self.thread = threading.Thread(target=self.thread_wrapper)
683 self.last_change_ts = time.time()
684 self.start_ts = time.time()
686 def _dump_log_file(self, logfile):
689 subprocess.check_call(['bat', '-H', '1', '--paging=never', logfile])
691 except (subprocess.CalledProcessError, FileNotFoundError):
694 with open(logfile, 'r') as fin:
695 for line in fin.readlines():
696 print('> ' + line, end='')
698 def _dump_log_files(self):
699 self._dump_log_file(self.logfile)
701 def copy_logfiles(self, extra_folder="flaky_tests"):
702 path = os.path.dirname(os.path.join(self.options.logsdir, extra_folder,
703 self.classname.replace(".", os.sep)))
705 self.logfile = shutil.copy(self.logfile, path)
707 for logfile in self.extra_logfiles:
708 extra_logs.append(shutil.copy(logfile, path))
709 self.extra_logfiles = extra_logs
711 def test_end(self, retry_on_failure=False):
712 self.kill_subprocess()
714 self.time_taken = time.time() - self._starting_time
717 signal.signal(signal.SIGINT, self.previous_sigint_handler)
719 self.finalize_logfiles()
723 if self.options.dump_on_failure:
724 if self.result not in [Result.PASSED, Result.KNOWN_ERROR, Result.NOT_RUN]:
725 self._dump_log_files()
727 # Only keep around env variables we need later
729 for n in self.__env_variable:
730 clean_env[n] = self.proc_env.get(n, None)
731 self.proc_env = clean_env
733 # Don't keep around JSON report objects, they were processed
734 # in check_results already
740 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
744 class GstValidateListener(socketserver.BaseRequestHandler, Loggable):
746 def __init__(self, *args, **kwargs):
747 super().__init__(*args, **kwargs)
748 Loggable.__init__(self, "GstValidateListener")
751 """Implements BaseRequestHandler handle method"""
753 self.logCategory = "GstValidateListener"
755 raw_len = self.request.recv(4)
758 msglen = struct.unpack('>I', raw_len)[0]
761 while msglen != len(raw_msg):
762 raw_msg += self.request.recv(msglen - len(raw_msg))
766 msg = raw_msg.decode('utf-8', 'ignore')
767 except UnicodeDecodeError as e:
768 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
775 obj = json.loads(msg)
776 except json.decoder.JSONDecodeError as e:
777 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
781 # First message must contain the uuid
782 uuid = obj.get("uuid", None)
785 # Find test from launcher
786 for t in self.server.launcher.tests:
787 if uuid == t.get_uuid():
791 self.server.launcher.error(
792 "Could not find test for UUID %s" % uuid)
795 obj_type = obj.get("type", '')
796 if obj_type == 'position':
797 test.set_position(obj['position'], obj['duration'],
799 elif obj_type == 'buffering':
800 test.set_position(obj['position'], 100)
801 elif obj_type == 'action':
802 test.add_action_execution(obj)
803 # Make sure that action is taken into account when checking if process
806 elif obj_type == 'action-done':
807 # Make sure that action end is taken into account when checking if process
810 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
811 elif obj_type == 'report':
813 elif obj_type == 'skip-test':
814 test.set_result(Result.SKIPPED)
817 class GstValidateTest(Test):
819 """ A class representing a particular test. """
820 HARD_TIMEOUT_FACTOR = 5
821 fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
822 needs_gst_inspect = set()
824 def __init__(self, application_name, classname,
825 options, reporter, duration=0,
826 timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
827 media_descriptor=None, extra_env_variables=None,
828 expected_issues=None, workdir=None):
830 extra_env_variables = extra_env_variables or {}
832 if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
834 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
836 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
840 # If we are running from source, use the -debug version of the
841 # application which is using rpath instead of libtool's wrappers. It's
842 # slightly faster to start and will not confuse valgrind.
843 debug = '%s-debug' % application_name
844 p = look_for_file_in_source_dir('tools', debug)
850 self.media_duration = -1
852 self.actions_infos = []
853 self.media_descriptor = media_descriptor
857 override_path = self.get_override_file(media_descriptor)
859 if extra_env_variables:
860 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
862 "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
864 extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
866 super(GstValidateTest, self).__init__(application_name, classname,
870 hard_timeout=hard_timeout,
871 extra_env_variables=extra_env_variables,
872 expected_issues=expected_issues,
874 if media_descriptor and media_descriptor.get_media_filepath():
875 config_file = os.path.join(media_descriptor.get_media_filepath() + '.config')
876 if os.path.isfile(config_file):
877 self.add_validate_config(config_file, extra_env_variables)
879 if scenario is None or scenario.name.lower() == "none":
882 self.scenario = scenario
884 def kill_subprocess(self):
885 Test.kill_subprocess(self)
887 def add_report(self, report):
888 self.reports.append(report)
890 def set_position(self, position, duration, speed=None):
891 self.position = position
892 self.media_duration = duration
896 def add_action_execution(self, action_infos):
897 self.actions_infos.append(action_infos)
899 def get_override_file(self, media_descriptor):
901 if media_descriptor.get_path():
902 override_path = os.path.splitext(media_descriptor.get_path())[
903 0] + VALIDATE_OVERRIDE_EXTENSION
904 if os.path.exists(override_path):
909 def get_current_position(self):
912 def get_current_value(self):
915 def get_subproc_env(self):
916 subproc_env = os.environ.copy()
918 if self.options.validate_default_config:
919 self.add_validate_config(self.options.validate_default_config,
922 subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
923 subproc_env["GST_VALIDATE_LOGSDIR"] = self.options.logsdir
925 if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
926 gstlogsfile = os.path.splitext(self.logfile)[0] + '.gstdebug'
927 self.extra_logfiles.add(gstlogsfile)
928 subproc_env["GST_DEBUG_FILE"] = gstlogsfile
930 if self.options.no_color:
931 subproc_env["GST_DEBUG_NO_COLOR"] = '1'
933 # Ensure XInitThreads is called, see bgo#731525
934 subproc_env['GST_GL_XINITTHREADS'] = '1'
935 self.add_env_variable('GST_GL_XINITTHREADS', '1')
937 if self.scenario is not None:
938 scenario = self.scenario.get_execution_name()
939 subproc_env["GST_VALIDATE_SCENARIO"] = scenario
940 self.add_env_variable("GST_VALIDATE_SCENARIO",
941 subproc_env["GST_VALIDATE_SCENARIO"])
944 del subproc_env["GST_VALIDATE_SCENARIO"]
948 if not subproc_env.get('GST_DEBUG_DUMP_DOT_DIR'):
949 dotfilesdir = os.path.join(self.options.logsdir,
950 self.classname.replace(".", os.sep) + '.pipelines_dot_files')
952 subproc_env['GST_DEBUG_DUMP_DOT_DIR'] = dotfilesdir
954 dotfilesurl = CI_ARTIFACTS_URL + os.path.relpath(dotfilesdir,
955 self.options.logsdir)
956 subproc_env['GST_VALIDATE_DEBUG_DUMP_DOT_URL'] = dotfilesurl
964 self.media_duration = -1
966 self.actions_infos = []
968 def build_arguments(self):
969 super(GstValidateTest, self).build_arguments()
970 if "GST_VALIDATE" in os.environ:
971 self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
973 if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
974 self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
975 os.environ["GST_VALIDATE_SCENARIOS_PATH"])
977 self.add_env_variable("GST_VALIDATE_CONFIG")
978 self.add_env_variable("GST_VALIDATE_OVERRIDE")
980 def get_extra_log_content(self, extralog):
981 value = Test.get_extra_log_content(self, extralog)
985 def report_matches_expected_issues(self, report, expected_issue):
986 for key in ['bug', 'bugs', 'sometimes']:
987 if key in expected_issue:
988 del expected_issue[key]
989 for key, value in list(report.items()):
990 if key in expected_issue:
991 if not re.findall(expected_issue[key], str(value)):
993 expected_issue.pop(key)
995 if "can-happen-several-times" in expected_issue:
996 expected_issue.pop("can-happen-several-times")
997 return not bool(expected_issue)
999 def check_reported_issues(self, expected_issues):
1001 expected_retcode = [0]
1002 for report in self.reports:
1004 for expected_issue in expected_issues:
1005 if self.report_matches_expected_issues(report,
1006 expected_issue.copy()):
1007 found = expected_issue
1010 if found is not None:
1011 if not found.get('can-happen-several-times', False):
1012 expected_issues.remove(found)
1013 if report['level'] == 'critical':
1014 if found.get('sometimes', True) and isinstance(expected_retcode, list):
1015 expected_retcode.append(18)
1017 expected_retcode = [18]
1018 elif report['level'] == 'critical':
1022 return None, expected_issues, expected_retcode
1024 return ret, expected_issues, expected_retcode
1026 def check_expected_issue(self, expected_issue):
1029 expected_symbols = expected_issue.get('stacktrace_symbols')
1030 if expected_symbols:
1031 trace_gatherer = BackTraceGenerator.get_default()
1032 stack_trace = trace_gatherer.get_trace(self)
1035 if not isinstance(expected_symbols, list):
1036 expected_symbols = [expected_symbols]
1038 not_found_symbols = [s for s in expected_symbols
1039 if s not in stack_trace]
1040 if not_found_symbols:
1041 msg = " Expected symbols '%s' not found in stack trace " % (
1045 msg += " No stack trace available, could not verify symbols "
1047 _, not_found_expected_issues, _ = self.check_reported_issues(expected_issue.get('issues', []))
1048 if not_found_expected_issues:
1049 mandatory_failures = [f for f in not_found_expected_issues
1050 if not f.get('sometimes', True)]
1051 if mandatory_failures:
1052 msg = " (Expected issues not found: %s) " % mandatory_failures
1057 def check_expected_timeout(self, expected_timeout):
1058 msg = "Expected timeout happened. "
1059 result = Result.PASSED
1060 message = expected_timeout.get('message')
1062 if not re.findall(message, self.message):
1063 result = Result.FAILED
1064 msg = "Expected timeout message: %s got %s " % (
1065 message, self.message)
1067 stack_msg, stack_res = self.check_expected_issue(expected_timeout)
1069 result = Result.TIMEOUT
1074 def check_results(self):
1075 if self.result in [Result.FAILED, Result.PASSED, Result.SKIPPED]:
1078 self.debug("%s returncode: %s", self, self.process.returncode)
1079 expected_issues = copy.deepcopy(self.expected_issues)
1081 # signal.SIGPPIPE is 13 but it sometimes isn't present in python for some reason.
1082 expected_issues.append({"returncode": -13, "sometimes": True})
1083 self.criticals, not_found_expected_issues, expected_returncode = self.check_reported_issues(expected_issues)
1084 expected_timeout = None
1085 expected_signal = None
1086 for i, f in enumerate(not_found_expected_issues):
1087 returncode = f.get('returncode', [])
1088 if not isinstance(returncode, list):
1089 returncode = [returncode]
1091 if f.get('signame'):
1092 signames = f['signame']
1093 if not isinstance(signames, list):
1094 signames = [signames]
1096 returncode = [EXITING_SIGNALS[signame] for signame in signames]
1099 if 'sometimes' in f:
1100 returncode.append(0)
1101 expected_returncode = returncode
1103 elif f.get("timeout"):
1104 expected_timeout = f
1106 not_found_expected_issues = [f for f in not_found_expected_issues
1107 if not f.get('returncode') and not f.get('signame')]
1110 result = Result.PASSED
1111 if self.result == Result.TIMEOUT:
1112 with open(self.logfile) as f:
1113 signal_fault_info = self.fault_sig_regex.findall(f.read())
1114 if signal_fault_info:
1115 result = Result.FAILED
1116 msg = signal_fault_info[0]
1117 elif expected_timeout:
1118 not_found_expected_issues.remove(expected_timeout)
1119 result, msg = self.check_expected_timeout(expected_timeout)
1122 elif self.process.returncode in EXITING_SIGNALS:
1123 msg = "Application exited with signal %s" % (
1124 EXITING_SIGNALS[self.process.returncode])
1125 if self.process.returncode not in expected_returncode:
1126 result = Result.FAILED
1129 stack_msg, stack_res = self.check_expected_issue(
1133 result = Result.FAILED
1134 self.add_stack_trace_to_logfile()
1135 elif self.process.returncode == VALGRIND_ERROR_CODE:
1136 msg = "Valgrind reported errors "
1137 result = Result.FAILED
1138 elif self.process.returncode not in expected_returncode:
1139 msg = "Application returned %s " % self.process.returncode
1140 if expected_returncode != [0]:
1141 msg += "(expected %s) " % expected_returncode
1142 result = Result.FAILED
1145 msg += "(critical errors: [%s]) " % ', '.join(set([c['summary']
1146 for c in self.criticals]))
1147 result = Result.FAILED
1149 if not_found_expected_issues:
1150 mandatory_failures = [f for f in not_found_expected_issues
1151 if not f.get('sometimes', True)]
1153 if mandatory_failures:
1154 msg += " (Expected errors not found: %s) " % mandatory_failures
1155 result = Result.FAILED
1156 elif self.expected_issues:
1157 msg += ' %s(Expected errors occurred: %s)%s' % (Colors.OKBLUE,
1158 self.expected_issues,
1160 result = Result.KNOWN_ERROR
1162 if result == Result.PASSED:
1163 for report in self.reports:
1164 if report["level"] == "expected":
1165 result = Result.KNOWN_ERROR
1168 self.set_result(result, msg.strip())
1170 def _generate_expected_issues(self):
1172 self.criticals = self.criticals or []
1173 if self.result == Result.TIMEOUT:
1179 for report in self.criticals:
1180 res += "\n%s{" % (" " * 12)
1182 for key, value in report.items():
1187 res += '\n%s%s"%s": "%s",' % (
1188 " " * 16, "# " if key == "details" else "",
1189 key, value.replace('\n', '\\n'))
1191 res += "\n%s}," % (" " * 12)
1195 def get_valgrind_suppressions(self):
1196 result = super(GstValidateTest, self).get_valgrind_suppressions()
1197 result.extend(utils.get_gst_build_valgrind_suppressions())
1198 gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
1200 result.append(gst_sup)
1205 class GstValidateEncodingTestInterface(object):
1206 DURATION_TOLERANCE = GST_SECOND / 4
1208 def __init__(self, combination, media_descriptor, duration_tolerance=None):
1209 super(GstValidateEncodingTestInterface, self).__init__()
1211 self.media_descriptor = media_descriptor
1212 self.combination = combination
1215 self._duration_tolerance = duration_tolerance
1216 if duration_tolerance is None:
1217 self._duration_tolerance = self.DURATION_TOLERANCE
1219 def get_current_size(self):
1221 size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
1225 self.debug("Size: %s" % size)
1228 def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1229 audio_restriction=None, audio_presence=0,
1230 video_presence=0, variable_framerate=False):
1236 if video_restriction is not None:
1237 ret = ret + video_restriction + '->'
1241 props += 'presence=%s,' % str(video_presence)
1242 if variable_framerate:
1243 props += 'variable-framerate=true,'
1245 ret = ret + '|' + props[:-1]
1248 if audio_restriction is not None:
1249 ret = ret + audio_restriction + '->'
1252 ret = ret + '|' + str(audio_presence)
1254 return ret.replace("::", ":")
1256 def get_profile(self, video_restriction=None, audio_restriction=None,
1257 variable_framerate=False):
1258 vcaps = self.combination.get_video_caps()
1259 acaps = self.combination.get_audio_caps()
1260 if video_restriction is None:
1261 video_restriction = self.combination.video_restriction
1262 if audio_restriction is None:
1263 audio_restriction = self.combination.audio_restriction
1264 if self.media_descriptor is not None:
1265 if self.combination.video == "theora":
1266 # Theoraenc doesn't support variable framerate, make sure to avoid them
1267 framerate = self.media_descriptor.get_framerate()
1268 if framerate == Fraction(0, 1):
1269 framerate = Fraction(30, 1)
1270 restriction = utils.GstCaps.new_from_str(video_restriction or "video/x-raw")
1271 for struct, _ in restriction:
1272 if struct.get("framerate") is None:
1273 struct.set("framerate", struct.FRACTION_TYPE, framerate)
1274 video_restriction = str(restriction)
1276 video_presence = self.media_descriptor.get_num_tracks("video")
1277 if video_presence == 0:
1280 audio_presence = self.media_descriptor.get_num_tracks("audio")
1281 if audio_presence == 0:
1284 return self._get_profile_full(self.combination.get_muxer_caps(),
1286 audio_presence=audio_presence,
1287 video_presence=video_presence,
1288 video_restriction=video_restriction,
1289 audio_restriction=audio_restriction,
1290 variable_framerate=variable_framerate)
1292 def _clean_caps(self, caps):
1294 Returns a list of key=value or structure name, without "(types)" or ";" or ","
1296 return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1298 # pylint: disable=E1101
1299 def _has_caps_type_variant(self, c, ccaps):
1301 Handle situations where we can have application/ogg or video/ogg or
1305 media_type = re.findall("application/|video/|audio/", c)
1307 media_type = media_type[0].replace('/', '')
1308 possible_mtypes = ["application", "video", "audio"]
1309 possible_mtypes.remove(media_type)
1310 for tmptype in possible_mtypes:
1311 possible_c_variant = c.replace(media_type, tmptype)
1312 if possible_c_variant in ccaps:
1314 "Found %s in %s, good enough!", possible_c_variant, ccaps)
1319 # pylint: disable=E1101
1320 def run_iqa_test(self, reference_file_uri):
1322 Runs IQA test if @reference_file_path exists
1323 @test: The test to run tests on
1325 if not GstValidateBaseTestManager.has_feature('iqa'):
1326 self.debug('Iqa element not present, not running extra test.')
1330 uridecodebin uri=%s !
1331 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1332 uridecodebin uri=%s ! iqa.
1333 """ % (reference_file_uri, self.dest_file)
1334 pipeline_desc = pipeline_desc.replace("\n", "")
1336 command = [GstValidateBaseTestManager.COMMAND] + \
1337 shlex.split(pipeline_desc)
1338 msg = "## Running IQA tests on results of: " \
1339 + "%s\n### Command: \n```\n%s\n```\n" % (
1340 self.classname, ' '.join(command))
1341 if not self.options.redirect_logs:
1345 printc(msg, Colors.OKBLUE)
1347 self.process = subprocess.Popen(command,
1354 def check_encoded_file(self):
1355 result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1357 if result_descriptor is None:
1358 return (Result.FAILED, "Could not discover encoded file %s"
1361 duration = result_descriptor.get_duration()
1362 orig_duration = self.media_descriptor.get_duration()
1363 tolerance = self._duration_tolerance
1365 if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1366 os.remove(result_descriptor.get_path())
1370 'issue-id': 'transcoded-file-wrong-duration',
1371 'summary': 'The duration of a transcoded file doesn\'t match the duration of the original file',
1372 'level': 'critical',
1373 'detected-on': 'pipeline',
1374 'details': "Duration of encoded file is " " wrong (%s instead of %s)" % (
1375 utils.TIME_ARGS(duration), utils.TIME_ARGS(orig_duration))
1379 all_tracks_caps = result_descriptor.get_tracks_caps()
1380 container_caps = result_descriptor.get_caps()
1382 all_tracks_caps.insert(0, ("container", container_caps))
1384 for track_type, caps in all_tracks_caps:
1385 ccaps = self._clean_caps(caps)
1386 wanted_caps = self.combination.get_caps(track_type)
1387 cwanted_caps = self._clean_caps(wanted_caps)
1389 if wanted_caps is None:
1390 os.remove(result_descriptor.get_path())
1394 'issue-id': 'transcoded-file-wrong-stream-type',
1395 'summary': 'Expected stream types during transcoding do not match expectations',
1396 'level': 'critical',
1397 'detected-on': 'pipeline',
1398 'details': "Found a track of type %s in the encoded files"
1399 " but none where wanted in the encoded profile: %s" % (
1400 track_type, self.combination)
1405 for c in cwanted_caps:
1407 if not self._has_caps_type_variant(c, ccaps):
1408 os.remove(result_descriptor.get_path())
1412 'issue-id': 'transcoded-file-wrong-caps',
1413 'summary': 'Expected stream caps during transcoding do not match expectations',
1414 'level': 'critical',
1415 'detected-on': 'pipeline',
1416 'details': "Field: %s (from %s) not in caps of the outputted file %s" % (
1417 wanted_caps, c, ccaps)
1422 os.remove(result_descriptor.get_path())
1425 class TestsManager(Loggable):
1427 """ A class responsible for managing tests. """
1430 loading_testsuite = None
1434 Loggable.__init__(self)
1437 self.unwanted_tests = []
1440 self.reporter = None
1441 self.wanted_tests_patterns = []
1442 self.blacklisted_tests_patterns = []
1443 self._generators = []
1444 self.check_testslist = True
1445 self.all_tests = None
1446 self.expected_issues = {}
1447 self.blacklisted_tests = []
1452 def list_tests(self):
1453 return sorted(list(self.tests), key=lambda x: x.classname)
1455 def find_tests(self, classname):
1456 regex = re.compile(classname)
1457 return [test for test in self.list_tests() if regex.findall(test.classname)]
1459 def add_expected_issues(self, expected_issues):
1460 for bugid, failure_def in list(expected_issues.items()):
1462 for test_name_regex in failure_def['tests']:
1463 regex = re.compile(test_name_regex)
1464 tests_regexes.append(regex)
1465 for test in self.tests:
1466 if regex.findall(test.classname):
1467 if failure_def.get('allow_flakiness'):
1468 test.allow_flakiness = True
1469 self.debug("%s allow flakiness" % (test.classname))
1471 for issue in failure_def['issues']:
1472 issue['bug'] = bugid
1473 test.expected_issues.extend(failure_def['issues'])
1474 self.debug("%s added expected issues from %s" % (
1475 test.classname, bugid))
1476 failure_def['tests'] = tests_regexes
1478 self.expected_issues.update(expected_issues)
1480 def add_test(self, test):
1481 if test.generator is None:
1482 test.classname = self.loading_testsuite + '.' + test.classname
1484 for bugid, failure_def in list(self.expected_issues.items()):
1485 failure_def['bug'] = bugid
1486 for regex in failure_def['tests']:
1487 if regex.findall(test.classname):
1488 if failure_def.get('allow_flakiness'):
1489 test.allow_flakiness = True
1490 self.debug("%s allow flakiness" % (test.classname))
1492 for issue in failure_def['issues']:
1493 issue['bug'] = bugid
1494 test.expected_issues.extend(failure_def['issues'])
1495 self.debug("%s added expected issues from %s" % (
1496 test.classname, bugid))
1498 if self._is_test_wanted(test):
1499 if test not in self.tests:
1500 self.tests.append(test)
1502 if test not in self.tests:
1503 self.unwanted_tests.append(test)
1505 def get_tests(self):
1508 def populate_testsuite(self):
1511 def add_generators(self, generators):
1513 @generators: A list of, or one single #TestsGenerator to be used to generate tests
1515 if not isinstance(generators, list):
1516 generators = [generators]
1517 self._generators.extend(generators)
1518 for generator in generators:
1519 generator.testsuite = self.loading_testsuite
1521 self._generators = list(set(self._generators))
1523 def get_generators(self):
1524 return self._generators
1526 def _add_blacklist(self, blacklisted_tests):
1527 if not isinstance(blacklisted_tests, list):
1528 blacklisted_tests = [blacklisted_tests]
1530 for patterns in blacklisted_tests:
1531 for pattern in patterns.split(","):
1532 self.blacklisted_tests_patterns.append(re.compile(pattern))
1534 def set_default_blacklist(self, default_blacklist):
1535 for test_regex, reason in default_blacklist:
1536 if not test_regex.startswith(self.loading_testsuite + '.'):
1537 test_regex = self.loading_testsuite + '.' + test_regex
1538 self.blacklisted_tests.append((test_regex, reason))
1539 self._add_blacklist(test_regex)
1541 def add_options(self, parser):
1542 """ Add more arguments. """
1545 def set_settings(self, options, args, reporter):
1546 """ Set properties after options parsing. """
1547 self.options = options
1549 self.reporter = reporter
1551 self.populate_testsuite()
1553 if self.options.valgrind:
1554 self.print_valgrind_bugs()
1556 if options.wanted_tests:
1557 for patterns in options.wanted_tests:
1558 for pattern in patterns.split(","):
1559 self.wanted_tests_patterns.append(re.compile(pattern))
1561 if options.blacklisted_tests:
1562 for patterns in options.blacklisted_tests:
1563 self._add_blacklist(patterns)
1565 def check_blacklists(self):
1566 if self.options.check_bugs_status:
1567 if not check_bugs_resolution(self.blacklisted_tests):
1572 def log_blacklists(self):
1573 if self.blacklisted_tests:
1574 self.info("Currently 'hardcoded' %s blacklisted tests:" %
1577 for name, bug in self.blacklisted_tests:
1578 if not self.options.check_bugs_status:
1579 self.info(" + %s --> bug: %s" % (name, bug))
1581 def check_expected_issues(self):
1582 if not self.expected_issues or not self.options.check_bugs_status:
1585 bugs_definitions = defaultdict(list)
1586 for bug, failure_def in list(self.expected_issues.items()):
1587 tests_names = '|'.join(
1588 [regex.pattern for regex in failure_def['tests']])
1589 bugs_definitions[tests_names].extend([bug])
1591 return check_bugs_resolution(bugs_definitions.items())
1593 def _check_blacklisted(self, test):
1594 for pattern in self.blacklisted_tests_patterns:
1595 if pattern.findall(test.classname):
1596 self.info("%s is blacklisted by %s", test.classname, pattern)
1601 def _check_whitelisted(self, test):
1602 for pattern in self.wanted_tests_patterns:
1603 if pattern.findall(test.classname):
1604 if self._check_blacklisted(test):
1605 # If explicitly white listed that specific test
1606 # bypass the blacklisting
1607 if pattern.pattern != test.classname:
1612 def _check_duration(self, test):
1613 if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1614 self.info("Not activating %s as its duration (%d) is superior"
1615 " than the long limit (%d)" % (test, test.duration,
1616 int(self.options.long_limit)))
1621 def _is_test_wanted(self, test):
1622 if self._check_whitelisted(test):
1623 if not self._check_duration(test):
1627 if self._check_blacklisted(test):
1630 if not self._check_duration(test):
1633 if not self.wanted_tests_patterns:
1638 def needs_http_server(self):
1641 def print_valgrind_bugs(self):
1645 class TestsGenerator(Loggable):
1647 def __init__(self, name, test_manager, tests=[]):
1648 Loggable.__init__(self)
1650 self.test_manager = test_manager
1651 self.testsuite = None
1654 self._tests[test.classname] = test
1656 def generate_tests(self, *kwargs):
1658 Method that generates tests
1660 return list(self._tests.values())
1662 def add_test(self, test):
1663 test.generator = self
1664 test.classname = self.testsuite + '.' + test.classname
1665 self._tests[test.classname] = test
1668 class GstValidateTestsGenerator(TestsGenerator):
1670 def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1673 def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1674 self.populate_tests(uri_minfo_special_scenarios, scenarios)
1675 return super(GstValidateTestsGenerator, self).generate_tests()
1678 class _TestsLauncher(Loggable):
1682 Loggable.__init__(self)
1687 self.reporter = None
1688 self._list_testers()
1689 self.all_tests = None
1690 self.wanted_tests_patterns = []
1692 self.queue = queue.Queue()
1694 self.total_num_tests = 0
1695 self.current_progress = -1
1698 self.vfb_server = None
1700 def _list_app_dirs(self):
1702 env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1703 if env_dirs is not None:
1704 for dir_ in env_dirs.split(os.pathsep):
1705 app_dirs.append(dir_)
1709 def _exec_app(self, app_dir, env):
1711 files = os.listdir(app_dir)
1712 except OSError as e:
1713 self.debug("Could not list %s: %s" % (app_dir, e))
1716 if f.endswith(".py"):
1717 exec(compile(open(os.path.join(app_dir, f)).read(),
1718 os.path.join(app_dir, f), 'exec'), env)
1720 def _exec_apps(self, env):
1721 app_dirs = self._list_app_dirs()
1722 for app_dir in app_dirs:
1723 self._exec_app(app_dir, env)
1725 def _list_testers(self):
1726 env = globals().copy()
1727 self._exec_apps(env)
1729 testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1730 for tester in testers:
1731 if tester.init() is True:
1732 self.testers.append(tester)
1734 self.warning("Can not init tester: %s -- PATH is %s"
1735 % (tester.name, os.environ["PATH"]))
1737 def add_options(self, parser):
1738 for tester in self.testers:
1739 tester.add_options(parser)
1741 def _load_testsuite(self, testsuites):
1743 for testsuite in testsuites:
1745 sys.path.insert(0, os.path.dirname(testsuite))
1746 spec = importlib.util.spec_from_file_location(os.path.basename(testsuite).replace(".py", ""), testsuite)
1747 module = importlib.util.module_from_spec(spec)
1748 spec.loader.exec_module(module)
1749 return (module, None)
1750 except Exception as e:
1751 exceptions.append("Could not load %s: %s" % (testsuite, e))
1754 sys.path.remove(os.path.dirname(testsuite))
1756 return (None, exceptions)
1758 def _load_testsuites(self):
1760 for testsuite in self.options.testsuites:
1761 if testsuite.endswith('.py') and os.path.exists(testsuite):
1762 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1763 loaded_module = self._load_testsuite([testsuite])
1765 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1766 for d in self.options.testsuites_dirs]
1767 loaded_module = self._load_testsuite(possible_testsuites_paths)
1769 module = loaded_module[0]
1770 if not loaded_module[0]:
1771 if "." in testsuite:
1772 self.options.testsuites.append(testsuite.split('.')[0])
1773 self.info("%s looks like a test name, trying that" %
1775 self.options.wanted_tests.append(testsuite)
1777 if testsuite in testsuites:
1778 self.info('Testuite %s was loaded previously', testsuite)
1780 printc("Could not load testsuite: %s, reasons: %s" % (
1781 testsuite, loaded_module[1]), Colors.FAIL)
1784 if module.__name__ in testsuites:
1785 self.info("Trying to load testsuite '%s' a second time?", module.__name__)
1788 testsuites[module.__name__] = module
1789 if not hasattr(module, "TEST_MANAGER"):
1790 module.TEST_MANAGER = [tester.name for tester in self.testers]
1791 elif not isinstance(module.TEST_MANAGER, list):
1792 module.TEST_MANAGER = [module.TEST_MANAGER]
1794 self.options.testsuites = list(testsuites.values())
1796 def _setup_testsuites(self):
1797 for testsuite in self.options.testsuites:
1799 wanted_test_manager = None
1800 # TEST_MANAGER has been set in _load_testsuites()
1801 assert hasattr(testsuite, "TEST_MANAGER")
1802 wanted_test_manager = testsuite.TEST_MANAGER
1803 if not isinstance(wanted_test_manager, list):
1804 wanted_test_manager = [wanted_test_manager]
1806 for tester in self.testers:
1807 if wanted_test_manager is not None and \
1808 tester.name not in wanted_test_manager:
1811 prev_testsuite_name = TestsManager.loading_testsuite
1812 if self.options.user_paths:
1813 TestsManager.loading_testsuite = tester.name
1814 tester.register_defaults()
1817 TestsManager.loading_testsuite = testsuite.__name__
1818 if testsuite.setup_tests(tester, self.options):
1820 if prev_testsuite_name:
1821 TestsManager.loading_testsuite = prev_testsuite_name
1824 printc("Could not load testsuite: %s"
1825 " maybe because of missing TestManager"
1826 % (testsuite), Colors.FAIL)
1829 def _load_config(self, options):
1830 printc("Loading config files is DEPRECATED"
1831 " you should use the new testsuite format now",)
1833 for tester in self.testers:
1834 tester.options = options
1835 globals()[tester.name] = tester
1836 globals()["options"] = options
1837 c__file__ = __file__
1838 globals()["__file__"] = self.options.config
1839 exec(compile(open(self.options.config).read(),
1840 self.options.config, 'exec'), globals())
1841 globals()["__file__"] = c__file__
1843 def set_settings(self, options, args):
1844 if options.xunit_file:
1845 self.reporter = reporters.XunitReporter(options)
1847 self.reporter = reporters.Reporter(options)
1849 self.options = options
1850 wanted_testers = None
1851 for tester in self.testers:
1852 if tester.name in args:
1853 wanted_testers = tester.name
1856 testers = self.testers
1858 for tester in testers:
1859 if tester.name in args:
1860 self.testers.append(tester)
1861 args.remove(tester.name)
1864 self._load_config(options)
1866 self._load_testsuites()
1867 if not self.options.testsuites:
1868 printc("Not testsuite loaded!", Colors.FAIL)
1871 for tester in self.testers:
1872 tester.set_settings(options, args, self.reporter)
1874 if not options.config and options.testsuites:
1875 if self._setup_testsuites() is False:
1878 if self.options.check_bugs_status:
1879 printc("-> Checking bugs resolution... ", end='')
1881 for tester in self.testers:
1882 if not tester.check_blacklists():
1885 tester.log_blacklists()
1887 if not tester.check_expected_issues():
1890 if self.options.check_bugs_status:
1891 printc("OK", Colors.OKGREEN)
1893 if self.needs_http_server() or options.httponly is True:
1894 self.httpsrv = HTTPServer(options)
1895 self.httpsrv.start()
1897 if options.no_display:
1898 self.vfb_server = get_virual_frame_buffer_server(options)
1899 res = self.vfb_server.start()
1901 printc("Could not start virtual frame server: %s" % res[1],
1904 os.environ["DISPLAY"] = self.vfb_server.display_id
1908 def _check_tester_has_other_testsuite(self, testsuite, tester):
1909 if tester.name != testsuite.TEST_MANAGER[0]:
1912 for t in self.options.testsuites:
1914 for other_testmanager in t.TEST_MANAGER:
1915 if other_testmanager == tester.name:
1920 def _check_defined_tests(self, tester, tests):
1921 if self.options.blacklisted_tests or self.options.wanted_tests:
1924 tests_names = [test.classname for test in tests]
1925 testlist_changed = False
1926 for testsuite in self.options.testsuites:
1927 if not self._check_tester_has_other_testsuite(testsuite, tester) \
1928 and tester.check_testslist:
1930 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1933 know_tests = testlist_file.read().split("\n")
1934 testlist_file.close()
1936 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1942 for test in know_tests:
1943 if test and test.strip('~') not in tests_names:
1944 if not test.startswith('~'):
1945 testlist_changed = True
1946 printc("Test %s Not in testsuite %s anymore"
1947 % (test, testsuite.__file__), Colors.FAIL)
1949 optional_out.append((test, None))
1951 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1952 key=lambda x: x[0].strip('~'))
1954 for tname, test in tests_names:
1955 if test and test.optional:
1957 testlist_file.write("%s\n" % (tname))
1958 if tname and tname not in know_tests:
1959 printc("Test %s is NEW in testsuite %s"
1960 % (tname, testsuite.__file__),
1961 Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1962 testlist_changed = True
1964 testlist_file.close()
1967 return testlist_changed
1969 def _split_tests(self, num_groups):
1970 groups = [[] for x in range(num_groups)]
1971 group = cycle(groups)
1972 for test in self.tests:
1973 next(group).append(test)
1976 def list_tests(self):
1977 for tester in self.testers:
1978 if not self._tester_needed(tester):
1981 tests = tester.list_tests()
1982 if self._check_defined_tests(tester, tests) and \
1983 self.options.fail_on_testlist_change:
1984 raise RuntimeError("Unexpected new test in testsuite.")
1986 self.tests.extend(tests)
1987 self.tests.sort(key=lambda test: test.classname)
1989 if self.options.num_parts < 1:
1990 raise RuntimeError("Tests must be split in positive number of parts.")
1991 if self.options.num_parts > len(self.tests):
1992 raise RuntimeError("Cannot have more parts then there exist tests.")
1993 if self.options.part_index < 1 or self.options.part_index > self.options.num_parts:
1994 raise RuntimeError("Part index is out of range")
1996 self.tests = self._split_tests(self.options.num_parts)[self.options.part_index - 1]
1999 def _tester_needed(self, tester):
2000 for testsuite in self.options.testsuites:
2001 if tester.name in testsuite.TEST_MANAGER:
2005 def server_wrapper(self, ready):
2006 self.server = GstValidateTCPServer(
2007 ('localhost', 0), GstValidateListener)
2008 self.server.socket.settimeout(None)
2009 self.server.launcher = self
2010 self.serverport = self.server.socket.getsockname()[1]
2011 self.info("%s server port: %s" % (self, self.serverport))
2014 self.server.serve_forever(poll_interval=0.05)
2016 def _start_server(self):
2017 self.info("Starting TCP Server")
2018 ready = threading.Event()
2019 self.server_thread = threading.Thread(target=self.server_wrapper,
2020 kwargs={'ready': ready})
2021 self.server_thread.start()
2023 os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
2025 def _stop_server(self):
2027 self.server.shutdown()
2028 self.server_thread.join()
2029 self.server.server_close()
2032 def test_wait(self):
2034 # Check process every second for timeout
2036 self.queue.get(timeout=1)
2040 for test in self.jobs:
2041 if test.process_update():
2042 self.jobs.remove(test)
2045 def tests_wait(self):
2047 test = self.test_wait()
2048 test.check_results()
2049 except KeyboardInterrupt:
2050 for test in self.jobs:
2051 test.kill_subprocess()
2056 def start_new_job(self, tests_left):
2058 test = tests_left.pop(0)
2062 test.test_start(self.queue)
2064 self.jobs.append(test)
2068 def print_result(self, current_test_num, test, retry_on_failure=False):
2069 if test.result != Result.PASSED and not retry_on_failure:
2070 printc(str(test), color=utils.get_color_for_result(test.result))
2073 progress = int(length * current_test_num // self.total_num_tests)
2074 bar = 'â–ˆ' * progress + '-' * (length - progress)
2076 printc('\r|%s| [%s/%s]' % (bar, current_test_num, self.total_num_tests), end='\r')
2078 if progress > self.current_progress:
2079 self.current_progress = progress
2080 printc('|%s| [%s/%s]' % (bar, current_test_num, self.total_num_tests))
2082 def _run_tests(self, running_tests=None, all_alone=False, retry_on_failures=False):
2083 if not self.all_tests:
2084 self.all_tests = self.list_tests()
2086 if not running_tests:
2087 running_tests = self.tests
2089 self.reporter.init_timer()
2092 for test in running_tests:
2093 if test.is_parallel and not all_alone:
2096 alone_tests.append(test)
2098 # use max to defend against the case where all tests are alone_tests
2099 max_num_jobs = max(min(self.options.num_jobs, len(tests)), 1)
2102 if self.options.forever and len(tests) < self.options.num_jobs and len(tests):
2103 max_num_jobs = self.options.num_jobs
2106 while (len(tests) + len(copied)) < max_num_jobs:
2107 copied.append(tests[i].copy(len(copied) + 1))
2113 self.tests += copied
2115 self.total_num_tests = len(self.all_tests)
2116 printc("\nRunning %d tests..." % self.total_num_tests, color=Colors.HEADER)
2117 # if order of test execution doesn't matter, shuffle
2118 # the order to optimize cpu usage
2119 if self.options.shuffle:
2120 random.shuffle(tests)
2121 random.shuffle(alone_tests)
2123 current_test_num = 1
2125 for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
2126 tests_left = list(tests)
2127 for i in range(num_jobs):
2128 if not self.start_new_job(tests_left):
2132 while jobs_running != 0:
2133 test = self.tests_wait()
2135 current_test_num += 1
2136 res = test.test_end(retry_on_failure=retry_on_failures)
2138 if res not in [Result.PASSED, Result.SKIPPED, Result.KNOWN_ERROR]:
2139 if self.options.forever or self.options.fatal_error:
2140 self.print_result(current_test_num - 1, test, retry_on_failure=retry_on_failures)
2141 self.reporter.after_test(test)
2144 if retry_on_failures:
2145 if not self.options.redirect_logs and test.allow_flakiness:
2146 test.copy_logfiles()
2148 to_retry.append(test)
2150 # Not adding to final report if flakiness is tolerated
2151 to_report = not test.allow_flakiness
2152 self.print_result(current_test_num - 1, test, retry_on_failure=retry_on_failures)
2154 self.reporter.after_test(test)
2155 if retry_on_failures:
2157 if self.start_new_job(tests_left):
2161 printc("--> Rerunning the following tests to see if they are flaky:", Colors.WARNING)
2162 for test in to_retry:
2163 printc(' * %s' % test.classname)
2165 return self._run_tests(to_retry, all_alone=True, retry_on_failures=False)
2169 def clean_tests(self, stop_server=False):
2170 for test in self.tests:
2175 def run_tests(self):
2178 self._start_server()
2179 if self.options.forever:
2182 printc("-> Iteration %d" % r, end='\r')
2184 if not self._run_tests():
2188 msg = "-> Iteration %d... %sOK%s" % (r, Colors.OKGREEN, Colors.ENDC)
2189 printc(msg, end="\r")
2192 elif self.options.n_runs:
2194 for r in range(self.options.n_runs):
2195 printc("-> Iteration %d" % r, end='\r')
2196 if not self._run_tests():
2198 printc("ERROR", Colors.FAIL, end="\r")
2200 printc("OK", Colors.OKGREEN, end="\r")
2205 return self._run_tests(retry_on_failures=self.options.retry_on_failures)
2207 if self.options.forever:
2208 printc("\n-> Ran %d times" % r)
2212 self.vfb_server.stop()
2213 self.clean_tests(True)
2215 def final_report(self):
2216 return self.reporter.final_report()
2218 def needs_http_server(self):
2219 for tester in self.testers:
2220 if tester.needs_http_server():
2224 class NamedDic(object):
2226 def __init__(self, props):
2228 for name, value in props.items():
2229 setattr(self, name, value)
2232 class Scenario(object):
2234 def __init__(self, name, props, path=None):
2238 for prop, value in props:
2239 setattr(self, prop.replace("-", "_"), value)
2241 def get_execution_name(self):
2242 if self.path is not None:
2248 if hasattr(self, "seek"):
2249 return bool(self.seek)
2253 def needs_clock_sync(self):
2254 if hasattr(self, "need_clock_sync"):
2255 return bool(self.need_clock_sync)
2259 def needs_live_content(self):
2260 # Scenarios that can only be used on live content
2261 if hasattr(self, "live_content_required"):
2262 return bool(self.live_content_required)
2265 def compatible_with_live_content(self):
2266 # if a live content is required it's implicitly compatible with
2268 if self.needs_live_content():
2270 if hasattr(self, "live_content_compatible"):
2271 return bool(self.live_content_compatible)
2274 def get_min_media_duration(self):
2275 if hasattr(self, "min_media_duration"):
2276 return float(self.min_media_duration)
2280 def does_reverse_playback(self):
2281 if hasattr(self, "reverse_playback"):
2282 return bool(self.reverse_playback)
2286 def get_duration(self):
2288 return float(getattr(self, "duration"))
2289 except AttributeError:
2292 def get_min_tracks(self, track_type):
2294 return int(getattr(self, "min_%s_track" % track_type))
2295 except AttributeError:
2299 return "<Scenario %s>" % self.name
2302 class ScenarioManager(Loggable):
2304 system_scenarios = []
2305 special_scenarios = {}
2307 FILE_EXTENSION = "scenario"
2309 def __new__(cls, *args, **kwargs):
2310 if not cls._instance:
2311 cls._instance = super(ScenarioManager, cls).__new__(
2312 cls, *args, **kwargs)
2313 cls._instance.config = None
2314 cls._instance.discovered = False
2315 Loggable.__init__(cls._instance)
2317 return cls._instance
2319 def find_special_scenarios(self, mfile):
2321 mfile_bname = os.path.basename(mfile)
2323 for f in os.listdir(os.path.dirname(mfile)):
2324 if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
2325 scenarios.append(os.path.join(os.path.dirname(mfile), f))
2328 scenarios = self.discover_scenarios(scenarios, mfile)
2332 def discover_scenarios(self, scenario_paths=[], mfile=None):
2334 Discover scenarios specified in scenario_paths or the default ones
2335 if nothing specified there
2338 scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
2339 log_path = os.path.join(self.config.logsdir, "scenarios_discovery.log")
2340 logs = open(log_path, 'w')
2343 command = [GstValidateBaseTestManager.COMMAND,
2344 "--scenarios-defs-output-file", scenario_defs]
2345 command.extend(scenario_paths)
2346 subprocess.check_call(command, stdout=logs, stderr=logs)
2347 except subprocess.CalledProcessError as e:
2349 self.error('See %s' % log_path)
2352 config = configparser.RawConfigParser()
2353 f = open(scenario_defs)
2356 for section in config.sections():
2359 for scenario_path in scenario_paths:
2360 if section == scenario_path:
2362 name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2363 path = scenario_path
2365 # The real name of the scenario is:
2366 # filename.REALNAME.scenario
2367 name = scenario_path.replace(mfile + ".", "").replace(
2368 "." + self.FILE_EXTENSION, "")
2369 path = scenario_path
2372 name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2377 props = config.items(section)
2378 scenario = Scenario(name, props, path)
2380 self.special_scenarios[path] = scenario
2381 scenarios.append(scenario)
2383 if not scenario_paths:
2384 self.discovered = True
2385 self.system_scenarios.extend(scenarios)
2389 def get_scenario(self, name):
2390 if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2391 scenario = self.special_scenarios.get(name)
2395 scenarios = self.discover_scenarios([name])
2396 self.special_scenarios[name] = scenarios
2401 if self.discovered is False:
2402 self.discover_scenarios()
2405 return self.system_scenarios
2408 return [scenario for scenario in self.system_scenarios if scenario.name == name][0]
2410 self.warning("Scenario: %s not found" % name)
2414 class GstValidateBaseTestManager(TestsManager):
2415 scenarios_manager = ScenarioManager()
2419 super(GstValidateBaseTestManager, self).__init__()
2420 self._scenarios = []
2421 self._encoding_formats = []
2424 def update_commands(cls, extra_paths=None):
2425 for varname, cmd in {'': 'gst-validate',
2426 'TRANSCODING_': 'gst-validate-transcoding',
2427 'MEDIA_CHECK_': 'gst-validate-media-check',
2428 'RTSP_SERVER_': 'gst-validate-rtsp-server',
2429 'INSPECT_': 'gst-inspect'}.items():
2430 setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2433 def has_feature(cls, featurename):
2435 return cls.features_cache[featurename]
2440 subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2442 except subprocess.CalledProcessError:
2445 cls.features_cache[featurename] = res
2448 def add_scenarios(self, scenarios):
2450 @scenarios A list or a unic scenario name(s) to be run on the tests.
2451 They are just the default scenarios, and then depending on
2452 the TestsGenerator to be used you can have more fine grained
2453 control on what to be run on each series of tests.
2455 if isinstance(scenarios, list):
2456 self._scenarios.extend(scenarios)
2458 self._scenarios.append(scenarios)
2460 self._scenarios = list(set(self._scenarios))
2462 def set_scenarios(self, scenarios):
2464 Override the scenarios
2466 self._scenarios = []
2467 self.add_scenarios(scenarios)
2469 def get_scenarios(self):
2470 return self._scenarios
2472 def add_encoding_formats(self, encoding_formats):
2474 :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2475 formats for transcoding test.
2476 They are just the default encoding formats, and then depending on
2477 the TestsGenerator to be used you can have more fine grained
2478 control on what to be run on each series of tests.
2480 if isinstance(encoding_formats, list):
2481 self._encoding_formats.extend(encoding_formats)
2483 self._encoding_formats.append(encoding_formats)
2485 self._encoding_formats = list(set(self._encoding_formats))
2487 def get_encoding_formats(self):
2488 return self._encoding_formats
2491 GstValidateBaseTestManager.update_commands()
2494 class MediaDescriptor(Loggable):
2497 Loggable.__init__(self)
2500 raise NotImplemented
2502 def has_frames(self):
2505 def get_framerate(self):
2506 for ttype, caps_str in self.get_tracks_caps():
2507 if ttype != "video":
2510 caps = utils.GstCaps.new_from_str(caps_str)
2512 self.warning("Could not create caps for %s" % caps_str)
2515 framerate = caps[0].get("framerate")
2519 return Fraction(0, 1)
2521 def get_media_filepath(self):
2522 raise NotImplemented
2524 def skip_parsers(self):
2528 raise NotImplemented
2531 raise NotImplemented
2533 def get_duration(self):
2534 raise NotImplemented
2536 def get_protocol(self):
2537 raise NotImplemented
2539 def is_seekable(self):
2540 raise NotImplemented
2543 raise NotImplemented
2546 raise NotImplemented
2548 def get_num_tracks(self, track_type):
2549 raise NotImplemented
2551 def get_tracks_caps(self):
2554 def can_play_reverse(self):
2555 raise NotImplemented
2560 def is_compatible(self, scenario):
2561 if scenario is None:
2564 if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2565 self.debug("Do not run %s as %s does not support seeking",
2566 scenario, self.get_uri())
2569 if self.is_image() and scenario.needs_clock_sync():
2570 self.debug("Do not run %s as %s is an image",
2571 scenario, self.get_uri())
2574 if not self.can_play_reverse() and scenario.does_reverse_playback():
2577 if not self.is_live() and scenario.needs_live_content():
2578 self.debug("Do not run %s as %s is not a live content",
2579 scenario, self.get_uri())
2582 if self.is_live() and not scenario.compatible_with_live_content():
2583 self.debug("Do not run %s as %s is a live content",
2584 scenario, self.get_uri())
2587 if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2590 if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2592 "Do not run %s as %s is too short (%i < min media duation : %i",
2593 scenario, self.get_uri(),
2594 self.get_duration() / GST_SECOND,
2595 scenario.get_min_media_duration())
2598 for track_type in ['audio', 'subtitle', 'video']:
2599 if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2600 self.debug("%s -- %s | At least %s %s track needed < %s"
2601 % (scenario, self.get_uri(), track_type,
2602 scenario.get_min_tracks(track_type),
2603 self.get_num_tracks(track_type)))
2609 class GstValidateMediaDescriptor(MediaDescriptor):
2610 # Some extension file for discovering results
2611 SKIPPED_MEDIA_INFO_EXT = "media_info.skipped"
2612 MEDIA_INFO_EXT = "media_info"
2613 PUSH_MEDIA_INFO_EXT = "media_info.push"
2614 STREAM_INFO_EXT = "stream_info"
2616 __all_descriptors = {}
2619 def get(cls, xml_path):
2620 if xml_path in cls.__all_descriptors:
2621 return cls.__all_descriptors[xml_path]
2622 return GstValidateMediaDescriptor(xml_path)
2624 def __init__(self, xml_path):
2625 super(GstValidateMediaDescriptor, self).__init__()
2627 self._media_file_path = None
2628 main_descriptor = self.__all_descriptors.get(xml_path)
2630 self._copy_data_from_main(main_descriptor)
2632 self.__all_descriptors[xml_path] = self
2634 self._xml_path = xml_path
2636 media_xml = ET.parse(xml_path).getroot()
2637 except xml.etree.ElementTree.ParseError:
2638 printc("Could not parse %s" % xml_path,
2641 self._extract_data(media_xml)
2643 self.set_protocol(urllib.parse.urlparse(self.get_uri()).scheme)
2645 def skip_parsers(self):
2646 return self._skip_parsers
2648 def has_frames(self):
2649 return self._has_frames
2651 def _copy_data_from_main(self, main_descriptor):
2652 for attr in main_descriptor.__dict__.keys():
2653 setattr(self, attr, getattr(main_descriptor, attr))
2655 def _extract_data(self, media_xml):
2656 # Extract the information we need from the xml
2657 self._caps = media_xml.findall("streams")[0].attrib["caps"]
2658 self._track_caps = []
2660 streams = media_xml.findall("streams")[0].findall("stream")
2664 for stream in streams:
2665 self._track_caps.append(
2666 (stream.attrib["type"], stream.attrib["caps"]))
2668 self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2669 self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2670 self._duration = int(media_xml.attrib["duration"])
2671 self._uri = media_xml.attrib["uri"]
2672 parsed_uri = urllib.parse.urlparse(self.get_uri())
2673 self._protocol = media_xml.get("protocol", parsed_uri.scheme)
2674 if parsed_uri.scheme == "file":
2675 if not os.path.exists(parsed_uri.path) and os.path.exists(self.get_media_filepath()):
2676 self._uri = "file://" + self.get_media_filepath()
2677 elif parsed_uri.scheme == Protocols.IMAGESEQUENCE:
2678 self._media_file_path = os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(parsed_uri.path))
2679 self._uri = parsed_uri._replace(path=os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(self._media_file_path))).geturl()
2680 self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2681 self._is_live = media_xml.get("live", "false").lower() == "true"
2682 self._is_image = False
2683 for stream in media_xml.findall("streams")[0].findall("stream"):
2684 if stream.attrib["type"] == "image":
2685 self._is_image = True
2686 self._track_types = []
2687 for stream in media_xml.findall("streams")[0].findall("stream"):
2688 self._track_types.append(stream.attrib["type"])
2690 def __cleanup_media_info_ext(self):
2691 for ext in [self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT,
2692 self.SKIPPED_MEDIA_INFO_EXT, ]:
2693 if self._xml_path.endswith(ext):
2694 return self._xml_path[:len(self._xml_path) - (len(ext) + 1)]
2696 assert "Not reached" == None # noqa
2699 def new_from_uri(uri, verbose=False, include_frames=False, is_push=False, is_skipped=False):
2701 include_frames = 0 # Never
2702 include_frames = 1 # always
2703 include_frames = 2 # if previous file included them
2706 media_path = utils.url2path(uri)
2708 ext = GstValidateMediaDescriptor.MEDIA_INFO_EXT
2710 ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT
2712 ext = GstValidateMediaDescriptor.SKIPPED_MEDIA_INFO_EXT
2713 descriptor_path = "%s.%s" % (media_path, ext)
2714 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2715 if include_frames == 2:
2717 media_xml = ET.parse(descriptor_path).getroot()
2718 prev_uri = urllib.parse.urlparse(media_xml.attrib['uri'])
2719 if prev_uri.scheme == Protocols.IMAGESEQUENCE:
2720 parsed_uri = urllib.parse.urlparse(uri)
2721 uri = prev_uri._replace(path=os.path.join(os.path.dirname(parsed_uri.path), os.path.basename(prev_uri.path))).geturl()
2722 include_frames = bool(int(media_xml.attrib["frame-detection"]))
2723 if bool(int(media_xml.attrib.get("skip-parsers", 0))):
2724 args.append("--skip-parsers")
2725 except FileNotFoundError:
2728 include_frames = bool(include_frames)
2731 args.extend(["--output-file", descriptor_path])
2733 args.extend(["--full"])
2736 printc("Generating media info for %s\n"
2737 " Command: '%s'" % (media_path, ' '.join(args)),
2741 subprocess.check_output(args, stderr=open(os.devnull))
2742 except subprocess.CalledProcessError as e:
2744 printc("Result: Failed", Colors.FAIL)
2746 loggable.warning("GstValidateMediaDescriptor",
2747 "Exception: %s" % e)
2751 printc("Result: Passed", Colors.OKGREEN)
2754 return GstValidateMediaDescriptor(descriptor_path)
2755 except (IOError, xml.etree.ElementTree.ParseError):
2759 return self._xml_path
2761 def need_clock_sync(self):
2762 return Protocols.needs_clock_sync(self.get_protocol())
2764 def get_media_filepath(self):
2765 if self._media_file_path is None:
2766 self._media_file_path = self.__cleanup_media_info_ext()
2767 return self._media_file_path
2772 def get_tracks_caps(self):
2773 return self._track_caps
2778 def get_duration(self):
2779 return self._duration
2781 def set_protocol(self, protocol):
2782 if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2783 self._protocol = Protocols.PUSHFILE
2785 self._protocol = protocol
2787 def get_protocol(self):
2788 return self._protocol
2790 def is_seekable(self):
2791 return self._is_seekable
2794 return self._is_live
2796 def can_play_reverse(self):
2800 return self._is_image
2802 def get_num_tracks(self, track_type):
2804 for t in self._track_types:
2810 def get_clean_name(self):
2811 name = os.path.basename(self.get_path())
2812 regex = '|'.join(['\\.%s$' % ext for ext in [self.SKIPPED_MEDIA_INFO_EXT, self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT]])
2813 name = re.sub(regex, "", name)
2815 return name.replace('.', "_")
2818 class MediaFormatCombination(object):
2819 FORMATS = {"aac": "audio/mpeg,mpegversion=4", # Audio
2820 "ac3": "audio/x-ac3",
2821 "vorbis": "audio/x-vorbis",
2822 "mp3": "audio/mpeg,mpegversion=1,layer=3",
2823 "opus": "audio/x-opus",
2824 "rawaudio": "audio/x-raw",
2827 "h264": "video/x-h264",
2828 "h265": "video/x-h265",
2829 "vp8": "video/x-vp8",
2830 "vp9": "video/x-vp9",
2831 "theora": "video/x-theora",
2832 "prores": "video/x-prores",
2833 "jpeg": "image/jpeg",
2836 "webm": "video/webm",
2837 "ogg": "application/ogg",
2838 "mkv": "video/x-matroska",
2839 "mp4": "video/quicktime,variant=iso;",
2840 "quicktime": "video/quicktime;"}
2843 return "%s and %s in %s" % (self.audio, self.video, self.container)
2845 def __init__(self, container, audio, video, duration_factor=1,
2846 video_restriction=None, audio_restriction=None):
2848 Describes a media format to be used for transcoding tests.
2850 :param container: A string defining the container format to be used, must bin in self.FORMATS
2851 :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2852 :param video: A string defining the video format to be used, must bin in self.FORMATS
2854 self.container = container
2857 self.video_restriction = video_restriction
2858 self.audio_restriction = audio_restriction
2860 def get_caps(self, track_type):
2862 return self.FORMATS[self.__dict__[track_type]]
2866 def get_audio_caps(self):
2867 return self.get_caps("audio")
2869 def get_video_caps(self):
2870 return self.get_caps("video")
2872 def get_muxer_caps(self):
2873 return self.get_caps("container")