super(device, self).__init__()
self.args = args
self.kwargs = kwargs
- self._name = kwargs['deviceid']
+ self._name = kwargs['devicename']
# init a cutter instance.
self._cutter = cutter.create(*args, **kwargs)
else:
return False
- def _thor(self, filenames, busid):
- """docstring for thor_downloader"""
- cmd = 'lthor --busid={0}'.format(busid)
- filenames = convert_single_item_to_list(filenames)
- for l in filenames:
- cmd += ' {}'.format(l)
- logging.debug(cmd)
- ret = call(cmd.split(), timeout=600)
- if ret:
- raise Exception('Thor error.')
-
- def flash(self, filenames, flasher=_thor, waiting=5):
+ def flash(self, filenames, flasher='lthor', waiting=5):
"""
Flash binaries to device.
This function turn on device and turn off device automatically.
:param dict filenames: filename string or dict
- :param func flasher: wrapper function of external flashing tool
+ :param sting flasher: external flashing tool name
:param float waiting: waiting time to acquire cdc_acm device
Example:
time.sleep(waiting)
busid = self._find_usb_busid()
self._release_global_lock()
- flasher(self, filenames=filenames, busid=busid)
+ self._lthor(filenames=filenames, busid=busid)
self._cutter.off()
except (Exception, KeyboardInterrupt) as e:
self._release_global_lock()
if self._uart.isOpen():
self._uart.close()
- def _find_usb_busid(self):
- """docstring for find_usb_busid"""
- pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
- self._pid)
- kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20'
- outs = check_output(kernlog, shell=True, timeout=10)
- result = find_all_pattern(pattern=pattern, data=outs)
- if result:
- busid = result[-1]
- logging.debug('usb busid : {}'.format(busid))
- else:
- raise Exception('Can\'t find usb busid')
-
- return busid
-
def _thread_for_enter_download_mode(self, cmd, count):
"""docstring for thread_for_enter_download_mode"""
for loop in range(count*20):
self._cutter.on(delay=powercut_delay)
t.join()
+ def _find_usb_busid(self):
+ """docstring for find_usb_busid"""
+ pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
+ self._pid)
+ kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20'
+ outs = check_output(kernlog, shell=True, timeout=10)
+ result = find_all_pattern(pattern=pattern, data=outs)
+ if result:
+ busid = result[-1]
+ logging.debug('usb busid : {}'.format(busid))
+ else:
+ raise Exception('Can\'t find usb busid')
+
+ return busid
+
+ def _lthor(self, filenames, busid):
+ """docstring for _lthor"""
+ cmd = 'lthor --busid={0}'.format(busid)
+ filenames = convert_single_item_to_list(filenames)
+ for l in filenames:
+ cmd += ' {}'.format(l)
+ logging.debug(cmd)
+ ret = call(cmd.split(), timeout=600)
+ if ret:
+ raise Exception('Thor error.')
+
+ def _find_usb_bus_and_device_address(self):
+ """docstring for _find_usb_bus_and_device_address"""
+ pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
+ self._pid)
+ kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20'
+ outs = check_output(kernlog, shell=True, timeout=10)
+ result = find_all_pattern(pattern=pattern, data=outs)
+ if result:
+ bid = result[-1]
+ busaddr_cmd = 'cat /sys/bus/usb/devices/{0}/busnum'.format(bid)
+ busaddr = check_output(busaddr_cmd, shell=True).rstrip().zfill(3)
+ logging.debug('usb_bus_addr : {}'.format(busaddr))
+ devaddr_cmd = 'cat /sys/bus/usb/devices/{0}/devnum'.format(bid)
+ devaddr = check_output(devaddr_cmd, shell=True).rstrip().zfill(3)
+ logging.debug('usb_dev_addr : {}'.format(devaddr))
+ else:
+ raise Exception('Can\'t find usb bus and dev addr')
+
+ return (busaddr, devaddr)
+
+ def _heimdall(self, filenames, busaddr, devaddr, partition_bin_mappings):
+ """docstring for _heimdall"""
+ filenames = convert_single_item_to_list(filenames)
+ tar_cmd = ['tar', 'xvfz']
+ for l in filenames:
+ tar_cmd.append(l)
+ logging.debug(tar_cmd)
+ call(tar_cmd, timeout=30)
+
+ heimdall_cmd = ['heimdall', 'flash', '--usbbus', busaddr,
+ '--usbdevaddr', devaddr]
+ for key, elem in partition_bin_mappings.items():
+ heimdall_cmd.append('--{}'.format(key))
+ heimdall_cmd.append(elem)
+ logging.debug(heimdall_cmd)
+
+ ret = call(heimdall_cmd, timeout=600)
+ if ret:
+ raise Exception('Heimdall error.')
+
def _wait_uart_shell_login_prompt(self):
"""docstring for _wait_uart_shell_login_prompt"""
logging.debug('===============Print boot logs===============')
import logging
from litmus.device.device import device
from litmus.core.util import check_output, find_pattern
-from litmus.core.util import convert_single_item_to_list
from litmus.core.util import call
User can control device in topology by this class methods.
"""
+ _booting_time = 60
+
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
else:
self._id = self._find_device_id()
- # init a cutter instance.
self._manager = kwargs['manager']
def _release(self):
return found
# public methods.
+
def get_id(self):
"""
Return the id of acquired device.
logging.debug('turn on device {}'.format(self.get_name()))
self.start_sdb_server()
- if self._find_device_id() == self.get_id():
+ if self.is_on():
self._sdb_root_on()
- self.run_cmd('reboot -f', timeout=10)
- time.sleep(60)
+ self.run_cmd('reboot -f', timeout=20)
+ time.sleep(self._booting_time)
self.start_sdb_server()
self._sdb_root_on()
"""
logging.debug('turn off device {}'.format(self.get_name()))
- def thor(self, filenames):
- """docstring for thor"""
- cmd = 'lthor'
- filenames = convert_single_item_to_list(filenames)
- for l in filenames:
- cmd += ' {}'.format(l)
- logging.debug(cmd)
- ret = call(cmd.split(), timeout=600)
- if ret:
- raise Exception('Thor error.')
-
- def heimdall(self, filenames,
- partition_bin_mappings={'BOOT': 'zImage',
- 'ROOTFS': 'rootfs.img',
- 'USER': 'user.img',
- 'SYSTEM-DATA': 'system-data.img'}):
- """docstring for heimdall"""
- filenames = convert_single_item_to_list(filenames)
- tar_cmd = ['tar', 'xvfz']
- for l in filenames:
- tar_cmd.append(l)
- logging.debug(tar_cmd)
- call(tar_cmd, timeout=30)
-
- heimdall_cmd = ['heimdall', 'flash']
- for key, elem in partition_bin_mappings.items():
- heimdall_cmd.append('--{}'.format(key))
- heimdall_cmd.append(elem)
- logging.debug(heimdall_cmd)
-
- ret = call(heimdall_cmd, timeout=600)
- if ret:
- raise Exception('Heimdall error.')
-
def flash(self, filenames, flasher='lthor', waiting=5,
partition_bin_mappings={'BOOT': 'zImage',
'ROOTFS': 'rootfs.img',
This function turn on device and turn off device automatically.
:param dict filenames: filename string or dict
- :param func flasher: wrapper function of external flashing tool
+ :param string flasher: external flashing tool name
:param float waiting: waiting time to acquire cdc_acm device
:param dict partition_bin_mappings: partition table for device which use heimdall flasher
raise Exception('There\'s no file to flash.')
try:
self._sdb_root_on()
- self.run_cmd('reboot -f download', timeout=10)
- time.sleep(5)
+ self._acquire_global_lock()
+ self.run_cmd('reboot -f download', timeout=20)
+ time.sleep(waiting)
if flasher == 'lthor':
- self.thor(filenames=filenames)
+ busid = self._find_usb_busid()
+ self._release_global_lock()
+ self._lthor(filenames=filenames, busid=busid)
elif flasher == 'heimdall':
- self.heimdall(filenames=filenames,
+ (busaddr, devaddr) = self._find_usb_bus_and_device_address()
+ self._release_global_lock()
+ self._heimdall(filenames=filenames,
+ busaddr=busaddr,
+ devaddr=devaddr,
partition_bin_mappings=partition_bin_mappings)
except (Exception, KeyboardInterrupt) as e:
+ self._release_global_lock()
logging.debug(e)
raise Exception('Can\'t flash files : {}.'.format(filenames))