8 import unittest2 as unittest
9 from unittest2 import SkipTest
12 from unittest import SkipTest
15 if hasattr(pexpect, 'spawnb'): # pexpect-u-2.5
16 spawn = pexpect.spawnb
20 from itest.conf import settings
21 from itest.utils import now, cd, get_machine_labels
22 from itest.fixture import Fixture
25 def id_split(idstring):
26 parts = idstring.split('.')
28 return '.'.join(parts[:-1]), parts[-1]
32 class TimeoutError(Exception):
36 def pcall(cmd, args=(), expecting=(), output=None,
37 eof_timeout=None, output_timeout=None, **spawn_opts):
38 '''call cmd with expecting
39 expecting: list of pairs, first is expecting string, second is send string
40 output: redirect cmd stdout and stderr to file object
41 eof_timeout: timeout for whole cmd in seconds. None means block forever
42 output_timeout: timeout if no output in seconds. Disabled by default
43 spawn_opts: keyword arguments passed to spawn call
45 question = [pexpect.EOF, pexpect.TIMEOUT]
46 question.extend([pair[0] for pair in expecting])
48 question.append(r'\r|\n')
49 answer = [None]*2 + [i[1] for i in expecting]
52 child = spawn(cmd, list(args), **spawn_opts)
54 child.logfile_read = output
56 timeout = output_timeout if output_timeout else eof_timeout
60 cost = time.time() - start
61 if cost >= eof_timeout:
62 msg = 'Run out of time in %s seconds!:%s %s' % \
63 (cost, cmd, ' '.join(args))
64 raise TimeoutError(msg)
66 i = child.expect(question, timeout=timeout)
69 elif i == 1: # TIMEOUT
71 msg = 'Hanging for %s seconds!:%s %s'
73 msg = 'Run out of time in %s seconds!:%s %s'
74 raise TimeoutError(msg % (timeout, cmd, ' '.join(args)))
75 elif output_timeout and i == len(question)-1:
76 # new line, stands for any output
77 # do nothing, just flush timeout counter
80 child.sendline(answer[i])
84 return child.exitstatus
87 # enumerate patterns for all distributions
89 # [sudo] password for itestuser5707:
93 # itestuser23794's password:
95 # [sudo] password for itester:
96 SUDO_PASS_PROMPT_PATTERN = "\[sudo\] password for .*?:|" \
100 SUDO_PASS_PROMPT_PATTERN_FEDORA_20_i586 = "\[sudo\] password for .*?:|" \
105 '''data write to original will write to another as well'''
107 def __init__(self, original, another=None):
108 self.original = original
110 self.another = sys.stderr
112 self.another = another
114 def write(self, data):
115 self.another.write(data)
116 return self.original.write(data)
120 return self.original.flush()
123 self.original.close()
128 Meta information of a test case
130 All meta information are put in a .meta/ directory under case running
131 path. Scripts `setup`, `steps` and `teardown` are in this meta path.
136 def __init__(self, rundir, test):
142 self.setup_script = None
143 self.steps_script = None
144 self.teardown_script = None
148 Begin to run test. Generate meta scripts and open log file.
152 self.logname = os.path.join(self.rundir, self.meta, 'log')
153 self.logfile = open(self.logname, 'a')
154 if settings.verbosity >= 3:
155 self.logfile = Tee(self.logfile)
158 self.setup_script = self._make_setup_script()
159 self.steps_script = self._make_steps_script()
160 if self.test.teardown:
161 self.teardown_script = self._make_teardown_script()
165 Test finished, do some cleanup.
173 # FIXME: it's a little hack here
175 os.system("sed -i 's/\x1b\[[0-9]*m//g' %s" % self.logname)
176 os.system("sed -i 's/\x1b\[[0-9]*K//g' %s" % self.logname)
180 if self.setup_script:
181 self.log('setup start')
182 code = self._psh(self.setup_script)
183 self.log('setup finish')
187 self.log('steps start')
188 code = self._psh(self.steps_script, self.test.qa)
189 self.log('steps finish')
193 if self.teardown_script:
194 self.log('teardown start')
195 self._psh(self.teardown_script)
196 self.log('teardown finish')
198 def log(self, msg, level="INFO"):
199 self.logfile.write('%s %s: %s\n' % (now(), level, msg))
201 def _make_setup_script(self):
202 code = '''cd %(rundir)s
203 (set -o posix; set) > %(var_old)s
208 (set -o posix; set) > %(var_new)s
209 diff --unchanged-line-format= --old-line-format= --new-line-format='%%L' \\
210 %(var_old)s %(var_new)s > %(var_out)s
213 'rundir': self.rundir,
214 'var_old': os.path.join(self.meta, 'var.old'),
215 'var_new': os.path.join(self.meta, 'var.new'),
216 'var_out': os.path.join(self.meta, 'var.out'),
217 'setup': self.test.setup,
219 return self._make_code('setup', code)
221 def _make_steps_script(self):
222 code = '''cd %(rundir)s
223 if [ -f %(var_out)s ]; then
230 'rundir': self.rundir,
231 'var_out': os.path.join(self.meta, 'var.out'),
232 'steps': self.test.steps,
234 return self._make_code('steps', code)
236 def _make_teardown_script(self):
237 code = '''cd %(rundir)s
238 if [ -f %(var_out)s ]; then
244 'rundir': self.rundir,
245 'var_out': os.path.join(self.meta, 'var.out'),
246 'teardown': self.test.teardown,
248 return self._make_code('teardown', code)
250 def _make_code(self, name, code):
251 """Write `code` into `name`"""
252 path = os.path.join(self.meta, name)
253 data = code.encode('utf8') if isinstance(code, unicode) else code
254 with open(path, 'w') as f:
258 def _psh(self, script, more_expecting=()):
259 if (platform.linux_distribution()[0] == 'Fedora') and \
260 (platform.linux_distribution()[1] == '20') and \
261 (platform.architecture()[0] == '32bit'):
262 pat = SUDO_PASS_PROMPT_PATTERN_FEDORA_20_i586
264 pat = SUDO_PASS_PROMPT_PATTERN
266 expecting = [(pat, settings.SUDO_PASSWD)] + \
269 return pcall('/bin/bash',
273 eof_timeout=float(settings.RUN_CASE_TIMEOUT),
274 output_timeout=float(settings.HANGING_TIMEOUT),
276 except Exception as err:
277 self.log('pcall error:%s\n%s' % (script, err), 'ERROR')
281 class TestCase(unittest.TestCase):
282 '''Single test case'''
286 was_successful = False
288 def __init__(self, filename, fields):
289 super(TestCase, self).__init__()
290 self.filename = filename
292 # Fields from case definition
293 self.version = fields.get('version')
294 self.summary = fields.get('summary')
295 self.steps = fields.get('steps')
296 self.setup = fields.get('setup')
297 self.teardown = fields.get('teardown')
298 self.qa = fields.get('qa', ())
299 self.tracking = fields.get('tracking', {})
300 self.conditions = fields.get('conditions', {})
301 self.fixtures = [Fixture(os.path.dirname(self.filename),
303 for i in fields.get('fixtures', ())]
305 self.component = self._guess_component(self.filename)
309 This id attribute is used in xunit file.
313 if settings.env_root:
314 retpath = self.filename[len(settings.cases_dir):]\
316 base = os.path.splitext(retpath)[0]
318 base = os.path.splitext(os.path.basename(self.filename))[0]
319 return base.replace(os.path.sep, '.')
321 def __eq__(self, that):
322 if type(self) is not type(that):
323 return NotImplemented
324 return self.id() == that.id()
327 return hash((type(self), self.filename))
330 cls, name = id_split(self.id())
332 return "%s (%s)" % (name, cls)
336 return '<%s %s>' % (self.__class__.__name__, self.id())
339 self._check_conditions()
340 self.rundir = rundir = self._new_rundir()
341 self._copy_fixtures()
343 self.meta = meta = Meta(rundir, self)
346 meta.log('case start to run!')
350 msg = "setup failed. Exit %d, see log: %s" % (
357 with cd(self.rundir):
359 meta.log('case is finished!')
364 with cd(self.rundir):
367 msg = "Exit Nonzero %d. See log: %s" % (code, self.meta.logname)
368 self.assertEqual(0, code, msg)
370 def _check_conditions(self):
371 '''Check if conditions match, raise SkipTest if some conditions are
372 defined but not match.
374 labels = set((i.lower() for i in get_machine_labels()))
375 # blacklist has higher priority, if it match both black and white
376 # lists, it will be skipped
377 if self.conditions.get('blacklist'):
378 intersection = labels & set(self.conditions.get('blacklist'))
380 raise SkipTest('by distribution blacklist:%s' %
381 ','.join(intersection))
384 if self.conditions.get(kw):
385 intersection = labels & set(self.conditions[kw])
387 raise SkipTest('not in distribution whitelist:%s' %
388 ','.join(self.conditions[kw]))
390 def _guess_component(self, filename):
391 # assert that filename is absolute path
392 if not settings.env_root or \
393 not filename.startswith(settings.cases_dir):
395 relative = filename[len(settings.cases_dir)+1:].split(os.sep)
396 # >1 means [0] is an dir name
397 return relative[0] if len(relative) > 1 else 'unknown'
399 def _new_rundir(self):
400 hash_ = str(uuid.uuid4()).replace('-', '')
401 path = os.path.join(settings.WORKSPACE, hash_)
405 def _copy_fixtures(self):
406 for item in self.fixtures:
407 item.copy(self.rundir)