return self.get_current_position()
-class GESRenderTest(GESTest):
+class GESRenderTest(GESTest, GstValidateEncodingTestInterface):
def __init__(self, classname, options, reporter, project_uri, combination):
- super(GESRenderTest, self).__init__(classname, options, reporter,
- project_uri)
- self.combination = combination
+ GESTest.__init__(self, classname, options, reporter, project_uri)
+
+ GstValidateEncodingTestInterface.__init__(self, combination, self.project)
def build_arguments(self):
GESTest.build_arguments(self)
if not utils.isuri(self.dest_file):
self.dest_file = utils.path2url(self.dest_file)
- profile = utils.get_profile(self.combination, self.project,
- video_restriction="video/x-raw,format=I420")
+ profile = self.get_profile(video_restriction="video/x-raw,format=I420")
self.add_arguments("-f", profile, "-o", self.dest_file)
def check_results(self):
if self.result in [Result.PASSED, Result.NOT_RUN] and self.scenario is None:
- res, msg = utils.compare_rendered_with_original(self.project.get_duration(),
- self.dest_file, self.combination)
+ res, msg = self.check_encoded_file()
self.set_result(res, msg)
else:
if self.result == utils.Result.TIMEOUT:
GstValidateTest.check_results(self)
def get_current_value(self):
- return self.get_current_size()
+ size = self.get_current_size()
+ if size is None:
+ return self.get_current_position()
+
+ return size
class GESTestsManager(TestsManager):
ScenarioManager, NamedDic, GstValidateTestsGenerator, \
GstValidateMediaDescriptor
-from utils import MediaFormatCombination, get_profile,\
- path2url, DEFAULT_TIMEOUT, which, GST_SECOND, Result, \
- compare_rendered_with_original, Protocols
+from utils import MediaFormatCombination, path2url, DEFAULT_TIMEOUT, which, \
+ GST_SECOND, Result, Protocols
######################################
# Private global variables #
self._media_info_path)
-class GstValidateTranscodingTest(GstValidateTest):
+class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterface):
scenarios_manager = ScenarioManager()
def __init__(self, classname, options, reporter,
combination, uri, media_descriptor,
except KeyError:
pass
- super(GstValidateTranscodingTest, self).__init__(
- GST_VALIDATE_TRANSCODING_COMMAND, classname,
- options, reporter, duration=duration,
- timeout=timeout, scenario=scenario)
+ super(GstValidateTranscodingTest, self).__init__(GST_VALIDATE_TRANSCODING_COMMAND,
+ classname,
+ options,
+ reporter,
+ duration=duration,
+ timeout=timeout,
+ scenario=scenario)
+
+ GstValidateEncodingTestInterface.__init__(self, combination, media_descriptor)
self.media_descriptor = media_descriptor
self.uri = uri
- self.combination = combination
- self.dest_file = ""
def set_rendering_info(self):
self.dest_file = path = os.path.join(self.options.dest,
if urlparse.urlparse(self.dest_file).scheme == "":
self.dest_file = path2url(self.dest_file)
- profile = get_profile(self.combination, self.media_descriptor)
+ profile = self.get_profile()
self.add_arguments("-o", profile)
def build_arguments(self):
return Result.FAILED
- return self.get_current_size()
+ size = self.get_current_size()
+ if size is None:
+ return self.get_current_position()
+
+ return size
def check_results(self):
if self.result in [Result.FAILED, Result.TIMEOUT]:
long(self.media_descriptor.get_duration()))
else:
orig_duration = long(self.media_descriptor.get_duration())
- res, msg = compare_rendered_with_original(orig_duration, self.dest_file)
+
+ res, msg = self.check_encoded_file()
self.set_result(res, msg)
return position
- def get_current_size(self):
- position = self.get_current_position()
+class GstValidateEncodingTestInterface(object):
+ DURATION_TOLERANCE = GST_SECOND / 4
+
+ def __init__(self, combination, media_descriptor, duration_tolerance=None):
+ super(GstValidateEncodingTestInterface, self).__init__()
+
+ self.media_descriptor = media_descriptor
+ self.combination = combination
+ self.dest_file = ""
+ self._duration_tolerance = duration_tolerance
+ if duration_tolerance is None:
+ self._duration_tolerance = self.DURATION_TOLERANCE
+
+ def get_current_size(self):
try:
size = os.stat(urlparse.urlparse(self.dest_file).path).st_size
except OSError as e:
- return position
+ return None
self.debug("Size: %s" % size)
return size
+ def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
+ audio_restriction=None, audio_presence=0,
+ video_presence=0):
+ ret = "\""
+ if muxer:
+ ret += muxer
+ ret += ":"
+ if venc:
+ if video_restriction is not None:
+ ret = ret + video_restriction + '->'
+ ret += venc
+ if video_presence:
+ ret = ret + '|' + str(video_presence)
+ if aenc:
+ ret += ":"
+ if audio_restriction is not None:
+ ret = ret + audio_restriction + '->'
+ ret += aenc
+ if audio_presence:
+ ret = ret + '|' + str(audio_presence)
+
+ ret += "\""
+ return ret.replace("::", ":")
+
+ def get_profile(self, video_restriction=None, audio_restriction=None):
+ vcaps = utils.FORMATS[self.combination.vcodec]
+ acaps = utils.FORMATS[self.combination.acodec]
+ if self.media_descriptor is not None:
+ if self.media_descriptor.get_num_tracks("video") == 0:
+ vcaps = None
+
+ if self.media_descriptor.get_num_tracks("audio") == 0:
+ acaps = None
+
+ return self._get_profile_full(utils.FORMATS[self.combination.container],
+ vcaps, acaps,
+ video_restriction=video_restriction,
+ audio_restriction=audio_restriction)
+
+ def check_encoded_file(self):
+ duration = utils.get_duration(self.dest_file)
+ orig_duration = self.media_descriptor.get_duration()
+ tolerance = self._duration_tolerance
+
+ if orig_duration - tolerance >= duration <= orig_duration + tolerance:
+ return (Result.FAILED, "Duration of encoded file is "
+ " wrong (%s instead of %s)" %
+ (TIME_ARGS (duration),
+ TIME_ARGS (orig_duration)))
+ else:
+ return (Result.PASSED, "")
+
class TestsManager(Loggable):
STREAM_INFO_EXT = "stream_info"
def __init__(self, xml_path):
- super(GstValidateMediaDescriptor, self).__init__(self)
+ super(GstValidateMediaDescriptor, self).__init__()
self._xml_path = xml_path
self.media_xml = ET.parse(xml_path).getroot()
DEFAULT_MAIN_DIR = os.path.expanduser("~/gst-validate/")
DEFAULT_GST_QA_ASSETS = os.path.join(DEFAULT_MAIN_DIR, "gst-qa-assets")
DISCOVERER_COMMAND = "gst-discoverer-1.0"
-DURATION_TOLERANCE = GST_SECOND / 4
# Use to set the duration from which a test is concidered as being 'long'
LONG_TEST = 40
return ret.replace("::", ":")
-def get_profile(combination, media_descriptor=None, video_restriction=None, audio_restriction=None):
- vcaps = FORMATS[combination.vcodec]
- acaps = FORMATS[combination.acodec]
- if media_descriptor is not None:
- if media_descriptor.get_num_tracks("video") == 0:
- vcaps = None
-
- if media_descriptor.get_num_tracks("audio") == 0:
- acaps = None
-
- return get_profile_full(FORMATS[combination.container],
- vcaps,
- acaps,
- video_restriction=video_restriction,
- audio_restriction=audio_restriction)
-
##################################################
# Some utilities to parse gst-validate output #
##################################################
return duration
-def compare_rendered_with_original(orig_duration, dest_file, tolerance=DURATION_TOLERANCE):
- duration = get_duration(dest_file)
-
- if orig_duration - tolerance >= duration <= orig_duration + tolerance:
- return (Result.FAILED, "Duration of encoded file is "
- " wrong (%s instead of %s)" %
- (TIME_ARGS (duration),
- TIME_ARGS (orig_duration)))
- else:
- return (Result.PASSED, "")
-
def get_scenarios():
GST_VALIDATE_COMMAND = "gst-validate-1.0"