9742a6829ca8a0d37831b4794ea7cfb7d8f8f7f1
[platform/upstream/gstreamer.git] / validate / launcher / apps / gstvalidate.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19 import argparse
20 import os
21 import copy
22 import sys
23 import time
24 import urllib.parse
25 import shlex
26 import socket
27 import subprocess
28 import configparser
29 import json
30 from launcher.loggable import Loggable
31
32 from launcher.baseclasses import GstValidateTest, Test, \
33     ScenarioManager, NamedDic, GstValidateTestsGenerator, \
34     GstValidateMediaDescriptor, GstValidateEncodingTestInterface, \
35     GstValidateBaseTestManager, MediaDescriptor, MediaFormatCombination
36
37 from launcher.utils import path2url, url2path, DEFAULT_TIMEOUT, which, \
38     GST_SECOND, Result, Protocols, mkdir, printc, Colors, get_data_file, \
39     kill_subprocess, format_config_template, get_fakesink_for_media_type
40
41 #
42 # Private global variables     #
43 #
44
45 # definitions of commands to use
46 parser = argparse.ArgumentParser(add_help=False)
47 parser.add_argument("--validate-tools-path", dest="validate_tools_path",
48                     default="",
49                     help="defines the paths to look for GstValidate tools.")
50 options, args = parser.parse_known_args()
51
52 GstValidateBaseTestManager.update_commands(options.validate_tools_path)
53 AUDIO_ONLY_FILE_TRANSCODING_RATIO = 5
54
55 #
56 # API to be used to create testsuites     #
57 #
58
59 """
60 Some info about protocols and how to handle them
61 """
62 GST_VALIDATE_CAPS_TO_PROTOCOL = [("application/x-hls", Protocols.HLS),
63                                  ("application/dash+xml", Protocols.DASH)]
64
65
66 def expand_vars_in_list_recurse(l, data):
67     for i, v in enumerate(l):
68         if isinstance(v, dict):
69             l[i] = expand_vars_in_dict_recurse(v, data)
70         elif isinstance(v, str):
71             l[i] = v % data
72         elif isinstance(v, list):
73             l[i] = expand_vars_in_list_recurse(v, data)
74
75     return l
76
77
78 def expand_vars_in_dict_recurse(dico, data):
79     for key, value in dico.items():
80         if isinstance(value, dict):
81             dico[key] = expand_vars_in_dict_recurse(value, data)
82         elif isinstance(value, str):
83             dico[key] = value % data
84         elif isinstance(value, list):
85             dico[key] = expand_vars_in_list_recurse(value, data)
86
87     return dico
88
89
90 class GstValidateMediaCheckTestsGenerator(GstValidateTestsGenerator):
91
92     def __init__(self, test_manager):
93         GstValidateTestsGenerator.__init__(self, "media_check", test_manager)
94
95     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
96         for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios:
97             protocol = mediainfo.media_descriptor.get_protocol()
98             timeout = DEFAULT_TIMEOUT
99
100             classname = "%s.media_check.%s" % (protocol,
101                                                os.path.basename(url2path(uri)).replace(".", "_"))
102             self.add_test(GstValidateMediaCheckTest(classname,
103                                                     self.test_manager.options,
104                                                     self.test_manager.reporter,
105                                                     mediainfo.media_descriptor,
106                                                     uri,
107                                                     mediainfo.path,
108                                                     timeout=timeout))
109
110
111 class GstValidateTranscodingTestsGenerator(GstValidateTestsGenerator):
112
113     def __init__(self, test_manager):
114         GstValidateTestsGenerator.__init__(self, "transcode", test_manager)
115
116     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
117         for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios:
118             if mediainfo.media_descriptor.is_image():
119                 continue
120
121             protocol = mediainfo.media_descriptor.get_protocol()
122             if protocol == Protocols.RTSP:
123                 continue
124
125             options = self.test_manager.options
126             for comb in self.test_manager.get_encoding_formats():
127                 classname = "%s.transcode.to_%s.%s" % (mediainfo.media_descriptor.get_protocol(),
128                                                        str(comb).replace(
129                                                        ' ', '_'),
130                                                        mediainfo.media_descriptor.get_clean_name())
131
132                 self.add_test(GstValidateTranscodingTest(classname,
133                                                          options,
134                                                          self.test_manager.reporter,
135                                                          comb,
136                                                          uri,
137                                                          mediainfo.media_descriptor))
138
139
140 class FakeMediaDescriptor(MediaDescriptor):
141
142     def __init__(self, infos, pipeline_desc):
143         MediaDescriptor.__init__(self)
144         self._infos = infos
145         self._pipeline_desc = pipeline_desc
146
147     def get_path(self):
148         return self._infos.get('path', None)
149
150     def get_media_filepath(self):
151         return self._infos.get('media-filepath', None)
152
153     def get_caps(self):
154         return self._infos.get('caps', None)
155
156     def get_uri(self):
157         return self._infos.get('uri', None)
158
159     def get_duration(self):
160         return int(self._infos.get('duration', 0)) * GST_SECOND
161
162     def get_protocol(self):
163         return self._infos.get('protocol', "launch_pipeline")
164
165     def is_seekable(self):
166         return self._infos.get('is-seekable', True)
167
168     def is_image(self):
169         return self._infos.get('is-image', False)
170
171     def is_live(self):
172         return self._infos.get('is-live', False)
173
174     def get_num_tracks(self, track_type):
175         return self._infos.get('num-%s-tracks' % track_type,
176                                self._pipeline_desc.count(track_type + "sink"))
177
178     def can_play_reverse(self):
179         return self._infos.get('plays-reverse', False)
180
181
182 class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator):
183
184     def __init__(self, name, test_manager, pipeline_template=None,
185                  pipelines_descriptions=None, valid_scenarios=None):
186         """
187         @name: The name of the generator
188         @pipeline_template: A template pipeline to be used to generate actual pipelines
189         @pipelines_descriptions: A list of tuple of the form:
190                                  (test_name, pipeline_description, extra_data)
191                                  extra_data being a dictionnary with the follwing keys:
192                                     'scenarios': ["the", "valide", "scenarios", "names"]
193                                     'duration': the_duration # in seconds
194                                     'timeout': a_timeout # in seconds
195                                     'hard_timeout': a_hard_timeout # in seconds
196
197         @valid_scenarios: A list of scenario name that can be used with that generator
198         """
199         valid_scenarios = valid_scenarios or []
200         GstValidateTestsGenerator.__init__(self, name, test_manager)
201         self._pipeline_template = pipeline_template
202         self._pipelines_descriptions = []
203         for description in pipelines_descriptions or []:
204             if not isinstance(description, dict):
205                 desc_dict = {"name": description[0],
206                      "pipeline": description[1]}
207                 if len(description) >= 3:
208                     desc_dict["extra_data"] = description[2]
209                 self._pipelines_descriptions.append(desc_dict)
210             else:
211                 self._pipelines_descriptions.append(description)
212         self._valid_scenarios = valid_scenarios
213
214     @classmethod
215     def from_dict(cls, test_manager, name, descriptions, extra_data=None):
216         """
217         :param json_file: Path to a JSON file containing pipeline tests.
218         :param extra_data: Variables available for interpolation in validate
219         configs and scenario actions.
220         """
221         if extra_data is None:
222             extra_data = {}
223
224         pipelines_descriptions = []
225         for test_name, defs in descriptions.items():
226             tests_definition = {'name': test_name, 'pipeline': defs.pop('pipeline')}
227             test_private_dir = os.path.join(test_manager.options.privatedir,
228                                             name, test_name)
229
230             config_file = None
231             if 'config' in defs:
232                 os.makedirs(test_private_dir, exist_ok=True)
233                 config_file = os.path.join(test_private_dir,
234                                            test_name + '.config')
235                 with open(config_file, 'w') as f:
236                     f.write(format_config_template(extra_data,
237                             '\n'.join(defs.pop('config')) + '\n', test_name))
238
239             scenarios = []
240             for scenario in defs.pop('scenarios', []):
241                 if isinstance(scenario, str):
242                     # Path to a scenario file
243                     scenarios.append(scenario)
244                 else:
245                     # Dictionary defining a new scenario in-line
246                     scenario_name = scenario_file = scenario['name']
247                     actions = scenario.get('actions')
248                     if actions:
249                         os.makedirs(test_private_dir, exist_ok=True)
250                         scenario_file = os.path.join(
251                             test_private_dir, scenario_name + '.scenario')
252                         with open(scenario_file, 'w') as f:
253                             f.write('\n'.join(action % extra_data for action in actions) + '\n')
254                     scenarios.append(scenario_file)
255
256             local_extra_data = extra_data.copy()
257             local_extra_data.update(defs)
258             envvars = defs.pop('extra_env_vars', {})
259             local_extra_data.update({
260                 'scenarios': scenarios,
261                 'config_file': config_file,
262                 'plays-reverse': True,
263                 'extra_env_vars': envvars,
264             })
265
266             expand_vars_in_dict_recurse(local_extra_data, extra_data)
267             tests_definition['extra_data'] = local_extra_data
268             tests_definition['pipeline_data'] = {}
269             tests_definition['pipeline_data'].update(local_extra_data)
270             pipelines_descriptions.append(tests_definition)
271
272         return GstValidatePipelineTestsGenerator(name, test_manager, pipelines_descriptions=pipelines_descriptions)
273
274     def get_fname(self, scenario, protocol=None, name=None):
275         if name is None:
276             name = self.name
277
278         if protocol is not None:
279             protocol_str = "%s." % protocol
280         else:
281             protocol_str = ""
282
283         if scenario is not None and scenario.name.lower() != "none":
284             return "%s%s.%s" % (protocol_str, name, scenario.name)
285
286         return ("%s.%s.%s" % (protocol_str, self.name, name)).replace("..", ".")
287
288     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
289         if self._valid_scenarios is None:
290             scenarios = [None]
291         elif self._valid_scenarios:
292             scenarios = [scenario for scenario in scenarios if
293                          scenario is not None and scenario.name in self._valid_scenarios]
294
295         return super(GstValidatePipelineTestsGenerator, self).generate_tests(
296             uri_minfo_special_scenarios, scenarios)
297
298     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
299         for description in self._pipelines_descriptions:
300             pipeline = description['pipeline']
301             extra_data = description.get('extra_data', {})
302             pipeline_data = description.get('pipeline_data', {})
303
304             if 'scenarios' in extra_data:
305                 # A pipeline description can override the default scenario set.
306                 # The pipeline description may specify an empty list of
307                 # scenarios, in which case one test will be generated with no
308                 # scenario.
309                 scenarios_to_iterate = extra_data['scenarios'] or [None]
310             else:
311                 scenarios_to_iterate = scenarios
312
313             for scenario in scenarios_to_iterate:
314                 if isinstance(scenario, str):
315                     tmpscenario = self.test_manager.scenarios_manager.get_scenario(
316                         scenario)
317                     if tmpscenario is None:
318                         raise RuntimeError("Could not find scenario file: %s" % scenario)
319                     scenario = tmpscenario
320
321                 mediainfo = FakeMediaDescriptor(extra_data, pipeline)
322                 if not mediainfo.is_compatible(scenario):
323                     continue
324
325                 if self.test_manager.options.mute:
326                     needs_clock = scenario.needs_clock_sync() \
327                         if scenario else False
328                     audiosink = get_fakesink_for_media_type("audio", needs_clock)
329                     videosink = get_fakesink_for_media_type("video", needs_clock)
330                 else:
331                     audiosink = 'autoaudiosink'
332                     videosink = 'autovideosink'
333
334                 pipeline_data.update({'videosink': videosink, 'audiosink': audiosink})
335                 pipeline_desc = pipeline % pipeline_data
336
337                 fname = self.get_fname(
338                     scenario, protocol=mediainfo.get_protocol(), name=description["name"])
339
340                 expected_issues = extra_data.get("expected-issues")
341                 extra_env_vars = extra_data.get("extra_env_vars")
342                 test = GstValidateLaunchTest(fname,
343                                              self.test_manager.options,
344                                              self.test_manager.reporter,
345                                              pipeline_desc,
346                                              scenario=scenario,
347                                              media_descriptor=mediainfo,
348                                              expected_issues=expected_issues,
349                                              extra_env_variables=extra_env_vars)
350                 if extra_data.get('config_file'):
351                     test.add_validate_config(extra_data['config_file'])
352                 self.add_test(test)
353
354
355 class GstValidatePlaybinTestsGenerator(GstValidatePipelineTestsGenerator):
356
357     def __init__(self, test_manager):
358         if os.getenv("USE_PLAYBIN3") is None:
359             GstValidatePipelineTestsGenerator.__init__(
360                 self, "playback", test_manager, "playbin")
361         else:
362             GstValidatePipelineTestsGenerator.__init__(
363                 self, "playback", test_manager, "playbin3")
364
365     def _set_sinks(self, minfo, pipe_str, scenario):
366         if self.test_manager.options.mute:
367             needs_clock = scenario.needs_clock_sync() or minfo.media_descriptor.need_clock_sync()
368
369             afakesink = get_fakesink_for_media_type("audio", needs_clock)
370             vfakesink = get_fakesink_for_media_type("video", needs_clock)
371             pipe_str += " audio-sink='%s' video-sink='%s'" % (
372                 afakesink, vfakesink)
373
374         return pipe_str
375
376     def _get_name(self, scenario, protocol, minfo):
377         return "%s.%s" % (self.get_fname(scenario,
378                                          protocol),
379                           os.path.basename(minfo.media_descriptor.get_clean_name()))
380
381     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
382         test_rtsp = GstValidateBaseTestManager.RTSP_SERVER_COMMAND
383         if not test_rtsp:
384             printc("\n\nRTSP server not available, you should make sure"
385                    " that %s is available in your $PATH." % GstValidateBaseTestManager.RTSP_SERVER_COMMAND,
386                    Colors.FAIL)
387         elif self.test_manager.options.disable_rtsp:
388             printc("\n\nRTSP tests are disabled")
389             test_rtsp = False
390
391         for uri, minfo, special_scenarios in uri_minfo_special_scenarios:
392             pipe = self._pipeline_template
393             protocol = minfo.media_descriptor.get_protocol()
394
395             if protocol == Protocols.RTSP:
396                 self.debug("SKIPPING %s as it is a RTSP stream" % uri)
397                 continue
398
399             pipe += " uri=%s" % uri
400
401             for scenario in special_scenarios + scenarios:
402                 cpipe = pipe
403                 if not minfo.media_descriptor.is_compatible(scenario):
404                     continue
405
406                 cpipe = self._set_sinks(minfo, cpipe, scenario)
407                 fname = self._get_name(scenario, protocol, minfo)
408
409                 self.debug("Adding: %s", fname)
410
411                 if scenario.does_reverse_playback() and protocol == Protocols.HTTP:
412                     # 10MB so we can reverse playback
413                     cpipe += " ring-buffer-max-size=10485760"
414
415                 self.add_test(GstValidateLaunchTest(fname,
416                                                     self.test_manager.options,
417                                                     self.test_manager.reporter,
418                                                     cpipe,
419                                                     scenario=scenario,
420                                                     media_descriptor=minfo.media_descriptor)
421                               )
422
423                 if test_rtsp and protocol == Protocols.FILE and not minfo.media_descriptor.is_image():
424                     rtspminfo = NamedDic({"path": minfo.media_descriptor.get_path(),
425                                           "media_descriptor": GstValidateRTSPMediaDescriptor(minfo.media_descriptor.get_path())})
426                     if not rtspminfo.media_descriptor.is_compatible(scenario):
427                         continue
428
429                     cpipe = self._set_sinks(rtspminfo, "%s uri=rtsp://127.0.0.1:<RTSPPORTNUMBER>/test"
430                                             % self._pipeline_template, scenario)
431                     fname = self._get_name(scenario, Protocols.RTSP, rtspminfo)
432
433                     self.add_test(GstValidateRTSPTest(
434                         fname, self.test_manager.options, self.test_manager.reporter,
435                         cpipe, uri, scenario=scenario,
436                         media_descriptor=rtspminfo.media_descriptor))
437
438                     fname = self._get_name(scenario, Protocols.RTSP + '2', rtspminfo)
439                     self.add_test(GstValidateRTSPTest(
440                         fname, self.test_manager.options, self.test_manager.reporter,
441                         cpipe, uri, scenario=scenario,
442                         media_descriptor=rtspminfo.media_descriptor,
443                         rtsp2=True))
444
445
446 class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator):
447
448     def __init__(self, name, test_manager, mixer, media_type, converter="",
449                  num_sources=3, mixed_srcs=None, valid_scenarios=None):
450         mixed_srcs = mixed_srcs or {}
451         valid_scenarios = valid_scenarios or []
452
453         pipe_template = "%(mixer)s name=_mixer !  " + \
454             converter + " ! %(sink)s "
455         self.converter = converter
456         self.mixer = mixer
457         self.media_type = media_type
458         self.num_sources = num_sources
459         self.mixed_srcs = mixed_srcs
460         super(
461             GstValidateMixerTestsGenerator, self).__init__(name, test_manager, pipe_template,
462                                                            valid_scenarios=valid_scenarios)
463
464     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
465         if self.test_manager.options.validate_uris:
466             return
467
468         wanted_ressources = []
469         for uri, minfo, special_scenarios in uri_minfo_special_scenarios:
470             protocol = minfo.media_descriptor.get_protocol()
471             if protocol == Protocols.FILE and \
472                     minfo.media_descriptor.get_num_tracks(self.media_type) > 0:
473                 wanted_ressources.append((uri, minfo))
474
475         if not self.mixed_srcs:
476             if not wanted_ressources:
477                 return
478
479             for i in range(len(uri_minfo_special_scenarios) / self.num_sources):
480                 srcs = []
481                 name = ""
482                 for nsource in range(self.num_sources):
483                     uri, minfo = wanted_ressources[i + nsource]
484                     if os.getenv("USE_PLAYBIN3") is None:
485                         srcs.append(
486                             "uridecodebin uri=%s ! %s" % (uri, self.converter))
487                     else:
488                         srcs.append(
489                             "uridecodebin3 uri=%s ! %s" % (uri, self.converter))
490                     fname = os.path.basename(uri).replace(".", "_")
491                     if not name:
492                         name = fname
493                     else:
494                         name += "+%s" % fname
495
496                 self.mixed_srcs[name] = tuple(srcs)
497
498         for name, srcs in self.mixed_srcs.items():
499             if isinstance(srcs, dict):
500                 pipe_arguments = {
501                     "mixer": self.mixer + " %s" % srcs["mixer_props"]}
502                 srcs = srcs["sources"]
503             else:
504                 pipe_arguments = {"mixer": self.mixer}
505
506             for scenario in scenarios:
507                 fname = self.get_fname(scenario, Protocols.FILE) + "."
508                 fname += name
509
510                 self.debug("Adding: %s", fname)
511
512                 if self.test_manager.options.mute:
513                     pipe_arguments["sink"] = get_fakesink_for_media_type(self.media_type,
514                                                                          scenario.needs_clock_sync())
515                 else:
516                     pipe_arguments["sink"] = "auto%ssink" % self.media_type
517
518                 pipe = self._pipeline_template % pipe_arguments
519
520                 for src in srcs:
521                     pipe += "%s ! _mixer. " % src
522
523                 self.add_test(GstValidateLaunchTest(fname,
524                                                     self.test_manager.options,
525                                                     self.test_manager.reporter,
526                                                     pipe,
527                                                     scenario=scenario)
528                               )
529
530
531 class GstValidateLaunchTest(GstValidateTest):
532
533     def __init__(self, classname, options, reporter, pipeline_desc,
534                  timeout=DEFAULT_TIMEOUT, scenario=None,
535                  media_descriptor=None, duration=0, hard_timeout=None,
536                  extra_env_variables=None, expected_issues=None):
537
538         extra_env_variables = extra_env_variables or {}
539
540         if scenario:
541             duration = scenario.get_duration()
542         elif media_descriptor:
543             duration = media_descriptor.get_duration() / GST_SECOND
544
545         super(
546             GstValidateLaunchTest, self).__init__(GstValidateBaseTestManager.COMMAND,
547                                                   classname,
548                                                   options, reporter,
549                                                   duration=duration,
550                                                   scenario=scenario,
551                                                   timeout=timeout,
552                                                   hard_timeout=hard_timeout,
553                                                   media_descriptor=media_descriptor,
554                                                   extra_env_variables=extra_env_variables,
555                                                   expected_issues=expected_issues)
556
557         self.pipeline_desc = pipeline_desc
558         self.media_descriptor = media_descriptor
559
560     def build_arguments(self):
561         GstValidateTest.build_arguments(self)
562         self.add_arguments(*shlex.split(self.pipeline_desc))
563         if self.media_descriptor is not None and self.media_descriptor.get_path():
564             self.add_arguments(
565                 "--set-media-info", self.media_descriptor.get_path())
566
567
568 class GstValidateMediaCheckTest(GstValidateTest):
569
570     def __init__(self, classname, options, reporter, media_descriptor,
571                  uri, minfo_path, timeout=DEFAULT_TIMEOUT,
572                  extra_env_variables=None,
573                  expected_issues=None):
574         extra_env_variables = extra_env_variables or {}
575
576         super(
577             GstValidateMediaCheckTest, self).__init__(GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, classname,
578                                                       options, reporter,
579                                                       timeout=timeout,
580                                                       media_descriptor=media_descriptor,
581                                                       extra_env_variables=extra_env_variables,
582                                                       expected_issues=expected_issues)
583         self._uri = uri
584         self._media_info_path = minfo_path
585
586     def build_arguments(self):
587         Test.build_arguments(self)
588         self.add_arguments(self._uri, "--expected-results",
589                            self._media_info_path)
590
591         if self.media_descriptor.skip_parsers():
592             self.add_arguments("--skip-parsers")
593
594
595 class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterface):
596     scenarios_manager = ScenarioManager()
597
598     def __init__(self, classname, options, reporter,
599                  combination, uri, media_descriptor,
600                  timeout=DEFAULT_TIMEOUT,
601                  scenario=None,
602                  extra_env_variables=None,
603                  expected_issues=None):
604         Loggable.__init__(self)
605
606         extra_env_variables = extra_env_variables or {}
607
608         file_dur = int(media_descriptor.get_duration()) / GST_SECOND
609         if not media_descriptor.get_num_tracks("video"):
610             self.debug("%s audio only file applying transcoding ratio."
611                        "File 'duration' : %s" % (classname, file_dur))
612             duration = file_dur / AUDIO_ONLY_FILE_TRANSCODING_RATIO
613         else:
614             duration = file_dur
615
616         super(
617             GstValidateTranscodingTest, self).__init__(GstValidateBaseTestManager.TRANSCODING_COMMAND,
618                                                        classname,
619                                                        options,
620                                                        reporter,
621                                                        duration=duration,
622                                                        timeout=timeout,
623                                                        scenario=scenario,
624                                                        media_descriptor=media_descriptor,
625                                                        extra_env_variables=None,
626                                                        expected_issues=expected_issues)
627         extra_env_variables = extra_env_variables or {}
628
629         GstValidateEncodingTestInterface.__init__(
630             self, combination, media_descriptor)
631
632         self.uri = uri
633
634     def run_external_checks(self):
635         if self.media_descriptor.get_num_tracks("video") == 1 and \
636                 self.options.validate_enable_iqa_tests:
637             self.run_iqa_test(self.uri)
638
639     def set_rendering_info(self):
640         self.dest_file = os.path.join(self.options.dest,
641                                       self.classname.replace(".transcode.", os.sep).
642                                       replace(".", os.sep))
643         mkdir(os.path.dirname(urllib.parse.urlsplit(self.dest_file).path))
644         if urllib.parse.urlparse(self.dest_file).scheme == "":
645             self.dest_file = path2url(self.dest_file)
646
647         profile = self.get_profile()
648         self.add_arguments("-o", profile)
649
650     def build_arguments(self):
651         GstValidateTest.build_arguments(self)
652         self.set_rendering_info()
653         self.add_arguments(self.uri, self.dest_file)
654
655     def get_current_value(self):
656         if self.scenario:
657             sent_eos = self.sent_eos_position()
658             if sent_eos is not None:
659                 t = time.time()
660                 if ((t - sent_eos)) > 30:
661                     if self.media_descriptor.get_protocol() == Protocols.HLS:
662                         self.set_result(Result.PASSED,
663                                         """Got no EOS 30 seconds after sending EOS,
664                                         in HLS known and tolerated issue:
665                                         https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/132""")
666                         return Result.KNOWN_ERROR
667
668                     self.set_result(
669                         Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
670
671                     return Result.FAILED
672
673         size = self.get_current_size()
674         if size is None:
675             return self.get_current_position()
676
677         return size
678
679     def check_results(self):
680         self.check_encoded_file()
681         GstValidateTest.check_results(self)
682
683
684 class GstValidateBaseRTSPTest:
685     """ Interface for RTSP tests, requires implementing Test"""
686     __used_ports = set()
687
688     def __init__(self, local_uri):
689         self._local_uri = local_uri
690         self.rtsp_server = None
691         self._unsetport_pipeline_desc = None
692         self.optional = True
693
694     @classmethod
695     def __get_open_port(cls):
696         while True:
697             # hackish trick from
698             # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python?answertab=votes#tab-top
699             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
700             s.bind(("", 0))
701             port = s.getsockname()[1]
702             if port not in cls.__used_ports:
703                 cls.__used_ports.add(port)
704                 s.close()
705                 return port
706
707             s.close()
708
709     def launch_server(self):
710         if self.options.redirect_logs == 'stdout':
711             self.rtspserver_logs = sys.stdout
712         elif self.options.redirect_logs == 'stderr':
713             self.rtspserver_logs = sys.stderr
714
715         self.server_port = self.__get_open_port()
716         command = [GstValidateBaseTestManager.RTSP_SERVER_COMMAND, self._local_uri, '--port', str(self.server_port)]
717
718         if self.options.validate_gdb_server:
719             command = self.use_gdb(command)
720             self.rtspserver_logs = sys.stdout
721         elif self.options.redirect_logs:
722             self.rtspserver_logs = sys.stdout
723         else:
724             self.rtspserver_logs = open(self.logfile + '_rtspserver.log', 'w+')
725             self.extra_logfiles.add(self.rtspserver_logs.name)
726
727         server_env = os.environ.copy()
728
729         self.rtsp_server = subprocess.Popen(command,
730                                             stderr=self.rtspserver_logs,
731                                             stdout=self.rtspserver_logs,
732                                             env=server_env)
733         while True:
734             s = socket.socket()
735             try:
736                 s.connect((("127.0.0.1", self.server_port)))
737                 break
738             except ConnectionRefusedError:
739                 time.sleep(0.1)
740                 continue
741             finally:
742                 s.close()
743
744         if not self._unsetport_pipeline_desc:
745             self._unsetport_pipeline_desc = self.pipeline_desc
746
747         self.pipeline_desc = self._unsetport_pipeline_desc.replace(
748             "<RTSPPORTNUMBER>", str(self.server_port))
749
750         return ' '.join(command)
751
752     def close_logfile(self):
753         super().close_logfile()
754         if not self.options.redirect_logs:
755             self.rtspserver_logs.close()
756
757     def process_update(self):
758         res = super().process_update()
759         if res:
760             kill_subprocess(self, self.rtsp_server, DEFAULT_TIMEOUT)
761             self.__used_ports.remove(self.server_port)
762
763         return res
764
765
766 class GstValidateRTSPTest(GstValidateBaseRTSPTest, GstValidateLaunchTest):
767
768     def __init__(self, classname, options, reporter, pipeline_desc,
769                  local_uri, timeout=DEFAULT_TIMEOUT, scenario=None,
770                  media_descriptor=None, rtsp2=False):
771         GstValidateLaunchTest.__init__(self, classname, options, reporter,
772                                        pipeline_desc, timeout, scenario,
773                                        media_descriptor)
774         GstValidateBaseRTSPTest.__init__(self, local_uri)
775         self.rtsp2 = rtsp2
776
777     def get_subproc_env(self):
778         env = super().get_subproc_env()
779         path = env.get('GST_VALIDATE_SCENARIOS_PATH', '')
780         override_dir = get_data_file(os.path.join('data', 'scenarios'), 'rtsp_overrides')
781         env['GST_VALIDATE_SCENARIOS_PATH'] = '%s:%s' % (override_dir, path)
782         if self.rtsp2:
783             env['GST_VALIDATE_SCENARIO'] = env.get('GST_VALIDATE_SCENARIO', '') + ':' + 'force_rtsp2'
784
785         return env
786
787
788 class GstValidateRTSPMediaDescriptor(GstValidateMediaDescriptor):
789
790     def __init__(self, xml_path):
791         GstValidateMediaDescriptor.__init__(self, xml_path)
792
793     def get_uri(self):
794         return "rtsp://127.0.0.1:8554/test"
795
796     def get_protocol(self):
797         return Protocols.RTSP
798
799     def prerrols(self):
800         return False
801
802
803 class GstValidateTestManager(GstValidateBaseTestManager):
804
805     name = "validate"
806
807     # List of all classes to create testsuites
808     GstValidateMediaCheckTestsGenerator = GstValidateMediaCheckTestsGenerator
809     GstValidateTranscodingTestsGenerator = GstValidateTranscodingTestsGenerator
810     GstValidatePipelineTestsGenerator = GstValidatePipelineTestsGenerator
811     GstValidatePlaybinTestsGenerator = GstValidatePlaybinTestsGenerator
812     GstValidateMixerTestsGenerator = GstValidateMixerTestsGenerator
813     GstValidateLaunchTest = GstValidateLaunchTest
814     GstValidateMediaCheckTest = GstValidateMediaCheckTest
815     GstValidateTranscodingTest = GstValidateTranscodingTest
816
817     def __init__(self):
818         super(GstValidateTestManager, self).__init__()
819         self._uris = []
820         self._run_defaults = True
821         self._is_populated = False
822         self._default_generators_registered = False
823
824     def init(self):
825         for command, name in [
826                 (GstValidateBaseTestManager.TRANSCODING_COMMAND, "gst-validate-transcoding-1.0"),
827                 (GstValidateBaseTestManager.COMMAND, "gst-validate-1.0"),
828                 (GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, "gst-validate-media-check-1.0")]:
829             if not command:
830                 self.error("command not found: %s" % name)
831                 return False
832
833         return True
834
835     def add_options(self, parser):
836         group = parser.add_argument_group("GstValidate tools specific options"
837                                           " and behaviours",
838                                           description="""When using --wanted-tests, all the scenarios can be used, even those which have
839 not been tested and explicitely activated if you set use --wanted-tests ALL""")
840         group.add_argument("--validate-check-uri", dest="validate_uris",
841                            action="append", help="defines the uris to run default tests on")
842         group.add_argument("--validate-tools-path", dest="validate_tools_path",
843                            action="append", help="defines the paths to look for GstValidate tools.")
844         group.add_argument("--validate-gdb-server", dest="validate_gdb_server",
845                            help="Run the server in GDB.")
846         group.add_argument("--validate-disable-rtsp", dest="disable_rtsp",
847                            help="Disable RTSP tests.")
848         group.add_argument("--validate-enable-iqa-tests", dest="validate_enable_iqa_tests",
849                            help="Enable Image Quality Assessment validation tests.",
850                            default=False, action='store_true')
851
852     def print_valgrind_bugs(self):
853         # Look for all the 'pending' bugs in our supp file
854         bugs = []
855         p = get_data_file('data', 'gstvalidate.supp')
856         with open(p) as f:
857             for line in f.readlines():
858                 line = line.strip()
859                 if line.startswith('# PENDING:'):
860                     tmp = line.split(' ')
861                     bugs.append(tmp[2])
862
863         if bugs:
864             msg = "Ignored valgrind bugs:\n"
865             for b in bugs:
866                 msg += "  + %s\n" % b
867             printc(msg, Colors.FAIL, True)
868
869     def populate_testsuite(self):
870
871         if self._is_populated is True:
872             return
873
874         if not self.options.config and not self.options.testsuites:
875             if self._run_defaults:
876                 self.register_defaults()
877             else:
878                 self.register_all()
879
880         self._is_populated = True
881
882     def list_tests(self):
883         if self.tests:
884             return self.tests
885
886         if self._run_defaults:
887             scenarios = [self.scenarios_manager.get_scenario(scenario_name)
888                          for scenario_name in self.get_scenarios()]
889         else:
890             scenarios = self.scenarios_manager.get_scenario(None)
891         uris = self._list_uris()
892
893         for generator in self.get_generators():
894             for test in generator.generate_tests(uris, scenarios):
895                 self.add_test(test)
896
897         if not self.tests and not uris and not self.options.wanted_tests:
898             self.info("No valid uris present in the path. Check if media files and info files exist")
899
900         return self.tests
901
902     def _add_media(self, media_info, uri=None):
903         self.debug("Checking %s", media_info)
904         if isinstance(media_info, GstValidateMediaDescriptor):
905             media_descriptor = media_info
906             media_info = media_descriptor.get_path()
907         else:
908             media_descriptor = GstValidateMediaDescriptor(media_info)
909
910         try:
911             # Just testing that the vairous mandatory infos are present
912             caps = media_descriptor.get_caps()
913             if uri is None:
914                 uri = media_descriptor.get_uri()
915
916             # Adjust local http uri
917             if self.options.http_server_port != 8079 and \
918                uri.startswith("http://127.0.0.1:8079/"):
919                 uri = uri.replace("http://127.0.0.1:8079/",
920                                   "http://127.0.0.1:%r/" % self.options.http_server_port, 1)
921             media_descriptor.set_protocol(urllib.parse.urlparse(uri).scheme)
922             for caps2, prot in GST_VALIDATE_CAPS_TO_PROTOCOL:
923                 if caps2 == caps:
924                     media_descriptor.set_protocol(prot)
925                     break
926
927             scenario_bname = media_descriptor.get_media_filepath()
928             special_scenarios = self.scenarios_manager.find_special_scenarios(
929                 scenario_bname)
930             self._uris.append((uri,
931                                NamedDic({"path": media_info,
932                                          "media_descriptor": media_descriptor}),
933                                special_scenarios))
934         except configparser.NoOptionError as e:
935             self.debug("Exception: %s for %s", e, media_info)
936
937     def _discover_file(self, uri, fpath):
938         for ext in (GstValidateMediaDescriptor.MEDIA_INFO_EXT,
939                 GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
940             try:
941                 is_push = False
942                 media_info = "%s.%s" % (fpath, ext)
943                 if ext == GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT:
944                     if not os.path.exists(media_info):
945                         continue
946                     is_push = True
947                     uri = "push" + uri
948                 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
949
950                 args.append(uri)
951                 if os.path.isfile(media_info) and not self.options.update_media_info:
952                     self._add_media(media_info, uri)
953                     continue
954                 elif fpath.endswith(GstValidateMediaDescriptor.STREAM_INFO_EXT):
955                     self._add_media(fpath)
956                     continue
957                 elif not self.options.generate_info and not self.options.update_media_info and not self.options.validate_uris:
958                     continue
959                 elif self.options.update_media_info and not os.path.isfile(media_info):
960                     self.info(
961                         "%s not present. Use --generate-media-info", media_info)
962                     continue
963                 elif os.path.islink(media_info):
964                     self.info(
965                         "%s is a symlink, not updating and hopefully the actual file gets updated!", media_info)
966                     continue
967
968                 include_frames = 0
969                 if self.options.update_media_info:
970                     include_frames = 2
971                 elif self.options.generate_info_full:
972                     include_frames = 1
973
974                 media_descriptor = GstValidateMediaDescriptor.new_from_uri(
975                     uri, True, include_frames, is_push)
976                 if media_descriptor:
977                     self._add_media(media_descriptor, uri)
978                 else:
979                     self.warning("Could not get any descriptor for %s" % uri)
980
981             except subprocess.CalledProcessError as e:
982                 if self.options.generate_info:
983                     printc("Result: Failed", Colors.FAIL)
984                 else:
985                     self.error("Exception: %s", e)
986                 return False
987         return True
988
989     def _list_uris(self):
990         if self._uris:
991             return self._uris
992
993         if self.options.validate_uris:
994             for uri in self.options.validate_uris:
995                 self._discover_file(uri, uri)
996             return self._uris
997
998         if not self.args:
999             if isinstance(self.options.paths, str):
1000                 self.options.paths = [os.path.join(self.options.paths)]
1001
1002             for path in self.options.paths:
1003                 if os.path.isfile(path):
1004                     path = os.path.abspath(path)
1005                     self._discover_file(path2url(path), path)
1006                 else:
1007                     for root, dirs, files in os.walk(path):
1008                         for f in files:
1009                             fpath = os.path.abspath(os.path.join(root, f))
1010                             if os.path.isdir(fpath) or \
1011                                     fpath.endswith(GstValidateMediaDescriptor.MEDIA_INFO_EXT) or\
1012                                     fpath.endswith(ScenarioManager.FILE_EXTENSION):
1013                                 continue
1014                             else:
1015                                 self._discover_file(path2url(fpath), fpath)
1016
1017         self.debug("Uris found: %s", self._uris)
1018
1019         return self._uris
1020
1021     def needs_http_server(self):
1022         for test in self.list_tests():
1023             if self._is_test_wanted(test) and test.media_descriptor is not None:
1024                 protocol = test.media_descriptor.get_protocol()
1025                 uri = test.media_descriptor.get_uri()
1026                 uri_requires_http_server = False
1027                 if uri:
1028                     expanded_uri = uri % {
1029                         'http-server-port': self.options.http_server_port}
1030                     uri_requires_http_server = expanded_uri.find(
1031                         "127.0.0.1:%s" % self.options.http_server_port) != -1
1032                 if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] or uri_requires_http_server:
1033                     return True
1034         return False
1035
1036     def set_settings(self, options, args, reporter):
1037         if options.wanted_tests:
1038             for i in range(len(options.wanted_tests)):
1039                 if "ALL" in options.wanted_tests[i]:
1040                     self._run_defaults = False
1041                     options.wanted_tests[
1042                         i] = options.wanted_tests[i].replace("ALL", "")
1043         try:
1044             options.wanted_tests.remove("")
1045         except ValueError:
1046             pass
1047
1048         if options.validate_uris:
1049             self.check_testslist = False
1050
1051         super(GstValidateTestManager, self).set_settings(
1052             options, args, reporter)
1053
1054     def register_defaults(self):
1055         """
1056         Registers the defaults:
1057             * Scenarios to be used
1058             * Encoding formats to be used
1059             * Blacklisted tests
1060             * Test generators
1061         """
1062         printc("-> Registering default 'validate' tests... ", end='')
1063         self.register_default_scenarios()
1064         self.register_default_encoding_formats()
1065         self.register_default_blacklist()
1066         self.register_default_test_generators()
1067         printc("OK", Colors.OKGREEN)
1068
1069     def register_default_scenarios(self):
1070         """
1071         Registers default test scenarios
1072         """
1073         if self.options.long_limit != 0:
1074             self.add_scenarios([
1075                 "play_15s",
1076                 "reverse_playback",
1077                 "fast_forward",
1078                 "seek_forward",
1079                 "seek_backward",
1080                 "seek_with_stop",
1081                 "switch_audio_track",
1082                 "switch_audio_track_while_paused",
1083                 "switch_subtitle_track",
1084                 "switch_subtitle_track_while_paused",
1085                 "disable_subtitle_track_while_paused",
1086                 "change_state_intensive",
1087                 "scrub_forward_seeking"])
1088         else:
1089             self.add_scenarios([
1090                 "play_15s",
1091                 "reverse_playback",
1092                 "fast_forward",
1093                 "seek_forward",
1094                 "seek_backward",
1095                 "seek_with_stop",
1096                 "switch_audio_track",
1097                 "switch_audio_track_while_paused",
1098                 "switch_subtitle_track",
1099                 "switch_subtitle_track_while_paused",
1100                 "disable_subtitle_track_while_paused",
1101                 "change_state_intensive",
1102                 "scrub_forward_seeking"])
1103
1104     def register_default_encoding_formats(self):
1105         """
1106         Registers default encoding formats
1107         """
1108         self.add_encoding_formats([
1109             MediaFormatCombination("ogg", "vorbis", "theora"),
1110             MediaFormatCombination("webm", "vorbis", "vp8"),
1111             MediaFormatCombination(
1112                 "webm", "vorbis", "vp9", video_restriction="video/x-raw,width=320,height=240"),
1113             MediaFormatCombination("mp4", "mp3", "h264"),
1114             MediaFormatCombination("mkv", "vorbis", "h264"),
1115         ])
1116
1117     def register_default_blacklist(self):
1118         self.set_default_blacklist([
1119             # testbin known issues
1120             ("testbin.media_check.*",
1121              "Not supported by GstDiscoverer."),
1122
1123             # dash known issues
1124             ("dash.media_check.*",
1125              "Caps are different depending on selected bitrates, etc"),
1126
1127             # Matroska/WEBM known issues:
1128             ("*.reverse_playback.*webm$",
1129              "https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/65"),
1130             ("*.reverse_playback.*mkv$",
1131              "https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/65"),
1132             ("http.playback.seek_with_stop.*webm",
1133              "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"),
1134             ("http.playback.seek_with_stop.*mkv",
1135              "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"),
1136
1137             # MPEG TS known issues:
1138             ('(?i)*playback.reverse_playback.*(?:_|.)(?:|m)ts$',
1139              "https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/97"),
1140
1141             # Fragmented MP4 disabled tests:
1142             ('*.playback..*seek.*.fragmented_nonseekable_sink_mp4',
1143              "Seeking on fragmented files without indexes isn't implemented"),
1144             ('*.playback.reverse_playback.fragmented_nonseekable_sink_mp4',
1145              "Seeking on fragmented files without indexes isn't implemented"),
1146
1147             # HTTP known issues:
1148             ("http.*scrub_forward_seeking.*",
1149              "This is not stable enough for now."),
1150             ("http.playback.change_state_intensive.raw_video_mov",
1151              "This is not stable enough for now. (flow return from pad push doesn't match expected value)"),
1152
1153             # MXF known issues"
1154             ("*reverse_playback.*mxf",
1155              "Reverse playback is not handled in MXF"),
1156             ("file\.transcode.*mxf",
1157              "FIXME: Transcoding and mixing tests need to be tested"),
1158
1159             # WMV known issues"
1160             ("*reverse_playback.*wmv",
1161              "Reverse playback is not handled in wmv"),
1162             (".*reverse_playback.*asf",
1163              "Reverse playback is not handled in asf"),
1164
1165             # ogg known issues
1166             ("http.playback.seek.*vorbis_theora_1_ogg",
1167              "https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/281"),
1168             # RTSP known issues
1169             ('rtsp.*playback.reverse.*',
1170              'https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/32'),
1171             ('rtsp.*playback.seek_with_stop.*',
1172              'https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/386'),
1173             ('rtsp.*playback.fast_*',
1174              'rtpbasedepayload does not handle rate != 1.0 correctly'),
1175         ])
1176
1177     def register_default_test_generators(self):
1178         """
1179         Registers default test generators
1180         """
1181         if self._default_generators_registered:
1182             return
1183
1184         self.add_generators([GstValidatePlaybinTestsGenerator(self),
1185                              GstValidateMediaCheckTestsGenerator(self),
1186                              GstValidateTranscodingTestsGenerator(self)])
1187         self._default_generators_registered = True