e71c394501dbe7c945f136456cf8112758c443d6
[platform/upstream/gstreamer.git] / validate / launcher / apps / gstvalidate.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19 import argparse
20 import os
21 import copy
22 import sys
23 import time
24 import urllib.parse
25 import shlex
26 import socket
27 import subprocess
28 import configparser
29 import json
30 from launcher.loggable import Loggable
31
32 from launcher.baseclasses import GstValidateTest, Test, \
33     ScenarioManager, NamedDic, GstValidateTestsGenerator, \
34     GstValidateMediaDescriptor, GstValidateEncodingTestInterface, \
35     GstValidateBaseTestManager, MediaDescriptor, MediaFormatCombination
36
37 from launcher.utils import path2url, url2path, DEFAULT_TIMEOUT, which, \
38     GST_SECOND, Result, Protocols, mkdir, printc, Colors, get_data_file, \
39     kill_subprocess
40
41 #
42 # Private global variables     #
43 #
44
45 # definitions of commands to use
46 parser = argparse.ArgumentParser(add_help=False)
47 parser.add_argument("--validate-tools-path", dest="validate_tools_path",
48                     default="",
49                     help="defines the paths to look for GstValidate tools.")
50 options, args = parser.parse_known_args()
51
52 GstValidateBaseTestManager.update_commands(options.validate_tools_path)
53 AUDIO_ONLY_FILE_TRANSCODING_RATIO = 5
54
55 #
56 # API to be used to create testsuites     #
57 #
58
59 """
60 Some info about protocols and how to handle them
61 """
62 GST_VALIDATE_CAPS_TO_PROTOCOL = [("application/x-hls", Protocols.HLS),
63                                  ("application/dash+xml", Protocols.DASH)]
64
65
66 class GstValidateMediaCheckTestsGenerator(GstValidateTestsGenerator):
67
68     def __init__(self, test_manager):
69         GstValidateTestsGenerator.__init__(self, "media_check", test_manager)
70
71     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
72         for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios:
73             protocol = mediainfo.media_descriptor.get_protocol()
74             timeout = DEFAULT_TIMEOUT
75
76             classname = "%s.media_check.%s" % (protocol,
77                                                os.path.basename(url2path(uri)).replace(".", "_"))
78             self.add_test(GstValidateMediaCheckTest(classname,
79                                                     self.test_manager.options,
80                                                     self.test_manager.reporter,
81                                                     mediainfo.media_descriptor,
82                                                     uri,
83                                                     mediainfo.path,
84                                                     timeout=timeout))
85
86
87 class GstValidateTranscodingTestsGenerator(GstValidateTestsGenerator):
88
89     def __init__(self, test_manager):
90         GstValidateTestsGenerator.__init__(self, "transcode", test_manager)
91
92     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
93         for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios:
94             if mediainfo.media_descriptor.is_image():
95                 continue
96
97             protocol = mediainfo.media_descriptor.get_protocol()
98             if protocol == Protocols.RTSP:
99                 continue
100
101             for comb in self.test_manager.get_encoding_formats():
102                 classname = "%s.transcode.to_%s.%s" % (mediainfo.media_descriptor.get_protocol(),
103                                                        str(comb).replace(
104                                                        ' ', '_'),
105                                                        mediainfo.media_descriptor.get_clean_name())
106                 self.add_test(GstValidateTranscodingTest(classname,
107                                                          self.test_manager.options,
108                                                          self.test_manager.reporter,
109                                                          comb,
110                                                          uri,
111                                                          mediainfo.media_descriptor))
112
113
114 class FakeMediaDescriptor(MediaDescriptor):
115
116     def __init__(self, infos, pipeline_desc):
117         MediaDescriptor.__init__(self)
118         self._infos = infos
119         self._pipeline_desc = pipeline_desc
120
121     def get_path(self):
122         return self._infos.get('path', None)
123
124     def get_media_filepath(self):
125         return self._infos.get('media-filepath', None)
126
127     def get_caps(self):
128         return self._infos.get('caps', None)
129
130     def get_uri(self):
131         return self._infos.get('uri', None)
132
133     def get_duration(self):
134         return int(self._infos.get('duration', 0)) * GST_SECOND
135
136     def get_protocol(self):
137         return self._infos.get('protocol', "launch_pipeline")
138
139     def is_seekable(self):
140         return self._infos.get('is-seekable', True)
141
142     def is_image(self):
143         return self._infos.get('is-image', False)
144
145     def is_live(self):
146         return self._infos.get('is-live', False)
147
148     def get_num_tracks(self, track_type):
149         return self._infos.get('num-%s-tracks' % track_type,
150                                self._pipeline_desc.count(track_type + "sink"))
151
152     def can_play_reverse(self):
153         return self._infos.get('plays-reverse', False)
154
155
156 class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator):
157
158     def __init__(self, name, test_manager, pipeline_template=None,
159                  pipelines_descriptions=None, valid_scenarios=[]):
160         """
161         @name: The name of the generator
162         @pipeline_template: A template pipeline to be used to generate actual pipelines
163         @pipelines_descriptions: A list of tuple of the form:
164                                  (test_name, pipeline_description, extra_data)
165                                  extra_data being a dictionnary with the follwing keys:
166                                     'scenarios': ["the", "valide", "scenarios", "names"]
167                                     'duration': the_duration # in seconds
168                                     'timeout': a_timeout # in seconds
169                                     'hard_timeout': a_hard_timeout # in seconds
170
171         @valid_scenarios: A list of scenario name that can be used with that generator
172         """
173         GstValidateTestsGenerator.__init__(self, name, test_manager)
174         self._pipeline_template = pipeline_template
175         self._pipelines_descriptions = []
176         for description in pipelines_descriptions or []:
177             if not isinstance(description, dict):
178                 desc_dict = {"name": description[0],
179                      "pipeline": description[1]}
180                 if len(description) >= 3:
181                     desc_dict["extra_data"] = description[2]
182                 self._pipelines_descriptions.append(desc_dict)
183             else:
184                 self._pipelines_descriptions.append(description)
185         self._valid_scenarios = valid_scenarios
186
187     @classmethod
188     def from_json(self, test_manager, json_file):
189         with open(json_file, 'r') as f:
190             descriptions = json.load(f)
191
192         name = os.path.basename(json_file).replace('.json', '')
193         pipelines_descriptions = []
194         for test_name, defs in descriptions.items():
195             tests_definition = {'name': test_name, 'pipeline': defs['pipeline']}
196             scenarios = []
197             for scenario in defs['scenarios']:
198                 if isinstance(scenario, str):
199                     scenarios.append(scenario)
200                 else:
201                     scenario_name = scenario_file = scenario['name']
202                     actions = scenario.get('actions')
203                     if actions:
204                         scenario_dir = os.path.join(
205                             test_manager.options.privatedir, name, test_name)
206                         scenario_file = os.path.join(
207                             scenario_dir, scenario_name + '.scenario')
208                         os.makedirs(scenario_dir, exist_ok=True)
209                         with open(scenario_file, 'w') as f:
210                             f.write('\n'.join(actions) + '\n')
211                     scenarios.append(scenario_file)
212             tests_definition['extra_data'] = {'scenarios': scenarios}
213             tests_definition['pipeline_data'] = {"config_path": os.path.dirname(json_file)}
214             pipelines_descriptions.append(tests_definition)
215
216         return GstValidatePipelineTestsGenerator(name, test_manager, pipelines_descriptions=pipelines_descriptions)
217
218     def get_fname(self, scenario, protocol=None, name=None):
219         if name is None:
220             name = self.name
221
222         if protocol is not None:
223             protocol_str = "%s." % protocol
224         else:
225             protocol_str = ""
226
227         if scenario is not None and scenario.name.lower() != "none":
228             return "%s%s.%s" % (protocol_str, name, scenario.name)
229
230         return ("%s.%s.%s" % (protocol_str, self.name, name)).replace("..", ".")
231
232     def generate_tests(self, uri_minfo_special_scenarios, scenarios):
233         if self._valid_scenarios is None:
234             scenarios = [None]
235         elif self._valid_scenarios:
236             scenarios = [scenario for scenario in scenarios if
237                          scenario is not None and scenario.name in self._valid_scenarios]
238
239         return super(GstValidatePipelineTestsGenerator, self).generate_tests(
240             uri_minfo_special_scenarios, scenarios)
241
242     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
243         for description in self._pipelines_descriptions:
244             pipeline = description['pipeline']
245             extra_data = description.get('extra_data', {})
246             pipeline_data = description.get('pipeline_data', {})
247
248             for scenario in extra_data.get('scenarios', scenarios):
249                 if isinstance(scenario, str):
250                     scenario = self.test_manager.scenarios_manager.get_scenario(
251                         scenario)
252
253                 mediainfo = FakeMediaDescriptor(extra_data, pipeline)
254                 if not mediainfo.is_compatible(scenario):
255                     continue
256
257                 if self.test_manager.options.mute:
258                     needs_clock = scenario.needs_clock_sync()
259                     audiosink = self.get_fakesink_for_media_type(
260                         "audio", needs_clock)
261                     videosink = self.get_fakesink_for_media_type(
262                         "video", needs_clock)
263                 else:
264                     audiosink = 'autoaudiosink'
265                     videosink = 'autovideosink'
266
267                 pipeline_data.update({'videosink': videosink, 'audiosink': audiosink})
268                 pipeline_desc = pipeline % pipeline_data
269
270                 fname = self.get_fname(
271                     scenario, protocol=mediainfo.get_protocol(), name=description["name"])
272
273                 expected_failures = extra_data.get("expected-failures")
274                 extra_env_vars = extra_data.get("extra_env_vars")
275                 self.add_test(GstValidateLaunchTest(fname,
276                                                     self.test_manager.options,
277                                                     self.test_manager.reporter,
278                                                     pipeline_desc,
279                                                     scenario=scenario,
280                                                     media_descriptor=mediainfo,
281                                                     expected_failures=expected_failures,
282                                                     extra_env_variables=extra_env_vars)
283                               )
284
285
286 class GstValidatePlaybinTestsGenerator(GstValidatePipelineTestsGenerator):
287
288     def __init__(self, test_manager):
289         if os.getenv("USE_PLAYBIN3") is None:
290             GstValidatePipelineTestsGenerator.__init__(
291                 self, "playback", test_manager, "playbin")
292         else:
293             GstValidatePipelineTestsGenerator.__init__(
294                 self, "playback", test_manager, "playbin3")
295
296     def _set_sinks(self, minfo, pipe_str, scenario):
297         if self.test_manager.options.mute:
298             needs_clock = scenario.needs_clock_sync() or minfo.media_descriptor.need_clock_sync()
299
300             afakesink = self.get_fakesink_for_media_type("audio", needs_clock)
301             vfakesink = self.get_fakesink_for_media_type("video", needs_clock)
302             pipe_str += " audio-sink='%s' video-sink='%s'" % (
303                 afakesink, vfakesink)
304
305         return pipe_str
306
307     def _get_name(self, scenario, protocol, minfo):
308         return "%s.%s" % (self.get_fname(scenario,
309                                          protocol),
310                           os.path.basename(minfo.media_descriptor.get_clean_name()))
311
312     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
313         test_rtsp = GstValidateBaseTestManager.RTSP_SERVER_COMMAND
314         if not test_rtsp:
315             printc("\n\nRTSP server not available, you should make sure"
316                    " that %s is available in your $PATH." % GstValidateBaseTestManager.RTSP_SERVER_COMMAND,
317                    Colors.FAIL)
318         elif self.test_manager.options.disable_rtsp:
319             printc("\n\nRTSP tests are disabled")
320             test_rtsp = False
321
322         for uri, minfo, special_scenarios in uri_minfo_special_scenarios:
323             pipe = self._pipeline_template
324             protocol = minfo.media_descriptor.get_protocol()
325
326             if protocol == Protocols.RTSP:
327                 self.debug("SKIPPING %s as it is a RTSP stream" % uri)
328                 continue
329
330             pipe += " uri=%s" % uri
331
332             for scenario in special_scenarios + scenarios:
333                 cpipe = pipe
334                 if not minfo.media_descriptor.is_compatible(scenario):
335                     continue
336
337                 cpipe = self._set_sinks(minfo, cpipe, scenario)
338                 fname = self._get_name(scenario, protocol, minfo)
339
340                 self.debug("Adding: %s", fname)
341
342                 if scenario.does_reverse_playback() and protocol == Protocols.HTTP:
343                     # 10MB so we can reverse playback
344                     cpipe += " ring-buffer-max-size=10485760"
345
346                 self.add_test(GstValidateLaunchTest(fname,
347                                                     self.test_manager.options,
348                                                     self.test_manager.reporter,
349                                                     cpipe,
350                                                     scenario=scenario,
351                                                     media_descriptor=minfo.media_descriptor)
352                               )
353
354                 if test_rtsp and protocol == Protocols.FILE and not minfo.media_descriptor.is_image():
355                     rtspminfo = NamedDic({"path": minfo.media_descriptor.get_path(),
356                                           "media_descriptor": GstValidateRTSPMediaDesciptor(minfo.media_descriptor.get_path())})
357                     if not rtspminfo.media_descriptor.is_compatible(scenario):
358                         continue
359
360                     cpipe = self._set_sinks(rtspminfo, "%s uri=rtsp://127.0.0.1:<RTSPPORTNUMBER>/test"
361                                             % self._pipeline_template, scenario)
362                     fname = self._get_name(scenario, Protocols.RTSP, rtspminfo)
363
364                     self.add_test(GstValidateRTSPTest(
365                         fname, self.test_manager.options, self.test_manager.reporter,
366                         cpipe, uri, scenario=scenario,
367                         media_descriptor=rtspminfo.media_descriptor))
368
369                     fname = self._get_name(scenario, Protocols.RTSP + '2', rtspminfo)
370                     self.add_test(GstValidateRTSPTest(
371                         fname, self.test_manager.options, self.test_manager.reporter,
372                         cpipe, uri, scenario=scenario,
373                         media_descriptor=rtspminfo.media_descriptor,
374                         rtsp2=True))
375
376
377 class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator):
378
379     def __init__(self, name, test_manager, mixer, media_type, converter="",
380                  num_sources=3, mixed_srcs={}, valid_scenarios=[]):
381         pipe_template = "%(mixer)s name=_mixer !  " + \
382             converter + " ! %(sink)s "
383         self.converter = converter
384         self.mixer = mixer
385         self.media_type = media_type
386         self.num_sources = num_sources
387         self.mixed_srcs = mixed_srcs
388         super(
389             GstValidateMixerTestsGenerator, self).__init__(name, test_manager, pipe_template,
390                                                            valid_scenarios=valid_scenarios)
391
392     def populate_tests(self, uri_minfo_special_scenarios, scenarios):
393         if self.test_manager.options.validate_uris:
394             return
395
396         wanted_ressources = []
397         for uri, minfo, special_scenarios in uri_minfo_special_scenarios:
398             protocol = minfo.media_descriptor.get_protocol()
399             if protocol == Protocols.FILE and \
400                     minfo.media_descriptor.get_num_tracks(self.media_type) > 0:
401                 wanted_ressources.append((uri, minfo))
402
403         if not self.mixed_srcs:
404             if not wanted_ressources:
405                 return
406
407             for i in range(len(uri_minfo_special_scenarios) / self.num_sources):
408                 srcs = []
409                 name = ""
410                 for nsource in range(self.num_sources):
411                     uri, minfo = wanted_ressources[i + nsource]
412                     if os.getenv("USE_PLAYBIN3") is None:
413                         srcs.append(
414                             "uridecodebin uri=%s ! %s" % (uri, self.converter))
415                     else:
416                         srcs.append(
417                             "uridecodebin3 uri=%s ! %s" % (uri, self.converter))
418                     fname = os.path.basename(uri).replace(".", "_")
419                     if not name:
420                         name = fname
421                     else:
422                         name += "+%s" % fname
423
424                 self.mixed_srcs[name] = tuple(srcs)
425
426         for name, srcs in self.mixed_srcs.items():
427             if isinstance(srcs, dict):
428                 pipe_arguments = {
429                     "mixer": self.mixer + " %s" % srcs["mixer_props"]}
430                 srcs = srcs["sources"]
431             else:
432                 pipe_arguments = {"mixer": self.mixer}
433
434             for scenario in scenarios:
435                 fname = self.get_fname(scenario, Protocols.FILE) + "."
436                 fname += name
437
438                 self.debug("Adding: %s", fname)
439
440                 if self.test_manager.options.mute:
441                     pipe_arguments["sink"] = self.get_fakesink_for_media_type(self.media_type,
442                                                                               scenario.needs_clock_sync())
443                 else:
444                     pipe_arguments["sink"] = "auto%ssink" % self.media_type
445
446                 pipe = self._pipeline_template % pipe_arguments
447
448                 for src in srcs:
449                     pipe += "%s ! _mixer. " % src
450
451                 self.add_test(GstValidateLaunchTest(fname,
452                                                     self.test_manager.options,
453                                                     self.test_manager.reporter,
454                                                     pipe,
455                                                     scenario=scenario)
456                               )
457
458
459 class GstValidateLaunchTest(GstValidateTest):
460
461     def __init__(self, classname, options, reporter, pipeline_desc,
462                  timeout=DEFAULT_TIMEOUT, scenario=None,
463                  media_descriptor=None, duration=0, hard_timeout=None,
464                  extra_env_variables=None, expected_failures=None):
465
466         extra_env_variables = extra_env_variables or {}
467
468         if scenario:
469             duration = scenario.get_duration()
470         elif media_descriptor:
471             duration = media_descriptor.get_duration() / GST_SECOND
472
473         super(
474             GstValidateLaunchTest, self).__init__(GstValidateBaseTestManager.COMMAND,
475                                                   classname,
476                                                   options, reporter,
477                                                   duration=duration,
478                                                   scenario=scenario,
479                                                   timeout=timeout,
480                                                   hard_timeout=hard_timeout,
481                                                   media_descriptor=media_descriptor,
482                                                   extra_env_variables=extra_env_variables,
483                                                   expected_failures=expected_failures)
484
485         self.pipeline_desc = pipeline_desc
486         self.media_descriptor = media_descriptor
487
488     def build_arguments(self):
489         GstValidateTest.build_arguments(self)
490         self.add_arguments(*shlex.split(self.pipeline_desc))
491         if self.media_descriptor is not None and self.media_descriptor.get_path():
492             self.add_arguments(
493                 "--set-media-info", self.media_descriptor.get_path())
494
495
496 class GstValidateMediaCheckTest(GstValidateTest):
497
498     def __init__(self, classname, options, reporter, media_descriptor,
499                  uri, minfo_path, timeout=DEFAULT_TIMEOUT,
500                  extra_env_variables=None,
501                  expected_failures=None):
502         extra_env_variables = extra_env_variables or {}
503
504         super(
505             GstValidateMediaCheckTest, self).__init__(GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, classname,
506                                                       options, reporter,
507                                                       timeout=timeout,
508                                                       media_descriptor=media_descriptor,
509                                                       extra_env_variables=extra_env_variables,
510                                                       expected_failures=expected_failures)
511         self._uri = uri
512         self._media_info_path = minfo_path
513
514     def build_arguments(self):
515         Test.build_arguments(self)
516         self.add_arguments(self._uri, "--expected-results",
517                            self._media_info_path)
518
519         if self.media_descriptor.skip_parsers():
520             self.add_arguments("--skip-parsers")
521
522
523 class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterface):
524     scenarios_manager = ScenarioManager()
525
526     def __init__(self, classname, options, reporter,
527                  combination, uri, media_descriptor,
528                  timeout=DEFAULT_TIMEOUT,
529                  scenario=None,
530                  extra_env_variables=None,
531                  expected_failures=None):
532         Loggable.__init__(self)
533
534         extra_env_variables = extra_env_variables or {}
535
536         file_dur = int(media_descriptor.get_duration()) / GST_SECOND
537         if not media_descriptor.get_num_tracks("video"):
538             self.debug("%s audio only file applying transcoding ratio."
539                        "File 'duration' : %s" % (classname, file_dur))
540             duration = file_dur / AUDIO_ONLY_FILE_TRANSCODING_RATIO
541         else:
542             duration = file_dur
543
544         super(
545             GstValidateTranscodingTest, self).__init__(GstValidateBaseTestManager.TRANSCODING_COMMAND,
546                                                        classname,
547                                                        options,
548                                                        reporter,
549                                                        duration=duration,
550                                                        timeout=timeout,
551                                                        scenario=scenario,
552                                                        media_descriptor=media_descriptor,
553                                                        extra_env_variables=None,
554                                                        expected_failures=expected_failures)
555         extra_env_variables = extra_env_variables or {}
556
557         GstValidateEncodingTestInterface.__init__(
558             self, combination, media_descriptor)
559
560         self.uri = uri
561
562     def run_external_checks(self):
563         if self.media_descriptor.get_num_tracks("video") == 1 and \
564                 self.options.validate_enable_iqa_tests:
565             self.run_iqa_test(self.uri)
566
567     def set_rendering_info(self):
568         self.dest_file = os.path.join(self.options.dest,
569                                       self.classname.replace(".transcode.", os.sep).
570                                       replace(".", os.sep))
571         mkdir(os.path.dirname(urllib.parse.urlsplit(self.dest_file).path))
572         if urllib.parse.urlparse(self.dest_file).scheme == "":
573             self.dest_file = path2url(self.dest_file)
574
575         profile = self.get_profile()
576         self.add_arguments("-o", profile)
577
578     def build_arguments(self):
579         GstValidateTest.build_arguments(self)
580         self.set_rendering_info()
581         self.add_arguments(self.uri, self.dest_file)
582
583     def get_current_value(self):
584         if self.scenario:
585             sent_eos = self.sent_eos_position()
586             if sent_eos is not None:
587                 t = time.time()
588                 if ((t - sent_eos)) > 30:
589                     if self.media_descriptor.get_protocol() == Protocols.HLS:
590                         self.set_result(Result.PASSED,
591                                         """Got no EOS 30 seconds after sending EOS,
592                                         in HLS known and tolerated issue:
593                                         https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
594                         return Result.KNOWN_ERROR
595
596                     self.set_result(
597                         Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
598
599                     return Result.FAILED
600
601         size = self.get_current_size()
602         if size is None:
603             return self.get_current_position()
604
605         return size
606
607     def check_results(self):
608         if self.result in [Result.FAILED, Result.TIMEOUT] or \
609                 self.process.returncode != 0:
610             GstValidateTest.check_results(self)
611             return
612
613         res, msg = self.check_encoded_file()
614         self.set_result(res, msg)
615
616
617 class GstValidateBaseRTSPTest:
618     """ Interface for RTSP tests, requires implementing Test"""
619     __used_ports = set()
620
621     def __init__(self, local_uri):
622         self._local_uri = local_uri
623         self.rtsp_server = None
624         self._unsetport_pipeline_desc = None
625         self.optional = True
626
627     @classmethod
628     def __get_open_port(cls):
629         while True:
630             # hackish trick from
631             # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python?answertab=votes#tab-top
632             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
633             s.bind(("", 0))
634             port = s.getsockname()[1]
635             if port not in cls.__used_ports:
636                 cls.__used_ports.add(port)
637                 s.close()
638                 return port
639
640             s.close()
641
642     def launch_server(self):
643         if self.options.redirect_logs == 'stdout':
644             self.rtspserver_logs = sys.stdout
645         elif self.options.redirect_logs == 'stderr':
646             self.rtspserver_logs = sys.stderr
647
648         self.server_port = self.__get_open_port()
649         command = [GstValidateBaseTestManager.RTSP_SERVER_COMMAND, self._local_uri, '--port', str(self.server_port)]
650
651         if self.options.validate_gdb_server:
652             command = self.use_gdb(command)
653             self.rtspserver_logs = sys.stdout
654         elif self.options.redirect_logs:
655             self.rtspserver_logs = sys.stdout
656         else:
657             self.rtspserver_logs = open(self.logfile + '_rtspserver.log', 'w+')
658             self.extra_logfiles.append(self.rtspserver_logs.name)
659
660         server_env = os.environ.copy()
661
662         self.rtsp_server = subprocess.Popen(command,
663                                             stderr=self.rtspserver_logs,
664                                             stdout=self.rtspserver_logs,
665                                             env=server_env)
666         while True:
667             s = socket.socket()
668             try:
669                 s.connect((("127.0.0.1", self.server_port)))
670                 break
671             except ConnectionRefusedError:
672                 time.sleep(0.1)
673                 continue
674             finally:
675                 s.close()
676
677         if not self._unsetport_pipeline_desc:
678             self._unsetport_pipeline_desc = self.pipeline_desc
679
680         self.pipeline_desc = self._unsetport_pipeline_desc.replace(
681             "<RTSPPORTNUMBER>", str(self.server_port))
682
683         return ' '.join(command)
684
685     def close_logfile(self):
686         super().close_logfile()
687         if not self.options.redirect_logs:
688             self.rtspserver_logs.close()
689
690     def process_update(self):
691         res = super().process_update()
692         if res:
693             kill_subprocess(self, self.rtsp_server, DEFAULT_TIMEOUT)
694             self.__used_ports.remove(self.server_port)
695
696         return res
697
698
699 class GstValidateRTSPTest(GstValidateBaseRTSPTest, GstValidateLaunchTest):
700
701     def __init__(self, classname, options, reporter, pipeline_desc,
702                  local_uri, timeout=DEFAULT_TIMEOUT, scenario=None,
703                  media_descriptor=None, rtsp2=False):
704         GstValidateLaunchTest.__init__(self, classname, options, reporter,
705                                        pipeline_desc, timeout, scenario,
706                                        media_descriptor)
707         GstValidateBaseRTSPTest.__init__(self, local_uri)
708         self.rtsp2 = rtsp2
709
710     def get_subproc_env(self):
711         env = super().get_subproc_env()
712         path = env.get('GST_VALIDATE_SCENARIOS_PATH', '')
713         override_dir = get_data_file(os.path.join('data', 'scenarios'), 'rtsp_overrides')
714         env['GST_VALIDATE_SCENARIOS_PATH'] = '%s:%s' % (override_dir, path)
715         if self.rtsp2:
716             env['GST_VALIDATE_SCENARIO'] = env.get('GST_VALIDATE_SCENARIO', '') + ':' + 'force_rtsp2'
717
718         return env
719
720
721 class GstValidateRTSPMediaDesciptor(GstValidateMediaDescriptor):
722
723     def __init__(self, xml_path):
724         GstValidateMediaDescriptor.__init__(self, xml_path)
725
726     def get_uri(self):
727         return "rtsp://127.0.0.1:8554/test"
728
729     def get_protocol(self):
730         return Protocols.RTSP
731
732     def prerrols(self):
733         return False
734
735
736 class GstValidateTestManager(GstValidateBaseTestManager):
737
738     name = "validate"
739
740     # List of all classes to create testsuites
741     GstValidateMediaCheckTestsGenerator = GstValidateMediaCheckTestsGenerator
742     GstValidateTranscodingTestsGenerator = GstValidateTranscodingTestsGenerator
743     GstValidatePipelineTestsGenerator = GstValidatePipelineTestsGenerator
744     GstValidatePlaybinTestsGenerator = GstValidatePlaybinTestsGenerator
745     GstValidateMixerTestsGenerator = GstValidateMixerTestsGenerator
746     GstValidateLaunchTest = GstValidateLaunchTest
747     GstValidateMediaCheckTest = GstValidateMediaCheckTest
748     GstValidateTranscodingTest = GstValidateTranscodingTest
749
750     def __init__(self):
751         super(GstValidateTestManager, self).__init__()
752         self._uris = []
753         self._run_defaults = True
754         self._is_populated = False
755         self._default_generators_registered = False
756
757     def init(self):
758         for command, name in [
759                 (GstValidateBaseTestManager.TRANSCODING_COMMAND, "gst-validate-transcoding-1.0"),
760                 (GstValidateBaseTestManager.COMMAND, "gst-validate-1.0"),
761                 (GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, "gst-validate-media-check-1.0")]:
762             if not command:
763                 self.error("command not found: %s" % name)
764                 return False
765
766         return True
767
768     def add_options(self, parser):
769         group = parser.add_argument_group("GstValidate tools specific options"
770                                           " and behaviours",
771                                           description="""When using --wanted-tests, all the scenarios can be used, even those which have
772 not been tested and explicitely activated if you set use --wanted-tests ALL""")
773         group.add_argument("--validate-check-uri", dest="validate_uris",
774                            action="append", help="defines the uris to run default tests on")
775         group.add_argument("--validate-tools-path", dest="validate_tools_path",
776                            action="append", help="defines the paths to look for GstValidate tools.")
777         group.add_argument("--validate-gdb-server", dest="validate_gdb_server",
778                            help="Run the server in GDB.")
779         group.add_argument("--validate-disable-rtsp", dest="disable_rtsp",
780                            help="Disable RTSP tests.")
781         group.add_argument("--validate-enable-iqa-tests", dest="validate_enable_iqa_tests",
782                            help="Enable Image Quality Assessment validation tests.",
783                            default=False, action='store_true')
784
785     def print_valgrind_bugs(self):
786         # Look for all the 'pending' bugs in our supp file
787         bugs = []
788         p = get_data_file('data', 'gstvalidate.supp')
789         with open(p) as f:
790             for line in f.readlines():
791                 line = line.strip()
792                 if line.startswith('# PENDING:'):
793                     tmp = line.split(' ')
794                     bugs.append(tmp[2])
795
796         if bugs:
797             msg = "Ignored valgrind bugs:\n"
798             for b in bugs:
799                 msg += "  + %s\n" % b
800             printc(msg, Colors.FAIL, True)
801
802     def populate_testsuite(self):
803
804         if self._is_populated is True:
805             return
806
807         if not self.options.config and not self.options.testsuites:
808             if self._run_defaults:
809                 self.register_defaults()
810             else:
811                 self.register_all()
812
813         self._is_populated = True
814
815     def list_tests(self):
816         if self.tests:
817             return self.tests
818
819         if self._run_defaults:
820             scenarios = [self.scenarios_manager.get_scenario(scenario_name)
821                          for scenario_name in self.get_scenarios()]
822         else:
823             scenarios = self.scenarios_manager.get_scenario(None)
824         uris = self._list_uris()
825
826         for generator in self.get_generators():
827             for test in generator.generate_tests(uris, scenarios):
828                 self.add_test(test)
829
830         if not self.tests and not uris and not self.options.wanted_tests:
831             printc(
832                 "No valid uris present in the path. Check if media files and info files exist", Colors.FAIL)
833
834         return self.tests
835
836     def _add_media(self, media_info, uri=None):
837         self.debug("Checking %s", media_info)
838         if isinstance(media_info, GstValidateMediaDescriptor):
839             media_descriptor = media_info
840             media_info = media_descriptor.get_path()
841         else:
842             media_descriptor = GstValidateMediaDescriptor(media_info)
843
844         try:
845             # Just testing that the vairous mandatory infos are present
846             caps = media_descriptor.get_caps()
847             if uri is None:
848                 uri = media_descriptor.get_uri()
849
850             # Adjust local http uri
851             if self.options.http_server_port != 8079 and \
852                uri.startswith("http://127.0.0.1:8079/"):
853                 uri = uri.replace("http://127.0.0.1:8079/",
854                                   "http://127.0.0.1:%r/" % self.options.http_server_port, 1)
855             media_descriptor.set_protocol(urllib.parse.urlparse(uri).scheme)
856             for caps2, prot in GST_VALIDATE_CAPS_TO_PROTOCOL:
857                 if caps2 == caps:
858                     media_descriptor.set_protocol(prot)
859                     break
860
861             scenario_bname = media_descriptor.get_media_filepath()
862             special_scenarios = self.scenarios_manager.find_special_scenarios(
863                 scenario_bname)
864             self._uris.append((uri,
865                                NamedDic({"path": media_info,
866                                          "media_descriptor": media_descriptor}),
867                                special_scenarios))
868         except configparser.NoOptionError as e:
869             self.debug("Exception: %s for %s", e, media_info)
870
871     def _discover_file(self, uri, fpath):
872         for ext in (GstValidateMediaDescriptor.MEDIA_INFO_EXT,
873                 GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
874             try:
875                 is_push = False
876                 media_info = "%s.%s" % (fpath, ext)
877                 if ext == GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT:
878                     if not os.path.exists(media_info):
879                         continue
880                     is_push = True
881                     uri = "push" + uri
882                 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
883
884                 args.append(uri)
885                 if os.path.isfile(media_info) and not self.options.update_media_info:
886                     self._add_media(media_info, uri)
887                     continue
888                 elif fpath.endswith(GstValidateMediaDescriptor.STREAM_INFO_EXT):
889                     self._add_media(fpath)
890                     continue
891                 elif not self.options.generate_info and not self.options.update_media_info and not self.options.validate_uris:
892                     continue
893                 elif self.options.update_media_info and not os.path.isfile(media_info):
894                     self.info(
895                         "%s not present. Use --generate-media-info", media_info)
896                     continue
897                 elif os.path.islink(media_info):
898                     self.info(
899                         "%s is a symlink, not updating and hopefully the actual file gets updated!", media_info)
900                     continue
901
902                 include_frames = 0
903                 if self.options.update_media_info:
904                     include_frames = 2
905                 elif self.options.generate_info_full:
906                     include_frames = 1
907
908                 media_descriptor = GstValidateMediaDescriptor.new_from_uri(
909                     uri, True, include_frames, is_push)
910                 if media_descriptor:
911                     self._add_media(media_descriptor, uri)
912                 else:
913                     self.warning("Could not get any descriptor for %s" % uri)
914
915             except subprocess.CalledProcessError as e:
916                 if self.options.generate_info:
917                     printc("Result: Failed", Colors.FAIL)
918                 else:
919                     self.error("Exception: %s", e)
920                 return False
921         return True
922
923     def _list_uris(self):
924         if self._uris:
925             return self._uris
926
927         if self.options.validate_uris:
928             for uri in self.options.validate_uris:
929                 self._discover_file(uri, uri)
930             return self._uris
931
932         if not self.args:
933             if isinstance(self.options.paths, str):
934                 self.options.paths = [os.path.join(self.options.paths)]
935
936             for path in self.options.paths:
937                 if os.path.isfile(path):
938                     path = os.path.abspath(path)
939                     self._discover_file(path2url(path), path)
940                 else:
941                     for root, dirs, files in os.walk(path):
942                         for f in files:
943                             fpath = os.path.abspath(os.path.join(root, f))
944                             if os.path.isdir(fpath) or \
945                                     fpath.endswith(GstValidateMediaDescriptor.MEDIA_INFO_EXT) or\
946                                     fpath.endswith(ScenarioManager.FILE_EXTENSION):
947                                 continue
948                             else:
949                                 self._discover_file(path2url(fpath), fpath)
950
951         self.debug("Uris found: %s", self._uris)
952
953         return self._uris
954
955     def needs_http_server(self):
956         for test in self.list_tests():
957             if self._is_test_wanted(test) and test.media_descriptor is not None:
958                 protocol = test.media_descriptor.get_protocol()
959                 uri = test.media_descriptor.get_uri()
960
961                 if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] and \
962                         ("127.0.0.1:%s" % (
963                             self.options.http_server_port) in uri or "127.0.0.1:8079" in uri):
964                     return True
965         return False
966
967     def set_settings(self, options, args, reporter):
968         if options.wanted_tests:
969             for i in range(len(options.wanted_tests)):
970                 if "ALL" in options.wanted_tests[i]:
971                     self._run_defaults = False
972                     options.wanted_tests[
973                         i] = options.wanted_tests[i].replace("ALL", "")
974         try:
975             options.wanted_tests.remove("")
976         except ValueError:
977             pass
978
979         if options.validate_uris:
980             self.check_testslist = False
981
982         super(GstValidateTestManager, self).set_settings(
983             options, args, reporter)
984
985     def register_defaults(self):
986         """
987         Registers the defaults:
988             * Scenarios to be used
989             * Encoding formats to be used
990             * Blacklisted tests
991             * Test generators
992         """
993         self.register_default_scenarios()
994         self.register_default_encoding_formats()
995         self.register_default_blacklist()
996         self.register_default_test_generators()
997
998     def register_default_scenarios(self):
999         """
1000         Registers default test scenarios
1001         """
1002         if self.options.long_limit != 0:
1003             self.add_scenarios([
1004                 "play_15s",
1005                 "reverse_playback",
1006                 "fast_forward",
1007                 "seek_forward",
1008                 "seek_backward",
1009                 "seek_with_stop",
1010                 "switch_audio_track",
1011                 "switch_audio_track_while_paused",
1012                 "switch_subtitle_track",
1013                 "switch_subtitle_track_while_paused",
1014                 "disable_subtitle_track_while_paused",
1015                 "change_state_intensive",
1016                 "scrub_forward_seeking"])
1017         else:
1018             self.add_scenarios([
1019                 "play_15s",
1020                 "reverse_playback",
1021                 "fast_forward",
1022                 "seek_forward",
1023                 "seek_backward",
1024                 "seek_with_stop",
1025                 "switch_audio_track",
1026                 "switch_audio_track_while_paused",
1027                 "switch_subtitle_track",
1028                 "switch_subtitle_track_while_paused",
1029                 "disable_subtitle_track_while_paused",
1030                 "change_state_intensive",
1031                 "scrub_forward_seeking"])
1032
1033     def register_default_encoding_formats(self):
1034         """
1035         Registers default encoding formats
1036         """
1037         self.add_encoding_formats([
1038             MediaFormatCombination("ogg", "vorbis", "theora"),
1039             MediaFormatCombination("webm", "vorbis", "vp8"),
1040             MediaFormatCombination("mp4", "mp3", "h264"),
1041             MediaFormatCombination("mkv", "vorbis", "h264"),
1042         ])
1043
1044     def register_default_blacklist(self):
1045         self.set_default_blacklist([
1046             # hls known issues
1047             # ("hls.playback.seek_with_stop.*",
1048             #  "https://bugzilla.gnome.org/show_bug.cgi?id=753689"),
1049
1050             # testbin known issues
1051             ("testbin.media_check.*",
1052              "Not supported by GstDiscoverer."),
1053
1054             # dash known issues
1055             ("dash.media_check.*",
1056              "Caps are different depending on selected bitrates, etc"),
1057
1058             # Matroska/WEBM known issues:
1059             ("*.reverse_playback.*webm$",
1060              "https://bugzilla.gnome.org/show_bug.cgi?id=679250"),
1061             ("*.reverse_playback.*mkv$",
1062              "https://bugzilla.gnome.org/show_bug.cgi?id=679250"),
1063             ("http.playback.seek_with_stop.*webm",
1064              "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"),
1065             ("http.playback.seek_with_stop.*mkv",
1066              "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"),
1067
1068             # MPEG TS known issues:
1069             ('(?i)*playback.reverse_playback.*(?:_|.)(?:|m)ts$',
1070              "https://bugzilla.gnome.org/show_bug.cgi?id=702595"),
1071
1072             # Fragmented MP4 disabled tests:
1073             ('*.playback..*seek.*.fragmented_nonseekable_sink_mp4',
1074              "Seeking on fragmented files without indexes isn't implemented"),
1075             ('*.playback.reverse_playback.fragmented_nonseekable_sink_mp4',
1076              "Seeking on fragmented files without indexes isn't implemented"),
1077
1078             # HTTP known issues:
1079             ("http.*scrub_forward_seeking.*",
1080              "This is not stable enough for now."),
1081             ("http.playback.change_state_intensive.raw_video_mov",
1082              "This is not stable enough for now. (flow return from pad push doesn't match expected value)"),
1083
1084             # MXF known issues"
1085             ("*reverse_playback.*mxf",
1086              "Reverse playback is not handled in MXF"),
1087             ("file\.transcode.*mxf",
1088              "FIXME: Transcoding and mixing tests need to be tested"),
1089
1090             # WMV known issues"
1091             ("*reverse_playback.*wmv",
1092              "Reverse playback is not handled in wmv"),
1093             (".*reverse_playback.*asf",
1094              "Reverse playback is not handled in asf"),
1095
1096             # ogg known issues
1097             ("http.playback.seek.*vorbis_theora_1_ogg",
1098              "https://bugzilla.gnome.org/show_bug.cgi?id=769545"),
1099             # RTSP known issues
1100             ('rtsp.*playback.reverse.*',
1101              'https://bugzilla.gnome.org/show_bug.cgi?id=626811'),
1102             ('rtsp.*playback.seek_with_stop.*',
1103              'https://bugzilla.gnome.org/show_bug.cgi?id=784298'),
1104             ('rtsp.*playback.fast_*',
1105              'https://bugzilla.gnome.org/show_bug.cgi?id=754575'),
1106         ])
1107
1108     def register_default_test_generators(self):
1109         """
1110         Registers default test generators
1111         """
1112         if self._default_generators_registered:
1113             return
1114
1115         self.add_generators([GstValidatePlaybinTestsGenerator(self),
1116                              GstValidateMediaCheckTestsGenerator(self),
1117                              GstValidateTranscodingTestsGenerator(self)])
1118         self._default_generators_registered = True