from launcher.baseclasses import GstValidateTest, Test, \
ScenarioManager, NamedDic, GstValidateTestsGenerator, \
GstValidateMediaDescriptor, GstValidateEncodingTestInterface, \
- GstValidateBaseTestManager
+ GstValidateBaseTestManager, MediaDescriptor
from launcher.utils import path2url, DEFAULT_TIMEOUT, which, \
GST_SECOND, Result, Protocols, mkdir, printc, Colors
mediainfo.media_descriptor))
+class FakeMediaDescriptor(MediaDescriptor):
+ def __init__(self, infos, pipeline_desc):
+ MediaDescriptor.__init__(self)
+ self._infos = infos
+ self._pipeline_desc = pipeline_desc
+
+ def get_path(self):
+ return self._infos.get('path', None)
+
+ def get_media_filepath(self):
+ return self._infos.get('media-filepath', None)
+
+ def get_caps(self):
+ return self._infos.get('caps', None)
+
+ def get_uri(self):
+ return self._infos.get('uri', None)
+
+ def get_duration(self):
+ return int(self._infos.get('duration', 0)) * GST_SECOND
+
+ def get_protocol(self):
+ return self._infos.get('protocol', "launch_pipeline")
+
+ def is_seekable(self):
+ return self._infos.get('is-seekable', True)
+
+ def is_image(self):
+ return self._infos.get('is-image', False)
+
+ def get_num_tracks(self, track_type):
+ return self._infos.get('num-%s-tracks' % track_type,
+ self._pipeline_desc.count(track_type + "sink"))
+
+ def can_play_reverse(self):
+ return self._infos.get('plays-reverse', False)
+
+
class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator):
def __init__(self, name, test_manager, pipeline_template=None,
else:
extra_datas = {}
- for scenario_name in extra_datas.get('scenarios', scenarios):
+ for scenario in extra_datas.get('scenarios', scenarios):
+ if isinstance(scenario, str):
+ scenario = self.test_manager.scenarios_manager.get_scenario(scenario)
+
+ mediainfo = FakeMediaDescriptor(extra_datas, pipeline)
+ if not mediainfo.is_compatible(scenario):
+ continue
+
if self.test_manager.options.mute:
if scenario and scenario.needs_clock_sync():
- fakesink = "'fakesink sync=true'"
+ fakesink = "fakesink sync=true"
else:
fakesink = "fakesink"
audiosink = 'autoaudiosink'
videosink = 'autovideosink'
- pipeline = pipeline % {'videosink': videosink,
+ pipeline_desc = pipeline % {'videosink': videosink,
'audiosink': audiosink}
- if scenario_name:
- scenario = self.test_manager.scenarios_manager.get_scenario(scenario_name)
- else:
- scenario = None
+ fname = self.get_fname(scenario, protocol=mediainfo.get_protocol(), name=name)
- fname = self.get_fname(scenario, name=name)
self.add_test(GstValidateLaunchTest(fname,
self.test_manager.options,
self.test_manager.reporter,
- pipeline,
+ pipeline_desc,
scenario=scenario,
- duration=extra_datas.get('duration', 0),
- timeout=extra_datas.get('timeout', DEFAULT_TIMEOUT),
- hard_timeout=extra_datas.get('hard_timeout', None))
+ media_descriptor=mediainfo)
)
def build_arguments(self):
GstValidateTest.build_arguments(self)
self.add_arguments(self.pipeline_desc)
- if self.media_descriptor is not None:
+ if self.media_descriptor is not None and self.media_descriptor.get_path():
self.add_arguments(
"--set-media-info", self.media_descriptor.get_path())
def get_num_tracks(self, track_type):
raise NotImplemented
+ def can_play_reverse(self):
+ raise NotImplemented
+
def is_compatible(self, scenario):
+ if scenario is None:
+ return True
+
if scenario.seeks() and (not self.is_seekable() or self.is_image()):
self.debug("Do not run %s as %s does not support seeking",
scenario, self.get_uri())
scenario, self.get_uri())
return False
+ if not self.can_play_reverse() and scenario.does_reverse_playback():
+ return False
+
if self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
self.debug(
"Do not run %s as %s is too short (%i < min media duation : %i",
def is_seekable(self):
return self.media_xml.attrib["seekable"]
+ def can_play_reverse(self):
+ return True
+
def is_image(self):
for stream in self.media_xml.findall("streams")[0].findall("stream"):
if stream.attrib["type"] == "image":