3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
20 """ Class representing tests and test managers. """
43 from .utils import which
44 from . import reporters
45 from . import loggable
46 from .loggable import Loggable
49 from lxml import etree as ET
51 import xml.etree.cElementTree as ET
53 from .vfb_server import get_virual_frame_buffer_server
54 from .httpserver import HTTPServer
55 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
56 Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
59 # The factor by which we increase the hard timeout when running inside
61 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
62 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
63 # The error reported by valgrind when detecting errors
64 VALGRIND_ERROR_CODE = 20
66 VALIDATE_OVERRIDE_EXTENSION = ".override"
67 EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
68 'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
69 'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
70 EXITING_SIGNALS.update({139: "SIGSEGV"})
75 """ A class representing a particular test. """
77 def __init__(self, application_name, classname, options,
78 reporter, duration=0, timeout=DEFAULT_TIMEOUT,
79 hard_timeout=None, extra_env_variables=None,
80 expected_failures=None, is_parallel=True,
83 @timeout: The timeout during which the value return by get_current_value
84 keeps being exactly equal
85 @hard_timeout: Max time the test can take in absolute
87 Loggable.__init__(self)
88 self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
90 self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
91 self.hard_timeout *= options.timeout_factor
93 self.hard_timeout = hard_timeout
94 self.classname = classname
95 self.options = options
96 self.application = application_name
98 self.server_command = None
99 self.reporter = reporter
104 self.duration = duration
105 self.stack_trace = None
107 if expected_failures is None:
108 self.expected_failures = []
109 elif not isinstance(expected_failures, list):
110 self.expected_failures = [expected_failures]
112 self.expected_failures = expected_failures
114 extra_env_variables = extra_env_variables or {}
115 self.extra_env_variables = extra_env_variables
116 self.optional = False
117 self.is_parallel = is_parallel
118 self.generator = None
119 # String representation of the test number in the testsuite
121 self.workdir = workdir
126 self.kill_subprocess()
129 self.time_taken = 0.0
130 self._starting_time = None
131 self.result = Result.NOT_RUN
134 self.extra_logfiles = []
135 self.__env_variable = []
136 self.kill_subprocess()
139 string = self.classname
140 if self.result != Result.NOT_RUN:
141 string += ": " + self.result
142 if self.result in [Result.FAILED, Result.TIMEOUT]:
143 string += " '%s'\n" \
144 " You can reproduce with: %s\n" \
145 % (self.message, self.get_command_repr())
147 if not self.options.redirect_logs and \
148 self.result == Result.PASSED or \
149 not self.options.dump_on_failure:
150 string += self.get_logfile_repr()
154 def add_env_variable(self, variable, value=None):
156 Only usefull so that the gst-validate-launcher can print the exact
157 right command line to reproduce the tests
160 value = os.environ.get(variable, None)
165 self.__env_variable.append(variable)
168 def _env_variable(self):
170 for var in set(self.__env_variable):
173 value = self.proc_env.get(var, None)
174 if value is not None:
175 res += "%s='%s'" % (var, value)
179 def open_logfile(self):
183 path = os.path.join(self.options.logsdir,
184 self.classname.replace(".", os.sep))
185 mkdir(os.path.dirname(path))
188 if self.options.redirect_logs == 'stdout':
189 self.out = sys.stdout
190 elif self.options.redirect_logs == 'stderr':
191 self.out = sys.stderr
193 self.out = open(path, 'w+')
195 def close_logfile(self):
196 if not self.options.redirect_logs:
201 def _get_file_content(self, file_name):
202 f = open(file_name, 'r+')
208 def get_log_content(self):
209 return self._get_file_content(self.logfile)
211 def get_extra_log_content(self, extralog):
212 if extralog not in self.extra_logfiles:
215 return self._get_file_content(extralog)
217 def get_classname(self):
218 name = self.classname.split('.')[-1]
219 classname = self.classname.replace('.%s' % name, '')
224 return self.classname.split('.')[-1]
227 if self._uuid is None:
228 self._uuid = self.classname + str(uuid.uuid4())
231 def add_arguments(self, *args):
234 def build_arguments(self):
235 self.add_env_variable("LD_PRELOAD")
236 self.add_env_variable("DISPLAY")
238 def add_stack_trace_to_logfile(self):
239 self.debug("Adding stack trace")
240 trace_gatherer = BackTraceGenerator.get_default()
241 stack_trace = trace_gatherer.get_trace(self)
246 info = "\n\n== Stack trace: == \n%s" % stack_trace
247 if self.options.redirect_logs:
251 if self.options.xunit_file:
252 self.stack_trace = stack_trace
254 with open(self.logfile, 'a') as f:
257 def set_result(self, result, message="", error=""):
258 self.debug("Setting result: %s (message: %s, error: %s)" % (result,
261 if result is Result.TIMEOUT:
262 if self.options.debug is True:
264 printc("Timeout, you should process <ctrl>c to get into gdb",
266 # and wait here until gdb exits
267 self.process.communicate()
269 pname = self.command[0]
270 input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
271 "Press enter to continue" % (Colors.FAIL, pname, self.process.pid,
274 self.add_stack_trace_to_logfile()
277 self.message = message
278 self.error_str = error
280 def check_results(self):
281 if self.result is Result.FAILED or self.result is Result.TIMEOUT:
284 self.debug("%s returncode: %s", self, self.process.returncode)
285 if self.process.returncode == 0:
286 self.set_result(Result.PASSED)
287 elif self.process.returncode in EXITING_SIGNALS:
288 self.add_stack_trace_to_logfile()
289 self.set_result(Result.FAILED,
290 "Application exited with signal %s" % (
291 EXITING_SIGNALS[self.process.returncode]))
292 elif self.process.returncode == VALGRIND_ERROR_CODE:
293 self.set_result(Result.FAILED, "Valgrind reported errors")
295 self.set_result(Result.FAILED,
296 "Application returned %d" % (self.process.returncode))
298 def get_current_value(self):
300 Lets subclasses implement a nicer timeout measurement method
301 They should return some value with which we will compare
302 the previous and timeout if they are egual during self.timeout
305 return Result.NOT_RUN
307 def process_update(self):
309 Returns True when process has finished running or has timed out.
312 if self.process is None:
313 # Process has not started running yet
317 if self.process.returncode is not None:
320 val = self.get_current_value()
322 self.debug("Got value: %s" % val)
323 if val is Result.NOT_RUN:
324 # The get_current_value logic is not implemented... dumb
326 if time.time() - self.last_change_ts > self.timeout:
327 self.set_result(Result.TIMEOUT,
328 "Application timed out: %s secs" %
333 elif val is Result.FAILED:
335 elif val is Result.KNOWN_ERROR:
338 self.log("New val %s" % val)
340 if val == self.last_val:
341 delta = time.time() - self.last_change_ts
342 self.debug("%s: Same value for %d/%d seconds" %
343 (self, delta, self.timeout))
344 if delta > self.timeout:
345 self.set_result(Result.TIMEOUT,
346 "Application timed out: %s secs" %
350 elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
352 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
355 self.last_change_ts = time.time()
360 def get_subproc_env(self):
361 return os.environ.copy()
363 def kill_subprocess(self):
364 utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT)
366 def run_external_checks(self):
369 def thread_wrapper(self):
371 # Restore the SIGINT handler for the child process (gdb) to ensure
373 signal.signal(signal.SIGINT, signal.SIG_DFL)
375 if self.options.gdb and os.name != "nt":
376 preexec_fn = enable_sigint
380 self.process = subprocess.Popen(self.command,
385 preexec_fn=preexec_fn)
387 if self.result is not Result.TIMEOUT:
388 if self.process.returncode == 0:
389 self.run_external_checks()
392 def get_valgrind_suppression_file(self, subdir, name):
393 p = get_data_file(subdir, name)
397 self.error("Could not find any %s file" % name)
399 def get_valgrind_suppressions(self):
400 return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
402 def use_gdb(self, command):
403 if self.hard_timeout is not None:
404 self.hard_timeout *= GDB_TIMEOUT_FACTOR
405 self.timeout *= GDB_TIMEOUT_FACTOR
407 if not self.options.gdb_non_stop:
408 self.timeout = sys.maxsize
409 self.hard_timeout = sys.maxsize
412 if self.options.gdb_non_stop:
413 args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
414 args += ["--args"] + command
417 def use_valgrind(self, command, subenv):
418 vglogsfile = self.logfile + '.valgrind'
419 self.extra_logfiles.append(vglogsfile)
423 for o, v in [('trace-children', 'yes'),
424 ('tool', 'memcheck'),
425 ('leak-check', 'full'),
426 ('leak-resolution', 'high'),
427 # TODO: errors-for-leak-kinds should be set to all instead of definite
428 # and all false positives should be added to suppression
430 ('errors-for-leak-kinds', 'definite'),
431 ('num-callers', '20'),
432 ('error-exitcode', str(VALGRIND_ERROR_CODE)),
433 ('gen-suppressions', 'all')]:
434 vg_args.append("--%s=%s" % (o, v))
436 if not self.options.redirect_logs:
437 vglogsfile = self.logfile + '.valgrind'
438 self.extra_logfiles.append(vglogsfile)
439 vg_args.append("--%s=%s" % ('log-file', vglogsfile))
441 for supp in self.get_valgrind_suppressions():
442 vg_args.append("--suppressions=%s" % supp)
444 command = ["valgrind"] + vg_args + command
446 # Tune GLib's memory allocator to be more valgrind friendly
447 subenv['G_DEBUG'] = 'gc-friendly'
448 subenv['G_SLICE'] = 'always-malloc'
450 if self.hard_timeout is not None:
451 self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
452 self.timeout *= VALGRIND_TIMEOUT_FACTOR
454 # Enable 'valgrind.config'
455 self.add_validate_config(get_data_file(
456 'data', 'valgrind.config'), subenv)
457 if subenv == self.proc_env:
458 self.add_env_variable('G_DEBUG', 'gc-friendly')
459 self.add_env_variable('G_SLICE', 'always-malloc')
460 self.add_env_variable('GST_VALIDATE_CONFIG',
461 self.proc_env['GST_VALIDATE_CONFIG'])
465 def add_validate_config(self, config, subenv=None):
467 subenv = self.extra_env_variables
469 if "GST_VALIDATE_CONFIG" in subenv:
470 subenv['GST_VALIDATE_CONFIG'] = '%s%s%s' % (
471 subenv['GST_VALIDATE_CONFIG'], os.pathsep, config)
473 subenv['GST_VALIDATE_CONFIG'] = config
475 def launch_server(self):
478 def get_logfile_repr(self):
480 logfiles = self.extra_logfiles.copy()
482 if not self.options.redirect_logs:
483 logfiles.insert(0, self.logfile)
486 message += " - %s\n" % log
490 def get_command_repr(self):
491 message = "%s %s" % (self._env_variable, ' '.join(
492 shlex.quote(arg) for arg in self.command))
493 if self.server_command:
494 message = "%s & %s" % (self.server_command, message)
498 def test_start(self, queue):
501 self.server_command = self.launch_server()
503 self.command = [self.application]
504 self._starting_time = time.time()
505 self.build_arguments()
506 self.proc_env = self.get_subproc_env()
508 for var, value in list(self.extra_env_variables.items()):
509 value = self.proc_env.get(var, '') + os.pathsep + value
510 self.proc_env[var] = value.strip(os.pathsep)
511 self.add_env_variable(var, self.proc_env[var])
514 self.command = self.use_gdb(self.command)
516 self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
517 # Make the gst-validate executable ignore SIGINT while gdb is
519 signal.signal(signal.SIGINT, signal.SIG_IGN)
521 if self.options.valgrind:
522 self.command = self.use_valgrind(self.command, self.proc_env)
524 if not self.options.redirect_logs:
525 self.out.write("=================\n"
528 "=================\n\n"
529 % (self.classname, ' '.join(self.command)))
532 message = "Launching: %s%s\n" \
533 " Command: %s\n" % (Colors.ENDC, self.classname,
534 self.get_command_repr())
535 printc(message, Colors.OKBLUE)
537 self.thread = threading.Thread(target=self.thread_wrapper)
541 self.last_change_ts = time.time()
542 self.start_ts = time.time()
544 def _dump_log_file(self, logfile):
545 message = "Dumping contents of %s\n" % logfile
546 printc(message, Colors.FAIL)
548 with open(logfile, 'r') as fin:
551 def _dump_log_files(self):
552 printc("Dumping log files on failure\n", Colors.FAIL)
553 self._dump_log_file(self.logfile)
554 for logfile in self.extra_logfiles:
555 self._dump_log_file(logfile)
558 self.kill_subprocess()
560 self.time_taken = time.time() - self._starting_time
563 signal.signal(signal.SIGINT, self.previous_sigint_handler)
565 if self.result != Result.PASSED:
569 message = "%s %s: %s%s" % (self.number, self.classname, self.result,
570 " (" + self.message + ")" if self.message else "")
572 if sys.stdout.isatty():
573 term_width = shutil.get_terminal_size((80, 20))[0]
574 if len(message) > term_width:
575 message = message[0:term_width - 2] + '…'
579 if message is not None:
580 printc(message, color=utils.get_color_for_result(self.result), end=end)
583 if self.options.dump_on_failure:
584 if self.result is not Result.PASSED:
585 self._dump_log_files()
587 # Only keep around env variables we need later
589 for n in self.__env_variable:
590 clean_env[n] = self.proc_env.get(n, None)
591 self.proc_env = clean_env
593 # Don't keep around JSON report objects, they were processed
594 # in check_results already
600 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
604 class GstValidateListener(socketserver.BaseRequestHandler):
607 """Implements BaseRequestHandler handle method"""
610 raw_len = self.request.recv(4)
613 msglen = struct.unpack('>I', raw_len)[0]
614 msg = self.request.recv(msglen).decode()
618 obj = json.loads(msg)
621 # First message must contain the uuid
622 uuid = obj.get("uuid", None)
625 # Find test from launcher
626 for t in self.server.launcher.tests:
627 if uuid == t.get_uuid():
631 self.server.launcher.error(
632 "Could not find test for UUID %s" % uuid)
635 obj_type = obj.get("type", '')
636 if obj_type == 'position':
637 test.set_position(obj['position'], obj['duration'],
639 elif obj_type == 'buffering':
640 test.set_position(obj['position'], 100)
641 elif obj_type == 'action':
642 test.add_action_execution(obj)
643 # Make sure that action is taken into account when checking if process
646 elif obj_type == 'action-done':
647 # Make sure that action end is taken into account when checking if process
650 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
651 elif obj_type == 'report':
655 class GstValidateTest(Test):
657 """ A class representing a particular test. """
658 HARD_TIMEOUT_FACTOR = 5
659 fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
661 def __init__(self, application_name, classname,
662 options, reporter, duration=0,
663 timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
664 media_descriptor=None, extra_env_variables=None,
665 expected_failures=None, workdir=None):
667 extra_env_variables = extra_env_variables or {}
669 if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
671 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
673 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
677 # If we are running from source, use the -debug version of the
678 # application which is using rpath instead of libtool's wrappers. It's
679 # slightly faster to start and will not confuse valgrind.
680 debug = '%s-debug' % application_name
681 p = look_for_file_in_source_dir('tools', debug)
687 self.media_duration = -1
689 self.actions_infos = []
690 self.media_descriptor = media_descriptor
693 override_path = self.get_override_file(media_descriptor)
695 if extra_env_variables:
696 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
698 "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
700 extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
702 super(GstValidateTest, self).__init__(application_name, classname,
706 hard_timeout=hard_timeout,
707 extra_env_variables=extra_env_variables,
708 expected_failures=expected_failures,
711 # defines how much the process can be outside of the configured
713 self._sent_eos_time = None
715 if scenario is None or scenario.name.lower() == "none":
718 self.scenario = scenario
720 def kill_subprocess(self):
721 Test.kill_subprocess(self)
723 def add_report(self, report):
724 self.reports.append(report)
726 def set_position(self, position, duration, speed=None):
727 self.position = position
728 self.media_duration = duration
732 def add_action_execution(self, action_infos):
733 if action_infos['action-type'] == 'eos':
734 self._sent_eos_time = time.time()
735 self.actions_infos.append(action_infos)
737 def get_override_file(self, media_descriptor):
739 if media_descriptor.get_path():
740 override_path = os.path.splitext(media_descriptor.get_path())[
741 0] + VALIDATE_OVERRIDE_EXTENSION
742 if os.path.exists(override_path):
747 def get_current_position(self):
750 def get_current_value(self):
752 if self._sent_eos_time is not None:
754 if ((t - self._sent_eos_time)) > 30:
755 if self.media_descriptor is not None and self.media_descriptor.get_protocol() == Protocols.HLS:
756 self.set_result(Result.PASSED,
757 """Got no EOS 30 seconds after sending EOS,
758 in HLS known and tolerated issue:
759 https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
760 return Result.KNOWN_ERROR
763 Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
769 def get_subproc_env(self):
770 subproc_env = os.environ.copy()
772 subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
774 if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
775 gstlogsfile = self.logfile + '.gstdebug'
776 self.extra_logfiles.append(gstlogsfile)
777 subproc_env["GST_DEBUG_FILE"] = gstlogsfile
779 if self.options.no_color:
780 subproc_env["GST_DEBUG_NO_COLOR"] = '1'
782 # Ensure XInitThreads is called, see bgo#731525
783 subproc_env['GST_GL_XINITTHREADS'] = '1'
784 self.add_env_variable('GST_GL_XINITTHREADS', '1')
786 if self.scenario is not None:
787 scenario = self.scenario.get_execution_name()
788 subproc_env["GST_VALIDATE_SCENARIO"] = scenario
789 self.add_env_variable("GST_VALIDATE_SCENARIO",
790 subproc_env["GST_VALIDATE_SCENARIO"])
793 del subproc_env["GST_VALIDATE_SCENARIO"]
801 self._sent_eos_time = None
804 self.media_duration = -1
806 self.actions_infos = []
808 def build_arguments(self):
809 super(GstValidateTest, self).build_arguments()
810 if "GST_VALIDATE" in os.environ:
811 self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
813 if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
814 self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
815 os.environ["GST_VALIDATE_SCENARIOS_PATH"])
817 self.add_env_variable("GST_VALIDATE_CONFIG")
818 self.add_env_variable("GST_VALIDATE_OVERRIDE")
820 def get_extra_log_content(self, extralog):
821 value = Test.get_extra_log_content(self, extralog)
825 def report_matches_expected_failure(self, report, expected_failure):
826 for key in ['bug', 'bugs', 'sometimes']:
827 if key in expected_failure:
828 del expected_failure[key]
829 for key, value in list(report.items()):
830 if key in expected_failure:
831 if not re.findall(expected_failure[key], str(value)):
833 expected_failure.pop(key)
835 return not bool(expected_failure)
837 def check_reported_issues(self):
839 expected_failures = copy.deepcopy(self.expected_failures)
840 expected_retcode = [0]
841 for report in self.reports:
843 for expected_failure in expected_failures:
844 if self.report_matches_expected_failure(report,
845 expected_failure.copy()):
846 found = expected_failure
849 if found is not None:
850 expected_failures.remove(found)
851 if report['level'] == 'critical':
852 if found.get('sometimes') and isinstance(expected_retcode, list):
853 expected_retcode.append(18)
855 expected_retcode = [18]
856 elif report['level'] == 'critical':
857 ret.append(report['summary'])
860 return None, expected_failures, expected_retcode
862 return ret, expected_failures, expected_retcode
864 def check_expected_timeout(self, expected_timeout):
865 msg = "Expected timeout happened. "
866 result = Result.PASSED
867 message = expected_timeout.get('message')
869 if not re.findall(message, self.message):
870 result = Result.FAILED
871 msg = "Expected timeout message: %s got %s " % (
872 message, self.message)
874 expected_symbols = expected_timeout.get('stacktrace_symbols')
876 trace_gatherer = BackTraceGenerator.get_default()
877 stack_trace = trace_gatherer.get_trace(self)
880 if not isinstance(expected_symbols, list):
881 expected_symbols = [expected_symbols]
883 not_found_symbols = [s for s in expected_symbols
884 if s not in stack_trace]
885 if not_found_symbols:
886 result = Result.TIMEOUT
887 msg = "Expected symbols '%s' not found in stack trace " % (
890 msg += "No stack trace available, could not verify symbols "
894 def check_results(self):
895 if self.result in [Result.FAILED, self.result is Result.PASSED]:
898 for report in self.reports:
899 if report.get('issue-id') == 'runtime::missing-plugin':
900 self.set_result(Result.SKIPPED, "%s\n%s" % (report['summary'],
904 self.debug("%s returncode: %s", self, self.process.returncode)
906 criticals, not_found_expected_failures, expected_returncode = self.check_reported_issues()
908 expected_timeout = None
909 for i, f in enumerate(not_found_expected_failures):
910 if len(f) == 1 and f.get("returncode"):
911 returncode = f['returncode']
912 if not isinstance(expected_returncode, list):
913 returncode = [expected_returncode]
916 elif f.get("timeout"):
919 not_found_expected_failures = [f for f in not_found_expected_failures
920 if not f.get('returncode')]
923 result = Result.PASSED
924 if self.result == Result.TIMEOUT:
925 with open(self.logfile) as f:
926 signal_fault_info = self.fault_sig_regex.findall(f.read())
927 if signal_fault_info:
928 result = Result.FAILED
929 msg = signal_fault_info[0]
930 elif expected_timeout:
931 not_found_expected_failures.remove(expected_timeout)
932 result, msg = self.check_expected_timeout(expected_timeout)
935 elif self.process.returncode in EXITING_SIGNALS:
936 result = Result.FAILED
937 msg = "Application exited with signal %s" % (
938 EXITING_SIGNALS[self.process.returncode]
940 self.add_stack_trace_to_logfile()
941 elif self.process.returncode == VALGRIND_ERROR_CODE:
942 msg = "Valgrind reported errors "
943 result = Result.FAILED
944 elif self.process.returncode not in expected_returncode:
945 msg = "Application returned %s " % self.process.returncode
946 if expected_returncode != [0]:
947 msg += "(expected %s) " % expected_returncode
948 result = Result.FAILED
951 msg += "(critical errors: [%s]) " % ', '.join(criticals)
952 result = Result.FAILED
954 if not_found_expected_failures:
955 mandatory_failures = [f for f in not_found_expected_failures
956 if not f.get('sometimes')]
958 if mandatory_failures:
959 msg += "(Expected errors not found: %s) " % mandatory_failures
960 result = Result.FAILED
961 elif self.expected_failures:
962 msg += '%s(Expected errors occured: %s)%s' % (Colors.OKBLUE,
963 self.expected_failures,
966 self.set_result(result, msg.strip())
968 def get_valgrind_suppressions(self):
969 result = super(GstValidateTest, self).get_valgrind_suppressions()
970 gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
972 result.append(gst_sup)
976 class GstValidateEncodingTestInterface(object):
977 DURATION_TOLERANCE = GST_SECOND / 4
979 def __init__(self, combination, media_descriptor, duration_tolerance=None):
980 super(GstValidateEncodingTestInterface, self).__init__()
982 self.media_descriptor = media_descriptor
983 self.combination = combination
986 self._duration_tolerance = duration_tolerance
987 if duration_tolerance is None:
988 self._duration_tolerance = self.DURATION_TOLERANCE
990 def get_current_size(self):
992 size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
996 self.debug("Size: %s" % size)
999 def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1000 audio_restriction=None, audio_presence=0,
1007 if video_restriction is not None:
1008 ret = ret + video_restriction + '->'
1011 ret = ret + '|' + str(video_presence)
1014 if audio_restriction is not None:
1015 ret = ret + audio_restriction + '->'
1018 ret = ret + '|' + str(audio_presence)
1020 return ret.replace("::", ":")
1022 def get_profile(self, video_restriction=None, audio_restriction=None):
1023 vcaps = self.combination.get_video_caps()
1024 acaps = self.combination.get_audio_caps()
1025 if self.media_descriptor is not None:
1026 if self.media_descriptor.get_num_tracks("video") == 0:
1029 if self.media_descriptor.get_num_tracks("audio") == 0:
1032 return self._get_profile_full(self.combination.get_muxer_caps(),
1034 video_restriction=video_restriction,
1035 audio_restriction=audio_restriction)
1037 def _clean_caps(self, caps):
1039 Returns a list of key=value or structure name, without "(types)" or ";" or ","
1041 return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1043 # pylint: disable=E1101
1044 def _has_caps_type_variant(self, c, ccaps):
1046 Handle situations where we can have application/ogg or video/ogg or
1050 media_type = re.findall("application/|video/|audio/", c)
1052 media_type = media_type[0].replace('/', '')
1053 possible_mtypes = ["application", "video", "audio"]
1054 possible_mtypes.remove(media_type)
1055 for tmptype in possible_mtypes:
1056 possible_c_variant = c.replace(media_type, tmptype)
1057 if possible_c_variant in ccaps:
1059 "Found %s in %s, good enough!", possible_c_variant, ccaps)
1064 # pylint: disable=E1101
1065 def run_iqa_test(self, reference_file_uri):
1067 Runs IQA test if @reference_file_path exists
1068 @test: The test to run tests on
1070 if not GstValidateBaseTestManager.has_feature('iqa'):
1071 self.debug('Iqa element not present, not running extra test.')
1075 uridecodebin uri=%s !
1076 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1077 uridecodebin uri=%s ! iqa.
1078 """ % (reference_file_uri, self.dest_file)
1079 pipeline_desc = pipeline_desc.replace("\n", "")
1081 command = [GstValidateBaseTestManager.COMMAND] + \
1082 shlex.split(pipeline_desc)
1083 if not self.options.redirect_logs:
1085 "=================\n"
1086 "Running IQA tests on results of: %s\n"
1088 "=================\n\n" % (
1089 self.classname, ' '.join(command)))
1092 message = "Running IQA tests on results of:%s %s\n" \
1093 " Command: %s\n" % (
1094 Colors.ENDC, self.classname, ' '.join(command))
1095 printc(message, Colors.OKBLUE)
1097 self.process = subprocess.Popen(command,
1104 def check_encoded_file(self):
1105 result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1107 if result_descriptor is None:
1108 return (Result.FAILED, "Could not discover encoded file %s"
1111 duration = result_descriptor.get_duration()
1112 orig_duration = self.media_descriptor.get_duration()
1113 tolerance = self._duration_tolerance
1115 if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1116 os.remove(result_descriptor.get_path())
1117 return (Result.FAILED, "Duration of encoded file is "
1118 " wrong (%s instead of %s)" %
1119 (utils.TIME_ARGS(duration),
1120 utils.TIME_ARGS(orig_duration)))
1122 all_tracks_caps = result_descriptor.get_tracks_caps()
1123 container_caps = result_descriptor.get_caps()
1125 all_tracks_caps.insert(0, ("container", container_caps))
1127 for track_type, caps in all_tracks_caps:
1128 ccaps = self._clean_caps(caps)
1129 wanted_caps = self.combination.get_caps(track_type)
1130 cwanted_caps = self._clean_caps(wanted_caps)
1132 if wanted_caps is None:
1133 os.remove(result_descriptor.get_path())
1134 return (Result.FAILED,
1135 "Found a track of type %s in the encoded files"
1136 " but none where wanted in the encoded profile: %s"
1137 % (track_type, self.combination))
1139 for c in cwanted_caps:
1141 if not self._has_caps_type_variant(c, ccaps):
1142 os.remove(result_descriptor.get_path())
1143 return (Result.FAILED,
1144 "Field: %s (from %s) not in caps of the outputed file %s"
1145 % (wanted_caps, c, ccaps))
1147 os.remove(result_descriptor.get_path())
1148 return (Result.PASSED, "")
1151 class TestsManager(Loggable):
1153 """ A class responsible for managing tests. """
1156 loading_testsuite = None
1160 Loggable.__init__(self)
1163 self.unwanted_tests = []
1166 self.reporter = None
1167 self.wanted_tests_patterns = []
1168 self.blacklisted_tests_patterns = []
1169 self._generators = []
1170 self.check_testslist = True
1171 self.all_tests = None
1172 self.expected_failures = {}
1173 self.blacklisted_tests = []
1178 def list_tests(self):
1179 return sorted(list(self.tests), key=lambda x: x.classname)
1181 def find_tests(self, classname):
1182 regex = re.compile(classname)
1183 return [test for test in self.list_tests() if regex.findall(test.classname)]
1185 def add_expected_issues(self, expected_failures):
1186 expected_failures_re = {}
1187 for test_name_regex, failures in list(expected_failures.items()):
1188 regex = re.compile(test_name_regex)
1189 expected_failures_re[regex] = failures
1190 for test in self.tests:
1191 if regex.findall(test.classname):
1192 test.expected_failures.extend(failures)
1194 self.expected_failures.update(expected_failures_re)
1196 def add_test(self, test):
1197 if test.generator is None:
1198 test.classname = self.loading_testsuite + '.' + test.classname
1199 for regex, failures in list(self.expected_failures.items()):
1200 if regex.findall(test.classname):
1201 test.expected_failures.extend(failures)
1203 if self._is_test_wanted(test):
1204 if test not in self.tests:
1205 self.tests.append(test)
1206 self.tests.sort(key=lambda test: test.classname)
1208 if test not in self.tests:
1209 self.unwanted_tests.append(test)
1210 self.unwanted_tests.sort(key=lambda test: test.classname)
1212 def get_tests(self):
1215 def populate_testsuite(self):
1218 def add_generators(self, generators):
1220 @generators: A list of, or one single #TestsGenerator to be used to generate tests
1222 if not isinstance(generators, list):
1223 generators = [generators]
1224 self._generators.extend(generators)
1225 for generator in generators:
1226 generator.testsuite = self.loading_testsuite
1228 self._generators = list(set(self._generators))
1230 def get_generators(self):
1231 return self._generators
1233 def _add_blacklist(self, blacklisted_tests):
1234 if not isinstance(blacklisted_tests, list):
1235 blacklisted_tests = [blacklisted_tests]
1237 for patterns in blacklisted_tests:
1238 for pattern in patterns.split(","):
1239 self.blacklisted_tests_patterns.append(re.compile(pattern))
1241 def set_default_blacklist(self, default_blacklist):
1242 for test_regex, reason in default_blacklist:
1243 if not test_regex.startswith(self.loading_testsuite + '.'):
1244 test_regex = self.loading_testsuite + '.' + test_regex
1245 self.blacklisted_tests.append((test_regex, reason))
1247 def add_options(self, parser):
1248 """ Add more arguments. """
1251 def set_settings(self, options, args, reporter):
1252 """ Set properties after options parsing. """
1253 self.options = options
1255 self.reporter = reporter
1257 self.populate_testsuite()
1259 if self.options.valgrind:
1260 self.print_valgrind_bugs()
1262 if options.wanted_tests:
1263 for patterns in options.wanted_tests:
1264 for pattern in patterns.split(","):
1265 self.wanted_tests_patterns.append(re.compile(pattern))
1267 if options.blacklisted_tests:
1268 for patterns in options.blacklisted_tests:
1269 self._add_blacklist(patterns)
1271 def set_blacklists(self):
1272 if self.blacklisted_tests:
1273 self.info("Currently 'hardcoded' %s blacklisted tests:" %
1276 if self.options.check_bugs_status:
1277 if not check_bugs_resolution(self.blacklisted_tests):
1280 for name, bug in self.blacklisted_tests:
1281 self._add_blacklist(name)
1282 if not self.options.check_bugs_status:
1283 self.info(" + %s --> bug: %s" % (name, bug))
1287 def check_expected_failures(self):
1288 if not self.expected_failures or not self.options.check_bugs_status:
1291 if self.expected_failures:
1292 printc("\nCurrently known failures in the %s testsuite:"
1293 % self.name, Colors.WARNING, title_char='-')
1295 bugs_definitions = {}
1296 for regex, failures in list(self.expected_failures.items()):
1297 for failure in failures:
1298 bugs = failure.get('bug')
1300 bugs = failure.get('bugs')
1302 printc('+ %s:\n --> no bug reported associated with %s\n' % (
1303 regex.pattern, failure), Colors.WARNING)
1306 if not isinstance(bugs, list):
1308 cbugs = bugs_definitions.get(regex.pattern, [])
1309 bugs.extend([b for b in bugs if b not in cbugs])
1310 bugs_definitions[regex.pattern] = bugs
1312 return check_bugs_resolution(bugs_definitions.items())
1314 def _check_blacklisted(self, test):
1315 for pattern in self.blacklisted_tests_patterns:
1316 if pattern.findall(test.classname):
1317 self.info("%s is blacklisted by %s", test.classname, pattern)
1322 def _check_whitelisted(self, test):
1323 for pattern in self.wanted_tests_patterns:
1324 if pattern.findall(test.classname):
1325 if self._check_blacklisted(test):
1326 # If explicitly white listed that specific test
1327 # bypass the blacklisting
1328 if pattern.pattern != test.classname:
1333 def _check_duration(self, test):
1334 if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1335 self.info("Not activating %s as its duration (%d) is superior"
1336 " than the long limit (%d)" % (test, test.duration,
1337 int(self.options.long_limit)))
1342 def _is_test_wanted(self, test):
1343 if self._check_whitelisted(test):
1344 if not self._check_duration(test):
1348 if self._check_blacklisted(test):
1351 if not self._check_duration(test):
1354 if not self.wanted_tests_patterns:
1359 def needs_http_server(self):
1362 def print_valgrind_bugs(self):
1366 class TestsGenerator(Loggable):
1368 def __init__(self, name, test_manager, tests=[]):
1369 Loggable.__init__(self)
1371 self.test_manager = test_manager
1372 self.testsuite = None
1375 self._tests[test.classname] = test
1377 def generate_tests(self, *kwargs):
1379 Method that generates tests
1381 return list(self._tests.values())
1383 def add_test(self, test):
1384 test.generator = self
1385 test.classname = self.testsuite + '.' + test.classname
1386 self._tests[test.classname] = test
1389 class GstValidateTestsGenerator(TestsGenerator):
1391 def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1395 def get_fakesink_for_media_type(media_type, needs_clock=False):
1396 if media_type == "video":
1398 return 'fakevideosink qos=true max-lateness=20000000'
1400 return "fakevideosink sync=false"
1403 return "fakesink sync=true"
1407 def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1408 self.populate_tests(uri_minfo_special_scenarios, scenarios)
1409 return super(GstValidateTestsGenerator, self).generate_tests()
1412 class _TestsLauncher(Loggable):
1416 Loggable.__init__(self)
1421 self.reporter = None
1422 self._list_testers()
1423 self.all_tests = None
1424 self.wanted_tests_patterns = []
1426 self.queue = queue.Queue()
1428 self.total_num_tests = 0
1431 self.vfb_server = None
1433 def _list_app_dirs(self):
1435 env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1436 if env_dirs is not None:
1437 for dir_ in env_dirs.split(":"):
1438 app_dirs.append(dir_)
1442 def _exec_app(self, app_dir, env):
1444 files = os.listdir(app_dir)
1445 except OSError as e:
1446 self.debug("Could not list %s: %s" % (app_dir, e))
1449 if f.endswith(".py"):
1450 exec(compile(open(os.path.join(app_dir, f)).read(),
1451 os.path.join(app_dir, f), 'exec'), env)
1453 def _exec_apps(self, env):
1454 app_dirs = self._list_app_dirs()
1455 for app_dir in app_dirs:
1456 self._exec_app(app_dir, env)
1458 def _list_testers(self):
1459 env = globals().copy()
1460 self._exec_apps(env)
1462 testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1463 for tester in testers:
1464 if tester.init() is True:
1465 self.testers.append(tester)
1467 self.warning("Can not init tester: %s -- PATH is %s"
1468 % (tester.name, os.environ["PATH"]))
1470 def add_options(self, parser):
1471 for tester in self.testers:
1472 tester.add_options(parser)
1474 def _load_testsuite(self, testsuites):
1476 for testsuite in testsuites:
1478 sys.path.insert(0, os.path.dirname(testsuite))
1479 return (__import__(os.path.basename(testsuite).replace(".py", "")), None)
1480 except Exception as e:
1481 exceptions.append("Could not load %s: %s" % (testsuite, e))
1484 sys.path.remove(os.path.dirname(testsuite))
1486 return (None, exceptions)
1488 def _load_testsuites(self):
1490 for testsuite in self.options.testsuites:
1491 if os.path.exists(testsuite):
1492 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1493 loaded_module = self._load_testsuite([testsuite])
1495 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1496 for d in self.options.testsuites_dirs]
1497 loaded_module = self._load_testsuite(possible_testsuites_paths)
1499 module = loaded_module[0]
1500 if not loaded_module[0]:
1501 if "." in testsuite:
1502 self.options.testsuites.append(testsuite.split('.')[0])
1503 self.info("%s looks like a test name, trying that" %
1505 self.options.wanted_tests.append(testsuite)
1507 printc("Could not load testsuite: %s, reasons: %s" % (
1508 testsuite, loaded_module[1]), Colors.FAIL)
1511 testsuites.add(module)
1512 if not hasattr(module, "TEST_MANAGER"):
1513 module.TEST_MANAGER = [tester.name for tester in self.testers]
1514 elif not isinstance(module.TEST_MANAGER, list):
1515 module.TEST_MANAGER = [module.TEST_MANAGER]
1517 self.options.testsuites = list(testsuites)
1519 def _setup_testsuites(self):
1520 for testsuite in self.options.testsuites:
1522 wanted_test_manager = None
1523 if hasattr(testsuite, "TEST_MANAGER"):
1524 wanted_test_manager = testsuite.TEST_MANAGER
1525 if not isinstance(wanted_test_manager, list):
1526 wanted_test_manager = [wanted_test_manager]
1528 for tester in self.testers:
1529 if wanted_test_manager is not None and \
1530 tester.name not in wanted_test_manager:
1533 prev_testsuite_name = TestsManager.loading_testsuite
1534 if self.options.user_paths:
1535 TestsManager.loading_testsuite = tester.name
1536 tester.register_defaults()
1539 TestsManager.loading_testsuite = testsuite.__name__
1540 if testsuite.setup_tests(tester, self.options):
1542 if prev_testsuite_name:
1543 TestsManager.loading_testsuite = prev_testsuite_name
1546 printc("Could not load testsuite: %s"
1547 " maybe because of missing TestManager"
1548 % (testsuite), Colors.FAIL)
1551 def _load_config(self, options):
1552 printc("Loading config files is DEPRECATED"
1553 " you should use the new testsuite format now",)
1555 for tester in self.testers:
1556 tester.options = options
1557 globals()[tester.name] = tester
1558 globals()["options"] = options
1559 c__file__ = __file__
1560 globals()["__file__"] = self.options.config
1561 exec(compile(open(self.options.config).read(),
1562 self.options.config, 'exec'), globals())
1563 globals()["__file__"] = c__file__
1565 def set_settings(self, options, args):
1566 if options.xunit_file:
1567 self.reporter = reporters.XunitReporter(options)
1569 self.reporter = reporters.Reporter(options)
1571 self.options = options
1572 wanted_testers = None
1573 for tester in self.testers:
1574 if tester.name in args:
1575 wanted_testers = tester.name
1578 testers = self.testers
1580 for tester in testers:
1581 if tester.name in args:
1582 self.testers.append(tester)
1583 args.remove(tester.name)
1586 self._load_config(options)
1588 self._load_testsuites()
1589 if not self.options.testsuites:
1590 printc("Not testsuite loaded!", Colors.FAIL)
1593 for tester in self.testers:
1594 tester.set_settings(options, args, self.reporter)
1596 if not options.config and options.testsuites:
1597 if self._setup_testsuites() is False:
1600 for tester in self.testers:
1601 if not tester.set_blacklists():
1604 if not tester.check_expected_failures():
1607 if self.needs_http_server() or options.httponly is True:
1608 self.httpsrv = HTTPServer(options)
1609 self.httpsrv.start()
1611 if options.no_display:
1612 self.vfb_server = get_virual_frame_buffer_server(options)
1613 res = self.vfb_server.start()
1615 printc("Could not start virtual frame server: %s" % res[1],
1618 os.environ["DISPLAY"] = self.vfb_server.display_id
1622 def _check_tester_has_other_testsuite(self, testsuite, tester):
1623 if tester.name != testsuite.TEST_MANAGER[0]:
1626 for t in self.options.testsuites:
1628 for other_testmanager in t.TEST_MANAGER:
1629 if other_testmanager == tester.name:
1634 def _check_defined_tests(self, tester, tests):
1635 if self.options.blacklisted_tests or self.options.wanted_tests:
1638 tests_names = [test.classname for test in tests]
1639 testlist_changed = False
1640 for testsuite in self.options.testsuites:
1641 if not self._check_tester_has_other_testsuite(testsuite, tester) \
1642 and tester.check_testslist:
1644 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1647 know_tests = testlist_file.read().split("\n")
1648 testlist_file.close()
1650 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1656 for test in know_tests:
1657 if test and test.strip('~') not in tests_names:
1658 if not test.startswith('~'):
1659 testlist_changed = True
1660 printc("Test %s Not in testsuite %s anymore"
1661 % (test, testsuite.__file__), Colors.FAIL)
1663 optional_out.append((test, None))
1665 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1666 key=lambda x: x[0].strip('~'))
1668 for tname, test in tests_names:
1669 if test and test.optional:
1671 testlist_file.write("%s\n" % (tname))
1672 if tname and tname not in know_tests:
1673 printc("Test %s is NEW in testsuite %s"
1674 % (tname, testsuite.__file__),
1675 Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1676 testlist_changed = True
1678 testlist_file.close()
1681 return testlist_changed
1683 def list_tests(self):
1684 for tester in self.testers:
1685 if not self._tester_needed(tester):
1688 tests = tester.list_tests()
1689 if self._check_defined_tests(tester, tests) and \
1690 self.options.fail_on_testlist_change:
1691 raise RuntimeError("Unexpected new test in testsuite.")
1693 self.tests.extend(tests)
1694 return sorted(list(self.tests), key=lambda t: t.classname)
1696 def _tester_needed(self, tester):
1697 for testsuite in self.options.testsuites:
1698 if tester.name in testsuite.TEST_MANAGER:
1702 def server_wrapper(self, ready):
1703 self.server = GstValidateTCPServer(
1704 ('localhost', 0), GstValidateListener)
1705 self.server.socket.settimeout(None)
1706 self.server.launcher = self
1707 self.serverport = self.server.socket.getsockname()[1]
1708 self.info("%s server port: %s" % (self, self.serverport))
1711 self.server.serve_forever(poll_interval=0.05)
1713 def _start_server(self):
1714 self.info("Starting TCP Server")
1715 ready = threading.Event()
1716 self.server_thread = threading.Thread(target=self.server_wrapper,
1717 kwargs={'ready': ready})
1718 self.server_thread.start()
1720 os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
1722 def _stop_server(self):
1724 self.server.shutdown()
1725 self.server_thread.join()
1726 self.server.server_close()
1729 def test_wait(self):
1731 # Check process every second for timeout
1733 self.queue.get(timeout=1)
1737 for test in self.jobs:
1738 if test.process_update():
1739 self.jobs.remove(test)
1742 def tests_wait(self):
1744 test = self.test_wait()
1745 test.check_results()
1746 except KeyboardInterrupt:
1747 for test in self.jobs:
1748 test.kill_subprocess()
1753 def start_new_job(self, tests_left):
1755 test = tests_left.pop(0)
1759 test.test_start(self.queue)
1761 self.jobs.append(test)
1765 def _run_tests(self):
1768 if not self.all_tests:
1769 all_tests = self.list_tests()
1770 self.all_tests = all_tests
1771 self.total_num_tests = len(self.all_tests)
1772 if not sys.stdout.isatty():
1773 printc("\nRunning %d tests..." % self.total_num_tests, color=Colors.HEADER)
1775 self.reporter.init_timer()
1778 for test in self.tests:
1779 if test.is_parallel:
1782 alone_tests.append(test)
1784 max_num_jobs = min(self.options.num_jobs, len(tests))
1787 # if order of test execution doesn't matter, shuffle
1788 # the order to optimize cpu usage
1789 if self.options.shuffle:
1790 random.shuffle(tests)
1791 random.shuffle(alone_tests)
1793 current_test_num = 1
1794 for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
1795 tests_left = list(tests)
1796 for i in range(num_jobs):
1797 if not self.start_new_job(tests_left):
1801 while jobs_running != 0:
1802 test = self.tests_wait()
1804 test.number = "[%d / %d] " % (current_test_num,
1805 self.total_num_tests)
1806 current_test_num += 1
1807 res = test.test_end()
1808 self.reporter.after_test(test)
1809 if res != Result.PASSED and (self.options.forever or
1810 self.options.fatal_error):
1812 if self.start_new_job(tests_left):
1817 def clean_tests(self):
1818 for test in self.tests:
1822 def run_tests(self):
1824 self._start_server()
1825 if self.options.forever:
1828 printc("Running iteration %d" % r, title=True)
1830 if not self._run_tests():
1836 elif self.options.n_runs:
1838 for r in range(self.options.n_runs):
1839 t = "Running iteration %d" % r
1840 print("%s\n%s\n%s\n" % ("=" * len(t), t, "=" * len(t)))
1841 if not self._run_tests():
1847 return self._run_tests()
1852 self.vfb_server.stop()
1855 def final_report(self):
1856 return self.reporter.final_report()
1858 def needs_http_server(self):
1859 for tester in self.testers:
1860 if tester.needs_http_server():
1864 class NamedDic(object):
1866 def __init__(self, props):
1868 for name, value in props.items():
1869 setattr(self, name, value)
1872 class Scenario(object):
1874 def __init__(self, name, props, path=None):
1878 for prop, value in props:
1879 setattr(self, prop.replace("-", "_"), value)
1881 def get_execution_name(self):
1882 if self.path is not None:
1888 if hasattr(self, "seek"):
1889 return bool(self.seek)
1893 def needs_clock_sync(self):
1894 if hasattr(self, "need_clock_sync"):
1895 return bool(self.need_clock_sync)
1899 def needs_live_content(self):
1900 # Scenarios that can only be used on live content
1901 if hasattr(self, "live_content_required"):
1902 return bool(self.live_content_required)
1905 def compatible_with_live_content(self):
1906 # if a live content is required it's implicitely compatible with
1908 if self.needs_live_content():
1910 if hasattr(self, "live_content_compatible"):
1911 return bool(self.live_content_compatible)
1914 def get_min_media_duration(self):
1915 if hasattr(self, "min_media_duration"):
1916 return float(self.min_media_duration)
1920 def does_reverse_playback(self):
1921 if hasattr(self, "reverse_playback"):
1922 return bool(self.reverse_playback)
1926 def get_duration(self):
1928 return float(getattr(self, "duration"))
1929 except AttributeError:
1932 def get_min_tracks(self, track_type):
1934 return int(getattr(self, "min_%s_track" % track_type))
1935 except AttributeError:
1939 return "<Scenario %s>" % self.name
1942 class ScenarioManager(Loggable):
1946 FILE_EXTENSION = "scenario"
1948 def __new__(cls, *args, **kwargs):
1949 if not cls._instance:
1950 cls._instance = super(ScenarioManager, cls).__new__(
1951 cls, *args, **kwargs)
1952 cls._instance.config = None
1953 cls._instance.discovered = False
1954 Loggable.__init__(cls._instance)
1956 return cls._instance
1958 def find_special_scenarios(self, mfile):
1960 mfile_bname = os.path.basename(mfile)
1962 for f in os.listdir(os.path.dirname(mfile)):
1963 if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
1964 scenarios.append(os.path.join(os.path.dirname(mfile), f))
1967 scenarios = self.discover_scenarios(scenarios, mfile)
1971 def discover_scenarios(self, scenario_paths=[], mfile=None):
1973 Discover scenarios specified in scenario_paths or the default ones
1974 if nothing specified there
1977 scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
1978 logs = open(os.path.join(self.config.logsdir,
1979 "scenarios_discovery.log"), 'w')
1982 command = [GstValidateBaseTestManager.COMMAND,
1983 "--scenarios-defs-output-file", scenario_defs]
1984 command.extend(scenario_paths)
1985 subprocess.check_call(command, stdout=logs, stderr=logs)
1986 except subprocess.CalledProcessError as e:
1990 config = configparser.RawConfigParser()
1991 f = open(scenario_defs)
1994 for section in config.sections():
1996 for scenario_path in scenario_paths:
1997 if section in os.path.splitext(os.path.basename(scenario_path))[0]:
2000 path = scenario_path
2002 # The real name of the scenario is:
2003 # filename.REALNAME.scenario
2004 name = scenario_path.replace(mfile + ".", "").replace(
2005 "." + self.FILE_EXTENSION, "")
2006 path = scenario_path
2011 props = config.items(section)
2012 scenarios.append(Scenario(name, props, path))
2014 if not scenario_paths:
2015 self.discovered = True
2016 self.all_scenarios.extend(scenarios)
2020 def get_scenario(self, name):
2021 if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2022 scenarios = self.discover_scenarios([name])
2027 if self.discovered is False:
2028 self.discover_scenarios()
2031 return self.all_scenarios
2034 return [scenario for scenario in self.all_scenarios if scenario.name == name][0]
2036 self.warning("Scenario: %s not found" % name)
2040 class GstValidateBaseTestManager(TestsManager):
2041 scenarios_manager = ScenarioManager()
2045 super(GstValidateBaseTestManager, self).__init__()
2046 self._scenarios = []
2047 self._encoding_formats = []
2050 def update_commands(cls, extra_paths=None):
2051 for varname, cmd in {'': 'gst-validate',
2052 'TRANSCODING_': 'gst-validate-transcoding',
2053 'MEDIA_CHECK_': 'gst-validate-media-check',
2054 'RTSP_SERVER_': 'gst-validate-rtsp-server',
2055 'INSPECT_': 'gst-inspect'}.items():
2056 setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2059 def has_feature(cls, featurename):
2061 return cls.features_cache[featurename]
2066 subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2068 except subprocess.CalledProcessError:
2071 cls.features_cache[featurename] = res
2074 def add_scenarios(self, scenarios):
2076 @scenarios A list or a unic scenario name(s) to be run on the tests.
2077 They are just the default scenarios, and then depending on
2078 the TestsGenerator to be used you can have more fine grained
2079 control on what to be run on each serie of tests.
2081 if isinstance(scenarios, list):
2082 self._scenarios.extend(scenarios)
2084 self._scenarios.append(scenarios)
2086 self._scenarios = list(set(self._scenarios))
2088 def set_scenarios(self, scenarios):
2090 Override the scenarios
2092 self._scenarios = []
2093 self.add_scenarios(scenarios)
2095 def get_scenarios(self):
2096 return self._scenarios
2098 def add_encoding_formats(self, encoding_formats):
2100 :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2101 formats for transcoding test.
2102 They are just the default encoding formats, and then depending on
2103 the TestsGenerator to be used you can have more fine grained
2104 control on what to be run on each serie of tests.
2106 if isinstance(encoding_formats, list):
2107 self._encoding_formats.extend(encoding_formats)
2109 self._encoding_formats.append(encoding_formats)
2111 self._encoding_formats = list(set(self._encoding_formats))
2113 def get_encoding_formats(self):
2114 return self._encoding_formats
2117 GstValidateBaseTestManager.update_commands()
2120 class MediaDescriptor(Loggable):
2123 Loggable.__init__(self)
2126 raise NotImplemented
2128 def has_frames(self):
2131 def get_media_filepath(self):
2132 raise NotImplemented
2134 def skip_parsers(self):
2138 raise NotImplemented
2141 raise NotImplemented
2143 def get_duration(self):
2144 raise NotImplemented
2146 def get_protocol(self):
2147 raise NotImplemented
2149 def is_seekable(self):
2150 raise NotImplemented
2153 raise NotImplemented
2156 raise NotImplemented
2158 def get_num_tracks(self, track_type):
2159 raise NotImplemented
2161 def can_play_reverse(self):
2162 raise NotImplemented
2167 def is_compatible(self, scenario):
2168 if scenario is None:
2171 if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2172 self.debug("Do not run %s as %s does not support seeking",
2173 scenario, self.get_uri())
2176 if self.is_image() and scenario.needs_clock_sync():
2177 self.debug("Do not run %s as %s is an image",
2178 scenario, self.get_uri())
2181 if not self.can_play_reverse() and scenario.does_reverse_playback():
2184 if not self.is_live() and scenario.needs_live_content():
2185 self.debug("Do not run %s as %s is not a live content",
2186 scenario, self.get_uri())
2189 if self.is_live() and not scenario.compatible_with_live_content():
2190 self.debug("Do not run %s as %s is a live content",
2191 scenario, self.get_uri())
2194 if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2197 if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2199 "Do not run %s as %s is too short (%i < min media duation : %i",
2200 scenario, self.get_uri(),
2201 self.get_duration() / GST_SECOND,
2202 scenario.get_min_media_duration())
2205 for track_type in ['audio', 'subtitle', 'video']:
2206 if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2207 self.debug("%s -- %s | At least %s %s track needed < %s"
2208 % (scenario, self.get_uri(), track_type,
2209 scenario.get_min_tracks(track_type),
2210 self.get_num_tracks(track_type)))
2216 class GstValidateMediaDescriptor(MediaDescriptor):
2217 # Some extension file for discovering results
2218 MEDIA_INFO_EXT = "media_info"
2219 PUSH_MEDIA_INFO_EXT = "media_info.push"
2220 STREAM_INFO_EXT = "stream_info"
2222 def __init__(self, xml_path):
2223 super(GstValidateMediaDescriptor, self).__init__()
2225 self._xml_path = xml_path
2227 media_xml = ET.parse(xml_path).getroot()
2228 except xml.etree.ElementTree.ParseError:
2229 printc("Could not parse %s" % xml_path,
2233 self._extract_data(media_xml)
2235 self.set_protocol(urllib.parse.urlparse(
2236 urllib.parse.urlparse(self.get_uri()).scheme).scheme)
2238 def skip_parsers(self):
2239 return self._skip_parsers
2241 def has_frames(self):
2242 return self._has_frames
2244 def _extract_data(self, media_xml):
2245 # Extract the information we need from the xml
2246 self._caps = media_xml.findall("streams")[0].attrib["caps"]
2247 self._track_caps = []
2249 streams = media_xml.findall("streams")[0].findall("stream")
2253 for stream in streams:
2254 self._track_caps.append(
2255 (stream.attrib["type"], stream.attrib["caps"]))
2256 self._uri = media_xml.attrib["uri"]
2257 self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2258 self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2259 self._duration = int(media_xml.attrib["duration"])
2260 self._protocol = media_xml.get("protocol", None)
2261 self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2262 self._is_live = media_xml.get("live", "false").lower() == "true"
2263 self._is_image = False
2264 for stream in media_xml.findall("streams")[0].findall("stream"):
2265 if stream.attrib["type"] == "image":
2266 self._is_image = True
2267 self._track_types = []
2268 for stream in media_xml.findall("streams")[0].findall("stream"):
2269 self._track_types.append(stream.attrib["type"])
2272 def new_from_uri(uri, verbose=False, include_frames=False, is_push=False):
2274 include_frames = 0 # Never
2275 include_frames = 1 # always
2276 include_frames = 2 # if previous file included them
2279 media_path = utils.url2path(uri)
2281 ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT if is_push else \
2282 GstValidateMediaDescriptor.MEDIA_INFO_EXT
2283 descriptor_path = "%s.%s" % (media_path, ext)
2284 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2286 if include_frames == 2:
2288 media_xml = ET.parse(descriptor_path).getroot()
2290 include_frames = bool(int(media_xml.attrib["frame-detection"]))
2291 if bool(int(media_xml.attrib.get("skip-parsers"))):
2292 args.append("--skip-parsers")
2293 except FileNotFoundError:
2296 include_frames = bool(include_frames)
2298 args.extend(["--output-file", descriptor_path])
2300 args.extend(["--full"])
2303 printc("Generating media info for %s\n"
2304 " Command: '%s'" % (media_path, ' '.join(args)),
2308 subprocess.check_output(args, stderr=open(os.devnull))
2309 except subprocess.CalledProcessError as e:
2311 printc("Result: Failed", Colors.FAIL)
2313 loggable.warning("GstValidateMediaDescriptor",
2314 "Exception: %s" % e)
2318 printc("Result: Passed", Colors.OKGREEN)
2321 return GstValidateMediaDescriptor(descriptor_path)
2322 except (IOError, xml.etree.ElementTree.ParseError):
2326 return self._xml_path
2328 def need_clock_sync(self):
2329 return Protocols.needs_clock_sync(self.get_protocol())
2331 def get_media_filepath(self):
2332 if self.get_protocol() == Protocols.FILE:
2333 return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "")
2334 elif self.get_protocol() == Protocols.PUSHFILE:
2335 return self._xml_path.replace("." + self.PUSH_MEDIA_INFO_EXT, "")
2337 return self._xml_path.replace("." + self.STREAM_INFO_EXT, "")
2342 def get_tracks_caps(self):
2343 return self._track_caps
2348 def get_duration(self):
2349 return self._duration
2351 def set_protocol(self, protocol):
2352 if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2353 self._protocol = Protocols.PUSHFILE
2355 self._protocol = protocol
2357 def get_protocol(self):
2358 return self._protocol
2360 def is_seekable(self):
2361 return self._is_seekable
2364 return self._is_live
2366 def can_play_reverse(self):
2370 return self._is_image
2372 def get_num_tracks(self, track_type):
2374 for t in self._track_types:
2380 def get_clean_name(self):
2381 name = os.path.basename(self.get_path())
2382 name = re.sub("\.stream_info|\.media_info", "", name)
2384 return name.replace('.', "_")
2387 class MediaFormatCombination(object):
2388 FORMATS = {"aac": "audio/mpeg,mpegversion=4", # Audio
2389 "ac3": "audio/x-ac3",
2390 "vorbis": "audio/x-vorbis",
2391 "mp3": "audio/mpeg,mpegversion=1,layer=3",
2392 "opus": "audio/x-opus",
2393 "rawaudio": "audio/x-raw",
2396 "h264": "video/x-h264",
2397 "h265": "video/x-h265",
2398 "vp8": "video/x-vp8",
2399 "vp9": "video/x-vp9",
2400 "theora": "video/x-theora",
2401 "prores": "video/x-prores",
2402 "jpeg": "image/jpeg",
2405 "webm": "video/webm",
2406 "ogg": "application/ogg",
2407 "mkv": "video/x-matroska",
2408 "mp4": "video/quicktime,variant=iso;",
2409 "quicktime": "video/quicktime;"}
2412 return "%s and %s in %s" % (self.audio, self.video, self.container)
2414 def __init__(self, container, audio, video):
2416 Describes a media format to be used for transcoding tests.
2418 :param container: A string defining the container format to be used, must bin in self.FORMATS
2419 :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2420 :param video: A string defining the video format to be used, must bin in self.FORMATS
2422 self.container = container
2426 def get_caps(self, track_type):
2428 return self.FORMATS[self.__dict__[track_type]]
2432 def get_audio_caps(self):
2433 return self.get_caps("audio")
2435 def get_video_caps(self):
2436 return self.get_caps("video")
2438 def get_muxer_caps(self):
2439 return self.get_caps("container")