3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
20 """ Class representing tests and test managers. """
39 from . import reporters
40 from . import loggable
41 from .loggable import Loggable
44 from lxml import etree as ET
46 import xml.etree.cElementTree as ET
48 from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
49 Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
52 # The factor by which we increase the hard timeout when running inside
54 GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
55 TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
56 # The error reported by valgrind when detecting errors
57 VALGRIND_ERROR_CODE = 20
59 VALIDATE_OVERRIDE_EXTENSION = ".override"
60 COREDUMP_SIGNALS = [-getattr(signal, s) for s in [
61 'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
62 'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)] + [139]
67 """ A class representing a particular test. """
69 def __init__(self, application_name, classname, options,
70 reporter, duration=0, timeout=DEFAULT_TIMEOUT,
71 hard_timeout=None, extra_env_variables=None,
72 expected_failures=None, is_parallel=True):
74 @timeout: The timeout during which the value return by get_current_value
75 keeps being exactly equal
76 @hard_timeout: Max time the test can take in absolute
78 Loggable.__init__(self)
79 self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
81 self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
82 self.hard_timeout *= options.timeout_factor
84 self.hard_timeout = hard_timeout
85 self.classname = classname
86 self.options = options
87 self.application = application_name
89 self.server_command = None
90 self.reporter = reporter
95 self.duration = duration
96 self.stack_trace = None
97 if expected_failures is None:
98 self.expected_failures = []
99 elif not isinstance(expected_failures, list):
100 self.expected_failures = [expected_failures]
102 self.expected_failures = expected_failures
104 extra_env_variables = extra_env_variables or {}
105 self.extra_env_variables = extra_env_variables
106 self.optional = False
107 self.is_parallel = is_parallel
108 self.generator = None
113 self.kill_subprocess()
116 self.time_taken = 0.0
117 self._starting_time = None
118 self.result = Result.NOT_RUN
121 self.extra_logfiles = []
122 self.__env_variable = []
123 self.kill_subprocess()
126 string = self.classname
127 if self.result != Result.NOT_RUN:
128 string += ": " + self.result
129 if self.result in [Result.FAILED, Result.TIMEOUT]:
130 string += " '%s'\n" \
131 " You can reproduce with: %s\n" \
132 % (self.message, self.get_command_repr())
134 string += self.get_logfile_repr()
138 def add_env_variable(self, variable, value=None):
140 Only usefull so that the gst-validate-launcher can print the exact
141 right command line to reproduce the tests
144 value = os.environ.get(variable, None)
149 self.__env_variable.append(variable)
152 def _env_variable(self):
154 for var in set(self.__env_variable):
157 value = self.proc_env.get(var, None)
158 if value is not None:
159 res += "%s='%s'" % (var, value)
163 def open_logfile(self):
167 path = os.path.join(self.options.logsdir,
168 self.classname.replace(".", os.sep))
169 mkdir(os.path.dirname(path))
172 if self.options.redirect_logs == 'stdout':
173 self.out = sys.stdout
174 elif self.options.redirect_logs == 'stderr':
175 self.out = sys.stderr
177 self.out = open(path, 'w+')
179 def close_logfile(self):
180 if not self.options.redirect_logs:
185 def _get_file_content(self, file_name):
186 f = open(file_name, 'r+')
192 def get_log_content(self):
193 return self._get_file_content(self.logfile)
195 def get_extra_log_content(self, extralog):
196 if extralog not in self.extra_logfiles:
199 return self._get_file_content(extralog)
201 def get_classname(self):
202 name = self.classname.split('.')[-1]
203 classname = self.classname.replace('.%s' % name, '')
208 return self.classname.split('.')[-1]
210 def add_arguments(self, *args):
213 def build_arguments(self):
214 self.add_env_variable("LD_PRELOAD")
215 self.add_env_variable("DISPLAY")
217 def add_stack_trace_to_logfile(self):
218 trace_gatherer = BackTraceGenerator.get_default()
219 stack_trace = trace_gatherer.get_trace(self)
224 info = "\n\n== Stack trace: == \n%s" % stack_trace
225 if self.options.redirect_logs:
227 elif self.options.xunit_file:
228 self.stack_trace = stack_trace
230 with open(self.logfile, 'a') as f:
233 def set_result(self, result, message="", error=""):
234 self.debug("Setting result: %s (message: %s, error: %s)" % (result,
237 if result is Result.TIMEOUT:
238 if self.options.debug is True:
240 printc("Timeout, you should process <ctrl>c to get into gdb",
242 # and wait here until gdb exits
243 self.process.communicate()
245 pname = self.command[0]
246 input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
247 "Press enter to continue" % (Colors.FAIL, pname, self.process.pid,
250 self.add_stack_trace_to_logfile()
253 self.message = message
254 self.error_str = error
256 def check_results(self):
257 if self.result is Result.FAILED or self.result is Result.TIMEOUT:
260 self.debug("%s returncode: %s", self, self.process.returncode)
261 if self.process.returncode == 0:
262 self.set_result(Result.PASSED)
263 elif self.process.returncode in [-signal.SIGSEGV, -signal.SIGABRT, 139]:
264 self.add_stack_trace_to_logfile()
265 self.set_result(Result.FAILED,
266 "Application segfaulted, returne code: %d" % (
267 self.process.returncode))
268 elif self.process.returncode == VALGRIND_ERROR_CODE:
269 self.set_result(Result.FAILED, "Valgrind reported errors")
271 self.set_result(Result.FAILED,
272 "Application returned %d" % (self.process.returncode))
274 def get_current_value(self):
276 Lets subclasses implement a nicer timeout measurement method
277 They should return some value with which we will compare
278 the previous and timeout if they are egual during self.timeout
281 return Result.NOT_RUN
283 def process_update(self):
285 Returns True when process has finished running or has timed out.
288 if self.process is None:
289 # Process has not started running yet
293 if self.process.returncode is not None:
296 val = self.get_current_value()
298 self.debug("Got value: %s" % val)
299 if val is Result.NOT_RUN:
300 # The get_current_value logic is not implemented... dumb
302 if time.time() - self.last_change_ts > self.timeout:
303 self.set_result(Result.TIMEOUT,
304 "Application timed out: %s secs" %
309 elif val is Result.FAILED:
311 elif val is Result.KNOWN_ERROR:
314 self.log("New val %s" % val)
316 if val == self.last_val:
317 delta = time.time() - self.last_change_ts
318 self.debug("%s: Same value for %d/%d seconds" %
319 (self, delta, self.timeout))
320 if delta > self.timeout:
321 self.set_result(Result.TIMEOUT,
322 "Application timed out: %s secs" %
326 elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
328 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
331 self.last_change_ts = time.time()
336 def get_subproc_env(self):
337 return os.environ.copy()
339 def kill_subprocess(self):
340 utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT)
342 def thread_wrapper(self):
343 self.process = subprocess.Popen(self.command,
348 if self.result is not Result.TIMEOUT:
351 def get_valgrind_suppression_file(self, subdir, name):
352 p = get_data_file(subdir, name)
356 self.error("Could not find any %s file" % name)
358 def get_valgrind_suppressions(self):
359 return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
361 def use_gdb(self, command):
362 if self.hard_timeout is not None:
363 self.hard_timeout *= GDB_TIMEOUT_FACTOR
364 self.timeout *= GDB_TIMEOUT_FACTOR
365 return ["gdb", "-ex", "run", "-ex", "backtrace", "-ex", "quit", "--args"] + command
367 def use_valgrind(self, command, subenv):
368 vglogsfile = self.logfile + '.valgrind'
369 self.extra_logfiles.append(vglogsfile)
373 for o, v in [('trace-children', 'yes'),
374 ('tool', 'memcheck'),
375 ('leak-check', 'full'),
376 ('leak-resolution', 'high'),
377 # TODO: errors-for-leak-kinds should be set to all instead of definite
378 # and all false positives should be added to suppression files.
379 ('errors-for-leak-kinds', 'definite'),
380 ('num-callers', '20'),
381 ('error-exitcode', str(VALGRIND_ERROR_CODE)),
382 ('gen-suppressions', 'all')]:
383 vg_args.append("--%s=%s" % (o, v))
385 if not self.options.redirect_logs:
386 vglogsfile = self.logfile + '.valgrind'
387 self.extra_logfiles.append(vglogsfile)
388 vg_args.append("--%s=%s" % ('log-file', vglogsfile))
390 for supp in self.get_valgrind_suppressions():
391 vg_args.append("--suppressions=%s" % supp)
393 command = ["valgrind"] + vg_args + command
395 # Tune GLib's memory allocator to be more valgrind friendly
396 subenv['G_DEBUG'] = 'gc-friendly'
397 subenv['G_SLICE'] = 'always-malloc'
399 if self.hard_timeout is not None:
400 self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
401 self.timeout *= VALGRIND_TIMEOUT_FACTOR
403 # Enable 'valgrind.config'
404 vg_config = get_data_file('data', 'valgrind.config')
406 if self.proc_env.get('GST_VALIDATE_CONFIG'):
407 subenv['GST_VALIDATE_CONFIG'] = '%s%s%s' % (self.proc_env['GST_VALIDATE_CONFIG'], os.pathsep, vg_config)
409 subenv['GST_VALIDATE_CONFIG'] = vg_config
411 if subenv == self.proc_env:
412 self.add_env_variable('G_DEBUG', 'gc-friendly')
413 self.add_env_variable('G_SLICE', 'always-malloc')
414 self.add_env_variable('GST_VALIDATE_CONFIG', self.proc_env['GST_VALIDATE_CONFIG'])
418 def launch_server(self):
421 def get_logfile_repr(self):
423 logfiles = self.extra_logfiles.copy()
425 if not self.options.redirect_logs:
426 logfiles.insert(0, self.logfile)
429 message += "\n - %s" % log
433 def get_command_repr(self):
434 message = "%s %s" % (self._env_variable, ' '.join(self.command))
435 if self.server_command:
436 message = "%s & %s" % (self.server_command, message)
438 return "'%s'" % message
440 def test_start(self, queue):
443 self.server_command = self.launch_server()
445 self.command = [self.application]
446 self._starting_time = time.time()
447 self.build_arguments()
448 self.proc_env = self.get_subproc_env()
450 for var, value in list(self.extra_env_variables.items()):
451 value = self.proc_env.get(var, '') + os.pathsep + value
452 self.proc_env[var] = value.strip(os.pathsep)
453 self.add_env_variable(var, self.proc_env[var])
456 self.command = self.use_gdb(self.command)
457 if self.options.valgrind:
458 self.command = self.use_valgrind(self.command, self.proc_env)
460 message = "Launching: %s%s\n" \
461 " Command: %s\n" % (Colors.ENDC, self.classname,
462 self.get_command_repr())
464 if not self.options.redirect_logs:
465 message += self.get_logfile_repr()
467 self.out.write("=================\n"
470 "=================\n\n"
471 % (self.classname, ' '.join(self.command)))
474 printc(message, Colors.OKBLUE)
476 self.thread = threading.Thread(target=self.thread_wrapper)
480 self.last_change_ts = time.time()
481 self.start_ts = time.time()
483 def _dump_log_file(self, logfile):
484 message = "Dumping contents of %s\n" % logfile
485 printc(message, Colors.FAIL)
487 with open(logfile, 'r') as fin:
490 def _dump_log_files(self):
491 printc("Dumping log files on failure\n", Colors.FAIL)
492 self._dump_log_file(self.logfile)
493 for logfile in self.extra_logfiles:
494 self._dump_log_file(logfile)
497 self.kill_subprocess()
499 self.time_taken = time.time() - self._starting_time
501 message = "%s: %s%s\n" % (self.classname, self.result,
502 " (" + self.message + ")" if self.message else "")
503 if not self.options.redirect_logs:
504 message += self.get_logfile_repr()
506 printc(message, color=utils.get_color_for_result(self.result))
510 if self.options.dump_on_failure:
511 if self.result is not Result.PASSED:
512 self._dump_log_files()
517 class GstValidateListener(socketserver.BaseRequestHandler):
519 """Implements BaseRequestHandler handle method"""
521 raw_len = self.request.recv(4)
524 msglen = struct.unpack('>I', raw_len)[0]
525 msg = self.request.recv(msglen).decode()
529 obj = json.loads(msg)
530 test = getattr(self.server, "test")
532 obj_type = obj.get("type", '')
533 if obj_type == 'position':
534 test.set_position(obj['position'], obj['duration'],
536 elif obj_type == 'buffering':
537 test.set_position(obj['position'], 100)
538 elif obj_type == 'action':
539 test.add_action_execution(obj)
540 # Make sure that action is taken into account when checking if process
543 elif obj_type == 'action-done':
544 # Make sure that action end is taken into account when checking if process
547 test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
548 elif obj_type == 'report':
552 class GstValidateTest(Test):
554 """ A class representing a particular test. """
555 findpos_regex = re.compile(
556 '.*position.*(\d+):(\d+):(\d+).(\d+).*duration.*(\d+):(\d+):(\d+).(\d+)')
557 findlastseek_regex = re.compile(
558 'seeking to.*(\d+):(\d+):(\d+).(\d+).*stop.*(\d+):(\d+):(\d+).(\d+).*rate.*(\d+)\.(\d+)')
560 HARD_TIMEOUT_FACTOR = 5
562 def __init__(self, application_name, classname,
563 options, reporter, duration=0,
564 timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
565 media_descriptor=None, extra_env_variables=None,
566 expected_failures=None):
568 extra_env_variables = extra_env_variables or {}
570 if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
572 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
574 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
578 # If we are running from source, use the -debug version of the
579 # application which is using rpath instead of libtool's wrappers. It's
580 # slightly faster to start and will not confuse valgrind.
581 debug = '%s-debug' % application_name
582 p = look_for_file_in_source_dir('tools', debug)
588 self.media_duration = -1
590 self.actions_infos = []
591 self.media_descriptor = media_descriptor
594 override_path = self.get_override_file(media_descriptor)
596 if extra_env_variables:
597 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
598 extra_env_variables["GST_VALIDATE_OVERRIDE"] += os.path.pathsep
600 extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
602 super(GstValidateTest, self).__init__(application_name, classname,
606 hard_timeout=hard_timeout,
607 extra_env_variables=extra_env_variables,
608 expected_failures=expected_failures)
610 # defines how much the process can be outside of the configured
612 self._sent_eos_time = None
614 if scenario is None or scenario.name.lower() == "none":
617 self.scenario = scenario
619 def stop_server(self):
621 self.server.shutdown()
622 self.server_thread.join()
623 self.server.server_close()
626 def kill_subprocess(self):
627 Test.kill_subprocess(self)
630 def add_report(self, report):
631 self.reports.append(report)
633 def set_position(self, position, duration, speed=None):
634 self.position = position
635 self.media_duration = duration
639 def add_action_execution(self, action_infos):
640 if action_infos['action-type'] == 'eos':
641 self._sent_eos_time = time.time()
642 self.actions_infos.append(action_infos)
644 def server_wrapper(self, ready):
645 self.server = socketserver.TCPServer(('localhost', 0), GstValidateListener)
646 self.server.socket.settimeout(None)
647 self.server.test = self
648 self.serverport = self.server.socket.getsockname()[1]
649 self.info("%s server port: %s" % (self, self.serverport))
652 self.server.serve_forever()
654 def test_start(self, queue):
655 ready = threading.Event()
656 self.server_thread = threading.Thread(target=self.server_wrapper,
657 kwargs={'ready': ready})
658 self.server_thread.start()
661 Test.test_start(self, queue)
664 res = Test.test_end(self)
669 def get_override_file(self, media_descriptor):
671 if media_descriptor.get_path():
672 override_path = os.path.splitext(media_descriptor.get_path())[0] + VALIDATE_OVERRIDE_EXTENSION
673 if os.path.exists(override_path):
678 def get_current_position(self):
681 def get_current_value(self):
683 if self._sent_eos_time is not None:
685 if ((t - self._sent_eos_time)) > 30:
686 if self.media_descriptor.get_protocol() == Protocols.HLS:
687 self.set_result(Result.PASSED,
688 """Got no EOS 30 seconds after sending EOS,
689 in HLS known and tolerated issue:
690 https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
691 return Result.KNOWN_ERROR
694 Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
700 def get_subproc_env(self):
701 subproc_env = os.environ.copy()
703 subproc_env["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
705 if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
706 gstlogsfile = self.logfile + '.gstdebug'
707 self.extra_logfiles.append(gstlogsfile)
708 subproc_env["GST_DEBUG_FILE"] = gstlogsfile
710 if self.options.no_color:
711 subproc_env["GST_DEBUG_NO_COLOR"] = '1'
713 # Ensure XInitThreads is called, see bgo#731525
714 subproc_env['GST_GL_XINITTHREADS'] = '1'
715 self.add_env_variable('GST_GL_XINITTHREADS', '1')
717 if self.scenario is not None:
718 scenario = self.scenario.get_execution_name()
719 subproc_env["GST_VALIDATE_SCENARIO"] = scenario
720 self.add_env_variable("GST_VALIDATE_SCENARIO",
721 subproc_env["GST_VALIDATE_SCENARIO"])
724 del subproc_env["GST_VALIDATE_SCENARIO"]
732 self._sent_eos_time = None
735 self.media_duration = -1
737 self.actions_infos = []
739 def build_arguments(self):
740 super(GstValidateTest, self).build_arguments()
741 if "GST_VALIDATE" in os.environ:
742 self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
744 if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
745 self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
746 os.environ["GST_VALIDATE_SCENARIOS_PATH"])
748 self.add_env_variable("GST_VALIDATE_CONFIG")
749 self.add_env_variable("GST_VALIDATE_OVERRIDE")
751 def get_extra_log_content(self, extralog):
752 value = Test.get_extra_log_content(self, extralog)
756 def report_matches_expected_failure(self, report, expected_failure):
757 for key in ['bug', 'bugs', 'sometimes']:
758 if key in expected_failure:
759 del expected_failure[key]
760 for key, value in list(report.items()):
761 if key in expected_failure:
762 if not re.findall(expected_failure[key], str(value)):
764 expected_failure.pop(key)
766 return not bool(expected_failure)
768 def check_reported_issues(self):
770 expected_failures = copy.deepcopy(self.expected_failures)
771 expected_retcode = [0]
772 for report in self.reports:
774 for expected_failure in expected_failures:
775 if self.report_matches_expected_failure(report,
776 expected_failure.copy()):
777 found = expected_failure
780 if found is not None:
781 expected_failures.remove(found)
782 if report['level'] == 'critical':
783 if found.get('sometimes') and isinstance(expected_retcode, list):
784 expected_retcode.append(18)
786 expected_retcode = [18]
787 elif report['level'] == 'critical':
788 ret.append(report['summary'])
791 return None, expected_failures, expected_retcode
793 return ret, expected_failures, expected_retcode
795 def check_expected_timeout(self, expected_timeout):
796 msg = "Expected timeout happened. "
797 result = Result.PASSED
798 message = expected_timeout.get('message')
800 if not re.findall(message, self.message):
801 result = Result.FAILED
802 msg = "Expected timeout message: %s got %s " % (
803 message, self.message)
805 expected_symbols = expected_timeout.get('stacktrace_symbols')
807 trace_gatherer = BackTraceGenerator.get_default()
808 stack_trace = trace_gatherer.get_trace(self)
811 if not isinstance(expected_symbols, list):
812 expected_symbols = [expected_symbols]
814 not_found_symbols = [s for s in expected_symbols
815 if s not in stack_trace]
816 if not_found_symbols:
817 result = Result.TIMEOUT
818 msg = "Expected symbols '%s' not found in stack trace " % (
821 msg += "No stack trace available, could not verify symbols "
825 def check_results(self):
826 if self.result in [Result.FAILED, self.result is Result.PASSED]:
829 for report in self.reports:
830 if report.get('issue-id') == 'runtime::missing-plugin':
831 self.set_result(Result.SKIPPED, "%s\n%s" % (report['summary'],
835 self.debug("%s returncode: %s", self, self.process.returncode)
837 criticals, not_found_expected_failures, expected_returncode = self.check_reported_issues()
839 expected_timeout = None
840 for i, f in enumerate(not_found_expected_failures):
841 if len(f) == 1 and f.get("returncode"):
842 returncode = f['returncode']
843 if not isinstance(expected_returncode, list):
844 returncode = [expected_returncode]
847 elif f.get("timeout"):
850 not_found_expected_failures = [f for f in not_found_expected_failures
851 if not f.get('returncode')]
854 result = Result.PASSED
855 if self.result == Result.TIMEOUT:
857 not_found_expected_failures.remove(expected_timeout)
858 result, msg = self.check_expected_timeout(expected_timeout)
861 elif self.process.returncode in COREDUMP_SIGNALS:
862 result = Result.FAILED
863 msg = "Application segfaulted "
864 self.add_stack_trace_to_logfile()
865 elif self.process.returncode == VALGRIND_ERROR_CODE:
866 msg = "Valgrind reported errors "
867 result = Result.FAILED
868 elif self.process.returncode not in expected_returncode:
869 msg = "Application returned %s " % self.process.returncode
870 if expected_returncode != [0]:
871 msg += "(expected %s) " % expected_returncode
872 result = Result.FAILED
875 msg += "(critical errors: [%s]) " % ', '.join(criticals)
876 result = Result.FAILED
878 if not_found_expected_failures:
879 mandatory_failures = [f for f in not_found_expected_failures
880 if not f.get('sometimes')]
882 if mandatory_failures:
883 msg += "(Expected errors not found: %s) " % mandatory_failures
884 result = Result.FAILED
885 elif self.expected_failures:
886 msg += '%s(Expected errors occured: %s)%s' % (Colors.OKBLUE,
887 self.expected_failures,
890 self.set_result(result, msg.strip())
892 def get_valgrind_suppressions(self):
893 result = super(GstValidateTest, self).get_valgrind_suppressions()
894 gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
896 result.append(gst_sup)
900 class GstValidateEncodingTestInterface(object):
901 DURATION_TOLERANCE = GST_SECOND / 4
903 def __init__(self, combination, media_descriptor, duration_tolerance=None):
904 super(GstValidateEncodingTestInterface, self).__init__()
906 self.media_descriptor = media_descriptor
907 self.combination = combination
910 self._duration_tolerance = duration_tolerance
911 if duration_tolerance is None:
912 self._duration_tolerance = self.DURATION_TOLERANCE
914 def get_current_size(self):
916 size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
920 self.debug("Size: %s" % size)
923 def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
924 audio_restriction=None, audio_presence=0,
931 if video_restriction is not None:
932 ret = ret + video_restriction + '->'
935 ret = ret + '|' + str(video_presence)
938 if audio_restriction is not None:
939 ret = ret + audio_restriction + '->'
942 ret = ret + '|' + str(audio_presence)
944 return ret.replace("::", ":")
946 def get_profile(self, video_restriction=None, audio_restriction=None):
947 vcaps = self.combination.get_video_caps()
948 acaps = self.combination.get_audio_caps()
949 if self.media_descriptor is not None:
950 if self.media_descriptor.get_num_tracks("video") == 0:
953 if self.media_descriptor.get_num_tracks("audio") == 0:
956 return self._get_profile_full(self.combination.get_muxer_caps(),
958 video_restriction=video_restriction,
959 audio_restriction=audio_restriction)
961 def _clean_caps(self, caps):
963 Returns a list of key=value or structure name, without "(types)" or ";" or ","
965 return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
967 def _has_caps_type_variant(self, c, ccaps):
969 Handle situations where we can have application/ogg or video/ogg or
973 media_type = re.findall("application/|video/|audio/", c)
975 media_type = media_type[0].replace('/', '')
976 possible_mtypes = ["application", "video", "audio"]
977 possible_mtypes.remove(media_type)
978 for tmptype in possible_mtypes:
979 possible_c_variant = c.replace(media_type, tmptype)
980 if possible_c_variant in ccaps:
982 "Found %s in %s, good enough!", possible_c_variant, ccaps)
987 def check_encoded_file(self):
988 result_descriptor = GstValidateMediaDescriptor.new_from_uri(
990 if result_descriptor is None:
991 return (Result.FAILED, "Could not discover encoded file %s"
994 duration = result_descriptor.get_duration()
995 orig_duration = self.media_descriptor.get_duration()
996 tolerance = self._duration_tolerance
998 if orig_duration - tolerance >= duration <= orig_duration + tolerance:
999 os.remove(result_descriptor.get_path())
1000 return (Result.FAILED, "Duration of encoded file is "
1001 " wrong (%s instead of %s)" %
1002 (utils.TIME_ARGS(duration),
1003 utils.TIME_ARGS(orig_duration)))
1005 all_tracks_caps = result_descriptor.get_tracks_caps()
1006 container_caps = result_descriptor.get_caps()
1008 all_tracks_caps.insert(0, ("container", container_caps))
1010 for track_type, caps in all_tracks_caps:
1011 ccaps = self._clean_caps(caps)
1012 wanted_caps = self.combination.get_caps(track_type)
1013 cwanted_caps = self._clean_caps(wanted_caps)
1015 if wanted_caps is None:
1016 os.remove(result_descriptor.get_path())
1017 return (Result.FAILED,
1018 "Found a track of type %s in the encoded files"
1019 " but none where wanted in the encoded profile: %s"
1020 % (track_type, self.combination))
1022 for c in cwanted_caps:
1024 if not self._has_caps_type_variant(c, ccaps):
1025 os.remove(result_descriptor.get_path())
1026 return (Result.FAILED,
1027 "Field: %s (from %s) not in caps of the outputed file %s"
1028 % (wanted_caps, c, ccaps))
1030 os.remove(result_descriptor.get_path())
1031 return (Result.PASSED, "")
1034 class TestsManager(Loggable):
1036 """ A class responsible for managing tests. """
1039 loading_testsuite = None
1043 Loggable.__init__(self)
1046 self.unwanted_tests = []
1049 self.reporter = None
1050 self.wanted_tests_patterns = []
1051 self.blacklisted_tests_patterns = []
1052 self._generators = []
1053 self.check_testslist = True
1054 self.all_tests = None
1055 self.expected_failures = {}
1056 self.blacklisted_tests = []
1061 def list_tests(self):
1062 return sorted(list(self.tests), key=lambda x: x.classname)
1064 def add_expected_issues(self, expected_failures):
1065 expected_failures_re = {}
1066 for test_name_regex, failures in list(expected_failures.items()):
1067 regex = re.compile(test_name_regex)
1068 expected_failures_re[regex] = failures
1069 for test in self.tests:
1070 if regex.findall(test.classname):
1071 test.expected_failures.extend(failures)
1073 self.expected_failures.update(expected_failures_re)
1075 def add_test(self, test):
1076 if test.generator is None:
1077 test.classname = self.loading_testsuite + '.' + test.classname
1078 for regex, failures in list(self.expected_failures.items()):
1079 if regex.findall(test.classname):
1080 test.expected_failures.extend(failures)
1082 if self._is_test_wanted(test):
1083 if test not in self.tests:
1084 self.tests.append(test)
1085 self.tests.sort(key=lambda test: test.classname)
1087 if test not in self.tests:
1088 self.unwanted_tests.append(test)
1089 self.unwanted_tests.sort(key=lambda test: test.classname)
1091 def get_tests(self):
1094 def populate_testsuite(self):
1097 def add_generators(self, generators):
1099 @generators: A list of, or one single #TestsGenerator to be used to generate tests
1101 if not isinstance(generators, list):
1102 generators = [generators]
1103 self._generators.extend(generators)
1104 for generator in generators:
1105 generator.testsuite = self.loading_testsuite
1107 self._generators = list(set(self._generators))
1109 def get_generators(self):
1110 return self._generators
1112 def _add_blacklist(self, blacklisted_tests):
1113 if not isinstance(blacklisted_tests, list):
1114 blacklisted_tests = [blacklisted_tests]
1116 for patterns in blacklisted_tests:
1117 for pattern in patterns.split(","):
1118 self.blacklisted_tests_patterns.append(re.compile(pattern))
1120 def set_default_blacklist(self, default_blacklist):
1121 for test_regex, reason in default_blacklist:
1122 if not test_regex.startswith(self.loading_testsuite + '.'):
1123 test_regex = self.loading_testsuite + '.' + test_regex
1124 self.blacklisted_tests.append((test_regex, reason))
1126 def add_options(self, parser):
1127 """ Add more arguments. """
1130 def set_settings(self, options, args, reporter):
1131 """ Set properties after options parsing. """
1132 self.options = options
1134 self.reporter = reporter
1136 self.populate_testsuite()
1138 if self.options.valgrind:
1139 self.print_valgrind_bugs()
1141 if options.wanted_tests:
1142 for patterns in options.wanted_tests:
1143 for pattern in patterns.split(","):
1144 self.wanted_tests_patterns.append(re.compile(pattern))
1146 if options.blacklisted_tests:
1147 for patterns in options.blacklisted_tests:
1148 self._add_blacklist(patterns)
1150 def set_blacklists(self):
1151 if self.blacklisted_tests:
1152 printc("\nCurrently 'hardcoded' %s blacklisted tests:" %
1153 self.name, Colors.WARNING, title_char='-')
1155 if self.options.check_bugs_status:
1156 if not check_bugs_resolution(self.blacklisted_tests):
1159 for name, bug in self.blacklisted_tests:
1160 self._add_blacklist(name)
1161 if not self.options.check_bugs_status:
1162 print(" + %s \n --> bug: %s\n" % (name, bug))
1166 def check_expected_failures(self):
1167 if not self.expected_failures or not self.options.check_bugs_status:
1170 if self.expected_failures:
1171 printc("\nCurrently known failures in the %s testsuite:"
1172 % self.name, Colors.WARNING, title_char='-')
1174 bugs_definitions = {}
1175 for regex, failures in list(self.expected_failures.items()):
1176 for failure in failures:
1177 bugs = failure.get('bug')
1179 bugs = failure.get('bugs')
1181 printc('+ %s:\n --> no bug reported associated with %s\n' % (
1182 regex.pattern, failure), Colors.WARNING)
1185 if not isinstance(bugs, list):
1187 cbugs = bugs_definitions.get(regex.pattern, [])
1188 bugs.extend([b for b in bugs if b not in cbugs])
1189 bugs_definitions[regex.pattern] = bugs
1191 return check_bugs_resolution(bugs_definitions.items())
1193 def _check_blacklisted(self, test):
1194 for pattern in self.blacklisted_tests_patterns:
1195 if pattern.findall(test.classname):
1196 self.info("%s is blacklisted by %s", test.classname, pattern)
1201 def _check_whitelisted(self, test):
1202 for pattern in self.wanted_tests_patterns:
1203 if pattern.findall(test.classname):
1204 if self._check_blacklisted(test):
1205 # If explicitly white listed that specific test
1206 # bypass the blacklisting
1207 if pattern.pattern != test.classname:
1212 def _check_duration(self, test):
1213 if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1214 self.info("Not activating %s as its duration (%d) is superior"
1215 " than the long limit (%d)" % (test, test.duration,
1216 int(self.options.long_limit)))
1221 def _is_test_wanted(self, test):
1222 if self._check_whitelisted(test):
1223 if not self._check_duration(test):
1227 if self._check_blacklisted(test):
1230 if not self._check_duration(test):
1233 if not self.wanted_tests_patterns:
1238 def needs_http_server(self):
1241 def print_valgrind_bugs(self):
1245 class TestsGenerator(Loggable):
1247 def __init__(self, name, test_manager, tests=[]):
1248 Loggable.__init__(self)
1250 self.test_manager = test_manager
1251 self.testsuite = None
1254 self._tests[test.classname] = test
1256 def generate_tests(self, *kwargs):
1258 Method that generates tests
1260 return list(self._tests.values())
1262 def add_test(self, test):
1263 test.generator = self
1264 test.classname = self.testsuite + '.' + test.classname
1265 self._tests[test.classname] = test
1268 class GstValidateTestsGenerator(TestsGenerator):
1270 def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1273 def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1274 self.populate_tests(uri_minfo_special_scenarios, scenarios)
1275 return super(GstValidateTestsGenerator, self).generate_tests()
1278 class _TestsLauncher(Loggable):
1280 def __init__(self, libsdir):
1282 Loggable.__init__(self)
1284 self.libsdir = libsdir
1288 self.reporter = None
1289 self._list_testers()
1290 self.all_tests = None
1291 self.wanted_tests_patterns = []
1293 self.queue = queue.Queue()
1295 self.total_num_tests = 0
1297 def _list_app_dirs(self):
1299 app_dirs.append(os.path.join(self.libsdir, "apps"))
1300 env_dirs = os.environ.get("GST_VALIDATE_APPS_DIR")
1301 if env_dirs is not None:
1302 for dir_ in env_dirs.split(":"):
1303 app_dirs.append(dir_)
1304 sys.path.append(dir_)
1308 def _exec_app(self, app_dir, env):
1310 files = os.listdir(app_dir)
1311 except OSError as e:
1312 self.debug("Could not list %s: %s" % (app_dir, e))
1315 if f.endswith(".py"):
1316 exec(compile(open(os.path.join(app_dir, f)).read(), os.path.join(app_dir, f), 'exec'), env)
1318 def _exec_apps(self, env):
1319 app_dirs = self._list_app_dirs()
1320 for app_dir in app_dirs:
1321 self._exec_app(app_dir, env)
1323 def _list_testers(self):
1324 env = globals().copy()
1325 self._exec_apps(env)
1327 testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1328 for tester in testers:
1329 if tester.init() is True:
1330 self.testers.append(tester)
1332 self.warning("Can not init tester: %s -- PATH is %s"
1333 % (tester.name, os.environ["PATH"]))
1335 def add_options(self, parser):
1336 for tester in self.testers:
1337 tester.add_options(parser)
1339 def _load_testsuite(self, testsuites):
1341 for testsuite in testsuites:
1343 sys.path.insert(0, os.path.dirname(testsuite))
1344 return (__import__(os.path.basename(testsuite).replace(".py", "")), None)
1345 except Exception as e:
1346 exceptions.append("Could not load %s: %s" % (testsuite, e))
1349 sys.path.remove(os.path.dirname(testsuite))
1351 return (None, exceptions)
1353 def _load_testsuites(self):
1355 for testsuite in self.options.testsuites:
1356 if os.path.exists(testsuite):
1357 testsuite = os.path.abspath(os.path.expanduser(testsuite))
1358 loaded_module = self._load_testsuite([testsuite])
1360 possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1361 for d in self.options.testsuites_dirs]
1362 loaded_module = self._load_testsuite(possible_testsuites_paths)
1364 module = loaded_module[0]
1365 if not loaded_module[0]:
1366 if "." in testsuite:
1367 self.options.testsuites.append(testsuite.split('.')[0])
1368 self.info("%s looks like a test name, trying that" % testsuite)
1369 self.options.wanted_tests.append(testsuite)
1371 printc("Could not load testsuite: %s, reasons: %s" % (
1372 testsuite, loaded_module[1]), Colors.FAIL)
1375 testsuites.append(module)
1376 if not hasattr(module, "TEST_MANAGER"):
1377 module.TEST_MANAGER = [tester.name for tester in self.testers]
1378 elif not isinstance(module.TEST_MANAGER, list):
1379 module.TEST_MANAGER = [module.TEST_MANAGER]
1381 self.options.testsuites = testsuites
1383 def _setup_testsuites(self):
1384 for testsuite in self.options.testsuites:
1386 wanted_test_manager = None
1387 if hasattr(testsuite, "TEST_MANAGER"):
1388 wanted_test_manager = testsuite.TEST_MANAGER
1389 if not isinstance(wanted_test_manager, list):
1390 wanted_test_manager = [wanted_test_manager]
1392 for tester in self.testers:
1393 if wanted_test_manager is not None and \
1394 tester.name not in wanted_test_manager:
1397 if self.options.user_paths:
1398 TestsManager.loading_testsuite = tester.name
1399 tester.register_defaults()
1402 TestsManager.loading_testsuite = testsuite.__name__
1403 if testsuite.setup_tests(tester, self.options):
1405 TestsManager.loading_testsuite = None
1408 printc("Could not load testsuite: %s"
1409 " maybe because of missing TestManager"
1410 % (testsuite), Colors.FAIL)
1413 def _load_config(self, options):
1414 printc("Loading config files is DEPRECATED"
1415 " you should use the new testsuite format now",)
1417 for tester in self.testers:
1418 tester.options = options
1419 globals()[tester.name] = tester
1420 globals()["options"] = options
1421 c__file__ = __file__
1422 globals()["__file__"] = self.options.config
1423 exec(compile(open(self.options.config).read(), self.options.config, 'exec'), globals())
1424 globals()["__file__"] = c__file__
1426 def set_settings(self, options, args):
1427 if options.xunit_file:
1428 self.reporter = reporters.XunitReporter(options)
1430 self.reporter = reporters.Reporter(options)
1432 self.options = options
1433 wanted_testers = None
1434 for tester in self.testers:
1435 if tester.name in args:
1436 wanted_testers = tester.name
1439 testers = self.testers
1441 for tester in testers:
1442 if tester.name in args:
1443 self.testers.append(tester)
1444 args.remove(tester.name)
1447 self._load_config(options)
1449 self._load_testsuites()
1450 if not self.options.testsuites:
1451 printc("Not testsuite loaded!", Colors.FAIL)
1454 for tester in self.testers:
1455 tester.set_settings(options, args, self.reporter)
1457 if not options.config and options.testsuites:
1458 if self._setup_testsuites() is False:
1461 for tester in self.testers:
1462 if not tester.set_blacklists():
1465 if not tester.check_expected_failures():
1470 def _check_tester_has_other_testsuite(self, testsuite, tester):
1471 if tester.name != testsuite.TEST_MANAGER[0]:
1474 for t in self.options.testsuites:
1476 for other_testmanager in t.TEST_MANAGER:
1477 if other_testmanager == tester.name:
1482 def _check_defined_tests(self, tester, tests):
1483 if self.options.blacklisted_tests or self.options.wanted_tests:
1486 tests_names = [test.classname for test in tests]
1487 testlist_changed = False
1488 for testsuite in self.options.testsuites:
1489 if not self._check_tester_has_other_testsuite(testsuite, tester) \
1490 and tester.check_testslist:
1492 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1495 know_tests = testlist_file.read().split("\n")
1496 testlist_file.close()
1498 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1504 for test in know_tests:
1505 if test and test.strip('~') not in tests_names:
1506 if not test.startswith('~'):
1507 testlist_changed = True
1508 printc("Test %s Not in testsuite %s anymore"
1509 % (test, testsuite.__file__), Colors.FAIL)
1511 optional_out.append((test, None))
1513 tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1514 key=lambda x: x[0].strip('~'))
1516 for tname, test in tests_names:
1517 if test and test.optional:
1519 testlist_file.write("%s\n" % (tname))
1520 if tname and tname not in know_tests:
1521 printc("Test %s is NEW in testsuite %s"
1522 % (tname, testsuite.__file__), Colors.OKGREEN)
1523 testlist_changed = True
1525 testlist_file.close()
1528 return testlist_changed
1530 def list_tests(self):
1531 for tester in self.testers:
1532 if not self._tester_needed(tester):
1535 tests = tester.list_tests()
1536 if self._check_defined_tests(tester, tests) and \
1537 self.options.fail_on_testlist_change:
1540 self.tests.extend(tests)
1541 return sorted(list(self.tests), key=lambda t: t.classname)
1543 def _tester_needed(self, tester):
1544 for testsuite in self.options.testsuites:
1545 if tester.name in testsuite.TEST_MANAGER:
1549 def print_test_num(self, test):
1550 cur_test_num = self.tests.index(test) + 1
1551 sys.stdout.write("[%d / %d] " % (cur_test_num, self.total_num_tests))
1553 def test_wait(self):
1555 # Check process every second for timeout
1557 self.queue.get(timeout=1)
1561 for test in self.jobs:
1562 if test.process_update():
1563 self.jobs.remove(test)
1566 def tests_wait(self):
1568 test = self.test_wait()
1569 test.check_results()
1570 except KeyboardInterrupt:
1571 for test in self.jobs:
1572 test.kill_subprocess()
1577 def start_new_job(self, tests_left):
1579 test = tests_left.pop(0)
1583 self.print_test_num(test)
1584 test.test_start(self.queue)
1586 self.jobs.append(test)
1590 def _run_tests(self):
1593 if not self.all_tests:
1594 all_tests = self.list_tests()
1597 self.all_tests = all_tests
1598 self.total_num_tests = len(self.all_tests)
1600 self.reporter.init_timer()
1603 for test in self.tests:
1604 if test.is_parallel:
1607 alone_tests.append(test)
1609 max_num_jobs = min(self.options.num_jobs, len(tests))
1612 for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
1613 tests_left = list(tests)
1614 for i in range(num_jobs):
1615 if not self.start_new_job(tests_left):
1619 while jobs_running != 0:
1620 test = self.tests_wait()
1622 self.print_test_num(test)
1623 res = test.test_end()
1624 self.reporter.after_test(test)
1625 if res != Result.PASSED and (self.options.forever or
1626 self.options.fatal_error):
1628 if self.start_new_job(tests_left):
1633 def clean_tests(self):
1634 for test in self.tests:
1637 def run_tests(self):
1638 if self.options.forever:
1641 t = "Running iteration %d" % r
1642 print("%s\n%s\n%s\n" % ("=" * len(t), t, "=" * len(t)))
1644 if not self._run_tests():
1650 elif self.options.n_runs:
1652 for r in range(self.options.n_runs):
1653 t = "Running iteration %d" % r
1654 print("%s\n%s\n%s\n" % ("=" * len(t), t, "=" * len(t)))
1655 if not self._run_tests():
1661 return self._run_tests()
1663 def final_report(self):
1664 return self.reporter.final_report()
1666 def needs_http_server(self):
1667 for tester in self.testers:
1668 if tester.needs_http_server():
1672 class NamedDic(object):
1674 def __init__(self, props):
1676 for name, value in props.items():
1677 setattr(self, name, value)
1680 class Scenario(object):
1682 def __init__(self, name, props, path=None):
1686 for prop, value in props:
1687 setattr(self, prop.replace("-", "_"), value)
1689 def get_execution_name(self):
1690 if self.path is not None:
1696 if hasattr(self, "seek"):
1697 return bool(self.seek)
1701 def needs_clock_sync(self):
1702 if hasattr(self, "need_clock_sync"):
1703 return bool(self.need_clock_sync)
1707 def needs_live_content(self):
1708 # Scenarios that can only be used on live content
1709 if hasattr(self, "live_content_required"):
1710 return bool(self.live_content_required)
1713 def compatible_with_live_content(self):
1714 # if a live content is required it's implicitely compatible with
1716 if self.needs_live_content():
1718 if hasattr(self, "live_content_compatible"):
1719 return bool(self.live_content_compatible)
1722 def get_min_media_duration(self):
1723 if hasattr(self, "min_media_duration"):
1724 return float(self.min_media_duration)
1728 def does_reverse_playback(self):
1729 if hasattr(self, "reverse_playback"):
1730 return bool(self.reverse_playback)
1734 def get_duration(self):
1736 return float(getattr(self, "duration"))
1737 except AttributeError:
1740 def get_min_tracks(self, track_type):
1742 return int(getattr(self, "min_%s_track" % track_type))
1743 except AttributeError:
1747 return "<Scenario %s>" % self.name
1750 class ScenarioManager(Loggable):
1754 FILE_EXTENSION = "scenario"
1755 GST_VALIDATE_COMMAND = ""
1757 def __new__(cls, *args, **kwargs):
1758 if not cls._instance:
1759 cls._instance = super(ScenarioManager, cls).__new__(
1760 cls, *args, **kwargs)
1761 cls._instance.config = None
1762 cls._instance.discovered = False
1763 Loggable.__init__(cls._instance)
1765 return cls._instance
1767 def find_special_scenarios(self, mfile):
1769 mfile_bname = os.path.basename(mfile)
1771 for f in os.listdir(os.path.dirname(mfile)):
1772 if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
1773 scenarios.append(os.path.join(os.path.dirname(mfile), f))
1776 scenarios = self.discover_scenarios(scenarios, mfile)
1780 def discover_scenarios(self, scenario_paths=[], mfile=None):
1782 Discover scenarios specified in scenario_paths or the default ones
1783 if nothing specified there
1786 scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
1787 logs = open(os.path.join(self.config.logsdir,
1788 "scenarios_discovery.log"), 'w')
1791 command = [self.GST_VALIDATE_COMMAND,
1792 "--scenarios-defs-output-file", scenario_defs]
1793 command.extend(scenario_paths)
1794 subprocess.check_call(command, stdout=logs, stderr=logs)
1795 except subprocess.CalledProcessError:
1798 config = configparser.RawConfigParser()
1799 f = open(scenario_defs)
1802 for section in config.sections():
1804 for scenario_path in scenario_paths:
1807 path = scenario_path
1808 elif section in scenario_path:
1809 # The real name of the scenario is:
1810 # filename.REALNAME.scenario
1811 name = scenario_path.replace(mfile + ".", "").replace(
1812 "." + self.FILE_EXTENSION, "")
1813 path = scenario_path
1818 props = config.items(section)
1819 scenarios.append(Scenario(name, props, path))
1821 if not scenario_paths:
1822 self.discovered = True
1823 self.all_scenarios.extend(scenarios)
1827 def get_scenario(self, name):
1828 if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
1829 scenarios = self.discover_scenarios([name])
1834 if self.discovered is False:
1835 self.discover_scenarios()
1838 return self.all_scenarios
1841 return [scenario for scenario in self.all_scenarios if scenario.name == name][0]
1843 self.warning("Scenario: %s not found" % name)
1847 class GstValidateBaseTestManager(TestsManager):
1848 scenarios_manager = ScenarioManager()
1851 super(GstValidateBaseTestManager, self).__init__()
1852 self._scenarios = []
1853 self._encoding_formats = []
1855 def add_scenarios(self, scenarios):
1857 @scenarios A list or a unic scenario name(s) to be run on the tests.
1858 They are just the default scenarios, and then depending on
1859 the TestsGenerator to be used you can have more fine grained
1860 control on what to be run on each serie of tests.
1862 if isinstance(scenarios, list):
1863 self._scenarios.extend(scenarios)
1865 self._scenarios.append(scenarios)
1867 self._scenarios = list(set(self._scenarios))
1869 def set_scenarios(self, scenarios):
1871 Override the scenarios
1873 self._scenarios = []
1874 self.add_scenarios(scenarios)
1876 def get_scenarios(self):
1877 return self._scenarios
1879 def add_encoding_formats(self, encoding_formats):
1881 :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
1882 formats for transcoding test.
1883 They are just the default encoding formats, and then depending on
1884 the TestsGenerator to be used you can have more fine grained
1885 control on what to be run on each serie of tests.
1887 if isinstance(encoding_formats, list):
1888 self._encoding_formats.extend(encoding_formats)
1890 self._encoding_formats.append(encoding_formats)
1892 self._encoding_formats = list(set(self._encoding_formats))
1894 def get_encoding_formats(self):
1895 return self._encoding_formats
1898 class MediaDescriptor(Loggable):
1901 Loggable.__init__(self)
1904 raise NotImplemented
1906 def get_media_filepath(self):
1907 raise NotImplemented
1910 raise NotImplemented
1913 raise NotImplemented
1915 def get_duration(self):
1916 raise NotImplemented
1918 def get_protocol(self):
1919 raise NotImplemented
1921 def is_seekable(self):
1922 raise NotImplemented
1925 raise NotImplemented
1928 raise NotImplemented
1930 def get_num_tracks(self, track_type):
1931 raise NotImplemented
1933 def can_play_reverse(self):
1934 raise NotImplemented
1939 def is_compatible(self, scenario):
1940 if scenario is None:
1943 if scenario.seeks() and (not self.is_seekable() or self.is_image()):
1944 self.debug("Do not run %s as %s does not support seeking",
1945 scenario, self.get_uri())
1948 if self.is_image() and scenario.needs_clock_sync():
1949 self.debug("Do not run %s as %s is an image",
1950 scenario, self.get_uri())
1953 if not self.can_play_reverse() and scenario.does_reverse_playback():
1956 if not self.is_live() and scenario.needs_live_content():
1957 self.debug("Do not run %s as %s is not a live content",
1958 scenario, self.get_uri())
1961 if self.is_live() and not scenario.compatible_with_live_content():
1962 self.debug("Do not run %s as %s is a live content", scenario, self.get_uri())
1965 if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
1968 if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
1970 "Do not run %s as %s is too short (%i < min media duation : %i",
1971 scenario, self.get_uri(),
1972 self.get_duration() / GST_SECOND,
1973 scenario.get_min_media_duration())
1976 for track_type in ['audio', 'subtitle', 'video']:
1977 if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
1978 self.debug("%s -- %s | At least %s %s track needed < %s"
1979 % (scenario, self.get_uri(), track_type,
1980 scenario.get_min_tracks(track_type),
1981 self.get_num_tracks(track_type)))
1987 class GstValidateMediaDescriptor(MediaDescriptor):
1988 # Some extension file for discovering results
1989 MEDIA_INFO_EXT = "media_info"
1990 STREAM_INFO_EXT = "stream_info"
1992 DISCOVERER_COMMAND = "gst-validate-media-check-1.0"
1993 if "win32" in sys.platform:
1994 DISCOVERER_COMMAND += ".exe"
1996 def __init__(self, xml_path):
1997 super(GstValidateMediaDescriptor, self).__init__()
1999 self._xml_path = xml_path
2001 self.media_xml = ET.parse(xml_path).getroot()
2002 except xml.etree.ElementTree.ParseError:
2003 printc("Could not parse %s" % xml_path,
2008 self.media_xml.attrib["duration"]
2009 self.media_xml.attrib["seekable"]
2011 self.set_protocol(urllib.parse.urlparse(urllib.parse.urlparse(self.get_uri()).scheme).scheme)
2014 def new_from_uri(uri, verbose=False, include_frames=False):
2016 include_frames = 0 # Never
2017 include_frames = 1 # always
2018 include_frames = 2 # if previous file included them
2021 media_path = utils.url2path(uri)
2023 descriptor_path = "%s.%s" % (
2024 media_path, GstValidateMediaDescriptor.MEDIA_INFO_EXT)
2025 if include_frames == 2:
2027 media_xml = ET.parse(descriptor_path).getroot()
2028 frames = media_xml.findall('streams/stream/frame')
2029 include_frames = bool(frames)
2030 except FileNotFoundError:
2033 include_frames = bool(include_frames)
2035 args = GstValidateMediaDescriptor.DISCOVERER_COMMAND.split(" ")
2038 args.extend(["--output-file", descriptor_path])
2040 args.extend(["--full"])
2043 printc("Generating media info for %s\n"
2044 " Command: '%s'" % (media_path, ' '.join(args)),
2048 subprocess.check_output(args, stderr=open(os.devnull))
2049 except subprocess.CalledProcessError as e:
2051 printc("Result: Failed", Colors.FAIL)
2053 loggable.warning("GstValidateMediaDescriptor", "Exception: %s" % e)
2057 printc("Result: Passed", Colors.OKGREEN)
2060 return GstValidateMediaDescriptor(descriptor_path)
2061 except (IOError, xml.etree.ElementTree.ParseError):
2065 return self._xml_path
2067 def need_clock_sync(self):
2068 return Protocols.needs_clock_sync(self.get_protocol())
2070 def get_media_filepath(self):
2071 if self.get_protocol() == Protocols.FILE:
2072 return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "")
2074 return self._xml_path.replace("." + self.STREAM_INFO_EXT, "")
2077 return self.media_xml.findall("streams")[0].attrib["caps"]
2079 def get_tracks_caps(self):
2082 streams = self.media_xml.findall("streams")[0].findall("stream")
2086 for stream in streams:
2087 res.append((stream.attrib["type"], stream.attrib["caps"]))
2092 return self.media_xml.attrib["uri"]
2094 def get_duration(self):
2095 return int(self.media_xml.attrib["duration"])
2097 def set_protocol(self, protocol):
2098 self.media_xml.attrib["protocol"] = protocol
2100 def get_protocol(self):
2101 return self.media_xml.attrib["protocol"]
2103 def is_seekable(self):
2104 return self.media_xml.attrib["seekable"].lower() == "true"
2107 return self.media_xml.get("live", "false").lower() == "true"
2109 def can_play_reverse(self):
2113 for stream in self.media_xml.findall("streams")[0].findall("stream"):
2114 if stream.attrib["type"] == "image":
2118 def get_num_tracks(self, track_type):
2120 for stream in self.media_xml.findall("streams")[0].findall("stream"):
2121 if stream.attrib["type"] == track_type:
2126 def get_clean_name(self):
2127 name = os.path.basename(self.get_path())
2128 name = re.sub("\.stream_info|\.media_info", "", name)
2130 return name.replace('.', "_")
2133 class MediaFormatCombination(object):
2134 FORMATS = {"aac": "audio/mpeg,mpegversion=4", # Audio
2135 "ac3": "audio/x-ac3",
2136 "vorbis": "audio/x-vorbis",
2137 "mp3": "audio/mpeg,mpegversion=1,layer=3",
2138 "opus": "audio/x-opus",
2139 "rawaudio": "audio/x-raw",
2142 "h264": "video/x-h264",
2143 "h265": "video/x-h265",
2144 "vp8": "video/x-vp8",
2145 "vp9": "video/x-vp9",
2146 "theora": "video/x-theora",
2147 "prores": "video/x-prores",
2148 "jpeg": "image/jpeg",
2151 "webm": "video/webm",
2152 "ogg": "application/ogg",
2153 "mkv": "video/x-matroska",
2154 "mp4": "video/quicktime,variant=iso;",
2155 "quicktime": "video/quicktime;"}
2158 return "%s and %s in %s" % (self.audio, self.video, self.container)
2160 def __init__(self, container, audio, video):
2162 Describes a media format to be used for transcoding tests.
2164 :param container: A string defining the container format to be used, must bin in self.FORMATS
2165 :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2166 :param video: A string defining the video format to be used, must bin in self.FORMATS
2168 self.container = container
2172 def get_caps(self, track_type):
2174 return self.FORMATS[self.__dict__[track_type]]
2178 def get_audio_caps(self):
2179 return self.get_caps("audio")
2181 def get_video_caps(self):
2182 return self.get_caps("video")
2184 def get_muxer_caps(self):
2185 return self.get_caps("container")