- Update mock device type to avoid sdb error
- Update default templates for u3 and xu3
-Version 0.3.0 19 Sep 2016
+Version 0.3.0 21 Sep 2016
---------------------------
- Update projects/topology file location
- Add projects/topology param at command prompt
+- Support multiple mock devices in topology
+- Add global lock for mock device type
1. Change directory name with version postfix and create an orig.tar.gz
$ mv litmus litmus-0.3.0
+
$ tar cvfz litmus-0.3.0.orig.tar.gz litmus-0.3.0
1. Build a deb package with debuild
$ cd litmus-0.3.0
+
$ debuild
2. Install the deb package using dpkg
* Update projects/topology file location
* Add projects/topology param at command prompt
+ * Support multiple mock devices in topology
+ * Add global lock for mock device type
- -- Donghoon Shin <dhs.shin@samsung.com> Mon, 19 Sep 2016 12:39:00 +0900
+ -- Donghoon Shin <dhs.shin@samsung.com> Wed, 21 Sep 2016 15:08:00 +0900
litmus (0.2.1-1) unstable; urgency=low
time.sleep(retry_delay)
raise Exception('{} device is not available.'.format(devicetype))
+ def acquire_dut_by_name(self, devicename,
+ max_retry_times=10, retry_delay=10):
+ """
+ Acquire an available device for testing.
+
+ :param str devicename: device name
+ :param int max_retry_times: max retry times for device acquisition
+ :param float retry_delay: delay time for each device acquisition retry
+
+ Example:
+ >>> mgr = manager()
+ >>> dut = mgr.acquire_dut_by_name('XU3_001')
+
+ :returns device: acquired device instance
+ """
+ logging.debug('===============Acquire an available DUT===============')
+
+ candidate = [dev for dev in self._all_devices
+ if dev['devicename'] == devicename]
+
+ if candidate:
+ for times in range(0, max_retry_times):
+ for dev in candidate:
+ if not dev['ilock'].acquired:
+ gotten_tlock = dev['tlock'].acquire(blocking=False)
+ gotten_ilock = dev['ilock'].acquire(blocking=False)
+ try:
+ os.chmod(dev['ilock'].path, 0o664)
+ except PermissionError:
+ logging.debug('Can\'t change lock file permission')
+
+ # if acquire tlock and ilock, assign a device.
+ if gotten_tlock and gotten_ilock:
+ dut = device.create(manager=self, **dev)
+ self._duts.append(dut)
+ logging.debug('{} is assigned.'
+ .format(dut.get_name()))
+ return dut
+ # if acquire tlock only then release it for next time.
+ elif gotten_tlock and not gotten_ilock:
+ dev['tlock'].release()
+ else:
+ logging.debug('{} is busy. Wait {} seconds.'
+ .format(devicename, retry_delay))
+ time.sleep(retry_delay)
+ raise Exception('{} is not available.'.format(devicename))
+
def release_dut(self, dut=None):
"""
Release acquired devices under test.
>>> mgr.release_dut()
"""
- # TODO: self._duts.remove(dev) doesn't delete device instance.
# release all _duts if dut param is None
if not dut:
for dev in self._duts:
for section in configparser.sections():
items = dict(configparser.items(section))
- items['deviceid'] = section
+ items['devicename'] = section
# Interproces Lock and Thread Lock
ilock_filename = os.path.join(self._path_for_locks,
- items['deviceid'])
+ items['devicename'])
items['tlock'] = Lock()
items['ilock'] = fasteners.InterProcessLock(ilock_filename)
# Append items
self._all_devices.append(items)
- # Add mock device
- mock_deviceid = 'MOCK_001'
- mock_ilock_filename = os.path.join(self._path_for_locks, mock_deviceid)
- mock = {'deviceid': mock_deviceid,
- 'dev_type': 'mock',
- 'tlock': Lock(),
- 'ilock': fasteners.InterProcessLock(mock_ilock_filename)}
- self._all_devices.append(mock)
+ if not next((d for d in self._all_devices if d['dev_type'] == 'mock'),
+ None):
+ # Add mock device
+ mock_devicename = 'MOCK_001'
+ mock_ilock_filename = os.path.join(self._path_for_locks,
+ mock_devicename)
+ mock = {'devicename': mock_devicename,
+ 'dev_type': 'mock',
+ 'tlock': Lock(),
+ 'ilock': fasteners.InterProcessLock(mock_ilock_filename)}
+ self._all_devices.append(mock)
super(device, self).__init__()
self.args = args
self.kwargs = kwargs
- self._name = kwargs['deviceid']
+ self._name = kwargs['devicename']
# init a cutter instance.
self._cutter = cutter.create(*args, **kwargs)
else:
return False
- def _thor(self, filenames, busid):
- """docstring for thor_downloader"""
- cmd = 'lthor --busid={0}'.format(busid)
- filenames = convert_single_item_to_list(filenames)
- for l in filenames:
- cmd += ' {}'.format(l)
- logging.debug(cmd)
- ret = call(cmd.split(), timeout=600)
- if ret:
- raise Exception('Thor error.')
-
- def flash(self, filenames, flasher=_thor, waiting=5):
+ def flash(self, filenames, flasher='lthor', waiting=5):
"""
Flash binaries to device.
This function turn on device and turn off device automatically.
:param dict filenames: filename string or dict
- :param func flasher: wrapper function of external flashing tool
+ :param sting flasher: external flashing tool name
:param float waiting: waiting time to acquire cdc_acm device
Example:
time.sleep(waiting)
busid = self._find_usb_busid()
self._release_global_lock()
- flasher(self, filenames=filenames, busid=busid)
+ self._lthor(filenames=filenames, busid=busid)
self._cutter.off()
except (Exception, KeyboardInterrupt) as e:
self._release_global_lock()
Example:
>>> dut.on()
- >>> dut.run_cmd(['ls','-alF','/','|','grep','usr'])
+ >>> dut.run_cmd('ls -alF / | grep usr')
\'drwxr-xr-x 15 root root 4096 Apr 29 2016 usr/\\r\\n\'
:returns str: stdout of sdb shell command
"""
c = ['sdb', '-s', self.get_id(), 'shell']
- c.extend(command)
+ c.extend(convert_single_item_to_list(command))
logging.debug(c)
result = check_output(c, timeout=timeout)
return result
if self._uart.isOpen():
self._uart.close()
- def _find_usb_busid(self):
- """docstring for find_usb_busid"""
- pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
- self._pid)
- kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20'
- outs = check_output(kernlog, shell=True, timeout=10)
- result = find_all_pattern(pattern=pattern, data=outs)
- if result:
- busid = result[-1]
- logging.debug('usb busid : {}'.format(busid))
- else:
- raise Exception('Can\'t find usb busid')
-
- return busid
-
def _thread_for_enter_download_mode(self, cmd, count):
"""docstring for thread_for_enter_download_mode"""
for loop in range(count*20):
self._cutter.on(delay=powercut_delay)
t.join()
+ def _find_usb_busid(self):
+ """docstring for find_usb_busid"""
+ pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
+ self._pid)
+ kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20'
+ outs = check_output(kernlog, shell=True, timeout=10)
+ result = find_all_pattern(pattern=pattern, data=outs)
+ if result:
+ busid = result[-1]
+ logging.debug('usb busid : {}'.format(busid))
+ else:
+ raise Exception('Can\'t find usb busid')
+
+ return busid
+
+ def _lthor(self, filenames, busid):
+ """docstring for _lthor"""
+ cmd = 'lthor --busid={0}'.format(busid)
+ filenames = convert_single_item_to_list(filenames)
+ for l in filenames:
+ cmd += ' {}'.format(l)
+ logging.debug(cmd)
+ ret = call(cmd.split(), timeout=600)
+ if ret:
+ raise Exception('Thor error.')
+
+ def _find_usb_bus_and_device_address(self):
+ """docstring for _find_usb_bus_and_device_address"""
+ pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
+ self._pid)
+ kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20'
+ outs = check_output(kernlog, shell=True, timeout=10)
+ result = find_all_pattern(pattern=pattern, data=outs)
+ if result:
+ bid = result[-1]
+ busaddr_cmd = 'cat /sys/bus/usb/devices/{0}/busnum'.format(bid)
+ busaddr = check_output(busaddr_cmd, shell=True).rstrip().zfill(3)
+ logging.debug('usb_bus_addr : {}'.format(busaddr))
+ devaddr_cmd = 'cat /sys/bus/usb/devices/{0}/devnum'.format(bid)
+ devaddr = check_output(devaddr_cmd, shell=True).rstrip().zfill(3)
+ logging.debug('usb_dev_addr : {}'.format(devaddr))
+ else:
+ raise Exception('Can\'t find usb bus and dev addr')
+
+ return (busaddr, devaddr)
+
+ def _heimdall(self, filenames, busaddr, devaddr, partition_bin_mappings):
+ """docstring for _heimdall"""
+ filenames = convert_single_item_to_list(filenames)
+ tar_cmd = ['tar', 'xvfz']
+ for l in filenames:
+ tar_cmd.append(l)
+ logging.debug(tar_cmd)
+ call(tar_cmd, timeout=30)
+
+ heimdall_cmd = ['heimdall', 'flash', '--usbbus', busaddr,
+ '--usbdevaddr', devaddr]
+ for key, elem in partition_bin_mappings.items():
+ heimdall_cmd.append('--{}'.format(key))
+ heimdall_cmd.append(elem)
+ logging.debug(heimdall_cmd)
+
+ ret = call(heimdall_cmd, timeout=600)
+ if ret:
+ raise Exception('Heimdall error.')
+
def _wait_uart_shell_login_prompt(self):
"""docstring for _wait_uart_shell_login_prompt"""
logging.debug('===============Print boot logs===============')
import logging
from litmus.device.device import device
from litmus.core.util import check_output, find_pattern
-from litmus.core.util import convert_single_item_to_list
from litmus.core.util import call
User can control device in topology by this class methods.
"""
+ _booting_time = 60
+
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
- self._name = kwargs['deviceid']
- self._id = self._find_device_id()
+ self._name = kwargs['devicename']
+ if 'serialno' in kwargs:
+ self._id = kwargs['serialno']
+ else:
+ self._id = self._find_device_id()
- # init a cutter instance.
self._manager = kwargs['manager']
def _release(self):
def _find_device_id(self):
"""docstring for _find_device_id"""
- self.refresh_sdb_server()
+ self.start_sdb_server()
outs = check_output(['sdb', 'devices'], timeout=10)
pattern = '.*List of devices attached \n([a-zA-Z0-9]*).*device.*'
found = find_pattern(pattern, outs, groupindex=1)
return found
# public methods.
+
def get_id(self):
"""
Return the id of acquired device.
"""
logging.debug('turn on device {}'.format(self.get_name()))
- self.refresh_sdb_server()
- if self._find_device_id() == self.get_id():
+ self.start_sdb_server()
+ if self.is_on():
self._sdb_root_on()
- self.run_cmd(['reboot', '-f'])
- time.sleep(60)
- self.refresh_sdb_server()
+ self.run_cmd('reboot -f', timeout=20)
+ time.sleep(self._booting_time)
+ self.start_sdb_server()
self._sdb_root_on()
def off(self, powercut_delay=2):
"""
logging.debug('turn off device {}'.format(self.get_name()))
- def thor(self, filenames):
- """docstring for thor"""
- cmd = 'lthor'
- filenames = convert_single_item_to_list(filenames)
- for l in filenames:
- cmd += ' {}'.format(l)
- logging.debug(cmd)
- ret = call(cmd.split(), timeout=600)
- if ret:
- raise Exception('Thor error.')
-
- def heimdall(self, filenames,
- partition_bin_mappings={'BOOT': 'zImage',
- 'ROOTFS': 'rootfs.img',
- 'USER': 'user.img',
- 'SYSTEM-DATA': 'system-data.img'}):
- """docstring for heimdall"""
- filenames = convert_single_item_to_list(filenames)
- tar_cmd = ['tar', 'xvfz']
- for l in filenames:
- tar_cmd.append(l)
- logging.debug(tar_cmd)
- call(tar_cmd, timeout=30)
-
- heimdall_cmd = ['heimdall', 'flash']
- for key, elem in partition_bin_mappings.items():
- heimdall_cmd.append('--{}'.format(key))
- heimdall_cmd.append(elem)
- logging.debug(heimdall_cmd)
-
- ret = call(heimdall_cmd, timeout=600)
- if ret:
- raise Exception('Heimdall error.')
-
def flash(self, filenames, flasher='lthor', waiting=5,
partition_bin_mappings={'BOOT': 'zImage',
'ROOTFS': 'rootfs.img',
This function turn on device and turn off device automatically.
:param dict filenames: filename string or dict
- :param func flasher: wrapper function of external flashing tool
+ :param string flasher: external flashing tool name
:param float waiting: waiting time to acquire cdc_acm device
:param dict partition_bin_mappings: partition table for device which use heimdall flasher
"""
logging.debug('flash binaries to device : {}'.format(filenames))
- self.refresh_sdb_server()
+ self.start_sdb_server()
if not filenames:
raise Exception('There\'s no file to flash.')
try:
self._sdb_root_on()
- self.run_cmd(['reboot', '-f', 'download'], timeout=10)
- time.sleep(5)
+ self._acquire_global_lock()
+ self.run_cmd('reboot -f download', timeout=20)
+ time.sleep(waiting)
if flasher == 'lthor':
- self.thor(filenames=filenames)
+ busid = self._find_usb_busid()
+ self._release_global_lock()
+ self._lthor(filenames=filenames, busid=busid)
elif flasher == 'heimdall':
- self.heimdall(filenames=filenames,
+ (busaddr, devaddr) = self._find_usb_bus_and_device_address()
+ self._release_global_lock()
+ self._heimdall(filenames=filenames,
+ busaddr=busaddr,
+ devaddr=devaddr,
partition_bin_mappings=partition_bin_mappings)
except (Exception, KeyboardInterrupt) as e:
+ self._release_global_lock()
logging.debug(e)
raise Exception('Can\'t flash files : {}.'.format(filenames))
"""docstring for refresh_sdb_server"""
call('sdb kill-server; sdb start-server', shell=True, timeout=10)
time.sleep(1)
+
+ def start_sdb_server(self):
+ """docstring for start_sdb_server"""
+ call('sdb start-server', shell=True, timeout=10)
+ time.sleep(1)