""" Class representing tests and test managers. """
+from enum import Enum
import importlib.util
import json
import os
from itertools import cycle
from fractions import Fraction
-from .utils import which
+from .utils import GstCaps, which
from . import reporters
from . import loggable
from .loggable import Loggable
copied_test = copy.copy(self)
if nth:
copied_test.classname += '_it' + str(nth)
+ copied_test._uuid = None
copied_test.options = copy.copy(self.options)
copied_test.options.logsdir = os.path.join(copied_test.options.logsdir, str(nth))
os.makedirs(copied_test.options.logsdir, exist_ok=True)
info = ""
info += "\n\n**You can mark the issues as 'known' by adding the " \
- + " following lines to the list of known issues**\n" \
+ + f" following lines to the list of known issues of the testsuite called \"{self.classname.split('.')[0]}\"**\n" \
+ "\n\n``` python\n%s\n```" % (self.generate_expected_issues())
if self.options.redirect_logs:
if result not in [Result.PASSED, Result.NOT_RUN, Result.SKIPPED]:
self.add_known_issue_information()
+ def expected_return_codes(self):
+ res = []
+ for issue in self.expected_issues:
+ if 'returncode' in issue:
+ res.append(issue['returncode'])
+ return res
+
def check_results(self):
if self.result is Result.FAILED or self.result is Result.TIMEOUT:
return
if self.options.rr and self.process.returncode == -signal.SIGPIPE:
self.set_result(Result.SKIPPED, "SIGPIPE received under `rr`, known issue.")
elif self.process.returncode == 0:
+ for issue in self.expected_issues:
+ if issue['returncode'] != 0 and not issue.get("sometimes", False):
+ self.set_result(Result.ERROR, "Expected return code %d" % issue['returncode'])
+ return
self.set_result(Result.PASSED)
elif self.process.returncode in EXITING_SIGNALS:
self.add_stack_trace_to_logfile()
EXITING_SIGNALS[self.process.returncode]))
elif self.process.returncode == VALGRIND_ERROR_CODE:
self.set_result(Result.FAILED, "Valgrind reported errors")
+ elif self.process.returncode in self.expected_return_codes():
+ self.set_result(Result.KNOWN_ERROR)
else:
self.set_result(Result.FAILED,
"Application returned %d" % (self.process.returncode))
options, reporter, duration=0,
timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
media_descriptor=None, extra_env_variables=None,
- expected_issues=None, workdir=None):
+ expected_issues=None, workdir=None, **kwargs):
extra_env_variables = extra_env_variables or {}
extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
- super(GstValidateTest, self).__init__(application_name, classname,
- options, reporter,
- duration=duration,
- timeout=timeout,
- hard_timeout=hard_timeout,
- extra_env_variables=extra_env_variables,
- expected_issues=expected_issues,
- workdir=workdir)
+ super().__init__(application_name,
+ classname,
+ options, reporter,
+ duration=duration,
+ timeout=timeout,
+ hard_timeout=hard_timeout,
+ extra_env_variables=extra_env_variables,
+ expected_issues=expected_issues,
+ workdir=workdir,
+ **kwargs)
if media_descriptor and media_descriptor.get_media_filepath():
config_file = os.path.join(media_descriptor.get_media_filepath() + '.config')
if os.path.isfile(config_file):
else:
self.scenario = scenario
+ def needs_http_server(self):
+ if self.media_descriptor is None:
+ return False
+
+ protocol = self.media_descriptor.get_protocol()
+ uri = self.media_descriptor.get_uri()
+ uri_requires_http_server = False
+ if uri:
+ if 'http-server-port' in uri:
+ expanded_uri = uri % {
+ 'http-server-port': self.options.http_server_port}
+ uri_requires_http_server = expanded_uri.find(
+ "127.0.0.1:%s" % self.options.http_server_port) != -1
+ if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] or uri_requires_http_server:
+ return True
+
+ return False
+
def kill_subprocess(self):
Test.kill_subprocess(self)
# Ensure XInitThreads is called, see bgo#731525
subproc_env['GST_GL_XINITTHREADS'] = '1'
self.add_env_variable('GST_GL_XINITTHREADS', '1')
+ subproc_env['GST_XINITTHREADS'] = '1'
+ self.add_env_variable('GST_XINITTHREADS', '1')
if self.scenario is not None:
scenario = self.scenario.get_execution_name()
def get_valgrind_suppressions(self):
result = super(GstValidateTest, self).get_valgrind_suppressions()
result.extend(utils.get_gst_build_valgrind_suppressions())
- gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
- if gst_sup:
- result.append(gst_sup)
-
return result
+class VariableFramerateMode(Enum):
+ DISABLED = 1
+ ENABLED = 2
+ AUTO = 3
+
+
class GstValidateEncodingTestInterface(object):
DURATION_TOLERANCE = GST_SECOND / 4
def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
audio_restriction=None, audio_presence=0,
- video_presence=0, variable_framerate=False):
+ video_presence=0,
+ variable_framerate=VariableFramerateMode.DISABLED):
+
ret = ""
if muxer:
ret += muxer
ret += venc
props = ""
if video_presence:
- props += 'presence=%s,' % str(video_presence)
- if variable_framerate:
- props += 'variable-framerate=true,'
+ props += 'presence=%s|' % str(video_presence)
+ if variable_framerate == VariableFramerateMode.AUTO:
+ if video_restriction and "framerate" in video_restriction:
+ variable_framerate = VariableFramerateMode.DISABLED
+ else:
+ variable_framerate = VariableFramerateMode.ENABLED
+ if variable_framerate == VariableFramerateMode.ENABLED:
+ props += 'variable-framerate=true|'
if props:
ret = ret + '|' + props[:-1]
if aenc:
return ret.replace("::", ":")
def get_profile(self, video_restriction=None, audio_restriction=None,
- variable_framerate=False):
+ variable_framerate=VariableFramerateMode.DISABLED):
vcaps = self.combination.get_video_caps()
acaps = self.combination.get_audio_caps()
if video_restriction is None:
self.blacklisted_tests_patterns.append(re.compile(pattern))
def set_default_blacklist(self, default_blacklist):
- for test_regex, reason in default_blacklist:
+ for test_regex, reason, *re_flags in default_blacklist:
+ re_flags = re_flags[0] if re_flags else None
+
if not test_regex.startswith(self.loading_testsuite + '.'):
test_regex = self.loading_testsuite + '.' + test_regex
+ if re_flags is not None:
+ test_regex = re_flags + test_regex
self.blacklisted_tests.append((test_regex, reason))
self._add_blacklist(test_regex)
return True
def print_result(self, current_test_num, test, total_num_tests, retry_on_failures=False):
- if test.result != Result.PASSED and (not retry_on_failures or test.max_retries):
+ if test.result not in [Result.PASSED, Result.KNOWN_ERROR] and (not retry_on_failures or test.max_retries):
printc(str(test), color=utils.get_color_for_result(test.result))
length = 80
self.reporter.after_test(test)
return False
- if retry_on_failures or test.max_retries:
+ if retry_on_failures or test.max_retries and not self.options.no_retry_on_failures:
if not self.options.redirect_logs:
test.copy_logfiles()
to_retry.append(test)
config = configparser.RawConfigParser()
f = open(scenario_defs)
- config.readfp(f)
+ config.read_file(f)
for section in config.sections():
name = None