Change private function _sdb_root_on to public function sdb_root_on
[tools/litmus.git] / litmus / device / device.py
1 #!/usr/bin/env python3
2 # Copyright 2015-2016 Samsung Electronics Co., Ltd.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import os
17 import time
18 import serial
19 import logging
20 import tempfile
21 import fasteners
22 from PIL import Image
23 from threading import Thread, Lock
24 from litmus.core.util import call, check_output
25 from litmus.core.util import convert_single_item_to_list
26 from litmus.core.util import find_pattern
27 from litmus.core.util import decode
28 from litmus.core.util import create_instance
29 from litmus.core.util import find_all_pattern
30 from litmus.core.exceptions import BootError
31 from litmus.device.cutter import cutter
32 from litmus import _path_for_locks_
33
34
35 class device(object):
36     """
37     Litmus device class.
38     User can control device in topology by this class methods.
39     """
40
41     _baudrate = 115200
42     _readtimeout = 0.5
43     _enterkey = b'\r'
44     _dnmode_cmd = b'thordown'
45     _username = b'root'
46     _password = b'tizen'
47     _vid = '04e8'
48     _pid = '685d'
49     _pattern_loginprompt = r'.*login: $'
50     _pattern_shellprompt = r'.*# .*'
51     _max_attempt_login_uart_shell = 5
52     _max_attempt_attach_sdb = 10
53     _retrycnt_at_a_time_sdb = 20
54     _max_attempt_boot_retry = 3
55     _boot_timeout = 50.0
56     _path_for_locks = _path_for_locks_
57     _screen_width = 1920
58     _screen_height = 1080
59
60     _cutter = None
61     _uart = None
62     _manager = None
63     _global_tlock = Lock()
64     _global_ilock_path = os.path.join(_path_for_locks_, 'globallock')
65     _global_ilock = fasteners.InterProcessLock(_global_ilock_path)
66
67     _name = None
68     _tests = None
69
70     def __init__(self, *args, **kwargs):
71         super(device, self).__init__()
72         self.args = args
73         self.kwargs = kwargs
74         self._name = kwargs['devicename']
75
76         # init a cutter instance.
77         self._cutter = cutter.create(*args, **kwargs)
78         # open uart
79         self._open_uart()
80         self._manager = kwargs['manager']
81
82     def __del__(self):
83         """docstring for __del__"""
84         self._release()
85
86     def _release(self):
87         """docstring for _release"""
88         self._cutter.off(delay=1)
89         self._close_uart()
90
91     # public methods.
92
93     @classmethod
94     def create(self, *args, **kwargs):
95         """
96         Create a device instance.
97         """
98         clsname = 'device' + kwargs['dev_type']
99         return create_instance(clsname,
100                                'litmus.device',
101                                *args,
102                                **kwargs)
103
104     def get_name(self):
105         """
106         Return the name of acquired device.
107
108         Example:
109             >>> dut = mgr.acquire_dut('xu3')
110             >>> dut.get_name()
111             'XU3_001'
112
113         :returns str: device name
114         """
115         return self._name
116
117     def get_id(self):
118         """
119         Return the id of acquired device.
120         Device instance uses this id for sdb connection.
121
122         Example:
123             >>> dut = mgr.acquire_dut('xu3')
124             >>> dut.get_id()
125             'XU3_001'
126
127         :returns str: device id
128         """
129         return self.get_name()
130
131     def on(self, powercut_delay=1):
132         """
133         Turn on the device acquired.
134
135         :param float powercut_delay: power-cut delay for cutter
136         """
137         logging.debug('=================Turn on device {}=================='
138                       .format(self.get_name()))
139         retry_cnt = 0
140         while retry_cnt <= self._max_attempt_boot_retry:
141             try:
142                 self.off(1)
143                 self._cutter.on(powercut_delay)
144                 self._wait_uart_shell_login_prompt()
145                 self._login_uart_shell()
146                 self._set_sdb_deviceid()
147                 self._attach_sdb()
148                 self.sdb_root_on()
149                 return
150             except KeyboardInterrupt:
151                 self.off(1)
152                 raise Exception('Keyboard interrupt.')
153             except Exception as e:
154                 logging.debug(e)
155                 retry_cnt += 1
156         else:
157             self.off(1)
158             raise BootError('Can\'t turn on dut.')
159
160     def off(self, powercut_delay=1):
161         """
162         Trun off the device acquired.
163
164         :param float powercut_delay: power-cut delay for cutter
165         """
166         logging.debug('=================Turn off device {}================='
167                       .format(self.get_name()))
168         self._detach_sdb()
169         self._cutter.off(powercut_delay)
170
171     def is_on(self):
172         """
173         Return whether device is turned on or not.
174
175         Example:
176             >>> dut.on()
177             >>> dut.is_on()
178             True
179             >>> dut.off()
180             >>> dut.is_on()
181             False
182
183         :returns boolean: true if device is turned on, false otherwise.
184         """
185         pattern = '.*{}'.format(self.get_id())
186         outs = check_output('sdb devices'.split(), timeout=10)
187         if find_pattern(pattern, outs):
188             return True
189         else:
190             return False
191
192     def flash(self, filenames, flasher='lthor', waiting=5):
193         """
194         Flash binaries to device.
195         This function turn on device and turn off device automatically.
196
197         :param dict filenames: filename string or dict
198         :param sting flasher: external flashing tool name
199         :param float waiting: waiting time to acquire cdc_acm device
200
201         Example:
202             >>> dut.flash(['boot.tar.gz','platform.tar.gz'])
203             >>> or
204             >>> dut.flash('platform.tar.gz')
205
206         """
207         logging.debug('==============Flash binaries to device {}==========='
208                       .format(self.get_name()))
209         logging.debug(filenames)
210
211         if not filenames:
212             raise Exception('There\'s no file to flash.')
213         try:
214             self._acquire_global_lock()
215             time.sleep(waiting)
216             self._enter_download_mode(self._dnmode_cmd)
217             time.sleep(waiting)
218             busid = self._find_usb_busid()
219             self._release_global_lock()
220             self._lthor(filenames=filenames, busid=busid)
221             self.off()
222         except (Exception, KeyboardInterrupt) as e:
223             self._release_global_lock()
224             logging.debug(e)
225             raise Exception('Can\'t flash files : {}.'.format(filenames))
226
227     def run_cmd(self, command, timeout=None):
228         """
229         Run a command on device.
230
231         :param str command: command to run on device
232         :param float timeout: timeout
233
234         Example:
235             >>> dut.on()
236             >>> dut.run_cmd('ls -alF / | grep usr')
237             \'drwxr-xr-x  15 root root     4096 Apr 29  2016 usr/\\r\\n\'
238
239         :returns str: stdout of sdb shell command
240
241         """
242         logging.debug('==============Run a command on device {}============'
243                       .format(self.get_name()))
244         c = ['sdb', '-s', self.get_id(), 'shell']
245         c.extend(convert_single_item_to_list(command))
246         logging.debug(c)
247         result = check_output(c, timeout=timeout)
248         return result
249
250     def push_file(self, src, dest, timeout=None):
251         """
252         Push a file from host to destination path of device.
253
254         :param str src: file path from host pc
255         :param str dest: destination path of device
256         :param float timeout: timeout
257
258         Example:
259             >>> dut.push_file('test.png', '/tmp')
260
261         :returns str: stdout of sdb push command
262         """
263         logging.debug('==============Push a file to device {}=============='
264                       .format(self.get_name()))
265         logging.debug('src from host : {}'.format(src))
266         logging.debug('dest on dut: {}'.format(dest))
267         c = ['sdb', '-s', self.get_id(), 'push', src, dest]
268         result = check_output(c, timeout=timeout)
269         return result
270
271     def pull_file(self, src, dest, timeout=None):
272         """
273         Pull a file from device to destination path of host.
274
275         :param str src: file path from device
276         :param str dest: destination path of host pc
277         :param float timeout: timeout
278
279         Example:
280             >>> dut.pull_file('/tmp/test.png','.')
281
282         :returns str: stdout of sdb push command
283         """
284         logging.debug('==============Pull a file from device {}============'
285                       .format(self.get_name()))
286         logging.debug('src from dut : {}'.format(src))
287         logging.debug('dest on host: {}'.format(dest))
288         c = ['sdb', '-s', self.get_id(), 'pull', src, dest]
289         result = check_output(c, timeout=timeout)
290         return result
291
292     def _read_uart(self, bufsize=100):
293         """docstring for read_uart"""
294         readdata = decode(self._uart.read(bufsize))
295         logging.debug(readdata)
296         return readdata
297
298     def _write_uart(self, cmd, returnkey=b'\r'):
299         """docstring for write_uart"""
300         self._uart.write(cmd)
301         time.sleep(0.1)
302         if returnkey:
303             self._uart.write(returnkey)
304
305     def add_test(self, func, args):
306         """
307         Add a testcase to device class instance.
308
309         :param func func: function object for test
310         :param dict args: arguments for test function
311
312         Example:
313             >>> from litmus.helper.helper import verify_wifi_is_working
314             >>> dut.add_test(verify_wifi_is_working,
315                              {'wifi_apname': 'setup',
316                               'wifi_password': '',
317                               'result_dir': 'result'})
318
319         """
320         if not self._tests:
321             self._tests = []
322
323         self._tests.append({'func': func, 'args': args})
324
325     def del_test(self, func):
326         """
327         Delete a testcase from device class instance.
328
329         :param func func: function object for test
330
331         Example:
332             >>> from litmus.helper.helper import verify_wifi_is_working
333             >>> dut.del_test(verify_wifi_is_working)
334         """
335         self._tests = [l for l in self._tests if l['func'] != func]
336
337     def run_tests(self):
338         """
339         Run all testcases.
340
341         Example:
342             >>> from litmus.helper.helper import verify_wifi_is_working
343             >>> dut.add_test(verify_wifi_is_working,
344                              {'wifi_apname': 'setup',
345                               'wifi_password': '',
346                               'result_dir': 'result'})
347             >>> dut.run_tests()
348
349         """
350         for l in self._tests:
351             if isinstance(l['args'], dict):
352                 l['func'](self, **l['args'])
353             elif isinstance(l['args'], tuple):
354                 l['func'](self, *l['args'])
355
356     def screenshot(self, filename):
357         """
358         Take a screenshot (png format)
359
360         :param str filename: screenshot file name
361
362         Example:
363             >>> dut.screenshot('screenshot.png')
364
365         """
366         logging.debug('==== Take a screenshot: {}, width: {}, height: {} ===='
367                       .format(filename,
368                               self._screen_width,
369                               self._screen_height))
370         dt = self._get_display_server_type()
371         if dt == 'X11':
372             self._screenshot_x11(filename)
373         elif dt == 'WAYLAND':
374             self._screenshot_wayland(filename)
375
376     # private methods.
377     def _get_display_server_type(self):
378         """docstring for get_display_server_type"""
379         res = self.run_cmd('ls /usr/lib', timeout=10)
380         if find_pattern('.*libX11.*', res):
381             return 'X11'
382         else:
383             return 'WAYLAND'
384
385     def _screenshot_x11(self, filename):
386         """docstring for _screenshot_x11"""
387         # take a screenshot using xwd
388         cmd = 'xwd -root -out /tmp/{}.xwd'.format(filename)
389         self.run_cmd(cmd, timeout=20)
390
391         # pull xwd file
392         self.pull_file('/tmp/{}.xwd'.format(filename), os.curdir, timeout=20)
393
394         # convert xwd to png and resize it
395         call(['convert', '{0}.xwd'.format(filename), '-resize',
396               '{0}x{1}'.format(self._screen_width, self._screen_height),
397               filename], timeout=20)
398         call(['rm', '{}.xwd'.format(filename)], timeout=20)
399
400     def _screenshot_wayland(self, filename):
401         """docstring for _screenshot_wayland"""
402         # Find all viewable window id
403         p_winid = '.*(0x[a-zA-Z0-9]{8})\s+\d+\s+\d+\s+\d+\s+\d+\s+(\d+)\s+(\d+).*\sViewable.*'
404         winids = find_all_pattern(p_winid,
405                                   self.run_cmd('enlightenment_info -topvwins',
406                                                timeout=20))
407         if winids:
408             # Dump windows
409             outs = self.run_cmd('enlightenment_info -dump_topvwins',
410                                 timeout=20)
411             dirn = find_pattern('directory:\s(.*)',
412                                 outs,
413                                 groupindex=1).rstrip()
414
415             # Create tempdir and pull dump files
416             tmpdir = tempfile.mkdtemp()
417             self.pull_file(dirn, tmpdir, timeout=20)
418
419             # If dump does not exist then remove winid from list
420             winids = [winid for winid in winids
421                       if os.path.exists(os.path.join(tmpdir, winid[0]+'.png'))]
422
423             # Base image
424             bg = Image.new('RGB', (self._screen_width, self._screen_height))
425             # Merge images
426             for winid in reversed(winids):
427                 try:
428                     fg = Image.open(os.path.join(tmpdir, winid[0]+'.png'))
429                     bg.paste(fg, (int(winid[1]), int(winid[2])), fg)
430                 except FileNotFoundError:
431                     pass
432             # Save merged image
433             bg.save(filename)
434             # Remove tempdir
435             call(['rm', '-rf', tmpdir], timeout=10)
436
437     def _flush_uart_buffer(self):
438         """docstring for flush_uart_buffer"""
439         self._uart.flushInput()
440         self._uart.flushOutput()
441         self._uart.flush()
442
443     def _open_uart(self):
444         """docstring for open_uart"""
445         try:
446             self._uart = serial.Serial(port=self.kwargs['uart_port'],
447                                        baudrate=self._baudrate,
448                                        timeout=self._readtimeout)
449         except serial.SerialException as err:
450             logging.debug(err)
451             return None
452         return self._uart
453
454     def _close_uart(self):
455         """docstring for close_uart"""
456         if self._uart.isOpen():
457             self._uart.close()
458
459     def _thread_for_enter_download_mode(self, cmd, count):
460         """docstring for thread_for_enter_download_mode"""
461         for loop in range(count*20):
462             self._uart.write(self._enterkey)
463             time.sleep(0.05)
464         self._uart.write(cmd)
465         for loop in range(2):
466             time.sleep(0.1)
467             self._uart.write(self._enterkey)
468
469     def _enter_download_mode(self, cmd, powercut_delay=1, thread_param=10):
470         """docstring for _enter_download_mode"""
471         t = Thread(target=self._thread_for_enter_download_mode,
472                    args=(cmd, thread_param, ))
473         t.start()
474         self._cutter.off(delay=powercut_delay)
475         self._cutter.on(delay=powercut_delay)
476         t.join()
477
478     def _find_usb_busid(self):
479         """docstring for find_usb_busid"""
480         pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
481                                                                   self._pid)
482         kernlog = 'dmesg | grep usb | tail -n 20'
483         outs = check_output(kernlog, shell=True, timeout=10)
484         result = find_all_pattern(pattern=pattern, data=outs)
485         if result:
486             busid = result[-1]
487             logging.debug('usb busid : {}'.format(busid))
488         else:
489             raise Exception('Can\'t find usb busid')
490
491         return busid
492
493     def _lthor(self, filenames, busid):
494         """docstring for _lthor"""
495         cmd = 'lthor --busid={0}'.format(busid)
496         filenames = convert_single_item_to_list(filenames)
497         for l in filenames:
498             cmd += ' {}'.format(l)
499         logging.debug(cmd)
500         ret = call(cmd.split(), timeout=600)
501         if ret:
502             raise Exception('Thor error.')
503
504     def _find_usb_bus_and_device_address(self):
505         """docstring for _find_usb_bus_and_device_address"""
506         pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
507                                                                   self._pid)
508         kernlog = 'dmesg | grep usb | tail -n 20'
509         outs = check_output(kernlog, shell=True, timeout=10)
510         result = find_all_pattern(pattern=pattern, data=outs)
511         if result:
512             bid = result[-1]
513             busaddr_cmd = 'cat /sys/bus/usb/devices/{0}/busnum'.format(bid)
514             busaddr = check_output(busaddr_cmd, shell=True).rstrip().zfill(3)
515             logging.debug('usb_bus_addr : {}'.format(busaddr))
516             devaddr_cmd = 'cat /sys/bus/usb/devices/{0}/devnum'.format(bid)
517             devaddr = check_output(devaddr_cmd, shell=True).rstrip().zfill(3)
518             logging.debug('usb_dev_addr : {}'.format(devaddr))
519         else:
520             raise Exception('Can\'t find usb bus and dev addr')
521
522         return (busaddr, devaddr)
523
524     def _heimdall(self, filenames, busaddr, devaddr, partition_bin_mappings):
525         """docstring for _heimdall"""
526         filenames = convert_single_item_to_list(filenames)
527         tar_cmd = ['tar', 'xvfz']
528         for l in filenames:
529             tar_cmd.append(l)
530         logging.debug(tar_cmd)
531         call(tar_cmd, timeout=30)
532
533         heimdall_cmd = ['heimdall', 'flash', '--usbbus', busaddr,
534                         '--usbdevaddr', devaddr]
535         for key, elem in partition_bin_mappings.items():
536             heimdall_cmd.append('--{}'.format(key))
537             heimdall_cmd.append(elem)
538         logging.debug(heimdall_cmd)
539
540         ret = call(heimdall_cmd, timeout=600)
541         if ret:
542             raise Exception('Heimdall error.')
543
544     def _wait_uart_shell_login_prompt(self):
545         """docstring for _wait_uart_shell_login_prompt"""
546         logging.debug('===============Print boot logs===============')
547
548         start_time = time.perf_counter()
549         wait_time = 0
550         while wait_time < self._boot_timeout:
551             if self._uart.inWaiting:
552                 buf = self._read_uart(1000)
553                 if find_pattern(self._pattern_loginprompt, data=buf):
554                     logging.debug('Found login shell pattern from uart log')
555                     logging.debug('wait_time : {}'.format(wait_time))
556                     return
557                 elif len(buf) == 0:
558                     self._write_uart(b'')
559             time.sleep(0.01)
560             wait_time = time.perf_counter() - start_time
561         else:
562             raise Exception('Boot timeout : {}s'.format(wait_time))
563
564     def _login_uart_shell(self):
565         """docstring for _login_uart_shell"""
566         logging.debug('===============Login UART shell===============')
567         retrycnt = 0
568         while retrycnt < self._max_attempt_login_uart_shell:
569             if self._username:
570                 self._write_uart(self._username)
571                 time.sleep(0.5)
572             if self._password:
573                 self._write_uart(self._password)
574                 time.sleep(1.5)
575             self._flush_uart_buffer()
576             self._write_uart(b'dmesg -n 1')
577             time.sleep(0.5)
578             readdata = self._read_uart(2000)
579             if find_pattern(self._pattern_shellprompt, readdata):
580                 return
581             else:
582                 logging.debug('Login failed. retry.')
583                 self._write_uart(b'')
584                 time.sleep(2)
585             retrycnt += 1
586         else:
587             raise Exception('Can\'t login uart shell.')
588
589     def _set_sdb_deviceid(self):
590         """docstring for _set_sdb_deviceid"""
591         usb0_path = b'/sys/class/usb_mode/usb0'
592         pattern = '.*{0}'.format(self.get_id())
593
594         def set_serialnumber(deviceid):
595             """docstring for set_serialnumber"""
596             self._write_uart(b''.join([b'echo 0 > ', usb0_path, b'/enable']))
597             time.sleep(0.3)
598             self._write_uart(b''.join([b'echo ',
599                                        b'-n ',
600                                        deviceid,
601                                        b' > ', usb0_path,
602                                        b'/iSerial']))
603             time.sleep(0.3)
604             self._write_uart(b''.join([b'echo 1 > ', usb0_path, b'/enable']))
605             time.sleep(0.3)
606             self._write_uart(b''.join([b'cat ', usb0_path, b'/enable']))
607             time.sleep(0.3)
608
609         def get_serialnumber():
610             """docstring for get_serialnumber"""
611             self._write_uart(b''.join([b'cat ', usb0_path, b'/iSerial']))
612             time.sleep(0.3)
613             return self._read_uart(1000)
614
615         retrycnt = 0
616         while retrycnt < 10:
617             set_serialnumber(deviceid=self.get_id().encode())
618             serialnumber = get_serialnumber()
619             if find_pattern(pattern, serialnumber):
620                 return
621             retrycnt += 1
622         else:
623             raise Exception('Can\'t configure sdb deviceid')
624
625     def _attach_sdb(self):
626         """docstring for _attach_sdb"""
627         # start sdb server if it is not started.
628         call('sdb start-server'.split(), timeout=10)
629
630         retry_attempt = 0
631         pattern = r'{}.*device.*\t.*'.format(self.get_id())
632
633         while retry_attempt < self._max_attempt_attach_sdb:
634             for l in range(self._retrycnt_at_a_time_sdb):
635                 outs = check_output('sdb devices'.split(), timeout=10)
636                 logging.debug(outs)
637                 if find_pattern(pattern, outs):
638                     logging.debug('found {}.'.format(self.get_id()))
639                     return
640                 time.sleep(0.2)
641             retry_attempt += 1
642         else:
643             raise Exception('Can\'t find device.')
644
645     def _detach_sdb(self):
646         """docstring for _detach_sdb"""
647         pass
648
649     def sdb_root_on(self):
650         """docstring for sdb_root_on"""
651         logging.debug('=================sdb root on for {}================='
652                       .format(self.get_name()))
653         call('sdb -s {} root on'.format(self.get_id()).split(), timeout=10)
654         time.sleep(0.5)
655
656     def _acquire_global_lock(self):
657         """docstring for _acquire_global_lock"""
658         logging.debug('Try to acquire global lock...')
659         self._global_tlock.acquire()
660         self._global_ilock.acquire()
661         # set gid of ilock file
662         try:
663             os.chmod(self._global_ilock.path, 0o664)
664         except PermissionError:
665             logging.debug('Can\'t change lock file permission')
666
667         if self._global_tlock.locked() and self._global_ilock.acquired:
668             logging.debug('global lock acquired for {}'
669                           .format(self.get_id()))
670
671     def _release_global_lock(self):
672         """docstring for _release_global_lock"""
673         if self._global_tlock.locked():
674             self._global_tlock.release()
675         if self._global_ilock.acquired:
676             self._global_ilock.release()
677         logging.debug('global lock released')