summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDonghoon Shin <dhs.shin@samsung.com>2016-09-21 15:14:25 +0900
committerDonghoon Shin <dhs.shin@samsung.com>2016-09-21 15:14:25 +0900
commitbb81eac85eb522da1cc4b53da7691ab2c7427344 (patch)
tree95e6cdebc4b6a50b545139a1f5c5384d49d20a35
parentf6f442480ef1a93e85bb86c28897c3b5fa76fea4 (diff)
downloadlitmus-bb81eac85eb522da1cc4b53da7691ab2c7427344.tar.gz
litmus-bb81eac85eb522da1cc4b53da7691ab2c7427344.tar.bz2
litmus-bb81eac85eb522da1cc4b53da7691ab2c7427344.zip
Support multiple mock type devices in topology
Change-Id: I17fa88fe02aa9355aaed00a9460da0b2b1a569ae
-rw-r--r--CHANGES.txt4
-rw-r--r--README.md2
-rw-r--r--debian/changelog4
-rw-r--r--litmus/core/manager.py71
-rw-r--r--litmus/device/device.py104
-rw-r--r--litmus/device/devicemock.py83
6 files changed, 173 insertions, 95 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index d25eebd..7b1efa5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,7 +25,9 @@ Version 0.2.1 9 Sep 2016
- Update mock device type to avoid sdb error
- Update default templates for u3 and xu3
-Version 0.3.0 19 Sep 2016
+Version 0.3.0 21 Sep 2016
---------------------------
- Update projects/topology file location
- Add projects/topology param at command prompt
+- Support multiple mock devices in topology
+- Add global lock for mock device type
diff --git a/README.md b/README.md
index a58d58b..1abbb1a 100644
--- a/README.md
+++ b/README.md
@@ -6,11 +6,13 @@ Buliding & installing
1. Change directory name with version postfix and create an orig.tar.gz
$ mv litmus litmus-0.3.0
+
$ tar cvfz litmus-0.3.0.orig.tar.gz litmus-0.3.0
1. Build a deb package with debuild
$ cd litmus-0.3.0
+
$ debuild
2. Install the deb package using dpkg
diff --git a/debian/changelog b/debian/changelog
index c7a43e7..1cdce25 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -2,8 +2,10 @@ litmus (0.3.0-1) unstable; urgency=low
* Update projects/topology file location
* Add projects/topology param at command prompt
+ * Support multiple mock devices in topology
+ * Add global lock for mock device type
- -- Donghoon Shin <dhs.shin@samsung.com> Mon, 19 Sep 2016 12:39:00 +0900
+ -- Donghoon Shin <dhs.shin@samsung.com> Wed, 21 Sep 2016 15:08:00 +0900
litmus (0.2.1-1) unstable; urgency=low
diff --git a/litmus/core/manager.py b/litmus/core/manager.py
index 332b8bc..dc9b10b 100644
--- a/litmus/core/manager.py
+++ b/litmus/core/manager.py
@@ -133,6 +133,53 @@ Lightweight test manager for tizen automated testing
time.sleep(retry_delay)
raise Exception('{} device is not available.'.format(devicetype))
+ def acquire_dut_by_name(self, devicename,
+ max_retry_times=10, retry_delay=10):
+ """
+ Acquire an available device for testing.
+
+ :param str devicename: device name
+ :param int max_retry_times: max retry times for device acquisition
+ :param float retry_delay: delay time for each device acquisition retry
+
+ Example:
+ >>> mgr = manager()
+ >>> dut = mgr.acquire_dut_by_name('XU3_001')
+
+ :returns device: acquired device instance
+ """
+ logging.debug('===============Acquire an available DUT===============')
+
+ candidate = [dev for dev in self._all_devices
+ if dev['devicename'] == devicename]
+
+ if candidate:
+ for times in range(0, max_retry_times):
+ for dev in candidate:
+ if not dev['ilock'].acquired:
+ gotten_tlock = dev['tlock'].acquire(blocking=False)
+ gotten_ilock = dev['ilock'].acquire(blocking=False)
+ try:
+ os.chmod(dev['ilock'].path, 0o664)
+ except PermissionError:
+ logging.debug('Can\'t change lock file permission')
+
+ # if acquire tlock and ilock, assign a device.
+ if gotten_tlock and gotten_ilock:
+ dut = device.create(manager=self, **dev)
+ self._duts.append(dut)
+ logging.debug('{} is assigned.'
+ .format(dut.get_name()))
+ return dut
+ # if acquire tlock only then release it for next time.
+ elif gotten_tlock and not gotten_ilock:
+ dev['tlock'].release()
+ else:
+ logging.debug('{} is busy. Wait {} seconds.'
+ .format(devicename, retry_delay))
+ time.sleep(retry_delay)
+ raise Exception('{} is not available.'.format(devicename))
+
def release_dut(self, dut=None):
"""
Release acquired devices under test.
@@ -147,7 +194,6 @@ Lightweight test manager for tizen automated testing
>>> mgr.release_dut()
"""
- # TODO: self._duts.remove(dev) doesn't delete device instance.
# release all _duts if dut param is None
if not dut:
for dev in self._duts:
@@ -243,22 +289,25 @@ Lightweight test manager for tizen automated testing
for section in configparser.sections():
items = dict(configparser.items(section))
- items['deviceid'] = section
+ items['devicename'] = section
# Interproces Lock and Thread Lock
ilock_filename = os.path.join(self._path_for_locks,
- items['deviceid'])
+ items['devicename'])
items['tlock'] = Lock()
items['ilock'] = fasteners.InterProcessLock(ilock_filename)
# Append items
self._all_devices.append(items)
- # Add mock device
- mock_deviceid = 'MOCK_001'
- mock_ilock_filename = os.path.join(self._path_for_locks, mock_deviceid)
- mock = {'deviceid': mock_deviceid,
- 'dev_type': 'mock',
- 'tlock': Lock(),
- 'ilock': fasteners.InterProcessLock(mock_ilock_filename)}
- self._all_devices.append(mock)
+ if not next((d for d in self._all_devices if d['dev_type'] == 'mock'),
+ None):
+ # Add mock device
+ mock_devicename = 'MOCK_001'
+ mock_ilock_filename = os.path.join(self._path_for_locks,
+ mock_devicename)
+ mock = {'devicename': mock_devicename,
+ 'dev_type': 'mock',
+ 'tlock': Lock(),
+ 'ilock': fasteners.InterProcessLock(mock_ilock_filename)}
+ self._all_devices.append(mock)
diff --git a/litmus/device/device.py b/litmus/device/device.py
index 9d9123a..d804256 100644
--- a/litmus/device/device.py
+++ b/litmus/device/device.py
@@ -67,7 +67,7 @@ class device(object):
super(device, self).__init__()
self.args = args
self.kwargs = kwargs
- self._name = kwargs['deviceid']
+ self._name = kwargs['devicename']
# init a cutter instance.
self._cutter = cutter.create(*args, **kwargs)
@@ -183,24 +183,13 @@ class device(object):
else:
return False
- def _thor(self, filenames, busid):
- """docstring for thor_downloader"""
- cmd = 'lthor --busid={0}'.format(busid)
- filenames = convert_single_item_to_list(filenames)
- for l in filenames:
- cmd += ' {}'.format(l)
- logging.debug(cmd)
- ret = call(cmd.split(), timeout=600)
- if ret:
- raise Exception('Thor error.')
-
- def flash(self, filenames, flasher=_thor, waiting=5):
+ def flash(self, filenames, flasher='lthor', waiting=5):
"""
Flash binaries to device.
This function turn on device and turn off device automatically.
:param dict filenames: filename string or dict
- :param func flasher: wrapper function of external flashing tool
+ :param sting flasher: external flashing tool name
:param float waiting: waiting time to acquire cdc_acm device
Example:
@@ -220,7 +209,7 @@ class device(object):
time.sleep(waiting)
busid = self._find_usb_busid()
self._release_global_lock()
- flasher(self, filenames=filenames, busid=busid)
+ self._lthor(filenames=filenames, busid=busid)
self._cutter.off()
except (Exception, KeyboardInterrupt) as e:
self._release_global_lock()
@@ -236,14 +225,14 @@ class device(object):
Example:
>>> dut.on()
- >>> dut.run_cmd(['ls','-alF','/','|','grep','usr'])
+ >>> dut.run_cmd('ls -alF / | grep usr')
\'drwxr-xr-x 15 root root 4096 Apr 29 2016 usr/\\r\\n\'
:returns str: stdout of sdb shell command
"""
c = ['sdb', '-s', self.get_id(), 'shell']
- c.extend(command)
+ c.extend(convert_single_item_to_list(command))
logging.debug(c)
result = check_output(c, timeout=timeout)
return result
@@ -370,21 +359,6 @@ class device(object):
if self._uart.isOpen():
self._uart.close()
- def _find_usb_busid(self):
- """docstring for find_usb_busid"""
- pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
- self._pid)
- kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20'
- outs = check_output(kernlog, shell=True, timeout=10)
- result = find_all_pattern(pattern=pattern, data=outs)
- if result:
- busid = result[-1]
- logging.debug('usb busid : {}'.format(busid))
- else:
- raise Exception('Can\'t find usb busid')
-
- return busid
-
def _thread_for_enter_download_mode(self, cmd, count):
"""docstring for thread_for_enter_download_mode"""
for loop in range(count*20):
@@ -404,6 +378,72 @@ class device(object):
self._cutter.on(delay=powercut_delay)
t.join()
+ def _find_usb_busid(self):
+ """docstring for find_usb_busid"""
+ pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
+ self._pid)
+ kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20'
+ outs = check_output(kernlog, shell=True, timeout=10)
+ result = find_all_pattern(pattern=pattern, data=outs)
+ if result:
+ busid = result[-1]
+ logging.debug('usb busid : {}'.format(busid))
+ else:
+ raise Exception('Can\'t find usb busid')
+
+ return busid
+
+ def _lthor(self, filenames, busid):
+ """docstring for _lthor"""
+ cmd = 'lthor --busid={0}'.format(busid)
+ filenames = convert_single_item_to_list(filenames)
+ for l in filenames:
+ cmd += ' {}'.format(l)
+ logging.debug(cmd)
+ ret = call(cmd.split(), timeout=600)
+ if ret:
+ raise Exception('Thor error.')
+
+ def _find_usb_bus_and_device_address(self):
+ """docstring for _find_usb_bus_and_device_address"""
+ pattern = 'usb (.*):.*idVendor={0}, idProduct={1}'.format(self._vid,
+ self._pid)
+ kernlog = 'cat /var/log/kern.log | grep usb | tail -n 20'
+ outs = check_output(kernlog, shell=True, timeout=10)
+ result = find_all_pattern(pattern=pattern, data=outs)
+ if result:
+ bid = result[-1]
+ busaddr_cmd = 'cat /sys/bus/usb/devices/{0}/busnum'.format(bid)
+ busaddr = check_output(busaddr_cmd, shell=True).rstrip().zfill(3)
+ logging.debug('usb_bus_addr : {}'.format(busaddr))
+ devaddr_cmd = 'cat /sys/bus/usb/devices/{0}/devnum'.format(bid)
+ devaddr = check_output(devaddr_cmd, shell=True).rstrip().zfill(3)
+ logging.debug('usb_dev_addr : {}'.format(devaddr))
+ else:
+ raise Exception('Can\'t find usb bus and dev addr')
+
+ return (busaddr, devaddr)
+
+ def _heimdall(self, filenames, busaddr, devaddr, partition_bin_mappings):
+ """docstring for _heimdall"""
+ filenames = convert_single_item_to_list(filenames)
+ tar_cmd = ['tar', 'xvfz']
+ for l in filenames:
+ tar_cmd.append(l)
+ logging.debug(tar_cmd)
+ call(tar_cmd, timeout=30)
+
+ heimdall_cmd = ['heimdall', 'flash', '--usbbus', busaddr,
+ '--usbdevaddr', devaddr]
+ for key, elem in partition_bin_mappings.items():
+ heimdall_cmd.append('--{}'.format(key))
+ heimdall_cmd.append(elem)
+ logging.debug(heimdall_cmd)
+
+ ret = call(heimdall_cmd, timeout=600)
+ if ret:
+ raise Exception('Heimdall error.')
+
def _wait_uart_shell_login_prompt(self):
"""docstring for _wait_uart_shell_login_prompt"""
logging.debug('===============Print boot logs===============')
diff --git a/litmus/device/devicemock.py b/litmus/device/devicemock.py
index f927c68..34f8f17 100644
--- a/litmus/device/devicemock.py
+++ b/litmus/device/devicemock.py
@@ -17,7 +17,6 @@ import time
import logging
from litmus.device.device import device
from litmus.core.util import check_output, find_pattern
-from litmus.core.util import convert_single_item_to_list
from litmus.core.util import call
@@ -27,13 +26,17 @@ class devicemock(device):
User can control device in topology by this class methods.
"""
+ _booting_time = 60
+
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
- self._name = kwargs['deviceid']
- self._id = self._find_device_id()
+ self._name = kwargs['devicename']
+ if 'serialno' in kwargs:
+ self._id = kwargs['serialno']
+ else:
+ self._id = self._find_device_id()
- # init a cutter instance.
self._manager = kwargs['manager']
def _release(self):
@@ -42,7 +45,7 @@ class devicemock(device):
def _find_device_id(self):
"""docstring for _find_device_id"""
- self.refresh_sdb_server()
+ self.start_sdb_server()
outs = check_output(['sdb', 'devices'], timeout=10)
pattern = '.*List of devices attached \n([a-zA-Z0-9]*).*device.*'
found = find_pattern(pattern, outs, groupindex=1)
@@ -50,6 +53,7 @@ class devicemock(device):
return found
# public methods.
+
def get_id(self):
"""
Return the id of acquired device.
@@ -72,12 +76,12 @@ class devicemock(device):
"""
logging.debug('turn on device {}'.format(self.get_name()))
- self.refresh_sdb_server()
- if self._find_device_id() == self.get_id():
+ self.start_sdb_server()
+ if self.is_on():
self._sdb_root_on()
- self.run_cmd(['reboot', '-f'])
- time.sleep(60)
- self.refresh_sdb_server()
+ self.run_cmd('reboot -f', timeout=20)
+ time.sleep(self._booting_time)
+ self.start_sdb_server()
self._sdb_root_on()
def off(self, powercut_delay=2):
@@ -88,40 +92,6 @@ class devicemock(device):
"""
logging.debug('turn off device {}'.format(self.get_name()))
- def thor(self, filenames):
- """docstring for thor"""
- cmd = 'lthor'
- filenames = convert_single_item_to_list(filenames)
- for l in filenames:
- cmd += ' {}'.format(l)
- logging.debug(cmd)
- ret = call(cmd.split(), timeout=600)
- if ret:
- raise Exception('Thor error.')
-
- def heimdall(self, filenames,
- partition_bin_mappings={'BOOT': 'zImage',
- 'ROOTFS': 'rootfs.img',
- 'USER': 'user.img',
- 'SYSTEM-DATA': 'system-data.img'}):
- """docstring for heimdall"""
- filenames = convert_single_item_to_list(filenames)
- tar_cmd = ['tar', 'xvfz']
- for l in filenames:
- tar_cmd.append(l)
- logging.debug(tar_cmd)
- call(tar_cmd, timeout=30)
-
- heimdall_cmd = ['heimdall', 'flash']
- for key, elem in partition_bin_mappings.items():
- heimdall_cmd.append('--{}'.format(key))
- heimdall_cmd.append(elem)
- logging.debug(heimdall_cmd)
-
- ret = call(heimdall_cmd, timeout=600)
- if ret:
- raise Exception('Heimdall error.')
-
def flash(self, filenames, flasher='lthor', waiting=5,
partition_bin_mappings={'BOOT': 'zImage',
'ROOTFS': 'rootfs.img',
@@ -132,7 +102,7 @@ class devicemock(device):
This function turn on device and turn off device automatically.
:param dict filenames: filename string or dict
- :param func flasher: wrapper function of external flashing tool
+ :param string flasher: external flashing tool name
:param float waiting: waiting time to acquire cdc_acm device
:param dict partition_bin_mappings: partition table for device which use heimdall flasher
@@ -144,20 +114,28 @@ class devicemock(device):
"""
logging.debug('flash binaries to device : {}'.format(filenames))
- self.refresh_sdb_server()
+ self.start_sdb_server()
if not filenames:
raise Exception('There\'s no file to flash.')
try:
self._sdb_root_on()
- self.run_cmd(['reboot', '-f', 'download'], timeout=10)
- time.sleep(5)
+ self._acquire_global_lock()
+ self.run_cmd('reboot -f download', timeout=20)
+ time.sleep(waiting)
if flasher == 'lthor':
- self.thor(filenames=filenames)
+ busid = self._find_usb_busid()
+ self._release_global_lock()
+ self._lthor(filenames=filenames, busid=busid)
elif flasher == 'heimdall':
- self.heimdall(filenames=filenames,
+ (busaddr, devaddr) = self._find_usb_bus_and_device_address()
+ self._release_global_lock()
+ self._heimdall(filenames=filenames,
+ busaddr=busaddr,
+ devaddr=devaddr,
partition_bin_mappings=partition_bin_mappings)
except (Exception, KeyboardInterrupt) as e:
+ self._release_global_lock()
logging.debug(e)
raise Exception('Can\'t flash files : {}.'.format(filenames))
@@ -165,3 +143,8 @@ class devicemock(device):
"""docstring for refresh_sdb_server"""
call('sdb kill-server; sdb start-server', shell=True, timeout=10)
time.sleep(1)
+
+ def start_sdb_server(self):
+ """docstring for start_sdb_server"""
+ call('sdb start-server', shell=True, timeout=10)
+ time.sleep(1)