3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
20 """ Class representing tests and test managers. """
42 from .utils import which
43 from . import reporters
44 from . import loggable
45 from .loggable import Loggable
48 from lxml import etree as ET
50 import xml.etree.cElementTree as ET
52 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
53 Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
56 # The factor by which we increase the hard timeout when running inside
58 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
59 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
60 # The error reported by valgrind when detecting errors
61 VALGRIND_ERROR_CODE = 20
63 VALIDATE_OVERRIDE_EXTENSION = ".override"
64 COREDUMP_SIGNALS = [-getattr(signal, s) for s in [
65 'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
66 'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)] + [139]
71 """ A class representing a particular test. """
73 def __init__(self, application_name, classname, options,
74 reporter, duration=0, timeout=DEFAULT_TIMEOUT,
75 hard_timeout=None, extra_env_variables=None,
76 expected_failures=None, is_parallel=True,
79 @timeout: The timeout during which the value return by get_current_value
80 keeps being exactly equal
81 @hard_timeout: Max time the test can take in absolute
83 Loggable.__init__(self)
84 self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
86 self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
87 self.hard_timeout *= options.timeout_factor
89 self.hard_timeout = hard_timeout
90 self.classname = classname
91 self.options = options
92 self.application = application_name
94 self.server_command = None
95 self.reporter = reporter
100 self.duration = duration
101 self.stack_trace = None
103 if expected_failures is None:
104 self.expected_failures = []
105 elif not isinstance(expected_failures, list):
106 self.expected_failures = [expected_failures]
108 self.expected_failures = expected_failures
110 extra_env_variables = extra_env_variables or {}
111 self.extra_env_variables = extra_env_variables
112 self.optional = False
113 self.is_parallel = is_parallel
114 self.generator = None
115 # String representation of the test number in the testsuite
117 self.workdir = workdir
122 self.kill_subprocess()
125 self.time_taken = 0.0
126 self._starting_time = None
127 self.result = Result.NOT_RUN
130 self.extra_logfiles = []
131 self.__env_variable = []
132 self.kill_subprocess()
135 string = self.classname
136 if self.result != Result.NOT_RUN:
137 string += ": " + self.result
138 if self.result in [Result.FAILED, Result.TIMEOUT]:
139 string += " '%s'\n" \
140 " You can reproduce with: %s\n" \
141 % (self.message, self.get_command_repr())
143 if not self.options.redirect_logs and \
144 self.result == Result.PASSED or \
145 not self.options.dump_on_failure:
146 string += self.get_logfile_repr()
150 def add_env_variable(self, variable, value=None):
152 Only usefull so that the gst-validate-launcher can print the exact
153 right command line to reproduce the tests
156 value = os.environ.get(variable, None)
161 self.__env_variable.append(variable)
164 def _env_variable(self):
166 for var in set(self.__env_variable):
169 value = self.proc_env.get(var, None)
170 if value is not None:
171 res += "%s='%s'" % (var, value)
175 def open_logfile(self):
179 path = os.path.join(self.options.logsdir,
180 self.classname.replace(".", os.sep))
181 mkdir(os.path.dirname(path))
184 if self.options.redirect_logs == 'stdout':
185 self.out = sys.stdout
186 elif self.options.redirect_logs == 'stderr':
187 self.out = sys.stderr
189 self.out = open(path, 'w+')
191 def close_logfile(self):
192 if not self.options.redirect_logs:
197 def _get_file_content(self, file_name):
198 f = open(file_name, 'r+')
204 def get_log_content(self):
205 return self._get_file_content(self.logfile)
207 def get_extra_log_content(self, extralog):
208 if extralog not in self.extra_logfiles:
211 return self._get_file_content(extralog)
213 def get_classname(self):
214 name = self.classname.split('.')[-1]
215 classname = self.classname.replace('.%s' % name, '')
220 return self.classname.split('.')[-1]
223 if self._uuid is None:
224 self._uuid = self.classname + str(uuid.uuid4())
227 def add_arguments(self, *args):
230 def build_arguments(self):
231 self.add_env_variable("LD_PRELOAD")
232 self.add_env_variable("DISPLAY")
234 def add_stack_trace_to_logfile(self):
235 trace_gatherer = BackTraceGenerator.get_default()
236 stack_trace = trace_gatherer.get_trace(self)
241 info = "\n\n== Stack trace: == \n%s" % stack_trace
242 if self.options.redirect_logs:
244 elif self.options.xunit_file:
245 self.stack_trace = stack_trace
247 with open(self.logfile, 'a') as f:
250 def set_result(self, result, message="", error=""):
251 self.debug("Setting result: %s (message: %s, error: %s)" % (result,
254 if result is Result.TIMEOUT:
255 if self.options.debug is True:
257 printc("Timeout, you should process <ctrl>c to get into gdb",
259 # and wait here until gdb exits
260 self.process.communicate()
262 pname = self.command[0]
263 input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
264 "Press enter to continue" % (Colors.FAIL, pname, self.process.pid,
267 self.add_stack_trace_to_logfile()
270 self.message = message
271 self.error_str = error
273 def check_results(self):
274 if self.result is Result.FAILED or self.result is Result.TIMEOUT:
277 self.debug("%s returncode: %s", self, self.process.returncode)
278 if self.process.returncode == 0:
279 self.set_result(Result.PASSED)
280 elif self.process.returncode in [-signal.SIGSEGV, -signal.SIGABRT, 139]:
281 self.add_stack_trace_to_logfile()
282 self.set_result(Result.FAILED,
283 "Application segfaulted, returne code: %d" % (
284 self.process.returncode))
285 elif self.process.returncode == VALGRIND_ERROR_CODE:
286 self.set_result(Result.FAILED, "Valgrind reported errors")
288 self.set_result(Result.FAILED,
289 "Application returned %d" % (self.process.returncode))
291 def get_current_value(self):
293 Lets subclasses implement a nicer timeout measurement method
294 They should return some value with which we will compare
295 the previous and timeout if they are egual during self.timeout
298 return Result.NOT_RUN
300 def process_update(self):
302 Returns True when process has finished running or has timed out.
305 if self.process is None:
306 # Process has not started running yet
310 if self.process.returncode is not None:
313 val = self.get_current_value()
315 self.debug("Got value: %s" % val)
316 if val is Result.NOT_RUN:
317 # The get_current_value logic is not implemented... dumb
319 if time.time() - self.last_change_ts > self.timeout:
320 self.set_result(Result.TIMEOUT,
321 "Application timed out: %s secs" %
326 elif val is Result.FAILED:
328 elif val is Result.KNOWN_ERROR:
331 self.log("New val %s" % val)
333 if val == self.last_val:
334 delta = time.time() - self.last_change_ts
335 self.debug("%s: Same value for %d/%d seconds" %
336 (self, delta, self.timeout))
337 if delta > self.timeout:
338 self.set_result(Result.TIMEOUT,
339 "Application timed out: %s secs" %
343 elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
345 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
348 self.last_change_ts = time.time()
353 def get_subproc_env(self):
354 return os.environ.copy()
356 def kill_subprocess(self):
357 utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT)
359 def run_external_checks(self):
362 def thread_wrapper(self):
364 # Restore the SIGINT handler for the child process (gdb) to ensure
366 signal.signal(signal.SIGINT, signal.SIG_DFL)
368 if self.options.gdb and os.name != "nt":
369 preexec_fn = enable_sigint
373 self.process = subprocess.Popen(self.command,
378 preexec_fn=preexec_fn)
380 if self.result is not Result.TIMEOUT:
381 if self.process.returncode == 0:
382 self.run_external_checks()
385 def get_valgrind_suppression_file(self, subdir, name):
386 p = get_data_file(subdir, name)
390 self.error("Could not find any %s file" % name)
392 def get_valgrind_suppressions(self):
393 return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
395 def use_gdb(self, command):
396 if self.hard_timeout is not None:
397 self.hard_timeout *= GDB_TIMEOUT_FACTOR
398 self.timeout *= GDB_TIMEOUT_FACTOR
400 if not self.options.gdb_non_stop:
401 self.timeout = sys.maxsize
402 self.hard_timeout = sys.maxsize
405 if self.options.gdb_non_stop:
406 args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
407 args += ["--args"] + command
410 def use_valgrind(self, command, subenv):
411 vglogsfile = self.logfile + '.valgrind'
412 self.extra_logfiles.append(vglogsfile)
416 for o, v in [('trace-children', 'yes'),
417 ('tool', 'memcheck'),
418 ('leak-check', 'full'),
419 ('leak-resolution', 'high'),
420 # TODO: errors-for-leak-kinds should be set to all instead of definite
421 # and all false positives should be added to suppression files.
422 ('errors-for-leak-kinds', 'definite'),
423 ('num-callers', '20'),
424 ('error-exitcode', str(VALGRIND_ERROR_CODE)),
425 ('gen-suppressions', 'all')]:
426 vg_args.append("--%s=%s" % (o, v))
428 if not self.options.redirect_logs:
429 vglogsfile = self.logfile + '.valgrind'
430 self.extra_logfiles.append(vglogsfile)
431 vg_args.append("--%s=%s" % ('log-file', vglogsfile))
433 for supp in self.get_valgrind_suppressions():
434 vg_args.append("--suppressions=%s" % supp)
436 command = ["valgrind"] + vg_args + command
438 # Tune GLib's memory allocator to be more valgrind friendly
439 subenv['G_DEBUG'] = 'gc-friendly'
440 subenv['G_SLICE'] = 'always-malloc'
442 if self.hard_timeout is not None:
443 self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
444 self.timeout *= VALGRIND_TIMEOUT_FACTOR
446 # Enable 'valgrind.config'
447 self.add_validate_config(get_data_file(
448 'data', 'valgrind.config'), subenv)
449 if subenv == self.proc_env:
450 self.add_env_variable('G_DEBUG', 'gc-friendly')
451 self.add_env_variable('G_SLICE', 'always-malloc')
452 self.add_env_variable('GST_VALIDATE_CONFIG',
453 self.proc_env['GST_VALIDATE_CONFIG'])
457 def add_validate_config(self, config, subenv=None):
459 subenv = self.extra_env_variables
461 if subenv.get('GST_VALIDATE_CONFIG'):
462 subenv['GST_VALIDATE_CONFIG'] = '%s%s%s' % (
463 self.proc_env['GST_VALIDATE_CONFIG'], os.pathsep, config)
465 subenv['GST_VALIDATE_CONFIG'] = config
467 def launch_server(self):
470 def get_logfile_repr(self):
472 logfiles = self.extra_logfiles.copy()
474 if not self.options.redirect_logs:
475 logfiles.insert(0, self.logfile)
478 message += " - %s\n" % log
482 def get_command_repr(self):
483 message = "%s %s" % (self._env_variable, ' '.join(self.command))
484 if self.server_command:
485 message = "%s & %s" % (self.server_command, message)
487 return "'%s'" % message
489 def test_start(self, queue):
492 self.server_command = self.launch_server()
494 self.command = [self.application]
495 self._starting_time = time.time()
496 self.build_arguments()
497 self.proc_env = self.get_subproc_env()
499 for var, value in list(self.extra_env_variables.items()):
500 value = self.proc_env.get(var, '') + os.pathsep + value
501 self.proc_env[var] = value.strip(os.pathsep)
502 self.add_env_variable(var, self.proc_env[var])
505 self.command = self.use_gdb(self.command)
507 self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
508 # Make the gst-validate executable ignore SIGINT while gdb is running.
509 signal.signal(signal.SIGINT, signal.SIG_IGN)
511 if self.options.valgrind:
512 self.command = self.use_valgrind(self.command, self.proc_env)
514 if not self.options.redirect_logs:
515 self.out.write("=================\n"
518 "=================\n\n"
519 % (self.classname, ' '.join(self.command)))
522 message = "Launching: %s%s\n" \
523 " Command: %s\n" % (Colors.ENDC, self.classname,
524 self.get_command_repr())
525 printc(message, Colors.OKBLUE)
527 self.thread = threading.Thread(target=self.thread_wrapper)
531 self.last_change_ts = time.time()
532 self.start_ts = time.time()
534 def _dump_log_file(self, logfile):
535 message = "Dumping contents of %s\n" % logfile
536 printc(message, Colors.FAIL)
538 with open(logfile, 'r') as fin:
541 def _dump_log_files(self):
542 printc("Dumping log files on failure\n", Colors.FAIL)
543 self._dump_log_file(self.logfile)
544 for logfile in self.extra_logfiles:
545 self._dump_log_file(logfile)
548 self.kill_subprocess()
550 self.time_taken = time.time() - self._starting_time
553 signal.signal(signal.SIGINT, self.previous_sigint_handler)
555 if self.result != Result.PASSED:
559 message = "%s %s: %s%s" % (self.number, self.classname, self.result,
560 " (" + self.message + ")" if self.message else "")
563 printc(message, color=utils.get_color_for_result(self.result), end=end)
566 if self.options.dump_on_failure:
567 if self.result is not Result.PASSED:
568 self._dump_log_files()
570 # Only keep around env variables we need later
572 for n in self.__env_variable:
573 clean_env[n] = self.proc_env.get(n, None)
574 self.proc_env = clean_env
576 # Don't keep around JSON report objects, they were processed
577 # in check_results already
583 class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
587 class GstValidateListener(socketserver.BaseRequestHandler):
589 """Implements BaseRequestHandler handle method"""
592 raw_len = self.request.recv(4)
595 msglen = struct.unpack('>I', raw_len)[0]
596 msg = self.request.recv(msglen).decode()
600 obj = json.loads(msg)
603 # First message must contain the uuid
604 uuid = obj.get("uuid", None)
607 # Find test from launcher
608 for t in self.server.launcher.tests:
609 if uuid == t.get_uuid():
613 self.server.launcher.error(
614 "Could not find test for UUID %s" % uuid)
617 obj_type = obj.get("type", '')
618 if obj_type == 'position':
619 test.set_position(obj['position'], obj['duration'],
621 elif obj_type == 'buffering':
622 test.set_position(obj['position'], 100)
623 elif obj_type == 'action':
624 test.add_action_execution(obj)
625 # Make sure that action is taken into account when checking if process
628 elif obj_type == 'action-done':
629 # Make sure that action end is taken into account when checking if process
632 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
633 elif obj_type == 'report':
637 class GstValidateTest(Test):
639 """ A class representing a particular test. """
640 findpos_regex = re.compile(
641 '.*position.*(\d+):(\d+):(\d+).(\d+).*duration.*(\d+):(\d+):(\d+).(\d+)')
642 findlastseek_regex = re.compile(
643 'seeking to.*(\d+):(\d+):(\d+).(\d+).*stop.*(\d+):(\d+):(\d+).(\d+).*rate.*(\d+)\.(\d+)')
645 HARD_TIMEOUT_FACTOR = 5
647 def __init__(self, application_name, classname,
648 options, reporter, duration=0,
649 timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
650 media_descriptor=None, extra_env_variables=None,
651 expected_failures=None, workdir=None):
653 extra_env_variables = extra_env_variables or {}
655 if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
657 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
659 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
663 # If we are running from source, use the -debug version of the
664 # application which is using rpath instead of libtool's wrappers. It's
665 # slightly faster to start and will not confuse valgrind.
666 debug = '%s-debug' % application_name
667 p = look_for_file_in_source_dir('tools', debug)
673 self.media_duration = -1
675 self.actions_infos = []
676 self.media_descriptor = media_descriptor
679 override_path = self.get_override_file(media_descriptor)
681 if extra_env_variables:
682 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
683 extra_env_variables["GST_VALIDATE_OVERRIDE"] += os.path.pathsep
685 extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
687 super(GstValidateTest, self).__init__(application_name, classname,
691 hard_timeout=hard_timeout,
692 extra_env_variables=extra_env_variables,
693 expected_failures=expected_failures,
696 # defines how much the process can be outside of the configured
698 self._sent_eos_time = None
700 if scenario is None or scenario.name.lower() == "none":
703 self.scenario = scenario
705 def kill_subprocess(self):
706 Test.kill_subprocess(self)
708 def add_report(self, report):
709 self.reports.append(report)
711 def set_position(self, position, duration, speed=None):
712 self.position = position
713 self.media_duration = duration
717 def add_action_execution(self, action_infos):
718 if action_infos['action-type'] == 'eos':
719 self._sent_eos_time = time.time()
720 self.actions_infos.append(action_infos)
722 def get_override_file(self, media_descriptor):
724 if media_descriptor.get_path():
725 override_path = os.path.splitext(media_descriptor.get_path())[
726 0] + VALIDATE_OVERRIDE_EXTENSION
727 if os.path.exists(override_path):
732 def get_current_position(self):
735 def get_current_value(self):
737 if self._sent_eos_time is not None:
739 if ((t - self._sent_eos_time)) > 30:
740 if self.media_descriptor.get_protocol() == Protocols.HLS:
741 self.set_result(Result.PASSED,
742 """Got no EOS 30 seconds after sending EOS,
743 in HLS known and tolerated issue:
744 https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
745 return Result.KNOWN_ERROR
748 Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
754 def get_subproc_env(self):
755 subproc_env = os.environ.copy()
757 subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
759 if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
760 gstlogsfile = self.logfile + '.gstdebug'
761 self.extra_logfiles.append(gstlogsfile)
762 subproc_env["GST_DEBUG_FILE"] = gstlogsfile
764 if self.options.no_color:
765 subproc_env["GST_DEBUG_NO_COLOR"] = '1'
767 # Ensure XInitThreads is called, see bgo#731525
768 subproc_env['GST_GL_XINITTHREADS'] = '1'
769 self.add_env_variable('GST_GL_XINITTHREADS', '1')
771 if self.scenario is not None:
772 scenario = self.scenario.get_execution_name()
773 subproc_env["GST_VALIDATE_SCENARIO"] = scenario
774 self.add_env_variable("GST_VALIDATE_SCENARIO",
775 subproc_env["GST_VALIDATE_SCENARIO"])
778 del subproc_env["GST_VALIDATE_SCENARIO"]
786 self._sent_eos_time = None
789 self.media_duration = -1
791 self.actions_infos = []
793 def build_arguments(self):
794 super(GstValidateTest, self).build_arguments()
795 if "GST_VALIDATE" in os.environ:
796 self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
798 if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
799 self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
800 os.environ["GST_VALIDATE_SCENARIOS_PATH"])
802 self.add_env_variable("GST_VALIDATE_CONFIG")
803 self.add_env_variable("GST_VALIDATE_OVERRIDE")
805 def get_extra_log_content(self, extralog):
806 value = Test.get_extra_log_content(self, extralog)
810 def report_matches_expected_failure(self, report, expected_failure):
811 for key in ['bug', 'bugs', 'sometimes']:
812 if key in expected_failure:
813 del expected_failure[key]
814 for key, value in list(report.items()):
815 if key in expected_failure:
816 if not re.findall(expected_failure[key], str(value)):
818 expected_failure.pop(key)
820 return not bool(expected_failure)
822 def check_reported_issues(self):
824 expected_failures = copy.deepcopy(self.expected_failures)
825 expected_retcode = [0]
826 for report in self.reports:
828 for expected_failure in expected_failures:
829 if self.report_matches_expected_failure(report,
830 expected_failure.copy()):
831 found = expected_failure
834 if found is not None:
835 expected_failures.remove(found)
836 if report['level'] == 'critical':
837 if found.get('sometimes') and isinstance(expected_retcode, list):
838 expected_retcode.append(18)
840 expected_retcode = [18]
841 elif report['level'] == 'critical':
842 ret.append(report['summary'])
845 return None, expected_failures, expected_retcode
847 return ret, expected_failures, expected_retcode
849 def check_expected_timeout(self, expected_timeout):
850 msg = "Expected timeout happened. "
851 result = Result.PASSED
852 message = expected_timeout.get('message')
854 if not re.findall(message, self.message):
855 result = Result.FAILED
856 msg = "Expected timeout message: %s got %s " % (
857 message, self.message)
859 expected_symbols = expected_timeout.get('stacktrace_symbols')
861 trace_gatherer = BackTraceGenerator.get_default()
862 stack_trace = trace_gatherer.get_trace(self)
865 if not isinstance(expected_symbols, list):
866 expected_symbols = [expected_symbols]
868 not_found_symbols = [s for s in expected_symbols
869 if s not in stack_trace]
870 if not_found_symbols:
871 result = Result.TIMEOUT
872 msg = "Expected symbols '%s' not found in stack trace " % (
875 msg += "No stack trace available, could not verify symbols "
879 def check_results(self):
880 if self.result in [Result.FAILED, self.result is Result.PASSED]:
883 for report in self.reports:
884 if report.get('issue-id') == 'runtime::missing-plugin':
885 self.set_result(Result.SKIPPED, "%s\n%s" % (report['summary'],
889 self.debug("%s returncode: %s", self, self.process.returncode)
891 criticals, not_found_expected_failures, expected_returncode = self.check_reported_issues()
893 expected_timeout = None
894 for i, f in enumerate(not_found_expected_failures):
895 if len(f) == 1 and f.get("returncode"):
896 returncode = f['returncode']
897 if not isinstance(expected_returncode, list):
898 returncode = [expected_returncode]
901 elif f.get("timeout"):
904 not_found_expected_failures = [f for f in not_found_expected_failures
905 if not f.get('returncode')]
908 result = Result.PASSED
909 if self.result == Result.TIMEOUT:
911 not_found_expected_failures.remove(expected_timeout)
912 result, msg = self.check_expected_timeout(expected_timeout)
915 elif self.process.returncode in COREDUMP_SIGNALS:
916 result = Result.FAILED
917 msg = "Application segfaulted "
918 self.add_stack_trace_to_logfile()
919 elif self.process.returncode == VALGRIND_ERROR_CODE:
920 msg = "Valgrind reported errors "
921 result = Result.FAILED
922 elif self.process.returncode not in expected_returncode:
923 msg = "Application returned %s " % self.process.returncode
924 if expected_returncode != [0]:
925 msg += "(expected %s) " % expected_returncode
926 result = Result.FAILED
929 msg += "(critical errors: [%s]) " % ', '.join(criticals)
930 result = Result.FAILED
932 if not_found_expected_failures:
933 mandatory_failures = [f for f in not_found_expected_failures
934 if not f.get('sometimes')]
936 if mandatory_failures:
937 msg += "(Expected errors not found: %s) " % mandatory_failures
938 result = Result.FAILED
939 elif self.expected_failures:
940 msg += '%s(Expected errors occured: %s)%s' % (Colors.OKBLUE,
941 self.expected_failures,
944 self.set_result(result, msg.strip())
946 def get_valgrind_suppressions(self):
947 result = super(GstValidateTest, self).get_valgrind_suppressions()
948 gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
950 result.append(gst_sup)
954 class GstValidateEncodingTestInterface(object):
955 DURATION_TOLERANCE = GST_SECOND / 4
957 def __init__(self, combination, media_descriptor, duration_tolerance=None):
958 super(GstValidateEncodingTestInterface, self).__init__()
960 self.media_descriptor = media_descriptor
961 self.combination = combination
964 self._duration_tolerance = duration_tolerance
965 if duration_tolerance is None:
966 self._duration_tolerance = self.DURATION_TOLERANCE
968 def get_current_size(self):
970 size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
974 self.debug("Size: %s" % size)
977 def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
978 audio_restriction=None, audio_presence=0,
985 if video_restriction is not None:
986 ret = ret + video_restriction + '->'
989 ret = ret + '|' + str(video_presence)
992 if audio_restriction is not None:
993 ret = ret + audio_restriction + '->'
996 ret = ret + '|' + str(audio_presence)
998 return ret.replace("::", ":")
1000 def get_profile(self, video_restriction=None, audio_restriction=None):
1001 vcaps = self.combination.get_video_caps()
1002 acaps = self.combination.get_audio_caps()
1003 if self.media_descriptor is not None:
1004 if self.media_descriptor.get_num_tracks("video") == 0:
1007 if self.media_descriptor.get_num_tracks("audio") == 0:
1010 return self._get_profile_full(self.combination.get_muxer_caps(),
1012 video_restriction=video_restriction,
1013 audio_restriction=audio_restriction)
1015 def _clean_caps(self, caps):
1017 Returns a list of key=value or structure name, without "(types)" or ";" or ","
1019 return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1021 # pylint: disable=E1101
1022 def _has_caps_type_variant(self, c, ccaps):
1024 Handle situations where we can have application/ogg or video/ogg or
1028 media_type = re.findall("application/|video/|audio/", c)
1030 media_type = media_type[0].replace('/', '')
1031 possible_mtypes = ["application", "video", "audio"]
1032 possible_mtypes.remove(media_type)
1033 for tmptype in possible_mtypes:
1034 possible_c_variant = c.replace(media_type, tmptype)
1035 if possible_c_variant in ccaps:
1037 "Found %s in %s, good enough!", possible_c_variant, ccaps)
1042 # pylint: disable=E1101
1043 def run_iqa_test(self, reference_file_uri):
1045 Runs IQA test if @reference_file_path exists
1046 @test: The test to run tests on
1048 if not GstValidateBaseTestManager.has_feature('iqa'):
1049 self.debug('Iqa element not present, not running extra test.')
1053 uridecodebin uri=%s !
1054 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1055 uridecodebin uri=%s ! iqa.
1056 """ % (reference_file_uri, self.dest_file)
1057 pipeline_desc = pipeline_desc.replace("\n", "")
1059 command = [GstValidateBaseTestManager.COMMAND] + \
1060 shlex.split(pipeline_desc)
1061 if not self.options.redirect_logs:
1063 "=================\n"
1064 "Running IQA tests on results of: %s\n"
1066 "=================\n\n" % (
1067 self.classname, ' '.join(command)))
1070 message = "Running IQA tests on results of:%s %s\n" \
1071 " Command: %s\n" % (
1072 Colors.ENDC, self.classname, ' '.join(command))
1073 printc(message, Colors.OKBLUE)
1075 self.process = subprocess.Popen(command,
1082 def check_encoded_file(self):
1083 result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1085 if result_descriptor is None:
1086 return (Result.FAILED, "Could not discover encoded file %s"
1089 duration = result_descriptor.get_duration()
1090 orig_duration = self.media_descriptor.get_duration()
1091 tolerance = self._duration_tolerance
1093 if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1094 os.remove(result_descriptor.get_path())
1095 return (Result.FAILED, "Duration of encoded file is "
1096 " wrong (%s instead of %s)" %
1097 (utils.TIME_ARGS(duration),
1098 utils.TIME_ARGS(orig_duration)))
1100 all_tracks_caps = result_descriptor.get_tracks_caps()
1101 container_caps = result_descriptor.get_caps()
1103 all_tracks_caps.insert(0, ("container", container_caps))
1105 for track_type, caps in all_tracks_caps:
1106 ccaps = self._clean_caps(caps)
1107 wanted_caps = self.combination.get_caps(track_type)
1108 cwanted_caps = self._clean_caps(wanted_caps)
1110 if wanted_caps is None:
1111 os.remove(result_descriptor.get_path())
1112 return (Result.FAILED,
1113 "Found a track of type %s in the encoded files"
1114 " but none where wanted in the encoded profile: %s"
1115 % (track_type, self.combination))
1117 for c in cwanted_caps:
1119 if not self._has_caps_type_variant(c, ccaps):
1120 os.remove(result_descriptor.get_path())
1121 return (Result.FAILED,
1122 "Field: %s (from %s) not in caps of the outputed file %s"
1123 % (wanted_caps, c, ccaps))
1125 os.remove(result_descriptor.get_path())
1126 return (Result.PASSED, "")
1129 class TestsManager(Loggable):
1131 """ A class responsible for managing tests. """
1134 loading_testsuite = None
1138 Loggable.__init__(self)
1141 self.unwanted_tests = []
1144 self.reporter = None
1145 self.wanted_tests_patterns = []
1146 self.blacklisted_tests_patterns = []
1147 self._generators = []
1148 self.check_testslist = True
1149 self.all_tests = None
1150 self.expected_failures = {}
1151 self.blacklisted_tests = []
1156 def list_tests(self):
1157 return sorted(list(self.tests), key=lambda x: x.classname)
1159 def find_tests(self, classname):
1160 regex = re.compile(classname)
1161 return [test for test in self.list_tests() if regex.findall(test.classname)]
1163 def add_expected_issues(self, expected_failures):
1164 expected_failures_re = {}
1165 for test_name_regex, failures in list(expected_failures.items()):
1166 regex = re.compile(test_name_regex)
1167 expected_failures_re[regex] = failures
1168 for test in self.tests:
1169 if regex.findall(test.classname):
1170 test.expected_failures.extend(failures)
1172 self.expected_failures.update(expected_failures_re)
1174 def add_test(self, test):
1175 if test.generator is None:
1176 test.classname = self.loading_testsuite + '.' + test.classname
1177 for regex, failures in list(self.expected_failures.items()):
1178 if regex.findall(test.classname):
1179 test.expected_failures.extend(failures)
1181 if self._is_test_wanted(test):
1182 if test not in self.tests:
1183 self.tests.append(test)
1184 self.tests.sort(key=lambda test: test.classname)
1186 if test not in self.tests:
1187 self.unwanted_tests.append(test)
1188 self.unwanted_tests.sort(key=lambda test: test.classname)
1190 def get_tests(self):
1193 def populate_testsuite(self):
1196 def add_generators(self, generators):
1198 @generators: A list of, or one single #TestsGenerator to be used to generate tests
1200 if not isinstance(generators, list):
1201 generators = [generators]
1202 self._generators.extend(generators)
1203 for generator in generators:
1204 generator.testsuite = self.loading_testsuite
1206 self._generators = list(set(self._generators))
1208 def get_generators(self):
1209 return self._generators
1211 def _add_blacklist(self, blacklisted_tests):
1212 if not isinstance(blacklisted_tests, list):
1213 blacklisted_tests = [blacklisted_tests]
1215 for patterns in blacklisted_tests:
1216 for pattern in patterns.split(","):
1217 self.blacklisted_tests_patterns.append(re.compile(pattern))
1219 def set_default_blacklist(self, default_blacklist):
1220 for test_regex, reason in default_blacklist:
1221 if not test_regex.startswith(self.loading_testsuite + '.'):
1222 test_regex = self.loading_testsuite + '.' + test_regex
1223 self.blacklisted_tests.append((test_regex, reason))
1225 def add_options(self, parser):
1226 """ Add more arguments. """
1229 def set_settings(self, options, args, reporter):
1230 """ Set properties after options parsing. """
1231 self.options = options
1233 self.reporter = reporter
1235 self.populate_testsuite()
1237 if self.options.valgrind:
1238 self.print_valgrind_bugs()
1240 if options.wanted_tests:
1241 for patterns in options.wanted_tests:
1242 for pattern in patterns.split(","):
1243 self.wanted_tests_patterns.append(re.compile(pattern))
1245 if options.blacklisted_tests:
1246 for patterns in options.blacklisted_tests:
1247 self._add_blacklist(patterns)
1249 def set_blacklists(self):
1250 if self.blacklisted_tests:
1251 printc("\nCurrently 'hardcoded' %s blacklisted tests:" %
1252 self.name, Colors.WARNING, title_char='-')
1254 if self.options.check_bugs_status:
1255 if not check_bugs_resolution(self.blacklisted_tests):
1258 for name, bug in self.blacklisted_tests:
1259 self._add_blacklist(name)
1260 if not self.options.check_bugs_status:
1261 print(" + %s \n --> bug: %s\n" % (name, bug))
1265 def check_expected_failures(self):
1266 if not self.expected_failures or not self.options.check_bugs_status:
1269 if self.expected_failures:
1270 printc("\nCurrently known failures in the %s testsuite:"
1271 % self.name, Colors.WARNING, title_char='-')
1273 bugs_definitions = {}
1274 for regex, failures in list(self.expected_failures.items()):
1275 for failure in failures:
1276 bugs = failure.get('bug')
1278 bugs = failure.get('bugs')
1280 printc('+ %s:\n --> no bug reported associated with %s\n' % (
1281 regex.pattern, failure), Colors.WARNING)
1284 if not isinstance(bugs, list):
1286 cbugs = bugs_definitions.get(regex.pattern, [])
1287 bugs.extend([b for b in bugs if b not in cbugs])
1288 bugs_definitions[regex.pattern] = bugs
1290 return check_bugs_resolution(bugs_definitions.items())
1292 def _check_blacklisted(self, test):
1293 for pattern in self.blacklisted_tests_patterns:
1294 if pattern.findall(test.classname):
1295 self.info("%s is blacklisted by %s", test.classname, pattern)
1300 def _check_whitelisted(self, test):
1301 for pattern in self.wanted_tests_patterns:
1302 if pattern.findall(test.classname):
1303 if self._check_blacklisted(test):
1304 # If explicitly white listed that specific test
1305 # bypass the blacklisting
1306 if pattern.pattern != test.classname:
1311 def _check_duration(self, test):
1312 if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1313 self.info("Not activating %s as its duration (%d) is superior"
1314 " than the long limit (%d)" % (test, test.duration,
1315 int(self.options.long_limit)))
1320 def _is_test_wanted(self, test):
1321 if self._check_whitelisted(test):
1322 if not self._check_duration(test):
1326 if self._check_blacklisted(test):
1329 if not self._check_duration(test):
1332 if not self.wanted_tests_patterns:
1337 def needs_http_server(self):
1340 def print_valgrind_bugs(self):
1344 class TestsGenerator(Loggable):
1346 def __init__(self, name, test_manager, tests=[]):
1347 Loggable.__init__(self)
1349 self.test_manager = test_manager
1350 self.testsuite = None
1353 self._tests[test.classname] = test
1355 def generate_tests(self, *kwargs):
1357 Method that generates tests
1359 return list(self._tests.values())
1361 def add_test(self, test):
1362 test.generator = self
1363 test.classname = self.testsuite + '.' + test.classname
1364 self._tests[test.classname] = test
1367 class GstValidateTestsGenerator(TestsGenerator):
1369 def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1372 def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1373 self.populate_tests(uri_minfo_special_scenarios, scenarios)
1374 return super(GstValidateTestsGenerator, self).generate_tests()
1377 class _TestsLauncher(Loggable):
1379 def __init__(self, libsdir):
1381 Loggable.__init__(self)
1383 self.libsdir = libsdir
1387 self.reporter = None
1388 self._list_testers()
1389 self.all_tests = None
1390 self.wanted_tests_patterns = []
1392 self.queue = queue.Queue()
1394 self.total_num_tests = 0
1397 def _list_app_dirs(self):
1399 app_dirs.append(os.path.join(self.libsdir, "apps"))
1400 env_dirs = os.environ.get("GST_VALIDATE_APPS_DIR")
1401 if env_dirs is not None:
1402 for dir_ in env_dirs.split(":"):
1403 app_dirs.append(dir_)
1404 sys.path.append(dir_)
1408 def _exec_app(self, app_dir, env):
1410 files = os.listdir(app_dir)
1411 except OSError as e:
1412 self.debug("Could not list %s: %s" % (app_dir, e))
1415 if f.endswith(".py"):
1416 exec(compile(open(os.path.join(app_dir, f)).read(),
1417 os.path.join(app_dir, f), 'exec'), env)
1419 def _exec_apps(self, env):
1420 app_dirs = self._list_app_dirs()
1421 for app_dir in app_dirs:
1422 self._exec_app(app_dir, env)
1424 def _list_testers(self):
1425 env = globals().copy()
1426 self._exec_apps(env)
1428 testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1429 for tester in testers:
1430 if tester.init() is True:
1431 self.testers.append(tester)
1433 self.warning("Can not init tester: %s -- PATH is %s"
1434 % (tester.name, os.environ["PATH"]))
1436 def add_options(self, parser):
1437 for tester in self.testers:
1438 tester.add_options(parser)
1440 def _load_testsuite(self, testsuites):
1442 for testsuite in testsuites:
1444 sys.path.insert(0, os.path.dirname(testsuite))
1445 return (__import__(os.path.basename(testsuite).replace(".py", "")), None)
1446 except Exception as e:
1447 exceptions.append("Could not load %s: %s" % (testsuite, e))
1450 sys.path.remove(os.path.dirname(testsuite))
1452 return (None, exceptions)
1454 def _load_testsuites(self):
1456 for testsuite in self.options.testsuites:
1457 if os.path.exists(testsuite):
1458 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1459 loaded_module = self._load_testsuite([testsuite])
1461 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1462 for d in self.options.testsuites_dirs]
1463 loaded_module = self._load_testsuite(possible_testsuites_paths)
1465 module = loaded_module[0]
1466 if not loaded_module[0]:
1467 if "." in testsuite:
1468 self.options.testsuites.append(testsuite.split('.')[0])
1469 self.info("%s looks like a test name, trying that" %
1471 self.options.wanted_tests.append(testsuite)
1473 printc("Could not load testsuite: %s, reasons: %s" % (
1474 testsuite, loaded_module[1]), Colors.FAIL)
1477 testsuites.add(module)
1478 if not hasattr(module, "TEST_MANAGER"):
1479 module.TEST_MANAGER = [tester.name for tester in self.testers]
1480 elif not isinstance(module.TEST_MANAGER, list):
1481 module.TEST_MANAGER = [module.TEST_MANAGER]
1483 self.options.testsuites = list(testsuites)
1485 def _setup_testsuites(self):
1486 for testsuite in self.options.testsuites:
1488 wanted_test_manager = None
1489 if hasattr(testsuite, "TEST_MANAGER"):
1490 wanted_test_manager = testsuite.TEST_MANAGER
1491 if not isinstance(wanted_test_manager, list):
1492 wanted_test_manager = [wanted_test_manager]
1494 for tester in self.testers:
1495 if wanted_test_manager is not None and \
1496 tester.name not in wanted_test_manager:
1499 if self.options.user_paths:
1500 TestsManager.loading_testsuite = tester.name
1501 tester.register_defaults()
1504 TestsManager.loading_testsuite = testsuite.__name__
1505 if testsuite.setup_tests(tester, self.options):
1507 TestsManager.loading_testsuite = None
1510 printc("Could not load testsuite: %s"
1511 " maybe because of missing TestManager"
1512 % (testsuite), Colors.FAIL)
1515 def _load_config(self, options):
1516 printc("Loading config files is DEPRECATED"
1517 " you should use the new testsuite format now",)
1519 for tester in self.testers:
1520 tester.options = options
1521 globals()[tester.name] = tester
1522 globals()["options"] = options
1523 c__file__ = __file__
1524 globals()["__file__"] = self.options.config
1525 exec(compile(open(self.options.config).read(),
1526 self.options.config, 'exec'), globals())
1527 globals()["__file__"] = c__file__
1529 def set_settings(self, options, args):
1530 if options.xunit_file:
1531 self.reporter = reporters.XunitReporter(options)
1533 self.reporter = reporters.Reporter(options)
1535 self.options = options
1536 wanted_testers = None
1537 for tester in self.testers:
1538 if tester.name in args:
1539 wanted_testers = tester.name
1542 testers = self.testers
1544 for tester in testers:
1545 if tester.name in args:
1546 self.testers.append(tester)
1547 args.remove(tester.name)
1550 self._load_config(options)
1552 self._load_testsuites()
1553 if not self.options.testsuites:
1554 printc("Not testsuite loaded!", Colors.FAIL)
1557 for tester in self.testers:
1558 tester.set_settings(options, args, self.reporter)
1560 if not options.config and options.testsuites:
1561 if self._setup_testsuites() is False:
1564 for tester in self.testers:
1565 if not tester.set_blacklists():
1568 if not tester.check_expected_failures():
1573 def _check_tester_has_other_testsuite(self, testsuite, tester):
1574 if tester.name != testsuite.TEST_MANAGER[0]:
1577 for t in self.options.testsuites:
1579 for other_testmanager in t.TEST_MANAGER:
1580 if other_testmanager == tester.name:
1585 def _check_defined_tests(self, tester, tests):
1586 if self.options.blacklisted_tests or self.options.wanted_tests:
1589 tests_names = [test.classname for test in tests]
1590 testlist_changed = False
1591 for testsuite in self.options.testsuites:
1592 if not self._check_tester_has_other_testsuite(testsuite, tester) \
1593 and tester.check_testslist:
1595 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1598 know_tests = testlist_file.read().split("\n")
1599 testlist_file.close()
1601 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1607 for test in know_tests:
1608 if test and test.strip('~') not in tests_names:
1609 if not test.startswith('~'):
1610 testlist_changed = True
1611 printc("Test %s Not in testsuite %s anymore"
1612 % (test, testsuite.__file__), Colors.FAIL)
1614 optional_out.append((test, None))
1616 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1617 key=lambda x: x[0].strip('~'))
1619 for tname, test in tests_names:
1620 if test and test.optional:
1622 testlist_file.write("%s\n" % (tname))
1623 if tname and tname not in know_tests:
1624 printc("Test %s is NEW in testsuite %s"
1625 % (tname, testsuite.__file__),
1626 Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1627 testlist_changed = True
1629 testlist_file.close()
1632 return testlist_changed
1634 def list_tests(self):
1635 for tester in self.testers:
1636 if not self._tester_needed(tester):
1639 tests = tester.list_tests()
1640 if self._check_defined_tests(tester, tests) and \
1641 self.options.fail_on_testlist_change:
1642 raise RuntimeError("Unexpected new test in testsuite.")
1644 self.tests.extend(tests)
1645 return sorted(list(self.tests), key=lambda t: t.classname)
1647 def _tester_needed(self, tester):
1648 for testsuite in self.options.testsuites:
1649 if tester.name in testsuite.TEST_MANAGER:
1653 def server_wrapper(self, ready):
1654 self.server = GstValidateTCPServer(
1655 ('localhost', 0), GstValidateListener)
1656 self.server.socket.settimeout(None)
1657 self.server.launcher = self
1658 self.serverport = self.server.socket.getsockname()[1]
1659 self.info("%s server port: %s" % (self, self.serverport))
1662 self.server.serve_forever(poll_interval=0.05)
1664 def _start_server(self):
1665 self.info("Starting TCP Server")
1666 ready = threading.Event()
1667 self.server_thread = threading.Thread(target=self.server_wrapper,
1668 kwargs={'ready': ready})
1669 self.server_thread.start()
1671 os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
1673 def _stop_server(self):
1675 self.server.shutdown()
1676 self.server_thread.join()
1677 self.server.server_close()
1680 def test_wait(self):
1682 # Check process every second for timeout
1684 self.queue.get(timeout=1)
1688 for test in self.jobs:
1689 if test.process_update():
1690 self.jobs.remove(test)
1693 def tests_wait(self):
1695 test = self.test_wait()
1696 test.check_results()
1697 except KeyboardInterrupt:
1698 for test in self.jobs:
1699 test.kill_subprocess()
1704 def start_new_job(self, tests_left):
1706 test = tests_left.pop(0)
1710 test.test_start(self.queue)
1712 self.jobs.append(test)
1716 def _run_tests(self):
1719 if not self.all_tests:
1720 all_tests = self.list_tests()
1721 self.all_tests = all_tests
1722 self.total_num_tests = len(self.all_tests)
1724 self.reporter.init_timer()
1727 for test in self.tests:
1728 if test.is_parallel:
1731 alone_tests.append(test)
1733 max_num_jobs = min(self.options.num_jobs, len(tests))
1736 # if order of test execution doesn't matter, shuffle
1737 # the order to optimize cpu usage
1738 if self.options.shuffle:
1739 random.shuffle(tests)
1740 random.shuffle(alone_tests)
1742 current_test_num = 1
1743 for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
1744 tests_left = list(tests)
1745 for i in range(num_jobs):
1746 if not self.start_new_job(tests_left):
1750 while jobs_running != 0:
1751 test = self.tests_wait()
1753 test.number = "[%d / %d] " % (current_test_num,
1754 self.total_num_tests)
1755 current_test_num += 1
1756 res = test.test_end()
1757 self.reporter.after_test(test)
1758 if res != Result.PASSED and (self.options.forever or
1759 self.options.fatal_error):
1761 if self.start_new_job(tests_left):
1766 def clean_tests(self):
1767 for test in self.tests:
1771 def run_tests(self):
1772 self._start_server()
1773 if self.options.forever:
1776 printc("Running iteration %d" % r, title=True)
1778 if not self._run_tests():
1784 elif self.options.n_runs:
1786 for r in range(self.options.n_runs):
1787 t = "Running iteration %d" % r
1788 print("%s\n%s\n%s\n" % ("=" * len(t), t, "=" * len(t)))
1789 if not self._run_tests():
1795 return self._run_tests()
1797 def final_report(self):
1798 return self.reporter.final_report()
1800 def needs_http_server(self):
1801 for tester in self.testers:
1802 if tester.needs_http_server():
1806 class NamedDic(object):
1808 def __init__(self, props):
1810 for name, value in props.items():
1811 setattr(self, name, value)
1814 class Scenario(object):
1816 def __init__(self, name, props, path=None):
1820 for prop, value in props:
1821 setattr(self, prop.replace("-", "_"), value)
1823 def get_execution_name(self):
1824 if self.path is not None:
1830 if hasattr(self, "seek"):
1831 return bool(self.seek)
1835 def needs_clock_sync(self):
1836 if hasattr(self, "need_clock_sync"):
1837 return bool(self.need_clock_sync)
1841 def needs_live_content(self):
1842 # Scenarios that can only be used on live content
1843 if hasattr(self, "live_content_required"):
1844 return bool(self.live_content_required)
1847 def compatible_with_live_content(self):
1848 # if a live content is required it's implicitely compatible with
1850 if self.needs_live_content():
1852 if hasattr(self, "live_content_compatible"):
1853 return bool(self.live_content_compatible)
1856 def get_min_media_duration(self):
1857 if hasattr(self, "min_media_duration"):
1858 return float(self.min_media_duration)
1862 def does_reverse_playback(self):
1863 if hasattr(self, "reverse_playback"):
1864 return bool(self.reverse_playback)
1868 def get_duration(self):
1870 return float(getattr(self, "duration"))
1871 except AttributeError:
1874 def get_min_tracks(self, track_type):
1876 return int(getattr(self, "min_%s_track" % track_type))
1877 except AttributeError:
1881 return "<Scenario %s>" % self.name
1884 class ScenarioManager(Loggable):
1888 FILE_EXTENSION = "scenario"
1890 def __new__(cls, *args, **kwargs):
1891 if not cls._instance:
1892 cls._instance = super(ScenarioManager, cls).__new__(
1893 cls, *args, **kwargs)
1894 cls._instance.config = None
1895 cls._instance.discovered = False
1896 Loggable.__init__(cls._instance)
1898 return cls._instance
1900 def find_special_scenarios(self, mfile):
1902 mfile_bname = os.path.basename(mfile)
1904 for f in os.listdir(os.path.dirname(mfile)):
1905 if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
1906 scenarios.append(os.path.join(os.path.dirname(mfile), f))
1909 scenarios = self.discover_scenarios(scenarios, mfile)
1913 def discover_scenarios(self, scenario_paths=[], mfile=None):
1915 Discover scenarios specified in scenario_paths or the default ones
1916 if nothing specified there
1919 scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
1920 logs = open(os.path.join(self.config.logsdir,
1921 "scenarios_discovery.log"), 'w')
1924 command = [GstValidateBaseTestManager.COMMAND,
1925 "--scenarios-defs-output-file", scenario_defs]
1926 command.extend(scenario_paths)
1927 subprocess.check_call(command, stdout=logs, stderr=logs)
1928 except subprocess.CalledProcessError:
1931 config = configparser.RawConfigParser()
1932 f = open(scenario_defs)
1935 for section in config.sections():
1937 for scenario_path in scenario_paths:
1940 path = scenario_path
1941 elif section in scenario_path:
1942 # The real name of the scenario is:
1943 # filename.REALNAME.scenario
1944 name = scenario_path.replace(mfile + ".", "").replace(
1945 "." + self.FILE_EXTENSION, "")
1946 path = scenario_path
1951 props = config.items(section)
1952 scenarios.append(Scenario(name, props, path))
1954 if not scenario_paths:
1955 self.discovered = True
1956 self.all_scenarios.extend(scenarios)
1960 def get_scenario(self, name):
1961 if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
1962 scenarios = self.discover_scenarios([name])
1967 if self.discovered is False:
1968 self.discover_scenarios()
1971 return self.all_scenarios
1974 return [scenario for scenario in self.all_scenarios if scenario.name == name][0]
1976 self.warning("Scenario: %s not found" % name)
1980 class GstValidateBaseTestManager(TestsManager):
1981 scenarios_manager = ScenarioManager()
1985 super(GstValidateBaseTestManager, self).__init__()
1986 self._scenarios = []
1987 self._encoding_formats = []
1990 def update_commands(cls, extra_paths=None):
1991 for varname, cmd in {'': 'gst-validate',
1992 'TRANSCODING_': 'gst-validate-transcoding',
1993 'MEDIA_CHECK_': 'gst-validate-media-check',
1994 'RTSP_SERVER_': 'gst-validate-rtsp-server',
1995 'INSPECT_': 'gst-inspect'}.items():
1996 setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
1999 def has_feature(cls, featurename):
2001 return cls.features_cache[featurename]
2006 subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2008 except subprocess.CalledProcessError:
2011 cls.features_cache[featurename] = res
2014 def add_scenarios(self, scenarios):
2016 @scenarios A list or a unic scenario name(s) to be run on the tests.
2017 They are just the default scenarios, and then depending on
2018 the TestsGenerator to be used you can have more fine grained
2019 control on what to be run on each serie of tests.
2021 if isinstance(scenarios, list):
2022 self._scenarios.extend(scenarios)
2024 self._scenarios.append(scenarios)
2026 self._scenarios = list(set(self._scenarios))
2028 def set_scenarios(self, scenarios):
2030 Override the scenarios
2032 self._scenarios = []
2033 self.add_scenarios(scenarios)
2035 def get_scenarios(self):
2036 return self._scenarios
2038 def add_encoding_formats(self, encoding_formats):
2040 :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2041 formats for transcoding test.
2042 They are just the default encoding formats, and then depending on
2043 the TestsGenerator to be used you can have more fine grained
2044 control on what to be run on each serie of tests.
2046 if isinstance(encoding_formats, list):
2047 self._encoding_formats.extend(encoding_formats)
2049 self._encoding_formats.append(encoding_formats)
2051 self._encoding_formats = list(set(self._encoding_formats))
2053 def get_encoding_formats(self):
2054 return self._encoding_formats
2057 GstValidateBaseTestManager.update_commands()
2060 class MediaDescriptor(Loggable):
2063 Loggable.__init__(self)
2066 raise NotImplemented
2068 def get_media_filepath(self):
2069 raise NotImplemented
2072 raise NotImplemented
2075 raise NotImplemented
2077 def get_duration(self):
2078 raise NotImplemented
2080 def get_protocol(self):
2081 raise NotImplemented
2083 def is_seekable(self):
2084 raise NotImplemented
2087 raise NotImplemented
2090 raise NotImplemented
2092 def get_num_tracks(self, track_type):
2093 raise NotImplemented
2095 def can_play_reverse(self):
2096 raise NotImplemented
2101 def is_compatible(self, scenario):
2102 if scenario is None:
2105 if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2106 self.debug("Do not run %s as %s does not support seeking",
2107 scenario, self.get_uri())
2110 if self.is_image() and scenario.needs_clock_sync():
2111 self.debug("Do not run %s as %s is an image",
2112 scenario, self.get_uri())
2115 if not self.can_play_reverse() and scenario.does_reverse_playback():
2118 if not self.is_live() and scenario.needs_live_content():
2119 self.debug("Do not run %s as %s is not a live content",
2120 scenario, self.get_uri())
2123 if self.is_live() and not scenario.compatible_with_live_content():
2124 self.debug("Do not run %s as %s is a live content",
2125 scenario, self.get_uri())
2128 if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2131 if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2133 "Do not run %s as %s is too short (%i < min media duation : %i",
2134 scenario, self.get_uri(),
2135 self.get_duration() / GST_SECOND,
2136 scenario.get_min_media_duration())
2139 for track_type in ['audio', 'subtitle', 'video']:
2140 if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2141 self.debug("%s -- %s | At least %s %s track needed < %s"
2142 % (scenario, self.get_uri(), track_type,
2143 scenario.get_min_tracks(track_type),
2144 self.get_num_tracks(track_type)))
2150 class GstValidateMediaDescriptor(MediaDescriptor):
2151 # Some extension file for discovering results
2152 MEDIA_INFO_EXT = "media_info"
2153 STREAM_INFO_EXT = "stream_info"
2155 def __init__(self, xml_path):
2156 super(GstValidateMediaDescriptor, self).__init__()
2158 self._xml_path = xml_path
2160 media_xml = ET.parse(xml_path).getroot()
2161 except xml.etree.ElementTree.ParseError:
2162 printc("Could not parse %s" % xml_path,
2166 self._extract_data(media_xml)
2168 self.set_protocol(urllib.parse.urlparse(
2169 urllib.parse.urlparse(self.get_uri()).scheme).scheme)
2171 def _extract_data(self, media_xml):
2172 # Extract the information we need from the xml
2173 self._caps = media_xml.findall("streams")[0].attrib["caps"]
2174 self._track_caps = []
2176 streams = media_xml.findall("streams")[0].findall("stream")
2180 for stream in streams:
2181 self._track_caps.append(
2182 (stream.attrib["type"], stream.attrib["caps"]))
2183 self._uri = media_xml.attrib["uri"]
2184 self._duration = int(media_xml.attrib["duration"])
2185 self._protocol = media_xml.get("protocol", None)
2186 self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2187 self._is_live = media_xml.get("live", "false").lower() == "true"
2188 self._is_image = False
2189 for stream in media_xml.findall("streams")[0].findall("stream"):
2190 if stream.attrib["type"] == "image":
2191 self._is_image = True
2192 self._track_types = []
2193 for stream in media_xml.findall("streams")[0].findall("stream"):
2194 self._track_types.append(stream.attrib["type"])
2197 def new_from_uri(uri, verbose=False, include_frames=False):
2199 include_frames = 0 # Never
2200 include_frames = 1 # always
2201 include_frames = 2 # if previous file included them
2204 media_path = utils.url2path(uri)
2206 descriptor_path = "%s.%s" % (
2207 media_path, GstValidateMediaDescriptor.MEDIA_INFO_EXT)
2208 if include_frames == 2:
2210 media_xml = ET.parse(descriptor_path).getroot()
2211 frames = media_xml.findall('streams/stream/frame')
2212 include_frames = bool(frames)
2213 except FileNotFoundError:
2216 include_frames = bool(include_frames)
2218 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2221 args.extend(["--output-file", descriptor_path])
2223 args.extend(["--full"])
2226 printc("Generating media info for %s\n"
2227 " Command: '%s'" % (media_path, ' '.join(args)),
2231 subprocess.check_output(args, stderr=open(os.devnull))
2232 except subprocess.CalledProcessError as e:
2234 printc("Result: Failed", Colors.FAIL)
2236 loggable.warning("GstValidateMediaDescriptor",
2237 "Exception: %s" % e)
2241 printc("Result: Passed", Colors.OKGREEN)
2244 return GstValidateMediaDescriptor(descriptor_path)
2245 except (IOError, xml.etree.ElementTree.ParseError):
2249 return self._xml_path
2251 def need_clock_sync(self):
2252 return Protocols.needs_clock_sync(self.get_protocol())
2254 def get_media_filepath(self):
2255 if self.get_protocol() == Protocols.FILE:
2256 return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "")
2258 return self._xml_path.replace("." + self.STREAM_INFO_EXT, "")
2263 def get_tracks_caps(self):
2264 return self._track_caps
2269 def get_duration(self):
2270 return self._duration
2272 def set_protocol(self, protocol):
2273 self._protocol = protocol
2275 def get_protocol(self):
2276 return self._protocol
2278 def is_seekable(self):
2279 return self._is_seekable
2282 return self._is_live
2284 def can_play_reverse(self):
2288 return self._is_image
2290 def get_num_tracks(self, track_type):
2292 for t in self._track_types:
2298 def get_clean_name(self):
2299 name = os.path.basename(self.get_path())
2300 name = re.sub("\.stream_info|\.media_info", "", name)
2302 return name.replace('.', "_")
2305 class MediaFormatCombination(object):
2306 FORMATS = {"aac": "audio/mpeg,mpegversion=4", # Audio
2307 "ac3": "audio/x-ac3",
2308 "vorbis": "audio/x-vorbis",
2309 "mp3": "audio/mpeg,mpegversion=1,layer=3",
2310 "opus": "audio/x-opus",
2311 "rawaudio": "audio/x-raw",
2314 "h264": "video/x-h264",
2315 "h265": "video/x-h265",
2316 "vp8": "video/x-vp8",
2317 "vp9": "video/x-vp9",
2318 "theora": "video/x-theora",
2319 "prores": "video/x-prores",
2320 "jpeg": "image/jpeg",
2323 "webm": "video/webm",
2324 "ogg": "application/ogg",
2325 "mkv": "video/x-matroska",
2326 "mp4": "video/quicktime,variant=iso;",
2327 "quicktime": "video/quicktime;"}
2330 return "%s and %s in %s" % (self.audio, self.video, self.container)
2332 def __init__(self, container, audio, video):
2334 Describes a media format to be used for transcoding tests.
2336 :param container: A string defining the container format to be used, must bin in self.FORMATS
2337 :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2338 :param video: A string defining the video format to be used, must bin in self.FORMATS
2340 self.container = container
2344 def get_caps(self, track_type):
2346 return self.FORMATS[self.__dict__[track_type]]
2350 def get_audio_caps(self):
2351 return self.get_caps("audio")
2353 def get_video_caps(self):
2354 return self.get_caps("video")
2356 def get_muxer_caps(self):
2357 return self.get_caps("container")