diff options
author | Zhang zhengguang <zhengguang.zhang@intel.com> | 2014-07-17 10:37:39 +0800 |
---|---|---|
committer | Zhang zhengguang <zhengguang.zhang@intel.com> | 2014-07-17 10:37:39 +0800 |
commit | 1b9d0a62f59bb48c8deb2f0b98d9acdffdd9abe7 (patch) | |
tree | 6e991827d28537f7f40f20786c2354fd04a9fdad /test | |
parent | fbe905ab58ecc31fe64c410c5f580cadc30e7f04 (diff) | |
download | connman-1b9d0a62f59bb48c8deb2f0b98d9acdffdd9abe7.tar.gz connman-1b9d0a62f59bb48c8deb2f0b98d9acdffdd9abe7.tar.bz2 connman-1b9d0a62f59bb48c8deb2f0b98d9acdffdd9abe7.zip |
Imported Upstream version 1.24upstream/1.24
Diffstat (limited to 'test')
-rwxr-xr-x | test/p2p-on-supplicant | 636 | ||||
-rwxr-xr-x | test/test-supplicant | 71 |
2 files changed, 636 insertions, 71 deletions
diff --git a/test/p2p-on-supplicant b/test/p2p-on-supplicant new file mode 100755 index 00000000..826656ee --- /dev/null +++ b/test/p2p-on-supplicant @@ -0,0 +1,636 @@ +#!/usr/bin/python + +from os import O_NONBLOCK +from sys import stdin, stdout, exit, version_info, argv +from fcntl import fcntl, F_GETFL, F_SETFL +import glib +import dbus +import dbus.mainloop.glib +import gobject +import argparse + +WPA_NAME='fi.w1.wpa_supplicant1' +WPA_INTF='fi.w1.wpa_supplicant1' +WPA_PATH='/fi/w1/wpa_supplicant1' +WPA_IF_INTF = WPA_INTF + '.Interface' +WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice' +WPA_GROUP_INTF = WPA_INTF + '.Group' +WPA_PEER_INTF = WPA_INTF + '.Peer' +DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties' + +P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0 + +class ArgFields: + for field in ('help', 'metavar'): + exec('{}="{}"'.format(field, field)) + +class InputLine: + def __init__(self, handler): + self.line = '' + self.handler = handler + + flags = fcntl(stdin.fileno(), F_GETFL) + flags |= O_NONBLOCK + fcntl(stdin.fileno(), F_SETFL, flags) + glib.io_add_watch(stdin, glib.IO_IN, self.input_cb) + + self.prompt() + + def prompt(self): + self.line = '' + print '> ', + stdout.flush() + + def input_cb(self, fd, event): + if event != glib.IO_IN: + return + + self.line += fd.read(); + for line in self.line.split('\n'): + line = line.strip() + if len(line) == 0: + break + + self.handler(line.strip()) + + self.prompt() + + return True + +def error_print(ex): + print 'Command Error: %s' % ex + +def checkarg(nb_args = 0, min_args = False): + def under(function): + def wrapper(*args, **kwargs): + resuls = True + + if min_args: + result = len(args[1]) < nb_args + else: + result = len(args[1]) != nb_args + + if result: + raise Exception('Command %s takes %s arguments' % + (function.__name__, nb_args)) + return function(*args, **kwargs) + return wrapper + return under + +def print_dict(d): + for k in d: + try: + if type(d[k]) is dbus.Byte: + print 'Key %s --> 0x%x' % (k, d[k]) + else: + print 'Key %s --> %s' % (k, d[k]) + except: + print "Error: Key %s content cannot be printed" % k + pass + +def print_tuple(t): + for e in t: + if type(e) is dbus.Dictionary: + print_dict(e) + else: + print 'Element: %s' % e + +class Wpa_s: + def __init__(self, bus, iface_name, command): + self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF) + bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH, + member_keyword='signal') + bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH, + signal_name='InterfaceAdded') + bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH, + signal_name='InterfaceRemoved') + self.__reset() + + self.bus = bus + + self.debug = False + + self.line_in = InputLine(self.__command) + + if iface_name: + try: + self.create_if([iface_name]) + except: + print "Error creating interface: %s" % iface_name + + if len(command.strip(' ')): + self.__command(command) + + def help(self, args): + list = self.command_list.keys() + list.sort() + for key in list: + help = '' + if (self.command_list[key].has_key(ArgFields.help)): + help = self.command_list[key][ArgFields.help] + + print "%s\t%s" % (key.rjust(25), help.ljust(50)) + + def __command(self, cmd_line): + cmd = cmd_line.split(' ') + + try: + func = getattr(self, cmd[0]) + except Exception, e: + print 'Error: command unknown - %s' % e + return + + try: + func(cmd[1:]) + except Exception, e: + error_print(e) + + def __wpa_property_changed(*args, **kwargs): + print 'WPA - Signal: %s' % kwargs.get('signal') + + def __if_property_changed(*args, **kwargs): + signal = kwargs.get('signal') + print 'IF - Signal: %s' % signal + + if signal == 'BSSAdded': + return + + if args[0].debug: + print_tuple(args[1:]) + + def __p2p_property_changed(*args, **kwargs): + print 'IF P2P - Signal: %s' % kwargs.get('signal') + if args[0].debug: + print_tuple(args[1:]) + + """ + It should be: __DeviceFound(self, object_path, properties) + wpa_supplicant's DBus API is buggy here: + - no properties are given + """ + def __DeviceFound(self, object_path): + self.peers[object_path] = None + + peer = self.bus.get_object(WPA_INTF, object_path) + peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF) + + self.peers[object_path] = peer_if.GetAll(WPA_PEER_INTF) + + def __DeviceLost(self, object_path): + if object_path in self.peers: + del self.peers[object_path] + + def __PeerJoined(self, object_path): + print 'Peer %s joined' % object_path + + def __PeerDisconnected(self, object_path): + print 'Peer %s disconnected' % object_path + + def __group_if_property_changed(*args, **kwargs): + print 'Group - ', + args[0].__if_property_changed(*args, **kwargs) + + def __group_if_p2p_property_changed(*args, **kwargs): + print 'Group - ', + args[0].__p2p_property_changed(*args, **kwargs) + + def __GroupFinished(self, ifname, role): + print 'Group running on %s is being removed' % ifname + self.group_obj = self.group_if = self.group_iface_path = None + + def __InvitationResult(self, response): + print 'Invitation result status: %d ' % response['status'] + + if response.has_key('bssid'): + print 'bssid: %s' % response['bssid'] + + if self.debug: + print_dict(response) + + def __GroupStarted(self, properties): + self.group_obj = properties['group_object'] + self.bus.add_signal_receiver(self.__PeerJoined, + dbus_interface=WPA_GROUP_INTF, + path=self.group_obj, + signal_name='PeerJoined') + self.bus.add_signal_receiver(self.__PeerDisconnected, + dbus_interface=WPA_GROUP_INTF, + path=self.group_obj, + signal_name='PeerDisconnected') + + self.group_iface_path = properties['interface_object'] + self.group_if = dbus.Interface(self.bus.get_object(WPA_INTF, + self.group_iface_path), + WPA_P2P_INTF) + self.bus.add_signal_receiver(self.__group_if_property_changed, + dbus_interface=WPA_IF_INTF, + path=self.group_iface_path, + member_keyword='signal') + self.bus.add_signal_receiver(self.__group_if_p2p_property_changed, + dbus_interface=WPA_P2P_INTF, + path=self.group_iface_path, + member_keyword='signal') + self.bus.add_signal_receiver(self.__GroupFinished, + dbus_interface=WPA_P2P_INTF, + path=self.group_iface_path, + member_keyword='signal') + self.bus.add_signal_receiver(self.__InvitationResult, + dbus_interface=WPA_P2P_INTF, + path=self.iface_path, + signal_name='InvitationResult') + + if self.debug: + group = dbus.Interface(self.bus.get_object(WPA_INTF, + self.group_obj), + DBUS_PROPERTIES_INTF) + print_dict(group.GetAll(WPA_GROUP_INTF)) + + def __ServiceDiscoveryResponse(self, response): + peer = response['peer_object'] + if peer in self.peers: + print 'Peer %s has this TLVs:' % (self.peers[peer]['DeviceName']) + print response['tlvs'] + + def __InterfaceAdded(self, path, properties): + print 'Interface %s Added (%s)' % (properties['Ifname'], path) + if self.debug: + print_dict(properties) + p2p = dbus.Interface(self.bus.get_object(WPA_INTF, + path), DBUS_PROPERTIES_INTF) + print_dict(p2p.GetAll(WPA_P2P_INTF)) + + def __InterfaceRemoved(self, path): + print 'Interface Removed (%s)' % (path) + + def __listen_if_signals(self): + self.bus.add_signal_receiver(self.__if_property_changed, + dbus_interface=WPA_IF_INTF, + path=self.iface_path, + member_keyword='signal') + self.bus.add_signal_receiver(self.__p2p_property_changed, + dbus_interface=WPA_P2P_INTF, + path=self.iface_path, + member_keyword='signal') + self.bus.add_signal_receiver(self.__GroupStarted, + dbus_interface=WPA_P2P_INTF, + path=self.iface_path, + signal_name='GroupStarted') + self.bus.add_signal_receiver(self.__DeviceFound, + dbus_interface=WPA_P2P_INTF, + path=self.iface_path, + signal_name='DeviceFound') + self.bus.add_signal_receiver(self.__DeviceLost, + dbus_interface=WPA_P2P_INTF, + path=self.iface_path, + signal_name='DeviceLost') + self.bus.add_signal_receiver(self.__ServiceDiscoveryResponse, + dbus_interface=WPA_P2P_INTF, + path=self.iface_path, + signal_name='ServiceDiscoveryResponse') + + def __reset(self): + self.iface_path = self.iface_name = self.iface = None + self.p2p = self.group_if = self.group_obj = None + self.peers = {} + + def __set_if(self, iface_name): + self.iface = dbus.Interface(self.bus.get_object(WPA_INTF, + self.iface_path), WPA_IF_INTF) + self.p2p = dbus.Interface(self.bus.get_object(WPA_INTF, + self.iface_path), WPA_P2P_INTF) + + p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF) + p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig', + dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' }, + signature='sv')) + print 'Interface %s: %s' % (iface_name, self.iface_path) + self.iface_name = iface_name + self.__listen_if_signals() + + @checkarg() + def enable_debug(self, args): + self.debug = True + + @checkarg() + def disable_debug(self, args): + self.debug = False + + @checkarg(nb_args=1) + def create_if(self, args): + self.__reset() + self.iface_path = self.wpa.CreateInterface(({ 'Ifname' : args[0]} )) + self.__set_if(args[0]) + + @checkarg(nb_args=1) + def get_if(self, args): + self.__reset() + self.iface_path = self.wpa.GetInterface(args[0]) + self.__set_if(args[0]) + + @checkarg() + def del_if(self, args = None): + if not self.iface_path: + return + + self.wpa.RemoveInterface(self.iface_path) + print 'Interface %s removed' % self.iface_name + self.__reset() + + @checkarg() + def scan(self, args = None): + if not self.iface: + return + + self.iface.Scan(({ 'Type': 'passive' })) + print 'Scan started' + + @checkarg() + def quit(self, args = None): + self.del_if(args) + exit(0) + + @checkarg(nb_args=1) + def set_command_list(self, command_list): + self.command_list = command_list[0] + + @checkarg() + def p2p_find(self, args = None): + if not self.p2p: + return + + self.p2p.Find(({})) + + @checkarg() + def p2p_stop_find(self, args = None): + if not self.p2p: + return + + self.p2p.StopFind() + + @checkarg() + def p2p_peers(self, args = None): + if not self.iface: + return + + for p in self.peers: + print 'Peer Name=%s' % (self.peers[p]['DeviceName']) + + def __find_peer(self, peer_name, ret_object_path = False): + if len(self.peers) == 0: + return None + + peer = None + for p in self.peers: + if self.peers[p]['DeviceName'] == peer_name: + peer = self.peers[p] + break + + if not peer: + print 'No peer found under the name: %s' % peer_name + p = None + + if ret_object_path: + return p + else: + return peer + + @checkarg(nb_args = 1) + def p2p_peer(self, args): + peer = self.__find_peer(args[0]) + if peer: + print_dict(peer) + + @checkarg(nb_args = 1) + def p2p_connect(self, args): + if not self.p2p: + return + + peer = self.__find_peer(args[0]) + if not peer: + return + + peer_path = self.__find_peer(args[0], True) + + if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER == + P2P_GROUP_CAPAB_GROUP_OWNER): + print 'Joining an existing P2P group' + pin = self.p2p.Connect(({ 'peer' : peer_path, + 'wps_method' : 'pbc', + 'join' : True, + 'go_intent' : 0 })) + else: + print 'Associating with another P2P device' + pin = self.p2p.Connect(({ 'peer' : peer_path, + 'wps_method' : 'pbc', + 'join' : False, + 'go_intent' : 7 })) + if not pin: + print 'WPS PIN in use: %s' % pin + + @checkarg(nb_args = 1) + def p2p_disconnect(self, args): + if not self.p2p: + return + + peer = self.__find_peer(args[0]) + if not peer: + return + + if not self.group_if: + print 'Peer %s is not connected' % (peer['DeviceName']) + return + + self.group_if.Disconnect() + + @checkarg() + def p2p_group_add(self, args): + if not self.p2p: + return + + self.p2p.GroupAdd(({ 'persistent' : dbus.Boolean(1) })) + + @checkarg() + def p2p_group_remove(self, args): + if not self.group_if: + return + + self.group_if.Disconnect() + + @checkarg() + def p2p_group(self, args): + if not self.group_obj: + return + + group = dbus.Interface(self.bus.get_object(WPA_INTF, + self.group_obj), DBUS_PROPERTIES_INTF) + print_dict(group.GetAll(WPA_GROUP_INTF)) + + @checkarg() + def p2p_flush(self, args): + if not self.p2p: + return + + self.p2p.Flush() + + @checkarg() + def p2p_serv_disc_req(self, args = None): + if not self.p2p: + return + + """ We request all kind of services """ + sd_req = dbus.Array(signature='y', variant_level=1) + for a in [2,0,0,1]: + sd_req.append(dbus.Byte(a)) + + ref = self.p2p.ServiceDiscoveryRequest(({ 'tlv' : sd_req })) + print 'Service discovery reference: %s' % ref + + @checkarg(nb_args = 1) + def p2p_serv_disc_cancel_req(self, args): + if not self.p2p: + return + + self.p2p.ServiceDiscoveryCancelRequest(int(args[0])) + + @checkarg(nb_args = 3) + def p2p_service_add(self, args): + if not self.p2p: + return + + service = { 'service_type' : args[0] } + if args[0] == 'upnp': + service['version'] = args[1] + service['service'] = args[2] + elif args[0] == 'bonjour': + service['query'] = args[1] + service['response'] = args[2] + else: + print 'Unknown service: %s' % args[0] + return + + self.p2p.AddService((service)) + + @checkarg(nb_args = 2, min_args = True) + def p2p_service_del(self, args): + if not self.p2p: + return + + service = { 'service_type' : args[0] } + if args[0] == 'upnp': + service['version'] = args[1] + service['service'] = args[2] + elif args[0] == 'bonjour': + service['query'] = args[1] + else: + print 'Unknown service: %s' % args[0] + return + + self.p2p.DeleteService((service)) + + @checkarg() + def p2p_service_flush(self, args = None): + if not self.p2p: + return + + self.p2p.FlushService() + + @checkarg(nb_args = 1) + def p2p_invite(self, args): + if not self.p2p or not self.group_if: + return + + peer_path = self.__find_peer(args[0], True) + + if not peer_path: + return + + self.group_if.Invite({ 'peer' : peer_path}) + +def build_args(parser): + parser.add_argument('-d', default=False, action='store_true', + dest='debug', help='enable debug') + parser.add_argument('-i', metavar='<interface>', dest='ifname', + help='interface name') + + command = {} + command['quit'] = {} + command['enable_debug'] = {} + command['disable_debug'] = {} + command['create_if'] = {ArgFields.help:'<iface_name> - create interface'} + command['get_if'] = {ArgFields.help:'<iface_name> - get interface'} + command['del_if'] = {ArgFields.help:'removes current interface'} + command['scan'] = {} + command['p2p_find'] = {} + command['p2p_stop_find'] = {} + command['p2p_flush'] = {} + command['p2p_group_add'] = {ArgFields.help:'adds an autonomous group'} + command['p2p_group_remove'] = {} + command['p2p_group'] = {} + command['p2p_peers'] = {} + command['p2p_peer'] = {ArgFields.help:'<p2p device name> - get info for a ' + 'peer'} + command['p2p_connect'] = {ArgFields.help:'<p2p device name>'} + command['p2p_disconnect'] = {ArgFields.help:'<p2p device name>'} + command['p2p_serv_disc_req'] = {} + command['p2p_serv_disc_cancel_req'] = {ArgFields.help:'<identifier>'} + command['p2p_service_add'] = {ArgFields.help:'<service type> ' + '<version/query> <service/response>'} + command['p2p_service_del'] = {ArgFields.help:'<service type> ' + '<version/query> [<service>]'} + command['p2p_service_flush'] = {} + command['p2p_invite'] = {ArgFields.help:'<p2p device name>'} + + command_list = command.keys() + command_list.sort() + subparsers = parser.add_subparsers(help='commands', dest='command') + subparsers.add_parser('') + for key in command_list: + help=None + metavar=None + if command[key].has_key(ArgFields.help): + help = command[key][ArgFields.help] + if command[key].has_key(ArgFields.metavar): + metavar = command[key][ArgFields.metavar] + command_parser = subparsers.add_parser(key, help=help) + command_parser.add_argument(key, nargs='*', metavar=metavar, help=help) + + return command + +def main(): + if version_info.major != 2: + print 'You need to run this under Python 2.x' + exit(1) + + parser = argparse.ArgumentParser(description='Connman P2P Test') + + command_list = build_args(parser) + + argv[1:] += [''] + + args = parser.parse_args(argv[1:]) + + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + opts = [] + if args.command: + opts = getattr(args, args.command) + + params = '' + if opts and len(opts[0]): + params = ' ' + ''.join(opts) + + bus = dbus.SystemBus() + + mainloop = gobject.MainLoop() + + wpa_s = Wpa_s(bus, args.ifname, args.command + params) + + if (args.debug): + wpa_s.enable_debug([]) + + wpa_s.set_command_list([command_list]) + + mainloop.run() + +if __name__ == '__main__': + main() diff --git a/test/test-supplicant b/test/test-supplicant deleted file mode 100755 index 68ac6630..00000000 --- a/test/test-supplicant +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/python - -import dbus -import time - -WPA_NAME='fi.epitest.hostap.WPASupplicant' -WPA_INTF='fi.epitest.hostap.WPASupplicant' -WPA_PATH='/fi/epitest/hostap/WPASupplicant' - -bus = dbus.SystemBus() - -dummy = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), - 'org.freedesktop.DBus.Introspectable') - -#print dummy.Introspect() - -manager = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF) - -try: - path = manager.getInterface("wlan0") -except: - path = manager.addInterface("wlan0") - -interface = dbus.Interface(bus.get_object(WPA_NAME, path), - WPA_INTF + ".Interface") - -print "state = %s" % (interface.state()) - -try: - print "scanning = %s" % (interface.scanning()) -except: - pass - -print "[ %s ]" % (path) - -capabilities = interface.capabilities() - -for key in capabilities.keys(): - list = "" - for value in capabilities[key]: - list += " " + value - print " %s =%s" % (key, list) - -interface.scan() - -time.sleep(1) - -try: - print "scanning = %s" % (interface.scanning()) -except: - pass - -time.sleep(1) - -print "state = %s" % (interface.state()) - -results = interface.scanResults() - -print results - -path = results[0] - -print "[ %s ]" % (path) - -bssid = dbus.Interface(bus.get_object(WPA_NAME, path), - WPA_INTF + ".BSSID") - -properties = bssid.properties() - -for key in properties.keys(): - print " %s = %s" % (key, properties[key]) |