validate: launcher: Add some missing env variables in command to launch test
[platform/upstream/gstreamer.git] / validate / launcher / apps / gstcheck.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016,Thibault Saunier <thibault.saunier@osg.samsung.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19 import argparse
20 import os
21 import re
22 import pickle
23 import platform
24 import shutil
25 import threading
26 import concurrent.futures as conc
27
28 from launcher import config
29 from launcher.utils import printc, Colors
30
31
32 class MesonTest(Test):
33
34     def __init__(self, name, options, reporter, test, child_env=None):
35         ref_env = os.environ.copy()
36         if child_env is None:
37             child_env = {}
38         else:
39             ref_env.update(child_env)
40
41         if not isinstance(test.env, dict):
42             test.env = test.env.get_env(ref_env)
43         child_env.update(test.env)
44         if len(test.extra_paths) > 0:
45             child_env['PATH'] = child_env['PATH'] + \
46                 ';'.join([''] + test.extra_paths)
47         self.child_env = child_env
48
49         timeout = int(child_env.pop('CK_DEFAULT_TIMEOUT', test.timeout))
50
51         Test.__init__(self, test.fname[0], name, options,
52                       reporter, timeout=timeout, hard_timeout=timeout,
53                       is_parallel=test.is_parallel)
54
55         self.mesontest = test
56
57     def build_arguments(self):
58         self.add_arguments(*self.mesontest.fname[1:])
59         self.add_arguments(*self.mesontest.cmd_args)
60
61     def get_subproc_env(self):
62         env = os.environ.copy()
63         env.update(self.child_env)
64         # No reason to fork since we are launching
65         # each test individually
66         env['CK_FORK'] = 'no'
67         self.add_env_variable('CK_FORK', 'no')
68         for var, val in self.child_env.items():
69             self.add_env_variable(var, val)
70
71         return env
72
73
74 class MesonTestsManager(TestsManager):
75     name = "mesontest"
76     arggroup = None
77
78     def __init__(self):
79         super().__init__()
80         self.rebuilt = None
81
82     def add_options(self, parser):
83         if self.arggroup:
84             return
85
86         arggroup = MesonTestsManager.arggroup = parser.add_argument_group(
87             "meson tests specific options and behaviours")
88         arggroup.add_argument("--meson-build-dir",
89                             action="append",
90                             dest='meson_build_dirs',
91                             default=[],
92                             help="defines the paths to look for GstValidate tools.")
93         arggroup.add_argument("--meson-no-rebuild",
94                             action="store_true",
95                             default=False,
96                             help="Whether to avoid to rebuild tests before running them.")
97
98     def get_meson_tests(self):
99         if not self.options.meson_build_dirs:
100             self.options.meson_build_dirs = [config.BUILDDIR]
101         mesontests = []
102         for i, bdir in enumerate(self.options.meson_build_dirs):
103             bdir = os.path.abspath(bdir)
104             datafile = os.path.join(
105                 bdir, 'meson-private/meson_test_setup.dat')
106
107             if not os.path.isfile(datafile):
108                 self.error("%s does not exists, can't use meson test launcher",
109                            datafile)
110                 continue
111
112             with open(datafile, 'rb') as f:
113                 tests = pickle.load(f)
114                 mesontests.extend(tests)
115
116         return mesontests
117
118     def rebuild(self, all=False):
119         if not self.options.meson_build_dirs:
120             self.options.meson_build_dirs = [config.BUILDDIR]
121         if self.options.meson_no_rebuild:
122             return True
123
124         if self.rebuilt is not None:
125             return self.rebuilt
126
127         for bdir in self.options.meson_build_dirs:
128             if not os.path.isfile(os.path.join(bdir, 'build.ninja')):
129                 printc("Only ninja backend is supported to rebuilt tests before running them.\n",
130                        Colors.OKBLUE)
131                 self.rebuilt = True
132                 return True
133
134             ninja = shutil.which('ninja')
135             if not ninja:
136                 ninja = shutil.which('ninja-build')
137             if not ninja:
138                 printc("Can't find ninja, can't rebuild test.\n", Colors.FAIL)
139                 self.rebuilt = False
140                 return False
141
142             print("-> Rebuilding %s.\n" % bdir)
143             try:
144                 subprocess.check_call([ninja, '-C', bdir])
145             except subprocess.CalledProcessError:
146                 self.rebuilt = False
147                 return False
148
149         self.rebuilt = True
150         return True
151
152     def run_tests(self, starting_test_num, total_num_tests):
153         if not self.rebuild():
154             self.error("Rebuilding FAILED!")
155             return Result.FAILED
156
157         return TestsManager.run_tests(self, starting_test_num, total_num_tests)
158
159     def get_test_name(self, test):
160         name = test.name.replace('/', '.')
161         if test.suite:
162             name = '.'.join(test.suite) + '.' + name
163
164         name = self.name + '.' + name
165
166         return name.replace('..', '.').replace(' ', '-')
167
168     def list_tests(self):
169         if self.tests:
170             return self.tests
171
172         mesontests = self.get_meson_tests()
173         for test in mesontests:
174             self.add_test(MesonTest(self.get_test_name(test),
175                                     self.options, self.reporter, test))
176
177         return self.tests
178
179
180 class GstCheckTestsManager(MesonTestsManager):
181     name = "check"
182
183     def __init__(self):
184         MesonTestsManager.__init__(self)
185         self.tests_info = {}
186
187     def init(self):
188         return True
189
190     def check_binary_ts(self, binary):
191         try:
192             last_touched = os.stat(binary).st_mtime
193             test_info = self.tests_info.get(binary)
194             if not test_info:
195                 return last_touched, []
196             elif test_info[0] == 0:
197                 return True
198             elif test_info[0] == last_touched:
199                 return True
200         except FileNotFoundError:
201             return None
202
203         return last_touched, []
204
205     def _list_gst_check_tests(self, test, recurse=False):
206         binary = test.fname[0]
207
208         self.tests_info[binary] = self.check_binary_ts(binary)
209
210         tmpenv = os.environ.copy()
211         tmpenv['GST_DEBUG'] = "0"
212         pe = subprocess.Popen([binary, '--list-tests'],
213                               stdout=subprocess.PIPE, stderr=subprocess.PIPE,
214                               env=tmpenv)
215
216         output = pe.communicate()[0].decode()
217         if pe.returncode != 0:
218             self.debug("%s not able to list tests" % binary)
219             return
220         for t in output.split("\n"):
221             test_name = re.findall(r'(?<=^Test: )\w+$', t)
222             if len(test_name) == 1:
223                 self.tests_info[binary][1].append(test_name[0])
224
225     def load_tests_info(self):
226         dumpfile = os.path.join(self.options.privatedir, self.name + '.dat')
227         try:
228             with open(dumpfile, 'rb') as f:
229                 self.tests_info = pickle.load(f)
230         except FileNotFoundError:
231             self.tests_info = {}
232
233     def save_tests_info(self):
234         dumpfile = os.path.join(self.options.privatedir, self.name + '.dat')
235         with open(dumpfile, 'wb') as f:
236             pickle.dump(self.tests_info, f)
237
238     def add_options(self, parser):
239         super().add_options(parser)
240         arggroup = parser.add_argument_group("gstcheck specific options")
241         arggroup.add_argument("--gst-check-leak-trace-testnames",
242                             default=None,
243                             help="A regex to specifying testsnames of the test"
244                               "to run with the leak tracer activated, if 'known-not-leaky'"
245                               " is specified, the testsuite will automatically activate"
246                               " leak tracers on tests known to be not leaky.")
247         arggroup.add_argument("--gst-check-leak-options",
248                             default=None,
249                             help="Leak tracer options")
250
251     def get_child_env(self, testname, check_name=None):
252         child_env = {}
253         if check_name:
254             child_env['GST_CHECKS'] = check_name
255
256         if self.options.gst_check_leak_trace_testnames:
257             if re.findall(self.options.gst_check_leak_trace_testnames, testname):
258                 leak_tracer = "leaks"
259                 if self.options.gst_check_leak_options:
260                     leak_tracer += "(%s)" % self.options.gst_check_leak_options
261                 tracers = set(os.environ.get('GST_TRACERS', '').split(';')) | set([leak_tracer])
262                 child_env['GST_TRACERS'] = ';'.join(tracers)
263
264         return child_env
265
266     def list_tests(self):
267         if self.tests:
268             return self.tests
269
270         self.rebuild(all=True)
271         self.load_tests_info()
272         mesontests = self.get_meson_tests()
273         to_inspect = []
274         for test in mesontests:
275             binary = test.fname[0]
276             test_info = self.check_binary_ts(binary)
277             if test_info is True:
278                 continue
279             elif test_info is None:
280                 test_info = self.check_binary_ts(binary)
281                 if test_info is None:
282                     raise RuntimeError("Test binary %s does not exist"
283                                        " even after a full rebuild" % binary)
284
285             with open(binary, 'rb') as f:
286                 if b"gstcheck" not in f.read():
287                     self.tests_info[binary] = [0, []]
288                     continue
289             to_inspect.append(test)
290
291         if to_inspect:
292             executor = conc.ThreadPoolExecutor(
293                 max_workers=self.options.num_jobs)
294             tmp = []
295             for test in to_inspect:
296                 tmp.append(executor.submit(self._list_gst_check_tests, test))
297
298             for e in tmp:
299                 e.result()
300
301         for test in mesontests:
302             gst_tests = self.tests_info[test.fname[0]][1]
303             if not gst_tests:
304                 name = self.get_test_name(test)
305                 child_env = self.get_child_env(name)
306                 self.add_test(MesonTest(name, self.options, self.reporter, test,
307                                         child_env))
308             else:
309                 for ltest in gst_tests:
310                     name = self.get_test_name(test) + '.' + ltest
311                     child_env = self.get_child_env(name, ltest)
312                     self.add_test(MesonTest(name, self.options, self.reporter, test,
313                                             child_env))
314         self.save_tests_info()
315         return self.tests