3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
20 """ Class representing tests and test managers. """
44 from itertools import cycle
45 from fractions import Fraction
47 from .utils import GstCaps, which
48 from . import reporters
49 from . import loggable
50 from .loggable import Loggable
52 from collections import defaultdict
54 from lxml import etree as ET
56 import xml.etree.cElementTree as ET
59 from .vfb_server import get_virual_frame_buffer_server
60 from .httpserver import HTTPServer
61 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
62 Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
63 check_bugs_resolution, is_tty
65 # The factor by which we increase the hard timeout when running inside
67 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
69 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
70 # The error reported by valgrind when detecting errors
71 VALGRIND_ERROR_CODE = 20
73 VALIDATE_OVERRIDE_EXTENSION = ".override"
74 EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
75 'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
76 'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
77 EXITING_SIGNALS.update({139: "SIGSEGV"})
78 EXITING_SIGNALS.update({(v, k) for k, v in EXITING_SIGNALS.items()})
81 CI_ARTIFACTS_URL = os.environ.get('CI_ARTIFACTS_URL')
86 """ A class representing a particular test. """
88 def __init__(self, application_name, classname, options,
89 reporter, duration=0, timeout=DEFAULT_TIMEOUT,
90 hard_timeout=None, extra_env_variables=None,
91 expected_issues=None, is_parallel=True,
94 @timeout: The timeout during which the value return by get_current_value
95 keeps being exactly equal
96 @hard_timeout: Max time the test can take in absolute
98 Loggable.__init__(self)
99 self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
101 self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
102 self.hard_timeout *= options.timeout_factor
104 self.hard_timeout = hard_timeout
105 self.classname = classname
106 self.options = options
107 self.application = application_name
109 self.server_command = None
110 self.reporter = reporter
115 self.duration = duration
116 self.stack_trace = None
118 if expected_issues is None:
119 self.expected_issues = []
120 elif not isinstance(expected_issues, list):
121 self.expected_issues = [expected_issues]
123 self.expected_issues = expected_issues
125 extra_env_variables = extra_env_variables or {}
126 self.extra_env_variables = extra_env_variables
127 self.optional = False
128 self.is_parallel = is_parallel
129 self.generator = None
130 self.workdir = workdir
133 self.rr_logdir = None
137 def _generate_expected_issues(self):
140 def generate_expected_issues(self):
141 res = '%s"FIXME \'%s\' issues [REPORT A BUG ' % (" " * 4, self.classname) \
142 + 'in https://gitlab.freedesktop.org/gstreamer/ '\
143 + 'or use a proper bug description]": {'
148 "issues": [""" % (self.classname)
150 retcode = self.process.returncode if self.process else 0
152 signame = EXITING_SIGNALS.get(retcode)
153 val = "'" + signame + "'" if signame else retcode
157 },""" % ("signame" if signame else "returncode", val)
159 res += self._generate_expected_issues()
160 res += "\n%s],\n%s},\n" % (" " * 8, " " * 4)
164 def copy(self, nth=None):
165 copied_test = copy.copy(self)
167 copied_test.classname += '_it' + str(nth)
168 copied_test.options = copy.copy(self.options)
169 copied_test.options.logsdir = os.path.join(copied_test.options.logsdir, str(nth))
170 os.makedirs(copied_test.options.logsdir, exist_ok=True)
175 self.kill_subprocess()
178 self.time_taken = 0.0
179 self._starting_time = None
180 self.result = Result.NOT_RUN
183 self.extra_logfiles = set()
184 self.__env_variable = []
185 self.kill_subprocess()
189 string = self.classname
190 if self.result != Result.NOT_RUN:
191 string += ": " + self.result
192 if self.result in [Result.FAILED, Result.TIMEOUT]:
193 string += " '%s'" % self.message
194 if not self.options.dump_on_failure:
195 if not self.options.redirect_logs and self.result != Result.PASSED:
196 string += self.get_logfile_repr()
198 string = "\n==> %s" % string
202 def add_env_variable(self, variable, value=None):
204 Only useful so that the gst-validate-launcher can print the exact
205 right command line to reproduce the tests
208 value = os.environ.get(variable, None)
213 self.__env_variable.append(variable)
216 def _env_variable(self):
218 if not self.options.verbose or self.options.verbose > 1:
219 for var in set(self.__env_variable):
222 value = self.proc_env.get(var, None)
223 if value is not None:
224 res += "%s='%s'" % (var, value)
226 res += "[Not displaying environment variables, rerun with -vv for the full command]"
230 def open_logfile(self):
234 path = os.path.join(self.options.logsdir,
235 self.classname.replace(".", os.sep) + '.md')
236 mkdir(os.path.dirname(path))
239 if self.options.redirect_logs == 'stdout':
240 self.out = sys.stdout
241 elif self.options.redirect_logs == 'stderr':
242 self.out = sys.stderr
244 self.out = open(path, 'w+')
246 def finalize_logfiles(self):
247 self.out.write("\n**Duration**: %s" % self.time_taken)
248 if not self.options.redirect_logs:
250 for logfile in self.extra_logfiles:
251 # Only copy over extra logfile content if it's below a certain threshold
252 # Avoid copying gigabytes of data if a lot of debugging is activated
253 if os.path.getsize(logfile) < 500 * 1024:
254 self.out.write('\n\n## %s:\n\n```\n%s\n```\n' % (
255 os.path.basename(logfile), self.get_extra_log_content(logfile))
258 self.out.write('\n\n## %s:\n\n**Log file too big.**\n %s\n\n Check file content directly\n\n' % (
259 os.path.basename(logfile), logfile)
263 self.out.write('\n\n## rr trace:\n\n```\nrr replay %s/latest-trace\n```\n' % (
269 if self.options.html:
270 self.html_log = os.path.splitext(self.logfile)[0] + '.html'
272 parser = commonmark.Parser()
273 with open(self.logfile) as f:
274 ast = parser.parse(f.read())
276 renderer = commonmark.HtmlRenderer()
277 html = renderer.render(ast)
278 with open(self.html_log, 'w') as f:
283 def _get_file_content(self, file_name):
284 f = open(file_name, 'r+')
290 def get_log_content(self):
291 return self._get_file_content(self.logfile)
293 def get_extra_log_content(self, extralog):
294 if extralog not in self.extra_logfiles:
297 return self._get_file_content(extralog)
299 def get_classname(self):
300 name = self.classname.split('.')[-1]
301 classname = self.classname.replace('.%s' % name, '')
306 return self.classname.split('.')[-1]
309 if self._uuid is None:
310 self._uuid = self.classname + str(uuid.uuid4())
313 def add_arguments(self, *args):
316 def build_arguments(self):
317 self.add_env_variable("LD_PRELOAD")
318 self.add_env_variable("DISPLAY")
320 def add_stack_trace_to_logfile(self):
321 self.debug("Adding stack trace")
325 trace_gatherer = BackTraceGenerator.get_default()
326 stack_trace = trace_gatherer.get_trace(self)
331 info = "\n\n## Stack trace\n\n```\n%s\n```" % stack_trace
332 if self.options.redirect_logs:
336 if self.options.xunit_file:
337 self.stack_trace = stack_trace
342 def add_known_issue_information(self):
343 if self.expected_issues:
344 info = "\n\n## Already known issues\n\n``` python\n%s\n```\n\n" % (
345 json.dumps(self.expected_issues, indent=4)
350 info += "\n\n**You can mark the issues as 'known' by adding the " \
351 + " following lines to the list of known issues**\n" \
352 + "\n\n``` python\n%s\n```" % (self.generate_expected_issues())
354 if self.options.redirect_logs:
360 def set_result(self, result, message="", error=""):
362 if not self.options.redirect_logs:
363 self.out.write("\n```\n")
366 self.debug("Setting result: %s (message: %s, error: %s)" % (result,
369 if result is Result.TIMEOUT:
370 if self.options.debug is True:
372 printc("Timeout, you should process <ctrl>c to get into gdb",
374 # and wait here until gdb exits
375 self.process.communicate()
377 pname = self.command[0]
378 input("%sTimeout happened on %s you can attach gdb doing:\n $gdb %s %d%s\n"
379 "Press enter to continue" % (Colors.FAIL, self.classname,
380 pname, self.process.pid, Colors.ENDC))
382 self.add_stack_trace_to_logfile()
385 self.message = message
386 self.error_str = error
388 if result not in [Result.PASSED, Result.NOT_RUN, Result.SKIPPED]:
389 self.add_known_issue_information()
391 def check_results(self):
392 if self.result is Result.FAILED or self.result is Result.TIMEOUT:
395 self.debug("%s returncode: %s", self, self.process.returncode)
396 if self.options.rr and self.process.returncode == -signal.SIGPIPE:
397 self.set_result(Result.SKIPPED, "SIGPIPE received under `rr`, known issue.")
398 elif self.process.returncode == 0:
399 self.set_result(Result.PASSED)
400 elif self.process.returncode in EXITING_SIGNALS:
401 self.add_stack_trace_to_logfile()
402 self.set_result(Result.FAILED,
403 "Application exited with signal %s" % (
404 EXITING_SIGNALS[self.process.returncode]))
405 elif self.process.returncode == VALGRIND_ERROR_CODE:
406 self.set_result(Result.FAILED, "Valgrind reported errors")
408 self.set_result(Result.FAILED,
409 "Application returned %d" % (self.process.returncode))
411 def get_current_value(self):
413 Lets subclasses implement a nicer timeout measurement method
414 They should return some value with which we will compare
415 the previous and timeout if they are egual during self.timeout
418 return Result.NOT_RUN
420 def process_update(self):
422 Returns True when process has finished running or has timed out.
425 if self.process is None:
426 # Process has not started running yet
430 if self.process.returncode is not None:
433 val = self.get_current_value()
435 self.debug("Got value: %s" % val)
436 if val is Result.NOT_RUN:
437 # The get_current_value logic is not implemented... dumb
439 if time.time() - self.last_change_ts > self.timeout:
440 self.set_result(Result.TIMEOUT,
441 "Application timed out: %s secs" %
446 elif val is Result.FAILED:
448 elif val is Result.KNOWN_ERROR:
451 self.log("New val %s" % val)
453 if val == self.last_val:
454 delta = time.time() - self.last_change_ts
455 self.debug("%s: Same value for %d/%d seconds" %
456 (self, delta, self.timeout))
457 if delta > self.timeout:
458 self.set_result(Result.TIMEOUT,
459 "Application timed out: %s secs" %
463 elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
465 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
468 self.last_change_ts = time.time()
473 def get_subproc_env(self):
474 return os.environ.copy()
476 def kill_subprocess(self):
478 if self.options.rr and self.process and self.process.returncode is None:
479 cmd = ["ps", "-o", "pid", "--ppid", str(self.process.pid), "--noheaders"]
481 subprocs_id = [int(pid.strip('\n')) for
482 pid in subprocess.check_output(cmd).decode().split(' ') if pid]
483 except FileNotFoundError:
484 self.error("Ps not found, will probably not be able to get rr "
485 "working properly after we kill the process")
486 except subprocess.CalledProcessError as e:
487 self.error("Couldn't get rr subprocess pid: %s" % (e))
489 utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT, subprocs_id)
491 def run_external_checks(self):
494 def thread_wrapper(self):
496 # Restore the SIGINT handler for the child process (gdb) to ensure
498 signal.signal(signal.SIGINT, signal.SIG_DFL)
500 if self.options.gdb and os.name != "nt":
501 preexec_fn = enable_sigint
505 self.process = subprocess.Popen(self.command,
510 preexec_fn=preexec_fn)
512 if self.result is not Result.TIMEOUT:
513 if self.process.returncode == 0:
514 self.run_external_checks()
517 def get_valgrind_suppression_file(self, subdir, name):
518 p = get_data_file(subdir, name)
522 self.error("Could not find any %s file" % name)
524 def get_valgrind_suppressions(self):
525 return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
527 def use_gdb(self, command):
528 if self.hard_timeout is not None:
529 self.hard_timeout *= GDB_TIMEOUT_FACTOR
530 self.timeout *= GDB_TIMEOUT_FACTOR
532 if not self.options.gdb_non_stop:
533 self.timeout = sys.maxsize
534 self.hard_timeout = sys.maxsize
537 if self.options.gdb_non_stop:
538 args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
539 args += ["--args"] + command
542 def use_rr(self, command, subenv):
543 command = ["rr", 'record', '-h'] + command
545 self.timeout *= RR_TIMEOUT_FACTOR
546 self.rr_logdir = os.path.join(self.options.logsdir, self.classname.replace(".", os.sep), 'rr-logs')
547 subenv['_RR_TRACE_DIR'] = self.rr_logdir
549 shutil.rmtree(self.rr_logdir, ignore_errors=False, onerror=None)
550 except FileNotFoundError:
552 self.add_env_variable('_RR_TRACE_DIR', self.rr_logdir)
556 def use_valgrind(self, command, subenv):
557 vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
558 self.extra_logfiles.add(vglogsfile)
562 for o, v in [('trace-children', 'yes'),
563 ('tool', 'memcheck'),
564 ('leak-check', 'full'),
565 ('leak-resolution', 'high'),
566 # TODO: errors-for-leak-kinds should be set to all instead of definite
567 # and all false positives should be added to suppression
569 ('errors-for-leak-kinds', 'definite,indirect'),
570 ('show-leak-kinds', 'definite,indirect'),
571 ('show-possibly-lost', 'no'),
572 ('num-callers', '20'),
573 ('error-exitcode', str(VALGRIND_ERROR_CODE)),
574 ('gen-suppressions', 'all')]:
575 vg_args.append("--%s=%s" % (o, v))
577 if not self.options.redirect_logs:
578 vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
579 self.extra_logfiles.add(vglogsfile)
580 vg_args.append("--%s=%s" % ('log-file', vglogsfile))
582 for supp in self.get_valgrind_suppressions():
583 vg_args.append("--suppressions=%s" % supp)
585 command = ["valgrind"] + vg_args + command
587 # Tune GLib's memory allocator to be more valgrind friendly
588 subenv['G_DEBUG'] = 'gc-friendly'
589 subenv['G_SLICE'] = 'always-malloc'
591 if self.hard_timeout is not None:
592 self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
593 self.timeout *= VALGRIND_TIMEOUT_FACTOR
595 # Enable 'valgrind.config'
596 self.add_validate_config(get_data_file(
597 'data', 'valgrind.config'), subenv)
598 if subenv == self.proc_env:
599 self.add_env_variable('G_DEBUG', 'gc-friendly')
600 self.add_env_variable('G_SLICE', 'always-malloc')
601 self.add_env_variable('GST_VALIDATE_CONFIG',
602 self.proc_env['GST_VALIDATE_CONFIG'])
606 def add_validate_config(self, config, subenv=None):
608 subenv = self.extra_env_variables
610 cconf = subenv.get('GST_VALIDATE_CONFIG', "")
611 paths = [c for c in cconf.split(os.pathsep) if c] + [config]
612 subenv['GST_VALIDATE_CONFIG'] = os.pathsep.join(paths)
614 def launch_server(self):
617 def get_logfile_repr(self):
618 if not self.options.redirect_logs:
625 log = CI_ARTIFACTS_URL + os.path.relpath(log, self.options.logsdir)
627 return "\n Log: %s" % (log)
631 def get_command_repr(self):
632 message = "%s %s" % (self._env_variable, ' '.join(
633 shlex.quote(arg) for arg in self.command))
634 if self.server_command:
635 message = "%s & %s" % (self.server_command, message)
639 def test_start(self, queue):
642 self.server_command = self.launch_server()
644 self.command = [self.application]
645 self._starting_time = time.time()
646 self.build_arguments()
647 self.proc_env = self.get_subproc_env()
649 for var, value in list(self.extra_env_variables.items()):
650 value = self.proc_env.get(var, '') + os.pathsep + value
651 self.proc_env[var] = value.strip(os.pathsep)
652 self.add_env_variable(var, self.proc_env[var])
655 self.command = self.use_gdb(self.command)
657 self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
658 # Make the gst-validate executable ignore SIGINT while gdb is
660 signal.signal(signal.SIGINT, signal.SIG_IGN)
662 if self.options.valgrind:
663 self.command = self.use_valgrind(self.command, self.proc_env)
666 self.command = self.use_rr(self.command, self.proc_env)
668 if not self.options.redirect_logs:
669 self.out.write("# `%s`\n\n"
670 "## Command\n\n``` bash\n%s\n```\n\n" % (
671 self.classname, self.get_command_repr()))
672 self.out.write("## %s output\n\n``` \n\n" % os.path.basename(self.application))
675 message = "Launching: %s%s\n" \
676 " Command: %s\n" % (Colors.ENDC, self.classname,
677 self.get_command_repr())
678 printc(message, Colors.OKBLUE)
680 self.thread = threading.Thread(target=self.thread_wrapper)
684 self.last_change_ts = time.time()
685 self.start_ts = time.time()
687 def _dump_log_file(self, logfile):
690 subprocess.check_call(['bat', '-H', '1', '--paging=never', logfile])
692 except (subprocess.CalledProcessError, FileNotFoundError):
695 with open(logfile, 'r') as fin:
696 for line in fin.readlines():
697 print('> ' + line, end='')
699 def _dump_log_files(self):
700 self._dump_log_file(self.logfile)
702 def copy_logfiles(self, extra_folder="flaky_tests"):
703 path = os.path.dirname(os.path.join(self.options.logsdir, extra_folder,
704 self.classname.replace(".", os.sep)))
706 self.logfile = shutil.copy(self.logfile, path)
708 for logfile in self.extra_logfiles:
709 extra_logs.append(shutil.copy(logfile, path))
710 self.extra_logfiles = extra_logs
712 def test_end(self, retry_on_failures=False):
713 self.kill_subprocess()
715 self.time_taken = time.time() - self._starting_time
718 signal.signal(signal.SIGINT, self.previous_sigint_handler)
720 self.finalize_logfiles()
721 if self.options.dump_on_failure and not retry_on_failures and not self.max_retries:
722 if self.result not in [Result.PASSED, Result.KNOWN_ERROR, Result.NOT_RUN]:
723 self._dump_log_files()
725 # Only keep around env variables we need later
727 for n in self.__env_variable:
728 clean_env[n] = self.proc_env.get(n, None)
729 self.proc_env = clean_env
731 # Don't keep around JSON report objects, they were processed
732 # in check_results already
738 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
742 class GstValidateListener(socketserver.BaseRequestHandler, Loggable):
744 def __init__(self, *args, **kwargs):
745 super().__init__(*args, **kwargs)
746 Loggable.__init__(self, "GstValidateListener")
749 """Implements BaseRequestHandler handle method"""
751 self.logCategory = "GstValidateListener"
753 raw_len = self.request.recv(4)
756 msglen = struct.unpack('>I', raw_len)[0]
759 while msglen != len(raw_msg):
760 raw_msg += self.request.recv(msglen - len(raw_msg))
764 msg = raw_msg.decode('utf-8', 'ignore')
765 except UnicodeDecodeError as e:
766 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
773 obj = json.loads(msg)
774 except json.decoder.JSONDecodeError as e:
775 self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
779 # First message must contain the uuid
780 uuid = obj.get("uuid", None)
783 # Find test from launcher
784 for t in self.server.launcher.tests:
785 if uuid == t.get_uuid():
789 self.server.launcher.error(
790 "Could not find test for UUID %s" % uuid)
793 obj_type = obj.get("type", '')
794 if obj_type == 'position':
795 test.set_position(obj['position'], obj['duration'],
797 elif obj_type == 'buffering':
798 test.set_position(obj['position'], 100)
799 elif obj_type == 'action':
800 test.add_action_execution(obj)
801 # Make sure that action is taken into account when checking if process
804 elif obj_type == 'action-done':
805 # Make sure that action end is taken into account when checking if process
808 if test.actions_infos:
809 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
810 elif obj_type == 'report':
812 elif obj_type == 'skip-test':
813 test.set_result(Result.SKIPPED)
816 class GstValidateTest(Test):
818 """ A class representing a particular test. """
819 HARD_TIMEOUT_FACTOR = 5
820 fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
821 needs_gst_inspect = set()
823 def __init__(self, application_name, classname,
824 options, reporter, duration=0,
825 timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
826 media_descriptor=None, extra_env_variables=None,
827 expected_issues=None, workdir=None, **kwargs):
829 extra_env_variables = extra_env_variables or {}
831 if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
833 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
835 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
839 # If we are running from source, use the -debug version of the
840 # application which is using rpath instead of libtool's wrappers. It's
841 # slightly faster to start and will not confuse valgrind.
842 debug = '%s-debug' % application_name
843 p = look_for_file_in_source_dir('tools', debug)
849 self.media_duration = -1
851 self.actions_infos = []
852 self.media_descriptor = media_descriptor
856 override_path = self.get_override_file(media_descriptor)
858 if extra_env_variables:
859 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
861 "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
863 extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
865 super().__init__(application_name,
870 hard_timeout=hard_timeout,
871 extra_env_variables=extra_env_variables,
872 expected_issues=expected_issues,
875 if media_descriptor and media_descriptor.get_media_filepath():
876 config_file = os.path.join(media_descriptor.get_media_filepath() + '.config')
877 if os.path.isfile(config_file):
878 self.add_validate_config(config_file, extra_env_variables)
880 if scenario is None or scenario.name.lower() == "none":
883 self.scenario = scenario
885 def needs_http_server(self):
886 if self.media_descriptor is None:
889 protocol = self.media_descriptor.get_protocol()
890 uri = self.media_descriptor.get_uri()
891 uri_requires_http_server = False
893 if 'http-server-port' in uri:
894 expanded_uri = uri % {
895 'http-server-port': self.options.http_server_port}
896 uri_requires_http_server = expanded_uri.find(
897 "127.0.0.1:%s" % self.options.http_server_port) != -1
898 if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] or uri_requires_http_server:
903 def kill_subprocess(self):
904 Test.kill_subprocess(self)
906 def add_report(self, report):
907 self.reports.append(report)
909 def set_position(self, position, duration, speed=None):
910 self.position = position
911 self.media_duration = duration
915 def add_action_execution(self, action_infos):
916 self.actions_infos.append(action_infos)
918 def get_override_file(self, media_descriptor):
920 if media_descriptor.get_path():
921 override_path = os.path.splitext(media_descriptor.get_path())[
922 0] + VALIDATE_OVERRIDE_EXTENSION
923 if os.path.exists(override_path):
928 def get_current_position(self):
931 def get_current_value(self):
934 def get_subproc_env(self):
935 subproc_env = os.environ.copy()
937 if self.options.validate_default_config:
938 self.add_validate_config(self.options.validate_default_config,
941 subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
942 subproc_env["GST_VALIDATE_LOGSDIR"] = self.options.logsdir
944 if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
945 gstlogsfile = os.path.splitext(self.logfile)[0] + '.gstdebug'
946 self.extra_logfiles.add(gstlogsfile)
947 subproc_env["GST_DEBUG_FILE"] = gstlogsfile
949 if self.options.no_color:
950 subproc_env["GST_DEBUG_NO_COLOR"] = '1'
952 # Ensure XInitThreads is called, see bgo#731525
953 subproc_env['GST_GL_XINITTHREADS'] = '1'
954 self.add_env_variable('GST_GL_XINITTHREADS', '1')
955 subproc_env['GST_XINITTHREADS'] = '1'
956 self.add_env_variable('GST_XINITTHREADS', '1')
958 if self.scenario is not None:
959 scenario = self.scenario.get_execution_name()
960 subproc_env["GST_VALIDATE_SCENARIO"] = scenario
961 self.add_env_variable("GST_VALIDATE_SCENARIO",
962 subproc_env["GST_VALIDATE_SCENARIO"])
965 del subproc_env["GST_VALIDATE_SCENARIO"]
969 if not subproc_env.get('GST_DEBUG_DUMP_DOT_DIR'):
970 dotfilesdir = os.path.join(self.options.logsdir,
971 self.classname.replace(".", os.sep) + '.pipelines_dot_files')
973 subproc_env['GST_DEBUG_DUMP_DOT_DIR'] = dotfilesdir
975 dotfilesurl = CI_ARTIFACTS_URL + os.path.relpath(dotfilesdir,
976 self.options.logsdir)
977 subproc_env['GST_VALIDATE_DEBUG_DUMP_DOT_URL'] = dotfilesurl
985 self.media_duration = -1
987 self.actions_infos = []
989 def build_arguments(self):
990 super(GstValidateTest, self).build_arguments()
991 if "GST_VALIDATE" in os.environ:
992 self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
994 if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
995 self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
996 os.environ["GST_VALIDATE_SCENARIOS_PATH"])
998 self.add_env_variable("GST_VALIDATE_CONFIG")
999 self.add_env_variable("GST_VALIDATE_OVERRIDE")
1001 def get_extra_log_content(self, extralog):
1002 value = Test.get_extra_log_content(self, extralog)
1006 def report_matches_expected_issues(self, report, expected_issue):
1007 for key in ['bug', 'bugs', 'sometimes']:
1008 if key in expected_issue:
1009 del expected_issue[key]
1010 for key, value in list(report.items()):
1011 if key in expected_issue:
1012 if not re.findall(expected_issue[key], str(value)):
1014 expected_issue.pop(key)
1016 if "can-happen-several-times" in expected_issue:
1017 expected_issue.pop("can-happen-several-times")
1018 return not bool(expected_issue)
1020 def check_reported_issues(self, expected_issues):
1022 expected_retcode = [0]
1023 for report in self.reports:
1025 for expected_issue in expected_issues:
1026 if self.report_matches_expected_issues(report,
1027 expected_issue.copy()):
1028 found = expected_issue
1031 if found is not None:
1032 if not found.get('can-happen-several-times', False):
1033 expected_issues.remove(found)
1034 if report['level'] == 'critical':
1035 if found.get('sometimes', True) and isinstance(expected_retcode, list):
1036 expected_retcode.append(18)
1038 expected_retcode = [18]
1039 elif report['level'] == 'critical':
1043 return None, expected_issues, expected_retcode
1045 return ret, expected_issues, expected_retcode
1047 def check_expected_issue(self, expected_issue):
1050 expected_symbols = expected_issue.get('stacktrace_symbols')
1051 if expected_symbols:
1052 trace_gatherer = BackTraceGenerator.get_default()
1053 stack_trace = trace_gatherer.get_trace(self)
1056 if not isinstance(expected_symbols, list):
1057 expected_symbols = [expected_symbols]
1059 not_found_symbols = [s for s in expected_symbols
1060 if s not in stack_trace]
1061 if not_found_symbols:
1062 msg = " Expected symbols '%s' not found in stack trace " % (
1066 msg += " No stack trace available, could not verify symbols "
1068 _, not_found_expected_issues, _ = self.check_reported_issues(expected_issue.get('issues', []))
1069 if not_found_expected_issues:
1070 mandatory_failures = [f for f in not_found_expected_issues
1071 if not f.get('sometimes', True)]
1072 if mandatory_failures:
1073 msg = " (Expected issues not found: %s) " % mandatory_failures
1078 def check_expected_timeout(self, expected_timeout):
1079 msg = "Expected timeout happened. "
1080 result = Result.PASSED
1081 message = expected_timeout.get('message')
1083 if not re.findall(message, self.message):
1084 result = Result.FAILED
1085 msg = "Expected timeout message: %s got %s " % (
1086 message, self.message)
1088 stack_msg, stack_res = self.check_expected_issue(expected_timeout)
1090 result = Result.TIMEOUT
1095 def check_results(self):
1096 if self.result in [Result.FAILED, Result.PASSED, Result.SKIPPED]:
1099 self.debug("%s returncode: %s", self, self.process.returncode)
1100 expected_issues = copy.deepcopy(self.expected_issues)
1102 # signal.SIGPPIPE is 13 but it sometimes isn't present in python for some reason.
1103 expected_issues.append({"returncode": -13, "sometimes": True})
1104 self.criticals, not_found_expected_issues, expected_returncode = self.check_reported_issues(expected_issues)
1105 expected_timeout = None
1106 expected_signal = None
1107 for i, f in enumerate(not_found_expected_issues):
1108 returncode = f.get('returncode', [])
1109 if not isinstance(returncode, list):
1110 returncode = [returncode]
1112 if f.get('signame'):
1113 signames = f['signame']
1114 if not isinstance(signames, list):
1115 signames = [signames]
1117 returncode = [EXITING_SIGNALS[signame] for signame in signames]
1120 if 'sometimes' in f:
1121 returncode.append(0)
1122 expected_returncode = returncode
1124 elif f.get("timeout"):
1125 expected_timeout = f
1127 not_found_expected_issues = [f for f in not_found_expected_issues
1128 if not f.get('returncode') and not f.get('signame')]
1131 result = Result.PASSED
1132 if self.result == Result.TIMEOUT:
1133 with open(self.logfile) as f:
1134 signal_fault_info = self.fault_sig_regex.findall(f.read())
1135 if signal_fault_info:
1136 result = Result.FAILED
1137 msg = signal_fault_info[0]
1138 elif expected_timeout:
1139 not_found_expected_issues.remove(expected_timeout)
1140 result, msg = self.check_expected_timeout(expected_timeout)
1143 elif self.process.returncode in EXITING_SIGNALS:
1144 msg = "Application exited with signal %s" % (
1145 EXITING_SIGNALS[self.process.returncode])
1146 if self.process.returncode not in expected_returncode:
1147 result = Result.FAILED
1150 stack_msg, stack_res = self.check_expected_issue(
1154 result = Result.FAILED
1155 self.add_stack_trace_to_logfile()
1156 elif self.process.returncode == VALGRIND_ERROR_CODE:
1157 msg = "Valgrind reported errors "
1158 result = Result.FAILED
1159 elif self.process.returncode not in expected_returncode:
1160 msg = "Application returned %s " % self.process.returncode
1161 if expected_returncode != [0]:
1162 msg += "(expected %s) " % expected_returncode
1163 result = Result.FAILED
1166 msg += "(critical errors: [%s]) " % ', '.join(set([c['summary']
1167 for c in self.criticals]))
1168 result = Result.FAILED
1170 if not_found_expected_issues:
1171 mandatory_failures = [f for f in not_found_expected_issues
1172 if not f.get('sometimes', True)]
1174 if mandatory_failures:
1175 msg += " (Expected errors not found: %s) " % mandatory_failures
1176 result = Result.FAILED
1177 elif self.expected_issues:
1178 msg += ' %s(Expected errors occurred: %s)%s' % (Colors.OKBLUE,
1179 self.expected_issues,
1181 result = Result.KNOWN_ERROR
1183 if result == Result.PASSED:
1184 for report in self.reports:
1185 if report["level"] == "expected":
1186 result = Result.KNOWN_ERROR
1189 self.set_result(result, msg.strip())
1191 def _generate_expected_issues(self):
1193 self.criticals = self.criticals or []
1194 if self.result == Result.TIMEOUT:
1200 for report in self.criticals:
1201 res += "\n%s{" % (" " * 12)
1203 for key, value in report.items():
1208 res += '\n%s%s"%s": "%s",' % (
1209 " " * 16, "# " if key == "details" else "",
1210 key, value.replace('\n', '\\n'))
1212 res += "\n%s}," % (" " * 12)
1216 def get_valgrind_suppressions(self):
1217 result = super(GstValidateTest, self).get_valgrind_suppressions()
1218 result.extend(utils.get_gst_build_valgrind_suppressions())
1222 class VariableFramerateMode(Enum):
1228 class GstValidateEncodingTestInterface(object):
1229 DURATION_TOLERANCE = GST_SECOND / 4
1231 def __init__(self, combination, media_descriptor, duration_tolerance=None):
1232 super(GstValidateEncodingTestInterface, self).__init__()
1234 self.media_descriptor = media_descriptor
1235 self.combination = combination
1238 self._duration_tolerance = duration_tolerance
1239 if duration_tolerance is None:
1240 self._duration_tolerance = self.DURATION_TOLERANCE
1242 def get_current_size(self):
1244 size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
1248 self.debug("Size: %s" % size)
1251 def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1252 audio_restriction=None, audio_presence=0,
1254 variable_framerate=VariableFramerateMode.DISABLED):
1261 if video_restriction is not None:
1262 ret = ret + video_restriction + '->'
1266 props += 'presence=%s|' % str(video_presence)
1267 if variable_framerate == VariableFramerateMode.AUTO:
1268 if video_restriction and "framerate" in video_restriction:
1269 variable_framerate = VariableFramerateMode.DISABLED
1271 variable_framerate = VariableFramerateMode.ENABLED
1272 if variable_framerate == VariableFramerateMode.ENABLED:
1273 props += 'variable-framerate=true|'
1275 ret = ret + '|' + props[:-1]
1278 if audio_restriction is not None:
1279 ret = ret + audio_restriction + '->'
1282 ret = ret + '|' + str(audio_presence)
1284 return ret.replace("::", ":")
1286 def get_profile(self, video_restriction=None, audio_restriction=None,
1287 variable_framerate=VariableFramerateMode.DISABLED):
1288 vcaps = self.combination.get_video_caps()
1289 acaps = self.combination.get_audio_caps()
1290 if video_restriction is None:
1291 video_restriction = self.combination.video_restriction
1292 if audio_restriction is None:
1293 audio_restriction = self.combination.audio_restriction
1294 if self.media_descriptor is not None:
1295 if self.combination.video == "theora":
1296 # Theoraenc doesn't support variable framerate, make sure to avoid them
1297 framerate = self.media_descriptor.get_framerate()
1298 if framerate == Fraction(0, 1):
1299 framerate = Fraction(30, 1)
1300 restriction = utils.GstCaps.new_from_str(video_restriction or "video/x-raw")
1301 for struct, _ in restriction:
1302 if struct.get("framerate") is None:
1303 struct.set("framerate", struct.FRACTION_TYPE, framerate)
1304 video_restriction = str(restriction)
1306 video_presence = self.media_descriptor.get_num_tracks("video")
1307 if video_presence == 0:
1310 audio_presence = self.media_descriptor.get_num_tracks("audio")
1311 if audio_presence == 0:
1314 return self._get_profile_full(self.combination.get_muxer_caps(),
1316 audio_presence=audio_presence,
1317 video_presence=video_presence,
1318 video_restriction=video_restriction,
1319 audio_restriction=audio_restriction,
1320 variable_framerate=variable_framerate)
1322 def _clean_caps(self, caps):
1324 Returns a list of key=value or structure name, without "(types)" or ";" or ","
1326 return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1328 # pylint: disable=E1101
1329 def _has_caps_type_variant(self, c, ccaps):
1331 Handle situations where we can have application/ogg or video/ogg or
1335 media_type = re.findall("application/|video/|audio/", c)
1337 media_type = media_type[0].replace('/', '')
1338 possible_mtypes = ["application", "video", "audio"]
1339 possible_mtypes.remove(media_type)
1340 for tmptype in possible_mtypes:
1341 possible_c_variant = c.replace(media_type, tmptype)
1342 if possible_c_variant in ccaps:
1344 "Found %s in %s, good enough!", possible_c_variant, ccaps)
1349 # pylint: disable=E1101
1350 def run_iqa_test(self, reference_file_uri):
1352 Runs IQA test if @reference_file_path exists
1353 @test: The test to run tests on
1355 if not GstValidateBaseTestManager.has_feature('iqa'):
1356 self.debug('Iqa element not present, not running extra test.')
1360 uridecodebin uri=%s !
1361 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1362 uridecodebin uri=%s ! iqa.
1363 """ % (reference_file_uri, self.dest_file)
1364 pipeline_desc = pipeline_desc.replace("\n", "")
1366 command = [GstValidateBaseTestManager.COMMAND] + \
1367 shlex.split(pipeline_desc)
1368 msg = "## Running IQA tests on results of: " \
1369 + "%s\n### Command: \n```\n%s\n```\n" % (
1370 self.classname, ' '.join(command))
1371 if not self.options.redirect_logs:
1375 printc(msg, Colors.OKBLUE)
1377 self.process = subprocess.Popen(command,
1384 def check_encoded_file(self):
1385 result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1387 if result_descriptor is None:
1388 return (Result.FAILED, "Could not discover encoded file %s"
1391 duration = result_descriptor.get_duration()
1392 orig_duration = self.media_descriptor.get_duration()
1393 tolerance = self._duration_tolerance
1395 if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1396 os.remove(result_descriptor.get_path())
1400 'issue-id': 'transcoded-file-wrong-duration',
1401 'summary': 'The duration of a transcoded file doesn\'t match the duration of the original file',
1402 'level': 'critical',
1403 'detected-on': 'pipeline',
1404 'details': "Duration of encoded file is " " wrong (%s instead of %s)" % (
1405 utils.TIME_ARGS(duration), utils.TIME_ARGS(orig_duration))
1409 all_tracks_caps = result_descriptor.get_tracks_caps()
1410 container_caps = result_descriptor.get_caps()
1412 all_tracks_caps.insert(0, ("container", container_caps))
1414 for track_type, caps in all_tracks_caps:
1415 ccaps = self._clean_caps(caps)
1416 wanted_caps = self.combination.get_caps(track_type)
1417 cwanted_caps = self._clean_caps(wanted_caps)
1419 if wanted_caps is None:
1420 os.remove(result_descriptor.get_path())
1424 'issue-id': 'transcoded-file-wrong-stream-type',
1425 'summary': 'Expected stream types during transcoding do not match expectations',
1426 'level': 'critical',
1427 'detected-on': 'pipeline',
1428 'details': "Found a track of type %s in the encoded files"
1429 " but none where wanted in the encoded profile: %s" % (
1430 track_type, self.combination)
1435 for c in cwanted_caps:
1437 if not self._has_caps_type_variant(c, ccaps):
1438 os.remove(result_descriptor.get_path())
1442 'issue-id': 'transcoded-file-wrong-caps',
1443 'summary': 'Expected stream caps during transcoding do not match expectations',
1444 'level': 'critical',
1445 'detected-on': 'pipeline',
1446 'details': "Field: %s (from %s) not in caps of the outputted file %s" % (
1447 wanted_caps, c, ccaps)
1452 os.remove(result_descriptor.get_path())
1455 class TestsManager(Loggable):
1457 """ A class responsible for managing tests. """
1460 loading_testsuite = None
1464 Loggable.__init__(self)
1467 self.unwanted_tests = []
1470 self.reporter = None
1471 self.wanted_tests_patterns = []
1472 self.blacklisted_tests_patterns = []
1473 self._generators = []
1474 self.check_testslist = True
1475 self.all_tests = None
1476 self.expected_issues = {}
1477 self.blacklisted_tests = []
1482 def list_tests(self):
1483 return sorted(list(self.tests), key=lambda x: x.classname)
1485 def find_tests(self, classname):
1486 regex = re.compile(classname)
1487 return [test for test in self.list_tests() if regex.findall(test.classname)]
1489 def add_expected_issues(self, expected_issues):
1490 for bugid, failure_def in list(expected_issues.items()):
1492 for test_name_regex in failure_def['tests']:
1493 regex = re.compile(test_name_regex)
1494 tests_regexes.append(regex)
1495 for test in self.tests:
1496 if regex.findall(test.classname):
1497 max_retries = failure_def.get('allow_flakiness', failure_def.get('max_retries'))
1499 test.max_retries = int(max_retries)
1500 self.debug(f"{test.classname} allow {test.max_retries}")
1502 for issue in failure_def['issues']:
1503 issue['bug'] = bugid
1504 test.expected_issues.extend(failure_def['issues'])
1505 self.debug("%s added expected issues from %s" % (
1506 test.classname, bugid))
1507 failure_def['tests'] = tests_regexes
1509 self.expected_issues.update(expected_issues)
1511 def add_test(self, test):
1512 if test.generator is None:
1513 test.classname = self.loading_testsuite + '.' + test.classname
1515 for bugid, failure_def in list(self.expected_issues.items()):
1516 failure_def['bug'] = bugid
1517 for regex in failure_def['tests']:
1518 if regex.findall(test.classname):
1519 max_retries = failure_def.get('allow_flakiness', failure_def.get('max_retries'))
1521 test.max_retries = int(max_retries)
1522 self.debug(f"{test.classname} allow {test.max_retries} retries.")
1524 for issue in failure_def['issues']:
1525 issue['bug'] = bugid
1526 test.expected_issues.extend(failure_def['issues'])
1527 self.debug("%s added expected issues from %s" % (
1528 test.classname, bugid))
1530 if self._is_test_wanted(test):
1531 if test not in self.tests:
1532 self.tests.append(test)
1534 if test not in self.tests:
1535 self.unwanted_tests.append(test)
1537 def get_tests(self):
1540 def populate_testsuite(self):
1543 def add_generators(self, generators):
1545 @generators: A list of, or one single #TestsGenerator to be used to generate tests
1547 if not isinstance(generators, list):
1548 generators = [generators]
1549 self._generators.extend(generators)
1550 for generator in generators:
1551 generator.testsuite = self.loading_testsuite
1553 self._generators = list(set(self._generators))
1555 def get_generators(self):
1556 return self._generators
1558 def _add_blacklist(self, blacklisted_tests):
1559 if not isinstance(blacklisted_tests, list):
1560 blacklisted_tests = [blacklisted_tests]
1562 for patterns in blacklisted_tests:
1563 for pattern in patterns.split(","):
1564 self.blacklisted_tests_patterns.append(re.compile(pattern))
1566 def set_default_blacklist(self, default_blacklist):
1567 for test_regex, reason in default_blacklist:
1568 if not test_regex.startswith(self.loading_testsuite + '.'):
1569 test_regex = self.loading_testsuite + '.' + test_regex
1570 self.blacklisted_tests.append((test_regex, reason))
1571 self._add_blacklist(test_regex)
1573 def add_options(self, parser):
1574 """ Add more arguments. """
1577 def set_settings(self, options, args, reporter):
1578 """ Set properties after options parsing. """
1579 self.options = options
1581 self.reporter = reporter
1583 self.populate_testsuite()
1585 if self.options.valgrind:
1586 self.print_valgrind_bugs()
1588 if options.wanted_tests:
1589 for patterns in options.wanted_tests:
1590 for pattern in patterns.split(","):
1591 self.wanted_tests_patterns.append(re.compile(pattern))
1593 if options.blacklisted_tests:
1594 for patterns in options.blacklisted_tests:
1595 self._add_blacklist(patterns)
1597 def check_blacklists(self):
1598 if self.options.check_bugs_status:
1599 if not check_bugs_resolution(self.blacklisted_tests):
1604 def log_blacklists(self):
1605 if self.blacklisted_tests:
1606 self.info("Currently 'hardcoded' %s blacklisted tests:" %
1609 for name, bug in self.blacklisted_tests:
1610 if not self.options.check_bugs_status:
1611 self.info(" + %s --> bug: %s" % (name, bug))
1613 def check_expected_issues(self):
1614 if not self.expected_issues or not self.options.check_bugs_status:
1617 bugs_definitions = defaultdict(list)
1618 for bug, failure_def in list(self.expected_issues.items()):
1619 tests_names = '|'.join(
1620 [regex.pattern for regex in failure_def['tests']])
1621 bugs_definitions[tests_names].extend([bug])
1623 return check_bugs_resolution(bugs_definitions.items())
1625 def _check_blacklisted(self, test):
1626 for pattern in self.blacklisted_tests_patterns:
1627 if pattern.findall(test.classname):
1628 self.info("%s is blacklisted by %s", test.classname, pattern)
1633 def _check_whitelisted(self, test):
1634 for pattern in self.wanted_tests_patterns:
1635 if pattern.findall(test.classname):
1636 if self._check_blacklisted(test):
1637 # If explicitly white listed that specific test
1638 # bypass the blacklisting
1639 if pattern.pattern != test.classname:
1644 def _check_duration(self, test):
1645 if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1646 self.info("Not activating %s as its duration (%d) is superior"
1647 " than the long limit (%d)" % (test, test.duration,
1648 int(self.options.long_limit)))
1653 def _is_test_wanted(self, test):
1654 if self._check_whitelisted(test):
1655 if not self._check_duration(test):
1659 if self._check_blacklisted(test):
1662 if not self._check_duration(test):
1665 if not self.wanted_tests_patterns:
1670 def needs_http_server(self):
1673 def print_valgrind_bugs(self):
1677 class TestsGenerator(Loggable):
1679 def __init__(self, name, test_manager, tests=[]):
1680 Loggable.__init__(self)
1682 self.test_manager = test_manager
1683 self.testsuite = None
1686 self._tests[test.classname] = test
1688 def generate_tests(self, *kwargs):
1690 Method that generates tests
1692 return list(self._tests.values())
1694 def add_test(self, test):
1695 test.generator = self
1696 test.classname = self.testsuite + '.' + test.classname
1697 self._tests[test.classname] = test
1700 class GstValidateTestsGenerator(TestsGenerator):
1702 def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1705 def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1706 self.populate_tests(uri_minfo_special_scenarios, scenarios)
1707 return super(GstValidateTestsGenerator, self).generate_tests()
1710 class _TestsLauncher(Loggable):
1714 Loggable.__init__(self)
1719 self.reporter = None
1720 self._list_testers()
1721 self.all_tests = None
1722 self.wanted_tests_patterns = []
1724 self.queue = queue.Queue()
1726 self.total_num_tests = 0
1727 self.current_progress = -1
1730 self.vfb_server = None
1732 def _list_app_dirs(self):
1734 env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1735 if env_dirs is not None:
1736 for dir_ in env_dirs.split(os.pathsep):
1737 app_dirs.append(dir_)
1741 def _exec_app(self, app_dir, env):
1743 files = os.listdir(app_dir)
1744 except OSError as e:
1745 self.debug("Could not list %s: %s" % (app_dir, e))
1748 if f.endswith(".py"):
1749 exec(compile(open(os.path.join(app_dir, f)).read(),
1750 os.path.join(app_dir, f), 'exec'), env)
1752 def _exec_apps(self, env):
1753 app_dirs = self._list_app_dirs()
1754 for app_dir in app_dirs:
1755 self._exec_app(app_dir, env)
1757 def _list_testers(self):
1758 env = globals().copy()
1759 self._exec_apps(env)
1761 testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1762 for tester in testers:
1763 if tester.init() is True:
1764 self.testers.append(tester)
1766 self.warning("Can not init tester: %s -- PATH is %s"
1767 % (tester.name, os.environ["PATH"]))
1769 def add_options(self, parser):
1770 for tester in self.testers:
1771 tester.add_options(parser)
1773 def _load_testsuite(self, testsuites):
1775 for testsuite in testsuites:
1777 sys.path.insert(0, os.path.dirname(testsuite))
1778 spec = importlib.util.spec_from_file_location(os.path.basename(testsuite).replace(".py", ""), testsuite)
1779 module = importlib.util.module_from_spec(spec)
1780 spec.loader.exec_module(module)
1781 return (module, None)
1782 except Exception as e:
1783 exceptions.append("Could not load %s: %s" % (testsuite, e))
1786 sys.path.remove(os.path.dirname(testsuite))
1788 return (None, exceptions)
1790 def _load_testsuites(self):
1792 for testsuite in self.options.testsuites:
1793 if testsuite.endswith('.py') and os.path.exists(testsuite):
1794 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1795 loaded_module = self._load_testsuite([testsuite])
1797 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1798 for d in self.options.testsuites_dirs]
1799 loaded_module = self._load_testsuite(possible_testsuites_paths)
1801 module = loaded_module[0]
1802 if not loaded_module[0]:
1803 if "." in testsuite:
1804 self.options.testsuites.append(testsuite.split('.')[0])
1805 self.info("%s looks like a test name, trying that" %
1807 self.options.wanted_tests.append(testsuite)
1809 if testsuite in testsuites:
1810 self.info('Testuite %s was loaded previously', testsuite)
1812 printc("Could not load testsuite: %s, reasons: %s" % (
1813 testsuite, loaded_module[1]), Colors.FAIL)
1816 if module.__name__ in testsuites:
1817 self.info("Trying to load testsuite '%s' a second time?", module.__name__)
1820 testsuites[module.__name__] = module
1821 if not hasattr(module, "TEST_MANAGER"):
1822 module.TEST_MANAGER = [tester.name for tester in self.testers]
1823 elif not isinstance(module.TEST_MANAGER, list):
1824 module.TEST_MANAGER = [module.TEST_MANAGER]
1826 self.options.testsuites = list(testsuites.values())
1828 def _setup_testsuites(self):
1829 for testsuite in self.options.testsuites:
1831 wanted_test_manager = None
1832 # TEST_MANAGER has been set in _load_testsuites()
1833 assert hasattr(testsuite, "TEST_MANAGER")
1834 wanted_test_manager = testsuite.TEST_MANAGER
1835 if not isinstance(wanted_test_manager, list):
1836 wanted_test_manager = [wanted_test_manager]
1838 for tester in self.testers:
1839 if wanted_test_manager is not None and \
1840 tester.name not in wanted_test_manager:
1843 prev_testsuite_name = TestsManager.loading_testsuite
1844 if self.options.user_paths:
1845 TestsManager.loading_testsuite = tester.name
1846 tester.register_defaults()
1849 TestsManager.loading_testsuite = testsuite.__name__
1850 if testsuite.setup_tests(tester, self.options):
1852 if prev_testsuite_name:
1853 TestsManager.loading_testsuite = prev_testsuite_name
1856 printc("Could not load testsuite: %s"
1857 " maybe because of missing TestManager"
1858 % (testsuite), Colors.FAIL)
1861 def _load_config(self, options):
1862 printc("Loading config files is DEPRECATED"
1863 " you should use the new testsuite format now",)
1865 for tester in self.testers:
1866 tester.options = options
1867 globals()[tester.name] = tester
1868 globals()["options"] = options
1869 c__file__ = __file__
1870 globals()["__file__"] = self.options.config
1871 exec(compile(open(self.options.config).read(),
1872 self.options.config, 'exec'), globals())
1873 globals()["__file__"] = c__file__
1875 def set_settings(self, options, args):
1876 if options.xunit_file:
1877 self.reporter = reporters.XunitReporter(options)
1879 self.reporter = reporters.Reporter(options)
1881 self.options = options
1882 wanted_testers = None
1883 for tester in self.testers:
1884 if tester.name in args:
1885 wanted_testers = tester.name
1888 testers = self.testers
1890 for tester in testers:
1891 if tester.name in args:
1892 self.testers.append(tester)
1893 args.remove(tester.name)
1896 self._load_config(options)
1898 self._load_testsuites()
1899 if not self.options.testsuites:
1900 printc("Not testsuite loaded!", Colors.FAIL)
1903 for tester in self.testers:
1904 tester.set_settings(options, args, self.reporter)
1906 if not options.config and options.testsuites:
1907 if self._setup_testsuites() is False:
1910 if self.options.check_bugs_status:
1911 printc("-> Checking bugs resolution... ", end='')
1913 for tester in self.testers:
1914 if not tester.check_blacklists():
1917 tester.log_blacklists()
1919 if not tester.check_expected_issues():
1922 if self.options.check_bugs_status:
1923 printc("OK", Colors.OKGREEN)
1925 if self.needs_http_server() or options.httponly is True:
1926 self.httpsrv = HTTPServer(options)
1927 self.httpsrv.start()
1929 if options.no_display:
1930 self.vfb_server = get_virual_frame_buffer_server(options)
1931 res = self.vfb_server.start()
1933 printc("Could not start virtual frame server: %s" % res[1],
1936 os.environ["DISPLAY"] = self.vfb_server.display_id
1940 def _check_tester_has_other_testsuite(self, testsuite, tester):
1941 if tester.name != testsuite.TEST_MANAGER[0]:
1944 for t in self.options.testsuites:
1946 for other_testmanager in t.TEST_MANAGER:
1947 if other_testmanager == tester.name:
1952 def _check_defined_tests(self, tester, tests):
1953 if self.options.blacklisted_tests or self.options.wanted_tests:
1956 tests_names = [test.classname for test in tests]
1957 testlist_changed = False
1958 for testsuite in self.options.testsuites:
1959 if not self._check_tester_has_other_testsuite(testsuite, tester) \
1960 and tester.check_testslist:
1962 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1965 know_tests = testlist_file.read().split("\n")
1966 testlist_file.close()
1968 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1974 for test in know_tests:
1975 if test and test.strip('~') not in tests_names:
1976 if not test.startswith('~'):
1977 testlist_changed = True
1978 printc("Test %s Not in testsuite %s anymore"
1979 % (test, testsuite.__file__), Colors.FAIL)
1981 optional_out.append((test, None))
1983 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1984 key=lambda x: x[0].strip('~'))
1986 for tname, test in tests_names:
1987 if test and test.optional:
1989 testlist_file.write("%s\n" % (tname))
1990 if tname and tname not in know_tests:
1991 printc("Test %s is NEW in testsuite %s"
1992 % (tname, testsuite.__file__),
1993 Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1994 testlist_changed = True
1996 testlist_file.close()
1999 return testlist_changed
2001 def _split_tests(self, num_groups):
2002 groups = [[] for x in range(num_groups)]
2003 group = cycle(groups)
2004 for test in self.tests:
2005 next(group).append(test)
2008 def list_tests(self):
2009 for tester in self.testers:
2010 if not self._tester_needed(tester):
2013 tests = tester.list_tests()
2014 if self._check_defined_tests(tester, tests) and \
2015 self.options.fail_on_testlist_change:
2016 raise RuntimeError("Unexpected new test in testsuite.")
2018 self.tests.extend(tests)
2019 self.tests.sort(key=lambda test: test.classname)
2021 if self.options.num_parts < 1:
2022 raise RuntimeError("Tests must be split in positive number of parts.")
2023 if self.options.num_parts > len(self.tests):
2024 raise RuntimeError("Cannot have more parts then there exist tests.")
2025 if self.options.part_index < 1 or self.options.part_index > self.options.num_parts:
2026 raise RuntimeError("Part index is out of range")
2028 self.tests = self._split_tests(self.options.num_parts)[self.options.part_index - 1]
2031 def _tester_needed(self, tester):
2032 for testsuite in self.options.testsuites:
2033 if tester.name in testsuite.TEST_MANAGER:
2037 def server_wrapper(self, ready):
2038 self.server = GstValidateTCPServer(
2039 ('localhost', 0), GstValidateListener)
2040 self.server.socket.settimeout(None)
2041 self.server.launcher = self
2042 self.serverport = self.server.socket.getsockname()[1]
2043 self.info("%s server port: %s" % (self, self.serverport))
2046 self.server.serve_forever(poll_interval=0.05)
2048 def _start_server(self):
2049 self.info("Starting TCP Server")
2050 ready = threading.Event()
2051 self.server_thread = threading.Thread(target=self.server_wrapper,
2052 kwargs={'ready': ready})
2053 self.server_thread.start()
2055 os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
2057 def _stop_server(self):
2059 self.server.shutdown()
2060 self.server_thread.join()
2061 self.server.server_close()
2064 def test_wait(self):
2066 # Check process every second for timeout
2068 self.queue.get(timeout=1)
2072 for test in self.jobs:
2073 if test.process_update():
2074 self.jobs.remove(test)
2077 def tests_wait(self):
2079 test = self.test_wait()
2080 test.check_results()
2081 except KeyboardInterrupt:
2082 for test in self.jobs:
2083 test.kill_subprocess()
2088 def start_new_job(self, tests_left):
2090 test = tests_left.pop(0)
2094 test.test_start(self.queue)
2096 self.jobs.append(test)
2100 def print_result(self, current_test_num, test, total_num_tests, retry_on_failures=False):
2101 if test.result not in [Result.PASSED, Result.KNOWN_ERROR] and (not retry_on_failures or test.max_retries):
2102 printc(str(test), color=utils.get_color_for_result(test.result))
2105 progress = int(length * current_test_num // total_num_tests)
2106 bar = 'â–ˆ' * progress + '-' * (length - progress)
2108 printc('\r|%s| [%s/%s]' % (bar, current_test_num, total_num_tests), end='\r')
2110 if progress > self.current_progress:
2111 self.current_progress = progress
2112 printc('|%s| [%s/%s]' % (bar, current_test_num, total_num_tests))
2114 def _run_tests(self, running_tests=None, all_alone=False, retry_on_failures=False, total_num_tests=None):
2115 if not self.all_tests:
2116 self.all_tests = self.list_tests()
2118 if not running_tests:
2119 running_tests = self.tests
2121 self.reporter.init_timer()
2124 for test in running_tests:
2125 if test.is_parallel and not all_alone:
2128 alone_tests.append(test)
2130 # use max to defend against the case where all tests are alone_tests
2131 max_num_jobs = max(min(self.options.num_jobs, len(tests)), 1)
2134 if self.options.forever and len(tests) < self.options.num_jobs and len(tests):
2135 max_num_jobs = self.options.num_jobs
2138 while (len(tests) + len(copied)) < max_num_jobs:
2139 copied.append(tests[i].copy(len(copied) + 1))
2145 self.tests += copied
2147 self.total_num_tests = len(self.all_tests)
2148 prefix = "=> Re-r" if total_num_tests else "R"
2149 total_num_tests = total_num_tests if total_num_tests else self.total_num_tests
2150 printc(f"\n{prefix}unning {total_num_tests} tests...", color=Colors.HEADER)
2151 # if order of test execution doesn't matter, shuffle
2152 # the order to optimize cpu usage
2153 if self.options.shuffle:
2154 random.shuffle(tests)
2155 random.shuffle(alone_tests)
2157 current_test_num = 1
2159 for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
2160 tests_left = list(tests)
2161 for i in range(num_jobs):
2162 if not self.start_new_job(tests_left):
2166 while jobs_running != 0:
2167 test = self.tests_wait()
2169 current_test_num += 1
2170 res = test.test_end(retry_on_failures=retry_on_failures)
2172 if res not in [Result.PASSED, Result.SKIPPED, Result.KNOWN_ERROR]:
2173 if self.options.forever or self.options.fatal_error:
2174 self.print_result(current_test_num - 1, test, retry_on_failures=retry_on_failures,
2175 total_num_tests=total_num_tests)
2176 self.reporter.after_test(test)
2179 if retry_on_failures or test.max_retries and not self.options.no_retry_on_failures:
2180 if not self.options.redirect_logs:
2181 test.copy_logfiles()
2182 to_retry.append(test)
2184 # Not adding to final report if flakiness is tolerated
2185 if test.max_retries:
2186 test.max_retries -= 1
2188 self.print_result(current_test_num - 1, test,
2189 retry_on_failures=retry_on_failures,
2190 total_num_tests=total_num_tests)
2192 self.reporter.after_test(test)
2193 if self.start_new_job(tests_left):
2197 printc("--> Rerunning the following tests to see if they are flaky:", Colors.WARNING)
2198 for test in to_retry:
2200 printc(f' * {test.classname}')
2202 self.current_progress = -1
2203 res = self._run_tests(
2206 retry_on_failures=False,
2207 total_num_tests=len(to_retry),
2214 def clean_tests(self, stop_server=False):
2215 for test in self.tests:
2220 def run_tests(self):
2223 self._start_server()
2224 if self.options.forever:
2227 self.current_progress = -1
2228 printc("-> Iteration %d" % r, end='\r')
2230 if not self._run_tests():
2234 msg = "-> Iteration %d... %sOK%s" % (r, Colors.OKGREEN, Colors.ENDC)
2235 printc(msg, end="\r")
2238 elif self.options.n_runs:
2240 for r in range(self.options.n_runs):
2241 self.current_progress = -1
2242 printc("-> Iteration %d" % r, end='\r')
2243 if not self._run_tests(retry_on_failures=self.options.retry_on_failures):
2245 printc("ERROR", Colors.FAIL, end="\r")
2247 printc("OK", Colors.OKGREEN, end="\r")
2252 return self._run_tests(retry_on_failures=self.options.retry_on_failures)
2254 if self.options.forever:
2255 printc("\n-> Ran %d times" % r)
2259 self.vfb_server.stop()
2260 self.clean_tests(True)
2262 def final_report(self):
2263 return self.reporter.final_report()
2265 def needs_http_server(self):
2266 for tester in self.testers:
2267 if tester.needs_http_server():
2271 class NamedDic(object):
2273 def __init__(self, props):
2275 for name, value in props.items():
2276 setattr(self, name, value)
2279 class Scenario(object):
2281 def __init__(self, name, props, path=None):
2285 for prop, value in props:
2286 setattr(self, prop.replace("-", "_"), value)
2288 def get_execution_name(self):
2289 if self.path is not None:
2295 if hasattr(self, "seek"):
2296 return bool(self.seek)
2300 def needs_clock_sync(self):
2301 if hasattr(self, "need_clock_sync"):
2302 return bool(self.need_clock_sync)
2306 def needs_live_content(self):
2307 # Scenarios that can only be used on live content
2308 if hasattr(self, "live_content_required"):
2309 return bool(self.live_content_required)
2312 def compatible_with_live_content(self):
2313 # if a live content is required it's implicitly compatible with
2315 if self.needs_live_content():
2317 if hasattr(self, "live_content_compatible"):
2318 return bool(self.live_content_compatible)
2321 def get_min_media_duration(self):
2322 if hasattr(self, "min_media_duration"):
2323 return float(self.min_media_duration)
2327 def does_reverse_playback(self):
2328 if hasattr(self, "reverse_playback"):
2329 return bool(self.reverse_playback)
2333 def get_duration(self):
2335 return float(getattr(self, "duration"))
2336 except AttributeError:
2339 def get_min_tracks(self, track_type):
2341 return int(getattr(self, "min_%s_track" % track_type))
2342 except AttributeError:
2346 return "<Scenario %s>" % self.name
2349 class ScenarioManager(Loggable):
2351 system_scenarios = []
2352 special_scenarios = {}
2354 FILE_EXTENSION = "scenario"
2356 def __new__(cls, *args, **kwargs):
2357 if not cls._instance:
2358 cls._instance = super(ScenarioManager, cls).__new__(
2359 cls, *args, **kwargs)
2360 cls._instance.config = None
2361 cls._instance.discovered = False
2362 Loggable.__init__(cls._instance)
2364 return cls._instance
2366 def find_special_scenarios(self, mfile):
2368 mfile_bname = os.path.basename(mfile)
2370 for f in os.listdir(os.path.dirname(mfile)):
2371 if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
2372 scenarios.append(os.path.join(os.path.dirname(mfile), f))
2375 scenarios = self.discover_scenarios(scenarios, mfile)
2379 def discover_scenarios(self, scenario_paths=[], mfile=None):
2381 Discover scenarios specified in scenario_paths or the default ones
2382 if nothing specified there
2385 scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
2386 log_path = os.path.join(self.config.logsdir, "scenarios_discovery.log")
2387 logs = open(log_path, 'w')
2390 command = [GstValidateBaseTestManager.COMMAND,
2391 "--scenarios-defs-output-file", scenario_defs]
2392 command.extend(scenario_paths)
2393 subprocess.check_call(command, stdout=logs, stderr=logs)
2394 except subprocess.CalledProcessError as e:
2396 self.error('See %s' % log_path)
2399 config = configparser.RawConfigParser()
2400 f = open(scenario_defs)
2403 for section in config.sections():
2406 for scenario_path in scenario_paths:
2407 if section == scenario_path:
2409 name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2410 path = scenario_path
2412 # The real name of the scenario is:
2413 # filename.REALNAME.scenario
2414 name = scenario_path.replace(mfile + ".", "").replace(
2415 "." + self.FILE_EXTENSION, "")
2416 path = scenario_path
2419 name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
2424 props = config.items(section)
2425 scenario = Scenario(name, props, path)
2427 self.special_scenarios[path] = scenario
2428 scenarios.append(scenario)
2430 if not scenario_paths:
2431 self.discovered = True
2432 self.system_scenarios.extend(scenarios)
2436 def get_scenario(self, name):
2437 if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2438 scenario = self.special_scenarios.get(name)
2442 scenarios = self.discover_scenarios([name])
2443 self.special_scenarios[name] = scenarios
2448 if self.discovered is False:
2449 self.discover_scenarios()
2452 return self.system_scenarios
2455 return [scenario for scenario in self.system_scenarios if scenario.name == name][0]
2457 self.warning("Scenario: %s not found" % name)
2461 class GstValidateBaseTestManager(TestsManager):
2462 scenarios_manager = ScenarioManager()
2466 super(GstValidateBaseTestManager, self).__init__()
2467 self._scenarios = []
2468 self._encoding_formats = []
2471 def update_commands(cls, extra_paths=None):
2472 for varname, cmd in {'': 'gst-validate',
2473 'TRANSCODING_': 'gst-validate-transcoding',
2474 'MEDIA_CHECK_': 'gst-validate-media-check',
2475 'RTSP_SERVER_': 'gst-validate-rtsp-server',
2476 'INSPECT_': 'gst-inspect'}.items():
2477 setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2480 def has_feature(cls, featurename):
2482 return cls.features_cache[featurename]
2487 subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2489 except subprocess.CalledProcessError:
2492 cls.features_cache[featurename] = res
2495 def add_scenarios(self, scenarios):
2497 @scenarios A list or a unic scenario name(s) to be run on the tests.
2498 They are just the default scenarios, and then depending on
2499 the TestsGenerator to be used you can have more fine grained
2500 control on what to be run on each series of tests.
2502 if isinstance(scenarios, list):
2503 self._scenarios.extend(scenarios)
2505 self._scenarios.append(scenarios)
2507 self._scenarios = list(set(self._scenarios))
2509 def set_scenarios(self, scenarios):
2511 Override the scenarios
2513 self._scenarios = []
2514 self.add_scenarios(scenarios)
2516 def get_scenarios(self):
2517 return self._scenarios
2519 def add_encoding_formats(self, encoding_formats):
2521 :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2522 formats for transcoding test.
2523 They are just the default encoding formats, and then depending on
2524 the TestsGenerator to be used you can have more fine grained
2525 control on what to be run on each series of tests.
2527 if isinstance(encoding_formats, list):
2528 self._encoding_formats.extend(encoding_formats)
2530 self._encoding_formats.append(encoding_formats)
2532 self._encoding_formats = list(set(self._encoding_formats))
2534 def get_encoding_formats(self):
2535 return self._encoding_formats
2538 GstValidateBaseTestManager.update_commands()
2541 class MediaDescriptor(Loggable):
2544 Loggable.__init__(self)
2547 raise NotImplemented
2549 def has_frames(self):
2552 def get_framerate(self):
2553 for ttype, caps_str in self.get_tracks_caps():
2554 if ttype != "video":
2557 caps = utils.GstCaps.new_from_str(caps_str)
2559 self.warning("Could not create caps for %s" % caps_str)
2562 framerate = caps[0].get("framerate")
2566 return Fraction(0, 1)
2568 def get_media_filepath(self):
2569 raise NotImplemented
2571 def skip_parsers(self):
2575 raise NotImplemented
2578 raise NotImplemented
2580 def get_duration(self):
2581 raise NotImplemented
2583 def get_protocol(self):
2584 raise NotImplemented
2586 def is_seekable(self):
2587 raise NotImplemented
2590 raise NotImplemented
2593 raise NotImplemented
2595 def get_num_tracks(self, track_type):
2596 raise NotImplemented
2598 def get_tracks_caps(self):
2601 def can_play_reverse(self):
2602 raise NotImplemented
2607 def is_compatible(self, scenario):
2608 if scenario is None:
2611 if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2612 self.debug("Do not run %s as %s does not support seeking",
2613 scenario, self.get_uri())
2616 if self.is_image() and scenario.needs_clock_sync():
2617 self.debug("Do not run %s as %s is an image",
2618 scenario, self.get_uri())
2621 if not self.can_play_reverse() and scenario.does_reverse_playback():
2624 if not self.is_live() and scenario.needs_live_content():
2625 self.debug("Do not run %s as %s is not a live content",
2626 scenario, self.get_uri())
2629 if self.is_live() and not scenario.compatible_with_live_content():
2630 self.debug("Do not run %s as %s is a live content",
2631 scenario, self.get_uri())
2634 if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2637 if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2639 "Do not run %s as %s is too short (%i < min media duation : %i",
2640 scenario, self.get_uri(),
2641 self.get_duration() / GST_SECOND,
2642 scenario.get_min_media_duration())
2645 for track_type in ['audio', 'subtitle', 'video']:
2646 if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2647 self.debug("%s -- %s | At least %s %s track needed < %s"
2648 % (scenario, self.get_uri(), track_type,
2649 scenario.get_min_tracks(track_type),
2650 self.get_num_tracks(track_type)))
2656 class GstValidateMediaDescriptor(MediaDescriptor):
2657 # Some extension file for discovering results
2658 SKIPPED_MEDIA_INFO_EXT = "media_info.skipped"
2659 MEDIA_INFO_EXT = "media_info"
2660 PUSH_MEDIA_INFO_EXT = "media_info.push"
2661 STREAM_INFO_EXT = "stream_info"
2663 __all_descriptors = {}
2666 def get(cls, xml_path):
2667 if xml_path in cls.__all_descriptors:
2668 return cls.__all_descriptors[xml_path]
2669 return GstValidateMediaDescriptor(xml_path)
2671 def __init__(self, xml_path):
2672 super(GstValidateMediaDescriptor, self).__init__()
2674 self._media_file_path = None
2675 main_descriptor = self.__all_descriptors.get(xml_path)
2677 self._copy_data_from_main(main_descriptor)
2679 self.__all_descriptors[xml_path] = self
2681 self._xml_path = xml_path
2683 media_xml = ET.parse(xml_path).getroot()
2684 except xml.etree.ElementTree.ParseError:
2685 printc("Could not parse %s" % xml_path,
2688 self._extract_data(media_xml)
2690 self.set_protocol(urllib.parse.urlparse(self.get_uri()).scheme)
2692 def skip_parsers(self):
2693 return self._skip_parsers
2695 def has_frames(self):
2696 return self._has_frames
2698 def _copy_data_from_main(self, main_descriptor):
2699 for attr in main_descriptor.__dict__.keys():
2700 setattr(self, attr, getattr(main_descriptor, attr))
2702 def _extract_data(self, media_xml):
2703 # Extract the information we need from the xml
2704 self._caps = media_xml.findall("streams")[0].attrib["caps"]
2705 self._track_caps = []
2707 streams = media_xml.findall("streams")[0].findall("stream")
2711 for stream in streams:
2712 self._track_caps.append(
2713 (stream.attrib["type"], stream.attrib["caps"]))
2715 self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2716 self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2717 self._duration = int(media_xml.attrib["duration"])
2718 self._uri = media_xml.attrib["uri"]
2719 parsed_uri = urllib.parse.urlparse(self.get_uri())
2720 self._protocol = media_xml.get("protocol", parsed_uri.scheme)
2721 if parsed_uri.scheme == "file":
2722 if not os.path.exists(parsed_uri.path) and os.path.exists(self.get_media_filepath()):
2723 self._uri = "file://" + self.get_media_filepath()
2724 elif parsed_uri.scheme == Protocols.IMAGESEQUENCE:
2725 self._media_file_path = os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(parsed_uri.path))
2726 self._uri = parsed_uri._replace(path=os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(self._media_file_path))).geturl()
2727 self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2728 self._is_live = media_xml.get("live", "false").lower() == "true"
2729 self._is_image = False
2730 for stream in media_xml.findall("streams")[0].findall("stream"):
2731 if stream.attrib["type"] == "image":
2732 self._is_image = True
2733 self._track_types = []
2734 for stream in media_xml.findall("streams")[0].findall("stream"):
2735 self._track_types.append(stream.attrib["type"])
2737 def __cleanup_media_info_ext(self):
2738 for ext in [self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT,
2739 self.SKIPPED_MEDIA_INFO_EXT, ]:
2740 if self._xml_path.endswith(ext):
2741 return self._xml_path[:len(self._xml_path) - (len(ext) + 1)]
2743 assert "Not reached" == None # noqa
2746 def new_from_uri(uri, verbose=False, include_frames=False, is_push=False, is_skipped=False):
2748 include_frames = 0 # Never
2749 include_frames = 1 # always
2750 include_frames = 2 # if previous file included them
2753 media_path = utils.url2path(uri)
2755 ext = GstValidateMediaDescriptor.MEDIA_INFO_EXT
2757 ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT
2759 ext = GstValidateMediaDescriptor.SKIPPED_MEDIA_INFO_EXT
2760 descriptor_path = "%s.%s" % (media_path, ext)
2761 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2762 if include_frames == 2:
2764 media_xml = ET.parse(descriptor_path).getroot()
2765 prev_uri = urllib.parse.urlparse(media_xml.attrib['uri'])
2766 if prev_uri.scheme == Protocols.IMAGESEQUENCE:
2767 parsed_uri = urllib.parse.urlparse(uri)
2768 uri = prev_uri._replace(path=os.path.join(os.path.dirname(parsed_uri.path), os.path.basename(prev_uri.path))).geturl()
2769 include_frames = bool(int(media_xml.attrib["frame-detection"]))
2770 if bool(int(media_xml.attrib.get("skip-parsers", 0))):
2771 args.append("--skip-parsers")
2772 except FileNotFoundError:
2775 include_frames = bool(include_frames)
2778 args.extend(["--output-file", descriptor_path])
2780 args.extend(["--full"])
2783 printc("Generating media info for %s\n"
2784 " Command: '%s'" % (media_path, ' '.join(args)),
2788 subprocess.check_output(args, stderr=open(os.devnull))
2789 except subprocess.CalledProcessError as e:
2791 printc("Result: Failed", Colors.FAIL)
2793 loggable.warning("GstValidateMediaDescriptor",
2794 "Exception: %s" % e)
2798 printc("Result: Passed", Colors.OKGREEN)
2801 return GstValidateMediaDescriptor(descriptor_path)
2802 except (IOError, xml.etree.ElementTree.ParseError):
2806 return self._xml_path
2808 def need_clock_sync(self):
2809 return Protocols.needs_clock_sync(self.get_protocol())
2811 def get_media_filepath(self):
2812 if self._media_file_path is None:
2813 self._media_file_path = self.__cleanup_media_info_ext()
2814 return self._media_file_path
2819 def get_tracks_caps(self):
2820 return self._track_caps
2825 def get_duration(self):
2826 return self._duration
2828 def set_protocol(self, protocol):
2829 if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2830 self._protocol = Protocols.PUSHFILE
2832 self._protocol = protocol
2834 def get_protocol(self):
2835 return self._protocol
2837 def is_seekable(self):
2838 return self._is_seekable
2841 return self._is_live
2843 def can_play_reverse(self):
2847 return self._is_image
2849 def get_num_tracks(self, track_type):
2851 for t in self._track_types:
2857 def get_clean_name(self):
2858 name = os.path.basename(self.get_path())
2859 regex = '|'.join(['\\.%s$' % ext for ext in [self.SKIPPED_MEDIA_INFO_EXT, self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT]])
2860 name = re.sub(regex, "", name)
2862 return name.replace('.', "_")
2865 class MediaFormatCombination(object):
2866 FORMATS = {"aac": "audio/mpeg,mpegversion=4", # Audio
2867 "ac3": "audio/x-ac3",
2868 "vorbis": "audio/x-vorbis",
2869 "mp3": "audio/mpeg,mpegversion=1,layer=3",
2870 "opus": "audio/x-opus",
2871 "rawaudio": "audio/x-raw",
2874 "h264": "video/x-h264",
2875 "h265": "video/x-h265",
2876 "vp8": "video/x-vp8",
2877 "vp9": "video/x-vp9",
2878 "theora": "video/x-theora",
2879 "prores": "video/x-prores",
2880 "jpeg": "image/jpeg",
2883 "webm": "video/webm",
2884 "ogg": "application/ogg",
2885 "mkv": "video/x-matroska",
2886 "mp4": "video/quicktime,variant=iso;",
2887 "quicktime": "video/quicktime;"}
2890 return "%s and %s in %s" % (self.audio, self.video, self.container)
2892 def __init__(self, container, audio, video, duration_factor=1,
2893 video_restriction=None, audio_restriction=None):
2895 Describes a media format to be used for transcoding tests.
2897 :param container: A string defining the container format to be used, must bin in self.FORMATS
2898 :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2899 :param video: A string defining the video format to be used, must bin in self.FORMATS
2901 self.container = container
2904 self.video_restriction = video_restriction
2905 self.audio_restriction = audio_restriction
2907 def get_caps(self, track_type):
2909 return self.FORMATS[self.__dict__[track_type]]
2913 def get_audio_caps(self):
2914 return self.get_caps("audio")
2916 def get_video_caps(self):
2917 return self.get_caps("video")
2919 def get_muxer_caps(self):
2920 return self.get_caps("container")