time.sleep(retry_delay)
raise Exception('{} device is not available.'.format(devicetype))
+ def acquire_dut_by_name(self, devicename,
+ max_retry_times=10, retry_delay=10):
+ """
+ Acquire an available device for testing.
+
+ :param str devicename: device name
+ :param int max_retry_times: max retry times for device acquisition
+ :param float retry_delay: delay time for each device acquisition retry
+
+ Example:
+ >>> mgr = manager()
+ >>> dut = mgr.acquire_dut_by_name('XU3_001')
+
+ :returns device: acquired device instance
+ """
+ logging.debug('===============Acquire an available DUT===============')
+
+ candidate = next((dev for dev in self._all_devices
+ if dev['devicename'] == devicename), None)
+
+ if candidate:
+ for times in range(0, max_retry_times):
+ if not candidate['ilock'].acquired:
+ gotten_tlock = candidate['tlock'].acquire(blocking=False)
+ gotten_ilock = candidate['ilock'].acquire(blocking=False)
+ try:
+ os.chmod(candidate['ilock'].path, 0o664)
+ except PermissionError:
+ logging.debug('Can\'t change lock file permission')
+
+ # if acquire tlock and ilock, assign a device.
+ if gotten_tlock and gotten_ilock:
+ dut = device.create(manager=self, **candidate)
+ self._duts.append(dut)
+ logging.debug('{} is assigned.'
+ .format(dut.get_name()))
+ return dut
+ # if acquire tlock only then release it for next time.
+ elif gotten_tlock and not gotten_ilock:
+ candidate['tlock'].release()
+ else:
+ logging.debug('{} is busy. Wait {} seconds.'
+ .format(devicename, retry_delay))
+ time.sleep(retry_delay)
+ raise Exception('{} is not available.'.format(devicename))
+
def release_dut(self, dut=None):
"""
Release acquired devices under test.
>>> mgr.release_dut()
"""
- # TODO: self._duts.remove(dev) doesn't delete device instance.
# release all _duts if dut param is None
if not dut:
for dev in self._duts:
for section in configparser.sections():
items = dict(configparser.items(section))
- items['deviceid'] = section
+ items['devicename'] = section
# Interproces Lock and Thread Lock
ilock_filename = os.path.join(self._path_for_locks,
- items['deviceid'])
+ items['devicename'])
items['tlock'] = Lock()
items['ilock'] = fasteners.InterProcessLock(ilock_filename)
# Append items
self._all_devices.append(items)
- # Add mock device
- mock_deviceid = 'MOCK_001'
- mock_ilock_filename = os.path.join(self._path_for_locks, mock_deviceid)
- mock = {'deviceid': mock_deviceid,
- 'dev_type': 'mock',
- 'tlock': Lock(),
- 'ilock': fasteners.InterProcessLock(mock_ilock_filename)}
- self._all_devices.append(mock)
+ if not next((d for d in self._all_devices if d['dev_type'] == 'mock'),
+ None):
+ # Add mock device
+ mock_devicename = 'MOCK_001'
+ mock_ilock_filename = os.path.join(self._path_for_locks,
+ mock_devicename)
+ mock = {'devicename': mock_devicename,
+ 'dev_type': 'mock',
+ 'tlock': Lock(),
+ 'ilock': fasteners.InterProcessLock(mock_ilock_filename)}
+ self._all_devices.append(mock)
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
- self._name = kwargs['deviceid']
- self._id = self._find_device_id()
+ self._name = kwargs['devicename']
+ if 'serialno' in kwargs:
+ self._id = kwargs['serialno']
+ else:
+ self._id = self._find_device_id()
# init a cutter instance.
self._manager = kwargs['manager']
def _find_device_id(self):
"""docstring for _find_device_id"""
- self.refresh_sdb_server()
+ self.start_sdb_server()
outs = check_output(['sdb', 'devices'], timeout=10)
pattern = '.*List of devices attached \n([a-zA-Z0-9]*).*device.*'
found = find_pattern(pattern, outs, groupindex=1)
"""
logging.debug('turn on device {}'.format(self.get_name()))
- self.refresh_sdb_server()
+ self.start_sdb_server()
if self._find_device_id() == self.get_id():
self._sdb_root_on()
- self.run_cmd('reboot -f')
+ self.run_cmd('reboot -f', timeout=10)
time.sleep(60)
- self.refresh_sdb_server()
+ self.start_sdb_server()
self._sdb_root_on()
def off(self, powercut_delay=2):
"""
logging.debug('flash binaries to device : {}'.format(filenames))
- self.refresh_sdb_server()
+ self.start_sdb_server()
if not filenames:
raise Exception('There\'s no file to flash.')
"""docstring for refresh_sdb_server"""
call('sdb kill-server; sdb start-server', shell=True, timeout=10)
time.sleep(1)
+
+ def start_sdb_server(self):
+ """docstring for start_sdb_server"""
+ call('sdb start-server', shell=True, timeout=10)
+ time.sleep(1)