2 # Copyright 2015-2016 Samsung Electronics Co., Ltd.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
23 from threading import Thread, Lock
24 from litmus.core.util import call, check_output
25 from litmus.core.util import convert_single_item_to_list
26 from litmus.core.util import find_pattern
27 from litmus.core.util import decode
28 from litmus.core.util import create_instance
29 from litmus.core.util import find_all_pattern
30 from litmus.core.exceptions import BootError
31 from litmus.device.cutter import cutter
32 from litmus import _path_for_locks_
38 User can control device in topology by this class methods.
44 _dnmode_cmd = b'thordown'
49 _pattern_loginprompt = r'.*login: $'
50 _pattern_shellprompt = r'.*# .*'
51 _max_attempt_login_uart_shell = 5
52 _max_attempt_attach_sdb = 10
53 _retrycnt_at_a_time_sdb = 20
54 _max_attempt_boot_retry = 3
56 _path_for_locks = _path_for_locks_
63 _global_tlock = Lock()
64 _global_ilock_path = os.path.join(_path_for_locks_, 'globallock')
65 _global_ilock = fasteners.InterProcessLock(_global_ilock_path)
70 def __init__(self, *args, **kwargs):
71 super(device, self).__init__()
74 self._name = kwargs['devicename']
76 # init a cutter instance.
77 self._cutter = cutter.create(*args, **kwargs)
80 self._manager = kwargs['manager']
83 """docstring for __del__"""
87 """docstring for _release"""
88 self._cutter.off(delay=1)
94 def create(self, *args, **kwargs):
96 Create a device instance.
98 clsname = 'device' + kwargs['dev_type']
99 return create_instance(clsname,
106 Return the name of acquired device.
109 >>> dut = mgr.acquire_dut('xu3')
113 :returns str: device name
119 Return the id of acquired device.
120 Device instance uses this id for sdb connection.
123 >>> dut = mgr.acquire_dut('xu3')
127 :returns str: device id
129 return self.get_name()
131 def on(self, powercut_delay=1):
133 Turn on the device acquired.
135 :param float powercut_delay: power-cut delay for cutter
137 logging.debug('=================Turn on device {}=================='
138 .format(self.get_name()))
140 while retry_cnt <= self._max_attempt_boot_retry:
143 self._cutter.on(powercut_delay)
144 self._wait_uart_shell_login_prompt()
145 self._login_uart_shell()
146 self._set_sdb_deviceid()
150 except KeyboardInterrupt:
152 raise Exception('Keyboard interrupt.')
153 except Exception as e:
158 raise BootError('Can\'t turn on dut.')
160 def off(self, powercut_delay=1):
162 Trun off the device acquired.
164 :param float powercut_delay: power-cut delay for cutter
166 logging.debug('=================Turn off device {}================='
167 .format(self.get_name()))
169 self._cutter.off(powercut_delay)
173 Return whether device is turned on or not.
183 :returns boolean: true if device is turned on, false otherwise.
185 pattern = '.*{}'.format(self.get_id())
186 outs = check_output('sdb devices'.split(), timeout=10)
187 if find_pattern(pattern, outs):
192 def flash(self, filenames, flasher='lthor', waiting=5):
194 Flash binaries to device.
195 This function turn on device and turn off device automatically.
197 :param dict filenames: filename string or dict
198 :param sting flasher: external flashing tool name
199 :param float waiting: waiting time to acquire cdc_acm device
202 >>> dut.flash(['boot.tar.gz','platform.tar.gz'])
204 >>> dut.flash('platform.tar.gz')
207 logging.debug('==============Flash binaries to device {}==========='
208 .format(self.get_name()))
209 logging.debug(filenames)
212 raise Exception('There\'s no file to flash.')
214 self._acquire_global_lock()
216 self._enter_download_mode(self._dnmode_cmd)
218 busid = self._find_usb_busid()
219 self._release_global_lock()
220 self._lthor(filenames=filenames, busid=busid)
222 except (Exception, KeyboardInterrupt) as e:
223 self._release_global_lock()
225 raise Exception('Can\'t flash files : {}.'.format(filenames))
227 def run_cmd(self, command, timeout=None):
229 Run a command on device.
231 :param str command: command to run on device
232 :param float timeout: timeout
236 >>> dut.run_cmd('ls -alF / | grep usr')
237 \'drwxr-xr-x 15 root root 4096 Apr 29 2016 usr/\\r\\n\'
239 :returns str: stdout of sdb shell command
242 logging.debug('==============Run a command on device {}============'
243 .format(self.get_name()))
244 c = ['sdb', '-s', self.get_id(), 'shell']
245 c.extend(convert_single_item_to_list(command))
247 result = check_output(c, timeout=timeout)
250 def push_file(self, src, dest, timeout=None):
252 Push a file from host to destination path of device.
254 :param str src: file path from host pc
255 :param str dest: destination path of device
256 :param float timeout: timeout
259 >>> dut.push_file('test.png', '/tmp')
261 :returns str: stdout of sdb push command
263 logging.debug('==============Push a file to device {}=============='
264 .format(self.get_name()))
265 logging.debug('src from host : {}'.format(src))
266 logging.debug('dest on dut: {}'.format(dest))
267 c = ['sdb', '-s', self.get_id(), 'push', src, dest]
268 result = check_output(c, timeout=timeout)
271 def pull_file(self, src, dest, timeout=None):
273 Pull a file from device to destination path of host.
275 :param str src: file path from device
276 :param str dest: destination path of host pc
277 :param float timeout: timeout
280 >>> dut.pull_file('/tmp/test.png','.')
282 :returns str: stdout of sdb push command
284 logging.debug('==============Pull a file from device {}============'
285 .format(self.get_name()))
286 logging.debug('src from dut : {}'.format(src))
287 logging.debug('dest on host: {}'.format(dest))
288 c = ['sdb', '-s', self.get_id(), 'pull', src, dest]
289 result = check_output(c, timeout=timeout)
292 def _read_uart(self, bufsize=100):
293 """docstring for read_uart"""
294 readdata = decode(self._uart.read(bufsize))
295 logging.debug(readdata)
298 def _write_uart(self, cmd, returnkey=b'\r'):
299 """docstring for write_uart"""
300 self._uart.write(cmd)
303 self._uart.write(returnkey)
305 def add_test(self, func, args):
307 Add a testcase to device class instance.
309 :param func func: function object for test
310 :param dict args: arguments for test function
313 >>> from litmus.helper.helper import verify_wifi_is_working
314 >>> dut.add_test(verify_wifi_is_working,
315 {'wifi_apname': 'setup',
317 'result_dir': 'result'})
323 self._tests.append({'func': func, 'args': args})
325 def del_test(self, func):
327 Delete a testcase from device class instance.
329 :param func func: function object for test
332 >>> from litmus.helper.helper import verify_wifi_is_working
333 >>> dut.del_test(verify_wifi_is_working)
335 self._tests = [l for l in self._tests if l['func'] != func]
342 >>> from litmus.helper.helper import verify_wifi_is_working
343 >>> dut.add_test(verify_wifi_is_working,
344 {'wifi_apname': 'setup',
346 'result_dir': 'result'})
350 for l in self._tests:
351 if isinstance(l['args'], dict):
352 l['func'](self, **l['args'])
353 elif isinstance(l['args'], tuple):
354 l['func'](self, *l['args'])
356 def screenshot(self, filename):
358 Take a screenshot (png format)
360 :param str filename: screenshot file name
363 >>> dut.screenshot('screenshot.png')
366 logging.debug('==== Take a screenshot: {}, width: {}, height: {} ===='
369 self._screen_height))
370 dt = self._get_display_server_type()
372 self._screenshot_x11(filename)
373 elif dt == 'WAYLAND':
374 self._screenshot_wayland(filename)
377 def _get_display_server_type(self):
378 """docstring for get_display_server_type"""
379 res = self.run_cmd('ls /usr/lib', timeout=10)
380 if find_pattern('.*libX11.*', res):
385 def _screenshot_x11(self, filename):
386 """docstring for _screenshot_x11"""
387 # take a screenshot using xwd
388 cmd = 'xwd -root -out /tmp/{}.xwd'.format(filename)
389 self.run_cmd(cmd, timeout=20)
392 self.pull_file('/tmp/{}.xwd'.format(filename), os.curdir, timeout=20)
394 # convert xwd to png and resize it
395 call(['convert', '{0}.xwd'.format(filename), '-resize',
396 '{0}x{1}'.format(self._screen_width, self._screen_height),
397 filename], timeout=20)
398 call(['rm', '{}.xwd'.format(filename)], timeout=20)
400 def _screenshot_wayland(self, filename):
401 """docstring for _screenshot_wayland"""
402 # Find all viewable window id
403 p_winid = '.*(0x[a-zA-Z0-9]{8})\s+\d+\s+\d+\s+\d+\s+\d+\s+(\d+)\s+(\d+).*\sViewable.*'
404 winids = find_all_pattern(p_winid,
405 self.run_cmd('enlightenment_info -topvwins',
409 outs = self.run_cmd('enlightenment_info -dump_topvwins',
411 dirn = find_pattern('directory:\s(.*)',
413 groupindex=1).rstrip()
415 # Create tempdir and pull dump files
416 tmpdir = tempfile.mkdtemp()
417 self.pull_file(dirn, tmpdir, timeout=20)
419 # If dump does not exist then remove winid from list
420 winids = [winid for winid in winids
421 if os.path.exists(os.path.join(tmpdir, winid[0]+'.png'))]
424 bg = Image.new('RGB', (self._screen_width, self._screen_height))
426 for winid in reversed(winids):
428 fg = Image.open(os.path.join(tmpdir, winid[0]+'.png'))
429 bg.paste(fg, (int(winid[1]), int(winid[2])), fg)
430 except FileNotFoundError:
435 call(['rm', '-rf', tmpdir], timeout=10)
437 def _flush_uart_buffer(self):
438 """docstring for flush_uart_buffer"""
439 self._uart.flushInput()
440 self._uart.flushOutput()
443 def _open_uart(self):
444 """docstring for open_uart"""
446 self._uart = serial.Serial(port=self.kwargs['uart_port'],
447 baudrate=self._baudrate,
448 timeout=self._readtimeout)
449 except serial.SerialException as err:
454 def _close_uart(self):
455 """docstring for close_uart"""
456 if self._uart.isOpen():
459 def _thread_for_enter_download_mode(self, cmd, count):
460 """docstring for thread_for_enter_download_mode"""
461 for loop in range(count*20):
462 self._uart.write(self._enterkey)
464 self._uart.write(cmd)
465 for loop in range(2):
467 self._uart.write(self._enterkey)
469 def _enter_download_mode(self, cmd, powercut_delay=1, thread_param=10):
470 """docstring for _enter_download_mode"""
471 t = Thread(target=self._thread_for_enter_download_mode,
472 args=(cmd, thread_param, ))
474 self._cutter.off(delay=powercut_delay)
475 self._cutter.on(delay=powercut_delay)
478 def _find_usb_busid(self):
479 """docstring for find_usb_busid"""
480 pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
482 kernlog = 'dmesg | grep usb | tail -n 20'
483 outs = check_output(kernlog, shell=True, timeout=10)
484 result = find_all_pattern(pattern=pattern, data=outs)
487 logging.debug('usb busid : {}'.format(busid))
489 raise Exception('Can\'t find usb busid')
493 def _lthor(self, filenames, busid):
494 """docstring for _lthor"""
495 cmd = 'lthor --busid={0}'.format(busid)
496 filenames = convert_single_item_to_list(filenames)
498 cmd += ' {}'.format(l)
500 ret = call(cmd.split(), timeout=600)
502 raise Exception('Thor error.')
504 def _find_usb_bus_and_device_address(self):
505 """docstring for _find_usb_bus_and_device_address"""
506 pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
508 kernlog = 'dmesg | grep usb | tail -n 20'
509 outs = check_output(kernlog, shell=True, timeout=10)
510 result = find_all_pattern(pattern=pattern, data=outs)
513 busaddr_cmd = 'cat /sys/bus/usb/devices/{0}/busnum'.format(bid)
514 busaddr = check_output(busaddr_cmd, shell=True).rstrip().zfill(3)
515 logging.debug('usb_bus_addr : {}'.format(busaddr))
516 devaddr_cmd = 'cat /sys/bus/usb/devices/{0}/devnum'.format(bid)
517 devaddr = check_output(devaddr_cmd, shell=True).rstrip().zfill(3)
518 logging.debug('usb_dev_addr : {}'.format(devaddr))
520 raise Exception('Can\'t find usb bus and dev addr')
522 return (busaddr, devaddr)
524 def _heimdall(self, filenames, busaddr, devaddr, partition_bin_mappings):
525 """docstring for _heimdall"""
526 filenames = convert_single_item_to_list(filenames)
527 tar_cmd = ['tar', 'xvfz']
530 logging.debug(tar_cmd)
531 call(tar_cmd, timeout=30)
533 heimdall_cmd = ['heimdall', 'flash', '--usbbus', busaddr,
534 '--usbdevaddr', devaddr]
535 for key, elem in partition_bin_mappings.items():
536 heimdall_cmd.append('--{}'.format(key))
537 heimdall_cmd.append(elem)
538 logging.debug(heimdall_cmd)
540 ret = call(heimdall_cmd, timeout=600)
542 raise Exception('Heimdall error.')
544 def _wait_uart_shell_login_prompt(self):
545 """docstring for _wait_uart_shell_login_prompt"""
546 logging.debug('===============Print boot logs===============')
548 start_time = time.perf_counter()
550 while wait_time < self._boot_timeout:
551 if self._uart.inWaiting:
552 buf = self._read_uart(1000)
553 if find_pattern(self._pattern_loginprompt, data=buf):
554 logging.debug('Found login shell pattern from uart log')
555 logging.debug('wait_time : {}'.format(wait_time))
558 self._write_uart(b'')
560 wait_time = time.perf_counter() - start_time
562 raise Exception('Boot timeout : {}s'.format(wait_time))
564 def _login_uart_shell(self):
565 """docstring for _login_uart_shell"""
566 logging.debug('===============Login UART shell===============')
568 while retrycnt < self._max_attempt_login_uart_shell:
570 self._write_uart(self._username)
573 self._write_uart(self._password)
575 self._flush_uart_buffer()
576 self._write_uart(b'dmesg -n 1')
578 readdata = self._read_uart(2000)
579 if find_pattern(self._pattern_shellprompt, readdata):
582 logging.debug('Login failed. retry.')
583 self._write_uart(b'')
587 raise Exception('Can\'t login uart shell.')
589 def _set_sdb_deviceid(self):
590 """docstring for _set_sdb_deviceid"""
591 usb0_path = b'/sys/class/usb_mode/usb0'
592 pattern = '.*{0}'.format(self.get_id())
594 def set_serialnumber(deviceid):
595 """docstring for set_serialnumber"""
596 self._write_uart(b''.join([b'echo 0 > ', usb0_path, b'/enable']))
598 self._write_uart(b''.join([b'echo ',
604 self._write_uart(b''.join([b'echo 1 > ', usb0_path, b'/enable']))
606 self._write_uart(b''.join([b'cat ', usb0_path, b'/enable']))
609 def get_serialnumber():
610 """docstring for get_serialnumber"""
611 self._write_uart(b''.join([b'cat ', usb0_path, b'/iSerial']))
613 return self._read_uart(1000)
617 set_serialnumber(deviceid=self.get_id().encode())
618 serialnumber = get_serialnumber()
619 if find_pattern(pattern, serialnumber):
623 raise Exception('Can\'t configure sdb deviceid')
625 def _attach_sdb(self):
626 """docstring for _attach_sdb"""
627 # start sdb server if it is not started.
628 call('sdb start-server'.split(), timeout=10)
631 pattern = r'{}.*device.*\t.*'.format(self.get_id())
633 while retry_attempt < self._max_attempt_attach_sdb:
634 for l in range(self._retrycnt_at_a_time_sdb):
635 outs = check_output('sdb devices'.split(), timeout=10)
637 if find_pattern(pattern, outs):
638 logging.debug('found {}.'.format(self.get_id()))
643 raise Exception('Can\'t find device.')
645 def _detach_sdb(self):
646 """docstring for _detach_sdb"""
649 def sdb_root_on(self):
650 """docstring for sdb_root_on"""
651 logging.debug('=================sdb root on for {}================='
652 .format(self.get_name()))
653 call('sdb -s {} root on'.format(self.get_id()).split(), timeout=10)
656 def _acquire_global_lock(self):
657 """docstring for _acquire_global_lock"""
658 logging.debug('Try to acquire global lock...')
659 self._global_tlock.acquire()
660 self._global_ilock.acquire()
661 # set gid of ilock file
663 os.chmod(self._global_ilock.path, 0o664)
664 except PermissionError:
665 logging.debug('Can\'t change lock file permission')
667 if self._global_tlock.locked() and self._global_ilock.acquired:
668 logging.debug('global lock acquired for {}'
669 .format(self.get_id()))
671 def _release_global_lock(self):
672 """docstring for _release_global_lock"""
673 if self._global_tlock.locked():
674 self._global_tlock.release()
675 if self._global_ilock.acquired:
676 self._global_ilock.release()
677 logging.debug('global lock released')