New validate plugin: validateflow
[platform/upstream/gstreamer.git] / validate / launcher / apps / gstvalidate.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19 import argparse
20 import os
21 import copy
22 import sys
23 import time
24 import urllib.parse
25 import shlex
26 import socket
27 import subprocess
28 import configparser
29 import json
30 from launcher.loggable import Loggable
31
32 from launcher.baseclasses import GstValidateTest, Test, \
33     ScenarioManager, NamedDic, GstValidateTestsGenerator, \
34     GstValidateMediaDescriptor, GstValidateEncodingTestInterface, \
35     GstValidateBaseTestManager, MediaDescriptor, MediaFormatCombination
36
37 from launcher.utils import path2url, url2path, DEFAULT_TIMEOUT, which, \
38     GST_SECOND, Result, Protocols, mkdir, printc, Colors, get_data_file, \
39     kill_subprocess
40
41 #
42 # Private global variables     #
43 #
44
45 # definitions of commands to use
46 parser = argparse.ArgumentParser(add_help=False)
47 parser.add_argument("--validate-tools-path", dest="validate_tools_path",
48                     default="",
49                     help="defines the paths to look for GstValidate tools.")
50 options, args = parser.parse_known_args()
51
52 GstValidateBaseTestManager.update_commands(options.validate_tools_path)
53 AUDIO_ONLY_FILE_TRANSCODING_RATIO = 5
54
55 #
56 # API to be used to create testsuites     #
57 #
58
59 """
60 Some info about protocols and how to handle them
61 """
62 GST_VALIDATE_CAPS_TO_PROTOCOL = [("application/x-hls", Protocols.HLS),
63                                  ("application/dash+xml", Protocols.DASH)]
64
65
66 class GstValidateMediaCheckTestsGenerator(GstValidateTestsGenerator):
67
68     def __init__(self, test_manager):
69         GstValidateTestsGenerator.__init__(self, "media_check", test_manager)
70
71     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
72         for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios:
73             protocol = mediainfo.media_descriptor.get_protocol()
74             timeout = DEFAULT_TIMEOUT
75
76             classname = "%s.media_check.%s" % (protocol,
77                                                os.path.basename(url2path(uri)).replace(".", "_"))
78             self.add_test(GstValidateMediaCheckTest(classname,
79                                                     self.test_manager.options,
80                                                     self.test_manager.reporter,
81                                                     mediainfo.media_descriptor,
82                                                     uri,
83                                                     mediainfo.path,
84                                                     timeout=timeout))
85
86
87 class GstValidateTranscodingTestsGenerator(GstValidateTestsGenerator):
88
89     def __init__(self, test_manager):
90         GstValidateTestsGenerator.__init__(self, "transcode", test_manager)
91
92     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
93         for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios:
94             if mediainfo.media_descriptor.is_image():
95                 continue
96
97             protocol = mediainfo.media_descriptor.get_protocol()
98             if protocol == Protocols.RTSP:
99                 continue
100
101             for comb in self.test_manager.get_encoding_formats():
102                 classname = "%s.transcode.to_%s.%s" % (mediainfo.media_descriptor.get_protocol(),
103                                                        str(comb).replace(
104                                                        ' ', '_'),
105                                                        mediainfo.media_descriptor.get_clean_name())
106                 self.add_test(GstValidateTranscodingTest(classname,
107                                                          self.test_manager.options,
108                                                          self.test_manager.reporter,
109                                                          comb,
110                                                          uri,
111                                                          mediainfo.media_descriptor))
112
113
114 class FakeMediaDescriptor(MediaDescriptor):
115
116     def __init__(self, infos, pipeline_desc):
117         MediaDescriptor.__init__(self)
118         self._infos = infos
119         self._pipeline_desc = pipeline_desc
120
121     def get_path(self):
122         return self._infos.get('path', None)
123
124     def get_media_filepath(self):
125         return self._infos.get('media-filepath', None)
126
127     def get_caps(self):
128         return self._infos.get('caps', None)
129
130     def get_uri(self):
131         return self._infos.get('uri', None)
132
133     def get_duration(self):
134         return int(self._infos.get('duration', 0)) * GST_SECOND
135
136     def get_protocol(self):
137         return self._infos.get('protocol', "launch_pipeline")
138
139     def is_seekable(self):
140         return self._infos.get('is-seekable', True)
141
142     def is_image(self):
143         return self._infos.get('is-image', False)
144
145     def is_live(self):
146         return self._infos.get('is-live', False)
147
148     def get_num_tracks(self, track_type):
149         return self._infos.get('num-%s-tracks' % track_type,
150                                self._pipeline_desc.count(track_type + "sink"))
151
152     def can_play_reverse(self):
153         return self._infos.get('plays-reverse', False)
154
155
156 class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator):
157
158     def __init__(self, name, test_manager, pipeline_template=None,
159                  pipelines_descriptions=None, valid_scenarios=None):
160         """
161         @name: The name of the generator
162         @pipeline_template: A template pipeline to be used to generate actual pipelines
163         @pipelines_descriptions: A list of tuple of the form:
164                                  (test_name, pipeline_description, extra_data)
165                                  extra_data being a dictionnary with the follwing keys:
166                                     'scenarios': ["the", "valide", "scenarios", "names"]
167                                     'duration': the_duration # in seconds
168                                     'timeout': a_timeout # in seconds
169                                     'hard_timeout': a_hard_timeout # in seconds
170
171         @valid_scenarios: A list of scenario name that can be used with that generator
172         """
173         valid_scenarios = valid_scenarios or []
174         GstValidateTestsGenerator.__init__(self, name, test_manager)
175         self._pipeline_template = pipeline_template
176         self._pipelines_descriptions = []
177         for description in pipelines_descriptions or []:
178             if not isinstance(description, dict):
179                 desc_dict = {"name": description[0],
180                      "pipeline": description[1]}
181                 if len(description) >= 3:
182                     desc_dict["extra_data"] = description[2]
183                 self._pipelines_descriptions.append(desc_dict)
184             else:
185                 self._pipelines_descriptions.append(description)
186         self._valid_scenarios = valid_scenarios
187
188     @classmethod
189     def from_json(cls, test_manager, json_file, extra_data=None):
190         """
191         :param json_file: Path to a JSON file containing pipeline tests.
192         :param extra_data: Variables available for interpolation in validate
193         configs and scenario actions.
194         """
195         if extra_data is None:
196             extra_data = {}
197         with open(json_file, 'r') as f:
198             descriptions = json.load(f)
199
200         name = os.path.basename(json_file).replace('.json', '')
201         pipelines_descriptions = []
202         for test_name, defs in descriptions.items():
203             tests_definition = {'name': test_name, 'pipeline': defs['pipeline']}
204             test_private_dir = os.path.join(test_manager.options.privatedir,
205                                             name, test_name)
206
207             config_file = None
208             if 'config' in defs:
209                 os.makedirs(test_private_dir, exist_ok=True)
210                 config_file = os.path.join(test_private_dir,
211                                            test_name + '.config')
212                 with open(config_file, 'w') as f:
213                     f.write(cls._format_config_template(extra_data,
214                             '\n'.join(defs['config']) + '\n', test_name))
215
216             scenarios = []
217             for scenario in defs.get('scenarios', []):
218                 if isinstance(scenario, str):
219                     # Path to a scenario file
220                     scenarios.append(scenario)
221                 else:
222                     # Dictionary defining a new scenario in-line
223                     scenario_name = scenario_file = scenario['name']
224                     actions = scenario.get('actions')
225                     if actions:
226                         os.makedirs(test_private_dir, exist_ok=True)
227                         scenario_file = os.path.join(
228                             test_private_dir, scenario_name + '.scenario')
229                         with open(scenario_file, 'w') as f:
230                             f.write('\n'.join(action % extra_data for action in actions) + '\n')
231                     scenarios.append(scenario_file)
232             tests_definition['extra_data'] = {'scenarios': scenarios, 'config_file': config_file}
233             tests_definition['pipeline_data'] = {"config_path": os.path.dirname(json_file)}
234             pipelines_descriptions.append(tests_definition)
235
236         return GstValidatePipelineTestsGenerator(name, test_manager, pipelines_descriptions=pipelines_descriptions)
237
238     @classmethod
239     def _format_config_template(cls, extra_data, config_text, test_name):
240         # Variables available for interpolation inside config blocks.
241
242         extra_vars = extra_data.copy()
243
244         if 'validate-flow-expectations-dir' in extra_vars and \
245                 'validate-flow-actual-results-dir' in extra_vars:
246             expectations_dir = os.path.join(extra_vars['validate-flow-expectations-dir'],
247                                             test_name.replace('.', os.sep))
248             actual_results_dir = os.path.join(extra_vars['validate-flow-actual-results-dir'],
249                                               test_name.replace('.', os.sep))
250             extra_vars['validateflow'] = "validateflow, expectations-dir=\"%s\", actual-results-dir=\"%s\"" % (expectations_dir, actual_results_dir)
251
252         return config_text % extra_vars
253
254     def get_fname(self, scenario, protocol=None, name=None):
255         if name is None:
256             name = self.name
257
258         if protocol is not None:
259             protocol_str = "%s." % protocol
260         else:
261             protocol_str = ""
262
263         if scenario is not None and scenario.name.lower() != "none":
264             return "%s%s.%s" % (protocol_str, name, scenario.name)
265
266         return ("%s.%s.%s" % (protocol_str, self.name, name)).replace("..", ".")
267
268     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
269         if self._valid_scenarios is None:
270             scenarios = [None]
271         elif self._valid_scenarios:
272             scenarios = [scenario for scenario in scenarios if
273                          scenario is not None and scenario.name in self._valid_scenarios]
274
275         return super(GstValidatePipelineTestsGenerator, self).generate_tests(
276             uri_minfo_special_scenarios, scenarios)
277
278     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
279         for description in self._pipelines_descriptions:
280             pipeline = description['pipeline']
281             extra_data = description.get('extra_data', {})
282             pipeline_data = description.get('pipeline_data', {})
283
284             if 'scenarios' in extra_data:
285                 # A pipeline description can override the default scenario set.
286                 # The pipeline description may specify an empty list of
287                 # scenarios, in which case one test will be generated with no
288                 # scenario.
289                 scenarios_to_iterate = extra_data['scenarios'] or [None]
290             else:
291                 scenarios_to_iterate = scenarios
292
293             for scenario in scenarios_to_iterate:
294                 if isinstance(scenario, str):
295                     scenario = self.test_manager.scenarios_manager.get_scenario(
296                         scenario)
297
298                 mediainfo = FakeMediaDescriptor(extra_data, pipeline)
299                 if not mediainfo.is_compatible(scenario):
300                     continue
301
302                 if self.test_manager.options.mute:
303                     needs_clock = scenario.needs_clock_sync() \
304                         if scenario else False
305                     audiosink = self.get_fakesink_for_media_type(
306                         "audio", needs_clock)
307                     videosink = self.get_fakesink_for_media_type(
308                         "video", needs_clock)
309                 else:
310                     audiosink = 'autoaudiosink'
311                     videosink = 'autovideosink'
312
313                 pipeline_data.update({'videosink': videosink, 'audiosink': audiosink})
314                 pipeline_desc = pipeline % pipeline_data
315
316                 fname = self.get_fname(
317                     scenario, protocol=mediainfo.get_protocol(), name=description["name"])
318
319                 expected_failures = extra_data.get("expected-failures")
320                 extra_env_vars = extra_data.get("extra_env_vars")
321                 test = GstValidateLaunchTest(fname,
322                                              self.test_manager.options,
323                                              self.test_manager.reporter,
324                                              pipeline_desc,
325                                              scenario=scenario,
326                                              media_descriptor=mediainfo,
327                                              expected_failures=expected_failures,
328                                              extra_env_variables=extra_env_vars)
329                 if extra_data.get('config_file'):
330                     test.add_validate_config(extra_data['config_file'])
331                 self.add_test(test)
332
333
334 class GstValidatePlaybinTestsGenerator(GstValidatePipelineTestsGenerator):
335
336     def __init__(self, test_manager):
337         if os.getenv("USE_PLAYBIN3") is None:
338             GstValidatePipelineTestsGenerator.__init__(
339                 self, "playback", test_manager, "playbin")
340         else:
341             GstValidatePipelineTestsGenerator.__init__(
342                 self, "playback", test_manager, "playbin3")
343
344     def _set_sinks(self, minfo, pipe_str, scenario):
345         if self.test_manager.options.mute:
346             needs_clock = scenario.needs_clock_sync() or minfo.media_descriptor.need_clock_sync()
347
348             afakesink = self.get_fakesink_for_media_type("audio", needs_clock)
349             vfakesink = self.get_fakesink_for_media_type("video", needs_clock)
350             pipe_str += " audio-sink='%s' video-sink='%s'" % (
351                 afakesink, vfakesink)
352
353         return pipe_str
354
355     def _get_name(self, scenario, protocol, minfo):
356         return "%s.%s" % (self.get_fname(scenario,
357                                          protocol),
358                           os.path.basename(minfo.media_descriptor.get_clean_name()))
359
360     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
361         test_rtsp = GstValidateBaseTestManager.RTSP_SERVER_COMMAND
362         if not test_rtsp:
363             printc("\n\nRTSP server not available, you should make sure"
364                    " that %s is available in your $PATH." % GstValidateBaseTestManager.RTSP_SERVER_COMMAND,
365                    Colors.FAIL)
366         elif self.test_manager.options.disable_rtsp:
367             printc("\n\nRTSP tests are disabled")
368             test_rtsp = False
369
370         for uri, minfo, special_scenarios in uri_minfo_special_scenarios:
371             pipe = self._pipeline_template
372             protocol = minfo.media_descriptor.get_protocol()
373
374             if protocol == Protocols.RTSP:
375                 self.debug("SKIPPING %s as it is a RTSP stream" % uri)
376                 continue
377
378             pipe += " uri=%s" % uri
379
380             for scenario in special_scenarios + scenarios:
381                 cpipe = pipe
382                 if not minfo.media_descriptor.is_compatible(scenario):
383                     continue
384
385                 cpipe = self._set_sinks(minfo, cpipe, scenario)
386                 fname = self._get_name(scenario, protocol, minfo)
387
388                 self.debug("Adding: %s", fname)
389
390                 if scenario.does_reverse_playback() and protocol == Protocols.HTTP:
391                     # 10MB so we can reverse playback
392                     cpipe += " ring-buffer-max-size=10485760"
393
394                 self.add_test(GstValidateLaunchTest(fname,
395                                                     self.test_manager.options,
396                                                     self.test_manager.reporter,
397                                                     cpipe,
398                                                     scenario=scenario,
399                                                     media_descriptor=minfo.media_descriptor)
400                               )
401
402                 if test_rtsp and protocol == Protocols.FILE and not minfo.media_descriptor.is_image():
403                     rtspminfo = NamedDic({"path": minfo.media_descriptor.get_path(),
404                                           "media_descriptor": GstValidateRTSPMediaDesciptor(minfo.media_descriptor.get_path())})
405                     if not rtspminfo.media_descriptor.is_compatible(scenario):
406                         continue
407
408                     cpipe = self._set_sinks(rtspminfo, "%s uri=rtsp://127.0.0.1:<RTSPPORTNUMBER>/test"
409                                             % self._pipeline_template, scenario)
410                     fname = self._get_name(scenario, Protocols.RTSP, rtspminfo)
411
412                     self.add_test(GstValidateRTSPTest(
413                         fname, self.test_manager.options, self.test_manager.reporter,
414                         cpipe, uri, scenario=scenario,
415                         media_descriptor=rtspminfo.media_descriptor))
416
417                     fname = self._get_name(scenario, Protocols.RTSP + '2', rtspminfo)
418                     self.add_test(GstValidateRTSPTest(
419                         fname, self.test_manager.options, self.test_manager.reporter,
420                         cpipe, uri, scenario=scenario,
421                         media_descriptor=rtspminfo.media_descriptor,
422                         rtsp2=True))
423
424
425 class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator):
426
427     def __init__(self, name, test_manager, mixer, media_type, converter="",
428                  num_sources=3, mixed_srcs=None, valid_scenarios=None):
429         mixed_srcs = mixed_srcs or {}
430         valid_scenarios = valid_scenarios or []
431
432         pipe_template = "%(mixer)s name=_mixer !  " + \
433             converter + " ! %(sink)s "
434         self.converter = converter
435         self.mixer = mixer
436         self.media_type = media_type
437         self.num_sources = num_sources
438         self.mixed_srcs = mixed_srcs
439         super(
440             GstValidateMixerTestsGenerator, self).__init__(name, test_manager, pipe_template,
441                                                            valid_scenarios=valid_scenarios)
442
443     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
444         if self.test_manager.options.validate_uris:
445             return
446
447         wanted_ressources = []
448         for uri, minfo, special_scenarios in uri_minfo_special_scenarios:
449             protocol = minfo.media_descriptor.get_protocol()
450             if protocol == Protocols.FILE and \
451                     minfo.media_descriptor.get_num_tracks(self.media_type) > 0:
452                 wanted_ressources.append((uri, minfo))
453
454         if not self.mixed_srcs:
455             if not wanted_ressources:
456                 return
457
458             for i in range(len(uri_minfo_special_scenarios) / self.num_sources):
459                 srcs = []
460                 name = ""
461                 for nsource in range(self.num_sources):
462                     uri, minfo = wanted_ressources[i + nsource]
463                     if os.getenv("USE_PLAYBIN3") is None:
464                         srcs.append(
465                             "uridecodebin uri=%s ! %s" % (uri, self.converter))
466                     else:
467                         srcs.append(
468                             "uridecodebin3 uri=%s ! %s" % (uri, self.converter))
469                     fname = os.path.basename(uri).replace(".", "_")
470                     if not name:
471                         name = fname
472                     else:
473                         name += "+%s" % fname
474
475                 self.mixed_srcs[name] = tuple(srcs)
476
477         for name, srcs in self.mixed_srcs.items():
478             if isinstance(srcs, dict):
479                 pipe_arguments = {
480                     "mixer": self.mixer + " %s" % srcs["mixer_props"]}
481                 srcs = srcs["sources"]
482             else:
483                 pipe_arguments = {"mixer": self.mixer}
484
485             for scenario in scenarios:
486                 fname = self.get_fname(scenario, Protocols.FILE) + "."
487                 fname += name
488
489                 self.debug("Adding: %s", fname)
490
491                 if self.test_manager.options.mute:
492                     pipe_arguments["sink"] = self.get_fakesink_for_media_type(self.media_type,
493                                                                               scenario.needs_clock_sync())
494                 else:
495                     pipe_arguments["sink"] = "auto%ssink" % self.media_type
496
497                 pipe = self._pipeline_template % pipe_arguments
498
499                 for src in srcs:
500                     pipe += "%s ! _mixer. " % src
501
502                 self.add_test(GstValidateLaunchTest(fname,
503                                                     self.test_manager.options,
504                                                     self.test_manager.reporter,
505                                                     pipe,
506                                                     scenario=scenario)
507                               )
508
509
510 class GstValidateLaunchTest(GstValidateTest):
511
512     def __init__(self, classname, options, reporter, pipeline_desc,
513                  timeout=DEFAULT_TIMEOUT, scenario=None,
514                  media_descriptor=None, duration=0, hard_timeout=None,
515                  extra_env_variables=None, expected_failures=None):
516
517         extra_env_variables = extra_env_variables or {}
518
519         if scenario:
520             duration = scenario.get_duration()
521         elif media_descriptor:
522             duration = media_descriptor.get_duration() / GST_SECOND
523
524         super(
525             GstValidateLaunchTest, self).__init__(GstValidateBaseTestManager.COMMAND,
526                                                   classname,
527                                                   options, reporter,
528                                                   duration=duration,
529                                                   scenario=scenario,
530                                                   timeout=timeout,
531                                                   hard_timeout=hard_timeout,
532                                                   media_descriptor=media_descriptor,
533                                                   extra_env_variables=extra_env_variables,
534                                                   expected_failures=expected_failures)
535
536         self.pipeline_desc = pipeline_desc
537         self.media_descriptor = media_descriptor
538
539     def build_arguments(self):
540         GstValidateTest.build_arguments(self)
541         self.add_arguments(*shlex.split(self.pipeline_desc))
542         if self.media_descriptor is not None and self.media_descriptor.get_path():
543             self.add_arguments(
544                 "--set-media-info", self.media_descriptor.get_path())
545
546
547 class GstValidateMediaCheckTest(GstValidateTest):
548
549     def __init__(self, classname, options, reporter, media_descriptor,
550                  uri, minfo_path, timeout=DEFAULT_TIMEOUT,
551                  extra_env_variables=None,
552                  expected_failures=None):
553         extra_env_variables = extra_env_variables or {}
554
555         super(
556             GstValidateMediaCheckTest, self).__init__(GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, classname,
557                                                       options, reporter,
558                                                       timeout=timeout,
559                                                       media_descriptor=media_descriptor,
560                                                       extra_env_variables=extra_env_variables,
561                                                       expected_failures=expected_failures)
562         self._uri = uri
563         self._media_info_path = minfo_path
564
565     def build_arguments(self):
566         Test.build_arguments(self)
567         self.add_arguments(self._uri, "--expected-results",
568                            self._media_info_path)
569
570         if self.media_descriptor.skip_parsers():
571             self.add_arguments("--skip-parsers")
572
573
574 class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterface):
575     scenarios_manager = ScenarioManager()
576
577     def __init__(self, classname, options, reporter,
578                  combination, uri, media_descriptor,
579                  timeout=DEFAULT_TIMEOUT,
580                  scenario=None,
581                  extra_env_variables=None,
582                  expected_failures=None):
583         Loggable.__init__(self)
584
585         extra_env_variables = extra_env_variables or {}
586
587         file_dur = int(media_descriptor.get_duration()) / GST_SECOND
588         if not media_descriptor.get_num_tracks("video"):
589             self.debug("%s audio only file applying transcoding ratio."
590                        "File 'duration' : %s" % (classname, file_dur))
591             duration = file_dur / AUDIO_ONLY_FILE_TRANSCODING_RATIO
592         else:
593             duration = file_dur
594
595         super(
596             GstValidateTranscodingTest, self).__init__(GstValidateBaseTestManager.TRANSCODING_COMMAND,
597                                                        classname,
598                                                        options,
599                                                        reporter,
600                                                        duration=duration,
601                                                        timeout=timeout,
602                                                        scenario=scenario,
603                                                        media_descriptor=media_descriptor,
604                                                        extra_env_variables=None,
605                                                        expected_failures=expected_failures)
606         extra_env_variables = extra_env_variables or {}
607
608         GstValidateEncodingTestInterface.__init__(
609             self, combination, media_descriptor)
610
611         self.uri = uri
612
613     def run_external_checks(self):
614         if self.media_descriptor.get_num_tracks("video") == 1 and \
615                 self.options.validate_enable_iqa_tests:
616             self.run_iqa_test(self.uri)
617
618     def set_rendering_info(self):
619         self.dest_file = os.path.join(self.options.dest,
620                                       self.classname.replace(".transcode.", os.sep).
621                                       replace(".", os.sep))
622         mkdir(os.path.dirname(urllib.parse.urlsplit(self.dest_file).path))
623         if urllib.parse.urlparse(self.dest_file).scheme == "":
624             self.dest_file = path2url(self.dest_file)
625
626         profile = self.get_profile()
627         self.add_arguments("-o", profile)
628
629     def build_arguments(self):
630         GstValidateTest.build_arguments(self)
631         self.set_rendering_info()
632         self.add_arguments(self.uri, self.dest_file)
633
634     def get_current_value(self):
635         if self.scenario:
636             sent_eos = self.sent_eos_position()
637             if sent_eos is not None:
638                 t = time.time()
639                 if ((t - sent_eos)) > 30:
640                     if self.media_descriptor.get_protocol() == Protocols.HLS:
641                         self.set_result(Result.PASSED,
642                                         """Got no EOS 30 seconds after sending EOS,
643                                         in HLS known and tolerated issue:
644                                         https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
645                         return Result.KNOWN_ERROR
646
647                     self.set_result(
648                         Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
649
650                     return Result.FAILED
651
652         size = self.get_current_size()
653         if size is None:
654             return self.get_current_position()
655
656         return size
657
658     def check_results(self):
659         if self.result in [Result.FAILED, Result.TIMEOUT] or \
660                 self.process.returncode != 0:
661             GstValidateTest.check_results(self)
662             return
663
664         res, msg = self.check_encoded_file()
665         self.set_result(res, msg)
666
667
668 class GstValidateBaseRTSPTest:
669     """ Interface for RTSP tests, requires implementing Test"""
670     __used_ports = set()
671
672     def __init__(self, local_uri):
673         self._local_uri = local_uri
674         self.rtsp_server = None
675         self._unsetport_pipeline_desc = None
676         self.optional = True
677
678     @classmethod
679     def __get_open_port(cls):
680         while True:
681             # hackish trick from
682             # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python?answertab=votes#tab-top
683             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
684             s.bind(("", 0))
685             port = s.getsockname()[1]
686             if port not in cls.__used_ports:
687                 cls.__used_ports.add(port)
688                 s.close()
689                 return port
690
691             s.close()
692
693     def launch_server(self):
694         if self.options.redirect_logs == 'stdout':
695             self.rtspserver_logs = sys.stdout
696         elif self.options.redirect_logs == 'stderr':
697             self.rtspserver_logs = sys.stderr
698
699         self.server_port = self.__get_open_port()
700         command = [GstValidateBaseTestManager.RTSP_SERVER_COMMAND, self._local_uri, '--port', str(self.server_port)]
701
702         if self.options.validate_gdb_server:
703             command = self.use_gdb(command)
704             self.rtspserver_logs = sys.stdout
705         elif self.options.redirect_logs:
706             self.rtspserver_logs = sys.stdout
707         else:
708             self.rtspserver_logs = open(self.logfile + '_rtspserver.log', 'w+')
709             self.extra_logfiles.append(self.rtspserver_logs.name)
710
711         server_env = os.environ.copy()
712
713         self.rtsp_server = subprocess.Popen(command,
714                                             stderr=self.rtspserver_logs,
715                                             stdout=self.rtspserver_logs,
716                                             env=server_env)
717         while True:
718             s = socket.socket()
719             try:
720                 s.connect((("127.0.0.1", self.server_port)))
721                 break
722             except ConnectionRefusedError:
723                 time.sleep(0.1)
724                 continue
725             finally:
726                 s.close()
727
728         if not self._unsetport_pipeline_desc:
729             self._unsetport_pipeline_desc = self.pipeline_desc
730
731         self.pipeline_desc = self._unsetport_pipeline_desc.replace(
732             "<RTSPPORTNUMBER>", str(self.server_port))
733
734         return ' '.join(command)
735
736     def close_logfile(self):
737         super().close_logfile()
738         if not self.options.redirect_logs:
739             self.rtspserver_logs.close()
740
741     def process_update(self):
742         res = super().process_update()
743         if res:
744             kill_subprocess(self, self.rtsp_server, DEFAULT_TIMEOUT)
745             self.__used_ports.remove(self.server_port)
746
747         return res
748
749
750 class GstValidateRTSPTest(GstValidateBaseRTSPTest, GstValidateLaunchTest):
751
752     def __init__(self, classname, options, reporter, pipeline_desc,
753                  local_uri, timeout=DEFAULT_TIMEOUT, scenario=None,
754                  media_descriptor=None, rtsp2=False):
755         GstValidateLaunchTest.__init__(self, classname, options, reporter,
756                                        pipeline_desc, timeout, scenario,
757                                        media_descriptor)
758         GstValidateBaseRTSPTest.__init__(self, local_uri)
759         self.rtsp2 = rtsp2
760
761     def get_subproc_env(self):
762         env = super().get_subproc_env()
763         path = env.get('GST_VALIDATE_SCENARIOS_PATH', '')
764         override_dir = get_data_file(os.path.join('data', 'scenarios'), 'rtsp_overrides')
765         env['GST_VALIDATE_SCENARIOS_PATH'] = '%s:%s' % (override_dir, path)
766         if self.rtsp2:
767             env['GST_VALIDATE_SCENARIO'] = env.get('GST_VALIDATE_SCENARIO', '') + ':' + 'force_rtsp2'
768
769         return env
770
771
772 class GstValidateRTSPMediaDesciptor(GstValidateMediaDescriptor):
773
774     def __init__(self, xml_path):
775         GstValidateMediaDescriptor.__init__(self, xml_path)
776
777     def get_uri(self):
778         return "rtsp://127.0.0.1:8554/test"
779
780     def get_protocol(self):
781         return Protocols.RTSP
782
783     def prerrols(self):
784         return False
785
786
787 class GstValidateTestManager(GstValidateBaseTestManager):
788
789     name = "validate"
790
791     # List of all classes to create testsuites
792     GstValidateMediaCheckTestsGenerator = GstValidateMediaCheckTestsGenerator
793     GstValidateTranscodingTestsGenerator = GstValidateTranscodingTestsGenerator
794     GstValidatePipelineTestsGenerator = GstValidatePipelineTestsGenerator
795     GstValidatePlaybinTestsGenerator = GstValidatePlaybinTestsGenerator
796     GstValidateMixerTestsGenerator = GstValidateMixerTestsGenerator
797     GstValidateLaunchTest = GstValidateLaunchTest
798     GstValidateMediaCheckTest = GstValidateMediaCheckTest
799     GstValidateTranscodingTest = GstValidateTranscodingTest
800
801     def __init__(self):
802         super(GstValidateTestManager, self).__init__()
803         self._uris = []
804         self._run_defaults = True
805         self._is_populated = False
806         self._default_generators_registered = False
807
808     def init(self):
809         for command, name in [
810                 (GstValidateBaseTestManager.TRANSCODING_COMMAND, "gst-validate-transcoding-1.0"),
811                 (GstValidateBaseTestManager.COMMAND, "gst-validate-1.0"),
812                 (GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, "gst-validate-media-check-1.0")]:
813             if not command:
814                 self.error("command not found: %s" % name)
815                 return False
816
817         return True
818
819     def add_options(self, parser):
820         group = parser.add_argument_group("GstValidate tools specific options"
821                                           " and behaviours",
822                                           description="""When using --wanted-tests, all the scenarios can be used, even those which have
823 not been tested and explicitely activated if you set use --wanted-tests ALL""")
824         group.add_argument("--validate-check-uri", dest="validate_uris",
825                            action="append", help="defines the uris to run default tests on")
826         group.add_argument("--validate-tools-path", dest="validate_tools_path",
827                            action="append", help="defines the paths to look for GstValidate tools.")
828         group.add_argument("--validate-gdb-server", dest="validate_gdb_server",
829                            help="Run the server in GDB.")
830         group.add_argument("--validate-disable-rtsp", dest="disable_rtsp",
831                            help="Disable RTSP tests.")
832         group.add_argument("--validate-enable-iqa-tests", dest="validate_enable_iqa_tests",
833                            help="Enable Image Quality Assessment validation tests.",
834                            default=False, action='store_true')
835
836     def print_valgrind_bugs(self):
837         # Look for all the 'pending' bugs in our supp file
838         bugs = []
839         p = get_data_file('data', 'gstvalidate.supp')
840         with open(p) as f:
841             for line in f.readlines():
842                 line = line.strip()
843                 if line.startswith('# PENDING:'):
844                     tmp = line.split(' ')
845                     bugs.append(tmp[2])
846
847         if bugs:
848             msg = "Ignored valgrind bugs:\n"
849             for b in bugs:
850                 msg += "  + %s\n" % b
851             printc(msg, Colors.FAIL, True)
852
853     def populate_testsuite(self):
854
855         if self._is_populated is True:
856             return
857
858         if not self.options.config and not self.options.testsuites:
859             if self._run_defaults:
860                 self.register_defaults()
861             else:
862                 self.register_all()
863
864         self._is_populated = True
865
866     def list_tests(self):
867         if self.tests:
868             return self.tests
869
870         if self._run_defaults:
871             scenarios = [self.scenarios_manager.get_scenario(scenario_name)
872                          for scenario_name in self.get_scenarios()]
873         else:
874             scenarios = self.scenarios_manager.get_scenario(None)
875         uris = self._list_uris()
876
877         for generator in self.get_generators():
878             for test in generator.generate_tests(uris, scenarios):
879                 self.add_test(test)
880
881         if not self.tests and not uris and not self.options.wanted_tests:
882             printc(
883                 "No valid uris present in the path. Check if media files and info files exist", Colors.FAIL)
884
885         return self.tests
886
887     def _add_media(self, media_info, uri=None):
888         self.debug("Checking %s", media_info)
889         if isinstance(media_info, GstValidateMediaDescriptor):
890             media_descriptor = media_info
891             media_info = media_descriptor.get_path()
892         else:
893             media_descriptor = GstValidateMediaDescriptor(media_info)
894
895         try:
896             # Just testing that the vairous mandatory infos are present
897             caps = media_descriptor.get_caps()
898             if uri is None:
899                 uri = media_descriptor.get_uri()
900
901             # Adjust local http uri
902             if self.options.http_server_port != 8079 and \
903                uri.startswith("http://127.0.0.1:8079/"):
904                 uri = uri.replace("http://127.0.0.1:8079/",
905                                   "http://127.0.0.1:%r/" % self.options.http_server_port, 1)
906             media_descriptor.set_protocol(urllib.parse.urlparse(uri).scheme)
907             for caps2, prot in GST_VALIDATE_CAPS_TO_PROTOCOL:
908                 if caps2 == caps:
909                     media_descriptor.set_protocol(prot)
910                     break
911
912             scenario_bname = media_descriptor.get_media_filepath()
913             special_scenarios = self.scenarios_manager.find_special_scenarios(
914                 scenario_bname)
915             self._uris.append((uri,
916                                NamedDic({"path": media_info,
917                                          "media_descriptor": media_descriptor}),
918                                special_scenarios))
919         except configparser.NoOptionError as e:
920             self.debug("Exception: %s for %s", e, media_info)
921
922     def _discover_file(self, uri, fpath):
923         for ext in (GstValidateMediaDescriptor.MEDIA_INFO_EXT,
924                 GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
925             try:
926                 is_push = False
927                 media_info = "%s.%s" % (fpath, ext)
928                 if ext == GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT:
929                     if not os.path.exists(media_info):
930                         continue
931                     is_push = True
932                     uri = "push" + uri
933                 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
934
935                 args.append(uri)
936                 if os.path.isfile(media_info) and not self.options.update_media_info:
937                     self._add_media(media_info, uri)
938                     continue
939                 elif fpath.endswith(GstValidateMediaDescriptor.STREAM_INFO_EXT):
940                     self._add_media(fpath)
941                     continue
942                 elif not self.options.generate_info and not self.options.update_media_info and not self.options.validate_uris:
943                     continue
944                 elif self.options.update_media_info and not os.path.isfile(media_info):
945                     self.info(
946                         "%s not present. Use --generate-media-info", media_info)
947                     continue
948                 elif os.path.islink(media_info):
949                     self.info(
950                         "%s is a symlink, not updating and hopefully the actual file gets updated!", media_info)
951                     continue
952
953                 include_frames = 0
954                 if self.options.update_media_info:
955                     include_frames = 2
956                 elif self.options.generate_info_full:
957                     include_frames = 1
958
959                 media_descriptor = GstValidateMediaDescriptor.new_from_uri(
960                     uri, True, include_frames, is_push)
961                 if media_descriptor:
962                     self._add_media(media_descriptor, uri)
963                 else:
964                     self.warning("Could not get any descriptor for %s" % uri)
965
966             except subprocess.CalledProcessError as e:
967                 if self.options.generate_info:
968                     printc("Result: Failed", Colors.FAIL)
969                 else:
970                     self.error("Exception: %s", e)
971                 return False
972         return True
973
974     def _list_uris(self):
975         if self._uris:
976             return self._uris
977
978         if self.options.validate_uris:
979             for uri in self.options.validate_uris:
980                 self._discover_file(uri, uri)
981             return self._uris
982
983         if not self.args:
984             if isinstance(self.options.paths, str):
985                 self.options.paths = [os.path.join(self.options.paths)]
986
987             for path in self.options.paths:
988                 if os.path.isfile(path):
989                     path = os.path.abspath(path)
990                     self._discover_file(path2url(path), path)
991                 else:
992                     for root, dirs, files in os.walk(path):
993                         for f in files:
994                             fpath = os.path.abspath(os.path.join(root, f))
995                             if os.path.isdir(fpath) or \
996                                     fpath.endswith(GstValidateMediaDescriptor.MEDIA_INFO_EXT) or\
997                                     fpath.endswith(ScenarioManager.FILE_EXTENSION):
998                                 continue
999                             else:
1000                                 self._discover_file(path2url(fpath), fpath)
1001
1002         self.debug("Uris found: %s", self._uris)
1003
1004         return self._uris
1005
1006     def needs_http_server(self):
1007         for test in self.list_tests():
1008             if self._is_test_wanted(test) and test.media_descriptor is not None:
1009                 protocol = test.media_descriptor.get_protocol()
1010                 uri = test.media_descriptor.get_uri()
1011
1012                 if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] and \
1013                         ("127.0.0.1:%s" % (
1014                             self.options.http_server_port) in uri or "127.0.0.1:8079" in uri):
1015                     return True
1016         return False
1017
1018     def set_settings(self, options, args, reporter):
1019         if options.wanted_tests:
1020             for i in range(len(options.wanted_tests)):
1021                 if "ALL" in options.wanted_tests[i]:
1022                     self._run_defaults = False
1023                     options.wanted_tests[
1024                         i] = options.wanted_tests[i].replace("ALL", "")
1025         try:
1026             options.wanted_tests.remove("")
1027         except ValueError:
1028             pass
1029
1030         if options.validate_uris:
1031             self.check_testslist = False
1032
1033         super(GstValidateTestManager, self).set_settings(
1034             options, args, reporter)
1035
1036     def register_defaults(self):
1037         """
1038         Registers the defaults:
1039             * Scenarios to be used
1040             * Encoding formats to be used
1041             * Blacklisted tests
1042             * Test generators
1043         """
1044         self.register_default_scenarios()
1045         self.register_default_encoding_formats()
1046         self.register_default_blacklist()
1047         self.register_default_test_generators()
1048
1049     def register_default_scenarios(self):
1050         """
1051         Registers default test scenarios
1052         """
1053         if self.options.long_limit != 0:
1054             self.add_scenarios([
1055                 "play_15s",
1056                 "reverse_playback",
1057                 "fast_forward",
1058                 "seek_forward",
1059                 "seek_backward",
1060                 "seek_with_stop",
1061                 "switch_audio_track",
1062                 "switch_audio_track_while_paused",
1063                 "switch_subtitle_track",
1064                 "switch_subtitle_track_while_paused",
1065                 "disable_subtitle_track_while_paused",
1066                 "change_state_intensive",
1067                 "scrub_forward_seeking"])
1068         else:
1069             self.add_scenarios([
1070                 "play_15s",
1071                 "reverse_playback",
1072                 "fast_forward",
1073                 "seek_forward",
1074                 "seek_backward",
1075                 "seek_with_stop",
1076                 "switch_audio_track",
1077                 "switch_audio_track_while_paused",
1078                 "switch_subtitle_track",
1079                 "switch_subtitle_track_while_paused",
1080                 "disable_subtitle_track_while_paused",
1081                 "change_state_intensive",
1082                 "scrub_forward_seeking"])
1083
1084     def register_default_encoding_formats(self):
1085         """
1086         Registers default encoding formats
1087         """
1088         self.add_encoding_formats([
1089             MediaFormatCombination("ogg", "vorbis", "theora"),
1090             MediaFormatCombination("webm", "vorbis", "vp8"),
1091             MediaFormatCombination("mp4", "mp3", "h264"),
1092             MediaFormatCombination("mkv", "vorbis", "h264"),
1093         ])
1094
1095     def register_default_blacklist(self):
1096         self.set_default_blacklist([
1097             # hls known issues
1098             # ("hls.playback.seek_with_stop.*",
1099             #  "https://bugzilla.gnome.org/show_bug.cgi?id=753689"),
1100
1101             # testbin known issues
1102             ("testbin.media_check.*",
1103              "Not supported by GstDiscoverer."),
1104
1105             # dash known issues
1106             ("dash.media_check.*",
1107              "Caps are different depending on selected bitrates, etc"),
1108
1109             # Matroska/WEBM known issues:
1110             ("*.reverse_playback.*webm$",
1111              "https://bugzilla.gnome.org/show_bug.cgi?id=679250"),
1112             ("*.reverse_playback.*mkv$",
1113              "https://bugzilla.gnome.org/show_bug.cgi?id=679250"),
1114             ("http.playback.seek_with_stop.*webm",
1115              "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"),
1116             ("http.playback.seek_with_stop.*mkv",
1117              "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"),
1118
1119             # MPEG TS known issues:
1120             ('(?i)*playback.reverse_playback.*(?:_|.)(?:|m)ts$',
1121              "https://bugzilla.gnome.org/show_bug.cgi?id=702595"),
1122
1123             # Fragmented MP4 disabled tests:
1124             ('*.playback..*seek.*.fragmented_nonseekable_sink_mp4',
1125              "Seeking on fragmented files without indexes isn't implemented"),
1126             ('*.playback.reverse_playback.fragmented_nonseekable_sink_mp4',
1127              "Seeking on fragmented files without indexes isn't implemented"),
1128
1129             # HTTP known issues:
1130             ("http.*scrub_forward_seeking.*",
1131              "This is not stable enough for now."),
1132             ("http.playback.change_state_intensive.raw_video_mov",
1133              "This is not stable enough for now. (flow return from pad push doesn't match expected value)"),
1134
1135             # MXF known issues"
1136             ("*reverse_playback.*mxf",
1137              "Reverse playback is not handled in MXF"),
1138             ("file\.transcode.*mxf",
1139              "FIXME: Transcoding and mixing tests need to be tested"),
1140
1141             # WMV known issues"
1142             ("*reverse_playback.*wmv",
1143              "Reverse playback is not handled in wmv"),
1144             (".*reverse_playback.*asf",
1145              "Reverse playback is not handled in asf"),
1146
1147             # ogg known issues
1148             ("http.playback.seek.*vorbis_theora_1_ogg",
1149              "https://bugzilla.gnome.org/show_bug.cgi?id=769545"),
1150             # RTSP known issues
1151             ('rtsp.*playback.reverse.*',
1152              'https://bugzilla.gnome.org/show_bug.cgi?id=626811'),
1153             ('rtsp.*playback.seek_with_stop.*',
1154              'https://bugzilla.gnome.org/show_bug.cgi?id=784298'),
1155             ('rtsp.*playback.fast_*',
1156              'https://bugzilla.gnome.org/show_bug.cgi?id=754575'),
1157         ])
1158
1159     def register_default_test_generators(self):
1160         """
1161         Registers default test generators
1162         """
1163         if self._default_generators_registered:
1164             return
1165
1166         self.add_generators([GstValidatePlaybinTestsGenerator(self),
1167                              GstValidateMediaCheckTestsGenerator(self),
1168                              GstValidateTranscodingTestsGenerator(self)])
1169         self._default_generators_registered = True