From: Donghoon Shin Date: Wed, 21 Sep 2016 04:47:14 +0000 (+0900) Subject: Add global lock for mock device type X-Git-Tag: 0.3.3~18^2 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=fa853e58c47ccb9cb57634a7f2c9a8172679bbc1;p=tools%2Flitmus.git Add global lock for mock device type --- diff --git a/litmus/device/device.py b/litmus/device/device.py index 62d2a02..d804256 100644 --- a/litmus/device/device.py +++ b/litmus/device/device.py @@ -67,7 +67,7 @@ class device(object): super(device, self).__init__() self.args = args self.kwargs = kwargs - self._name = kwargs['deviceid'] + self._name = kwargs['devicename'] # init a cutter instance. self._cutter = cutter.create(*args, **kwargs) @@ -183,24 +183,13 @@ class device(object): else: return False - def _thor(self, filenames, busid): - """docstring for thor_downloader""" - cmd = 'lthor --busid={0}'.format(busid) - filenames = convert_single_item_to_list(filenames) - for l in filenames: - cmd += ' {}'.format(l) - logging.debug(cmd) - ret = call(cmd.split(), timeout=600) - if ret: - raise Exception('Thor error.') - - def flash(self, filenames, flasher=_thor, waiting=5): + def flash(self, filenames, flasher='lthor', waiting=5): """ Flash binaries to device. This function turn on device and turn off device automatically. :param dict filenames: filename string or dict - :param func flasher: wrapper function of external flashing tool + :param sting flasher: external flashing tool name :param float waiting: waiting time to acquire cdc_acm device Example: @@ -220,7 +209,7 @@ class device(object): time.sleep(waiting) busid = self._find_usb_busid() self._release_global_lock() - flasher(self, filenames=filenames, busid=busid) + self._lthor(filenames=filenames, busid=busid) self._cutter.off() except (Exception, KeyboardInterrupt) as e: self._release_global_lock() @@ -370,21 +359,6 @@ class device(object): if self._uart.isOpen(): self._uart.close() - def _find_usb_busid(self): - """docstring for find_usb_busid""" - pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid, - self._pid) - kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20' - outs = check_output(kernlog, shell=True, timeout=10) - result = find_all_pattern(pattern=pattern, data=outs) - if result: - busid = result[-1] - logging.debug('usb busid : {}'.format(busid)) - else: - raise Exception('Can\'t find usb busid') - - return busid - def _thread_for_enter_download_mode(self, cmd, count): """docstring for thread_for_enter_download_mode""" for loop in range(count*20): @@ -404,6 +378,72 @@ class device(object): self._cutter.on(delay=powercut_delay) t.join() + def _find_usb_busid(self): + """docstring for find_usb_busid""" + pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid, + self._pid) + kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20' + outs = check_output(kernlog, shell=True, timeout=10) + result = find_all_pattern(pattern=pattern, data=outs) + if result: + busid = result[-1] + logging.debug('usb busid : {}'.format(busid)) + else: + raise Exception('Can\'t find usb busid') + + return busid + + def _lthor(self, filenames, busid): + """docstring for _lthor""" + cmd = 'lthor --busid={0}'.format(busid) + filenames = convert_single_item_to_list(filenames) + for l in filenames: + cmd += ' {}'.format(l) + logging.debug(cmd) + ret = call(cmd.split(), timeout=600) + if ret: + raise Exception('Thor error.') + + def _find_usb_bus_and_device_address(self): + """docstring for _find_usb_bus_and_device_address""" + pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid, + self._pid) + kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20' + outs = check_output(kernlog, shell=True, timeout=10) + result = find_all_pattern(pattern=pattern, data=outs) + if result: + bid = result[-1] + busaddr_cmd = 'cat /sys/bus/usb/devices/{0}/busnum'.format(bid) + busaddr = check_output(busaddr_cmd, shell=True).rstrip().zfill(3) + logging.debug('usb_bus_addr : {}'.format(busaddr)) + devaddr_cmd = 'cat /sys/bus/usb/devices/{0}/devnum'.format(bid) + devaddr = check_output(devaddr_cmd, shell=True).rstrip().zfill(3) + logging.debug('usb_dev_addr : {}'.format(devaddr)) + else: + raise Exception('Can\'t find usb bus and dev addr') + + return (busaddr, devaddr) + + def _heimdall(self, filenames, busaddr, devaddr, partition_bin_mappings): + """docstring for _heimdall""" + filenames = convert_single_item_to_list(filenames) + tar_cmd = ['tar', 'xvfz'] + for l in filenames: + tar_cmd.append(l) + logging.debug(tar_cmd) + call(tar_cmd, timeout=30) + + heimdall_cmd = ['heimdall', 'flash', '--usbbus', busaddr, + '--usbdevaddr', devaddr] + for key, elem in partition_bin_mappings.items(): + heimdall_cmd.append('--{}'.format(key)) + heimdall_cmd.append(elem) + logging.debug(heimdall_cmd) + + ret = call(heimdall_cmd, timeout=600) + if ret: + raise Exception('Heimdall error.') + def _wait_uart_shell_login_prompt(self): """docstring for _wait_uart_shell_login_prompt""" logging.debug('===============Print boot logs===============') diff --git a/litmus/device/devicemock.py b/litmus/device/devicemock.py index 22b992e..34f8f17 100644 --- a/litmus/device/devicemock.py +++ b/litmus/device/devicemock.py @@ -17,7 +17,6 @@ import time import logging from litmus.device.device import device from litmus.core.util import check_output, find_pattern -from litmus.core.util import convert_single_item_to_list from litmus.core.util import call @@ -27,6 +26,8 @@ class devicemock(device): User can control device in topology by this class methods. """ + _booting_time = 60 + def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs @@ -36,7 +37,6 @@ class devicemock(device): else: self._id = self._find_device_id() - # init a cutter instance. self._manager = kwargs['manager'] def _release(self): @@ -53,6 +53,7 @@ class devicemock(device): return found # public methods. + def get_id(self): """ Return the id of acquired device. @@ -76,10 +77,10 @@ class devicemock(device): logging.debug('turn on device {}'.format(self.get_name())) self.start_sdb_server() - if self._find_device_id() == self.get_id(): + if self.is_on(): self._sdb_root_on() - self.run_cmd('reboot -f', timeout=10) - time.sleep(60) + self.run_cmd('reboot -f', timeout=20) + time.sleep(self._booting_time) self.start_sdb_server() self._sdb_root_on() @@ -91,40 +92,6 @@ class devicemock(device): """ logging.debug('turn off device {}'.format(self.get_name())) - def thor(self, filenames): - """docstring for thor""" - cmd = 'lthor' - filenames = convert_single_item_to_list(filenames) - for l in filenames: - cmd += ' {}'.format(l) - logging.debug(cmd) - ret = call(cmd.split(), timeout=600) - if ret: - raise Exception('Thor error.') - - def heimdall(self, filenames, - partition_bin_mappings={'BOOT': 'zImage', - 'ROOTFS': 'rootfs.img', - 'USER': 'user.img', - 'SYSTEM-DATA': 'system-data.img'}): - """docstring for heimdall""" - filenames = convert_single_item_to_list(filenames) - tar_cmd = ['tar', 'xvfz'] - for l in filenames: - tar_cmd.append(l) - logging.debug(tar_cmd) - call(tar_cmd, timeout=30) - - heimdall_cmd = ['heimdall', 'flash'] - for key, elem in partition_bin_mappings.items(): - heimdall_cmd.append('--{}'.format(key)) - heimdall_cmd.append(elem) - logging.debug(heimdall_cmd) - - ret = call(heimdall_cmd, timeout=600) - if ret: - raise Exception('Heimdall error.') - def flash(self, filenames, flasher='lthor', waiting=5, partition_bin_mappings={'BOOT': 'zImage', 'ROOTFS': 'rootfs.img', @@ -135,7 +102,7 @@ class devicemock(device): This function turn on device and turn off device automatically. :param dict filenames: filename string or dict - :param func flasher: wrapper function of external flashing tool + :param string flasher: external flashing tool name :param float waiting: waiting time to acquire cdc_acm device :param dict partition_bin_mappings: partition table for device which use heimdall flasher @@ -153,14 +120,22 @@ class devicemock(device): raise Exception('There\'s no file to flash.') try: self._sdb_root_on() - self.run_cmd('reboot -f download', timeout=10) - time.sleep(5) + self._acquire_global_lock() + self.run_cmd('reboot -f download', timeout=20) + time.sleep(waiting) if flasher == 'lthor': - self.thor(filenames=filenames) + busid = self._find_usb_busid() + self._release_global_lock() + self._lthor(filenames=filenames, busid=busid) elif flasher == 'heimdall': - self.heimdall(filenames=filenames, + (busaddr, devaddr) = self._find_usb_bus_and_device_address() + self._release_global_lock() + self._heimdall(filenames=filenames, + busaddr=busaddr, + devaddr=devaddr, partition_bin_mappings=partition_bin_mappings) except (Exception, KeyboardInterrupt) as e: + self._release_global_lock() logging.debug(e) raise Exception('Can\'t flash files : {}.'.format(filenames))