project_uri, scenario=scenario)
def get_current_value(self):
- return utils.get_current_position(self)
+ return self.get_current_position()
class GESRenderTest(GESTest):
GstValidateTest.check_results(self)
def get_current_value(self):
- return utils.get_current_size(self)
+ return self.get_current_size()
class GESTestsManager(TestsManager):
from baseclasses import GstValidateTest, TestsManager, Test, Scenario, NamedDic
from utils import MediaFormatCombination, get_profile,\
- path2url, get_current_position, get_current_size, \
- DEFAULT_TIMEOUT, which, GST_SECOND, Result, \
+ path2url, DEFAULT_TIMEOUT, which, GST_SECOND, Result, \
compare_rendered_with_original
self.add_arguments(self.pipeline_desc)
def get_current_value(self):
- return get_current_position(self)
+ return self.get_current_position()
class GstValidateMediaCheckTest(Test):
self.add_arguments(self.uri, self.dest_file)
def get_current_value(self):
- return get_current_size(self)
+ return self.get_current_size()
def check_results(self):
if self.process.returncode == 0:
import re
import time
import utils
+import urlparse
import subprocess
import reporters
from loggable import Loggable
def __init__(self, application_name, classname,
options, reporter, timeout=DEFAULT_TIMEOUT,
- scenario=None, hard_timeout=None):
+ scenario=None, hard_timeout=None, max_outside_segment=5):
super(GstValidateTest, self).__init__(application_name, classname, options,
reporter, timeout=timeout, hard_timeout=hard_timeout)
+ # defines how much the process can be outside of the configured
+ # segment / seek
+ self.max_outside_segment = max_outside_segment
+
if scenario is None or scenario.name.lower() == "none":
self.scenario = None
else:
self.process.returncode,
self.get_validate_criticals_errors()
))
+ def _parse_position(self, p):
+ self.log("Parsing %s" % p)
+
+ start_stop = p.replace("<position: ", '').replace("/>", "").split(" duration: ")
+
+ if len(start_stop) < 2:
+ self.warning("Got a unparsable value: %s" % p)
+ return 0, 0
+
+ if "speed:"in start_stop[1]:
+ start_stop[1] = start_stop[1].split("speed:")[0].rstrip().lstrip()
+
+ return utils.parse_gsttimeargs(start_stop[0]), utils.parse_gsttimeargs(start_stop[1])
+
+
+ def _parse_buffering(self, b):
+ return b.split("buffering... ")[1].split("%")[0], 100
+
+
+ def _get_position(self):
+ position = duration = -1
+
+ self.debug("Getting position")
+ self.reporter.out.seek(0)
+ m = None
+ for l in reversed(self.reporter.out.readlines()):
+ l = l.lower()
+ if "<position:" in l or "buffering" in l:
+ m = l
+ break
+
+ if m is None:
+ self.debug("Could not fine any positionning info")
+ return position, duration
+
+ for j in m.split("\r"):
+ j = j.lstrip().rstrip()
+ if j.startswith("<position:") and j.endswith("/>"):
+ position, duration = self._parse_position(j)
+ elif j.startswith("buffering") and j.endswith("%"):
+ position, duration = self._parse_buffering(j)
+ else:
+ self.debug("No info in %s" % j)
+
+ return position, duration
+
+ def _get_last_seek_values(self):
+ m = None
+ rate = start = stop = None
+
+ for l in reversed(self.reporter.out.readlines()):
+ l = l.lower()
+ if "seeking to: " in l:
+ m = l
+ break
+
+ if m is None:
+ self.debug("Could not fine any seeking info")
+ return start, stop, rate
+
+ tmp = m.split("seeking to: ")[1].split(" stop: ")
+ start = tmp[0]
+ stop_rate = tmp[1].split(" Rate")
+
+ return utils.parse_gsttimeargs(start), \
+ utils.parse_gsttimeargs(stop_rate[0]), float(stop_rate[1].replace(":",""))
+
+ def get_current_position(self):
+ position, duration = self._get_position()
+
+ start, stop, rate = self._get_last_seek_values()
+ if start and not (start - self.max_outside_segment * GST_SECOND < position < stop +
+ self.max_outside_segment):
+ self.set_result(Result.FAILED,
+ "Position not in expected 'segment' (with %d second tolerance)"
+ "seek.start %d < position %d < seek.stop %d is FALSE"
+ % (self.max_outside_segment,
+ start - self.max_outside_segment, position,
+ stop + self.max_outside_segment)
+ )
+
+ return position
+
+
+ def get_current_size(self):
+ position = self.get_current_position()
+
+ size = os.stat(urlparse.urlparse(self.dest_file).path).st_size
+ self.debug("Size: %s" % size)
+ return size
class TestsManager(Loggable):
##################################################
# Some utilities to parse gst-validate output #
##################################################
-
-
-def _parse_position(p):
- def parse_gsttimeargs(time):
- return int(time.split(":")[0]) * 3600 + int(time.split(":")[1]) * 60 + int(time.split(":")[2].split(".")[0]) * 60
- start_stop = p.replace("<position: ", '').replace("/>", "").split(" duration: ")
-
- if len(start_stop) < 2:
- loggable.warning("utils", "Got a unparsable value: %s" % p)
- return 0, 0
-
- if " speed: "in start_stop[1]:
- start_stop[1] = start_stop[1].split("speed: ")[0]
-
- return parse_gsttimeargs(start_stop[0]), parse_gsttimeargs(start_stop[1])
-
-
-def _parse_buffering(b):
- return b.split("buffering... ")[1].split("%")[0], 100
-
-
-def _get_position(test):
- position = duration = -1
-
- test.reporter.out.seek(0)
- m = None
- for l in reversed(test.reporter.out.readlines()):
- l = l.lower()
- if "<position:" in l or "buffering" in l:
- m = l
- break
-
- if m is None:
- loggable.debug("utils", "Could not fine any positionning info")
- return position, duration
-
- for j in m.split("\r"):
- if j.startswith("<position:") and j.endswith("/>"):
- position, duration = _parse_position(j)
- elif j.startswith("buffering") and j.endswith("%"):
- position, duration = _parse_buffering(j)
-
- return position, duration
-
-
-def get_current_position(test, max_passed_stop=0.5):
- position, duration = _get_position(test)
-
- if position > duration + max_passed_stop:
- loggable.warning("utils", "Position > duration -> Returning -1")
- return -1
-
- return position
-
-
-def get_current_size(test):
- position = get_current_position(test)
-
- if position is -1:
- return -1
-
- size = os.stat(urlparse.urlparse(test.dest_file).path).st_size
- loggable.debug("utils", "Size: %s" % size)
- return size
-
+def parse_gsttimeargs(time):
+ stime = time.split(":")
+ sns = stime[2].split(".")
+ stime[2] = sns[0]
+ stime.append(sns[1])
+ return long((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2]) * 60) * GST_SECOND + int(stime[3]))
def get_duration(media_file):
- duration = 0
- def parse_gsttimeargs(time):
- stime = time.split(":")
- sns = stime[2].split(".")
- stime[2] = sns[0]
- stime.append(sns[1])
- return (int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2]) * 60) * GST_SECOND + int(stime[3])
+ duration = 0
try:
res = subprocess.check_output([DISCOVERER_COMMAND, media_file])
except subprocess.CalledProcessError: