de3511a71d7f834ecf47035d8c5b8bef39d3b89f
[tools/itest-core.git] / itest / case.py
1 import os
2 import sys
3 import time
4 import uuid
5 import platform
6
7 try:
8     import unittest2 as unittest
9     from unittest2 import SkipTest
10 except ImportError:
11     import unittest
12     from unittest import SkipTest
13
14 import pexpect
15 if hasattr(pexpect, 'spawnb'):  # pexpect-u-2.5
16     spawn = pexpect.spawnb
17 else:
18     spawn = pexpect.spawn
19
20 from itest.conf import settings
21 from itest.utils import now, cd, get_machine_labels
22 from itest.fixture import Fixture
23
24
25 def id_split(idstring):
26     parts = idstring.split('.')
27     if len(parts) > 1:
28         return '.'.join(parts[:-1]), parts[-1]
29     return '', idstring
30
31
32 class TimeoutError(Exception):
33     pass
34
35
36 def pcall(cmd, args=(), expecting=(), output=None,
37           eof_timeout=None, output_timeout=None, **spawn_opts):
38     '''call cmd with expecting
39     expecting: list of pairs, first is expecting string, second is send string
40     output: redirect cmd stdout and stderr to file object
41     eof_timeout: timeout for whole cmd in seconds. None means block forever
42     output_timeout: timeout if no output in seconds. Disabled by default
43     spawn_opts: keyword arguments passed to spawn call
44     '''
45     question = [pexpect.EOF, pexpect.TIMEOUT]
46     question.extend([pair[0] for pair in expecting])
47     if output_timeout:
48         question.append(r'\r|\n')
49     answer = [None]*2 + [i[1] for i in expecting]
50
51     start = time.time()
52     child = spawn(cmd, list(args), **spawn_opts)
53     if output:
54         child.logfile_read = output
55
56     timeout = output_timeout if output_timeout else eof_timeout
57     try:
58         while True:
59             if output_timeout:
60                 cost = time.time() - start
61                 if cost >= eof_timeout:
62                     msg = 'Run out of time in %s seconds!:%s %s' % \
63                         (cost, cmd, ' '.join(args))
64                     raise TimeoutError(msg)
65
66             i = child.expect(question, timeout=timeout)
67             if i == 0:  # EOF
68                 break
69             elif i == 1:  # TIMEOUT
70                 if output_timeout:
71                     msg = 'Hanging for %s seconds!:%s %s'
72                 else:
73                     msg = 'Run out of time in %s seconds!:%s %s'
74                 raise TimeoutError(msg % (timeout, cmd, ' '.join(args)))
75             elif output_timeout and i == len(question)-1:
76                 # new line, stands for any output
77                 # do nothing, just flush timeout counter
78                 pass
79             else:
80                 child.sendline(answer[i])
81     finally:
82         child.close()
83
84     return child.exitstatus
85
86
87 # enumerate patterns for all distributions
88 # fedora16-64:
89 # [sudo] password for itestuser5707:
90 # suse121-32b
91 # root's password:
92 # suse122-32b
93 # itestuser23794's password:
94 # u1110-32b
95 # [sudo] password for itester:
96 SUDO_PASS_PROMPT_PATTERN = "\[sudo\] password for .*?:|" \
97                            "root's password:|" \
98                            ".*?'s password:"
99
100 SUDO_PASS_PROMPT_PATTERN_FEDORA_20_i586 = "\[sudo\] password for .*?:|" \
101                            "root's password:"
102
103 class Tee(object):
104
105     '''data write to original will write to another as well'''
106
107     def __init__(self, original, another=None):
108         self.original = original
109         if another is None:
110             self.another = sys.stderr
111         else:
112             self.another = another
113
114     def write(self, data):
115         self.another.write(data)
116         return self.original.write(data)
117
118     def flush(self):
119         self.another.flush()
120         return self.original.flush()
121
122     def close(self):
123         self.original.close()
124
125
126 class Meta(object):
127     """
128     Meta information of a test case
129
130     All meta information are put in a .meta/ directory under case running
131     path. Scripts `setup`, `steps` and `teardown` are in this meta path.
132     """
133
134     meta = '.meta'
135
136     def __init__(self, rundir, test):
137         self.rundir = rundir
138         self.test = test
139
140         self.logname = None
141         self.logfile = None
142         self.setup_script = None
143         self.steps_script = None
144         self.teardown_script = None
145
146     def begin(self):
147         """
148         Begin to run test. Generate meta scripts and open log file.
149         """
150         os.mkdir(self.meta)
151
152         self.logname = os.path.join(self.rundir, self.meta, 'log')
153         self.logfile = open(self.logname, 'a')
154         if settings.verbosity >= 3:
155             self.logfile = Tee(self.logfile)
156
157         if self.test.setup:
158             self.setup_script = self._make_setup_script()
159         self.steps_script = self._make_steps_script()
160         if self.test.teardown:
161             self.teardown_script = self._make_teardown_script()
162
163     def end(self):
164         """
165         Test finished, do some cleanup.
166         """
167         if not self.logfile:
168             return
169
170         self.logfile.close()
171         self.logfile = None
172
173         # FIXME: it's a little hack here
174         # delete color code
175         os.system("sed -i 's/\x1b\[[0-9]*m//g' %s" % self.logname)
176         os.system("sed -i 's/\x1b\[[0-9]*K//g' %s" % self.logname)
177
178     def setup(self):
179         code = 0
180         if self.setup_script:
181             self.log('setup start')
182             code = self._psh(self.setup_script)
183             self.log('setup finish')
184         return code
185
186     def steps(self):
187         self.log('steps start')
188         code = self._psh(self.steps_script, self.test.qa)
189         self.log('steps finish')
190         return code
191
192     def teardown(self):
193         if self.teardown_script:
194             self.log('teardown start')
195             self._psh(self.teardown_script)
196             self.log('teardown finish')
197
198     def log(self, msg, level="INFO"):
199         self.logfile.write('%s %s: %s\n' % (now(), level, msg))
200
201     def _make_setup_script(self):
202         code = '''cd %(rundir)s
203 (set -o posix; set) > %(var_old)s
204 set -x
205 %(setup)s
206 __exitcode__=$?
207 set +x
208 (set -o posix; set) > %(var_new)s
209 diff --unchanged-line-format= --old-line-format= --new-line-format='%%L' \\
210     %(var_old)s %(var_new)s > %(var_out)s
211 exit ${__exitcode__}
212 ''' % {
213             'rundir': self.rundir,
214             'var_old': os.path.join(self.meta, 'var.old'),
215             'var_new': os.path.join(self.meta, 'var.new'),
216             'var_out': os.path.join(self.meta, 'var.out'),
217             'setup': self.test.setup,
218             }
219         return self._make_code('setup', code)
220
221     def _make_steps_script(self):
222         code = '''cd %(rundir)s
223 if [ -f %(var_out)s ]; then
224     . %(var_out)s
225 fi
226 set -o pipefail
227 set -ex
228 %(steps)s
229 ''' % {
230             'rundir': self.rundir,
231             'var_out': os.path.join(self.meta, 'var.out'),
232             'steps': self.test.steps,
233             }
234         return self._make_code('steps', code)
235
236     def _make_teardown_script(self):
237         code = '''cd %(rundir)s
238 if [ -f %(var_out)s ]; then
239     . %(var_out)s
240 fi
241 set -x
242 %(teardown)s
243 ''' % {
244             'rundir': self.rundir,
245             'var_out': os.path.join(self.meta, 'var.out'),
246             'teardown': self.test.teardown,
247             }
248         return self._make_code('teardown', code)
249
250     def _make_code(self, name, code):
251         """Write `code` into `name`"""
252         path = os.path.join(self.meta, name)
253         data = code.encode('utf8') if isinstance(code, unicode) else code
254         with open(path, 'w') as f:
255             f.write(data)
256         return path
257
258     def _psh(self, script, more_expecting=()):
259         if (platform.linux_distribution()[0] == 'Fedora') and \
260                 (platform.linux_distribution()[1] == '20') and \
261                 (platform.architecture()[0] == '32bit'):
262             pat = SUDO_PASS_PROMPT_PATTERN_FEDORA_20_i586
263         else:
264             pat = SUDO_PASS_PROMPT_PATTERN
265
266         expecting = [(pat, settings.SUDO_PASSWD)] + \
267             list(more_expecting)
268         try:
269             return pcall('/bin/bash',
270                          [script],
271                          expecting=expecting,
272                          output=self.logfile,
273                          eof_timeout=float(settings.RUN_CASE_TIMEOUT),
274                          output_timeout=float(settings.HANGING_TIMEOUT),
275                          )
276         except Exception as err:
277             self.log('pcall error:%s\n%s' % (script, err), 'ERROR')
278             return -1
279
280
281 class TestCase(unittest.TestCase):
282     '''Single test case'''
283
284     count = 1
285     was_skipped = False
286     was_successful = False
287
288     def __init__(self, filename, fields):
289         super(TestCase, self).__init__()
290         self.filename = filename
291
292         # Fields from case definition
293         self.version = fields.get('version')
294         self.summary = fields.get('summary')
295         self.steps = fields.get('steps')
296         self.setup = fields.get('setup')
297         self.teardown = fields.get('teardown')
298         self.qa = fields.get('qa', ())
299         self.tracking = fields.get('tracking', {})
300         self.conditions = fields.get('conditions', {})
301         self.fixtures = [Fixture(os.path.dirname(self.filename),
302                                  i)
303                          for i in fields.get('fixtures', ())]
304
305         self.component = self._guess_component(self.filename)
306
307     def id(self):
308         """
309         This id attribute is used in xunit file.
310
311         classname.name
312         """
313         if settings.env_root:
314             retpath = self.filename[len(settings.cases_dir):]\
315                 .lstrip(os.path.sep)
316             base = os.path.splitext(retpath)[0]
317         else:
318             base = os.path.splitext(os.path.basename(self.filename))[0]
319         return base.replace(os.path.sep, '.')
320
321     def __eq__(self, that):
322         if type(self) is not type(that):
323             return NotImplemented
324         return self.id() == that.id()
325
326     def __hash__(self):
327         return hash((type(self), self.filename))
328
329     def __str__(self):
330         cls, name = id_split(self.id())
331         if cls:
332             return "%s (%s)" % (name, cls)
333         return name
334
335     def __repr__(self):
336         return '<%s %s>' % (self.__class__.__name__, self.id())
337
338     def setUp(self):
339         self._check_conditions()
340         self.rundir = rundir = self._new_rundir()
341         self._copy_fixtures()
342
343         self.meta = meta = Meta(rundir, self)
344         with cd(rundir):
345             meta.begin()
346             meta.log('case start to run!')
347             if self.setup:
348                 code = meta.setup()
349                 if code != 0:
350                     msg = "setup failed. Exit %d, see log: %s" % (
351                         code, meta.logname)
352                     raise Exception(msg)
353
354     def tearDown(self):
355         meta = self.meta
356         if meta:
357             with cd(self.rundir):
358                 meta.teardown()
359                 meta.log('case is finished!')
360                 meta.end()
361
362     def runTest(self):
363         meta = self.meta
364         with cd(self.rundir):
365             code = meta.steps()
366
367         msg = "Exit Nonzero %d. See log: %s" % (code, self.meta.logname)
368         self.assertEqual(0, code, msg)
369
370     def _check_conditions(self):
371         '''Check if conditions match, raise SkipTest if some conditions are
372         defined but not match.
373         '''
374         labels = set((i.lower() for i in get_machine_labels()))
375         # blacklist has higher priority, if it match both black and white
376         # lists, it will be skipped
377         if self.conditions.get('blacklist'):
378             intersection = labels & set(self.conditions.get('blacklist'))
379             if intersection:
380                 raise SkipTest('by distribution blacklist:%s' %
381                                ','.join(intersection))
382
383         kw = 'whitelist'
384         if self.conditions.get(kw):
385             intersection = labels & set(self.conditions[kw])
386             if not intersection:
387                 raise SkipTest('not in distribution whitelist:%s' %
388                                ','.join(self.conditions[kw]))
389
390     def _guess_component(self, filename):
391         # assert that filename is absolute path
392         if not settings.env_root or \
393                 not filename.startswith(settings.cases_dir):
394             return 'unknown'
395         relative = filename[len(settings.cases_dir)+1:].split(os.sep)
396         # >1 means [0] is an dir name
397         return relative[0] if len(relative) > 1 else 'unknown'
398
399     def _new_rundir(self):
400         hash_ = str(uuid.uuid4()).replace('-', '')
401         path = os.path.join(settings.WORKSPACE, hash_)
402         os.mkdir(path)
403         return path
404
405     def _copy_fixtures(self):
406         for item in self.fixtures:
407             item.copy(self.rundir)