summaryrefslogtreecommitdiff
path: root/test/p2p-on-supplicant
diff options
context:
space:
mode:
Diffstat (limited to 'test/p2p-on-supplicant')
-rwxr-xr-xtest/p2p-on-supplicant636
1 files changed, 636 insertions, 0 deletions
diff --git a/test/p2p-on-supplicant b/test/p2p-on-supplicant
new file mode 100755
index 0000000..826656e
--- /dev/null
+++ b/test/p2p-on-supplicant
@@ -0,0 +1,636 @@
+#!/usr/bin/python
+
+from os import O_NONBLOCK
+from sys import stdin, stdout, exit, version_info, argv
+from fcntl import fcntl, F_GETFL, F_SETFL
+import glib
+import dbus
+import dbus.mainloop.glib
+import gobject
+import argparse
+
+WPA_NAME='fi.w1.wpa_supplicant1'
+WPA_INTF='fi.w1.wpa_supplicant1'
+WPA_PATH='/fi/w1/wpa_supplicant1'
+WPA_IF_INTF = WPA_INTF + '.Interface'
+WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice'
+WPA_GROUP_INTF = WPA_INTF + '.Group'
+WPA_PEER_INTF = WPA_INTF + '.Peer'
+DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties'
+
+P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0
+
+class ArgFields:
+ for field in ('help', 'metavar'):
+ exec('{}="{}"'.format(field, field))
+
+class InputLine:
+ def __init__(self, handler):
+ self.line = ''
+ self.handler = handler
+
+ flags = fcntl(stdin.fileno(), F_GETFL)
+ flags |= O_NONBLOCK
+ fcntl(stdin.fileno(), F_SETFL, flags)
+ glib.io_add_watch(stdin, glib.IO_IN, self.input_cb)
+
+ self.prompt()
+
+ def prompt(self):
+ self.line = ''
+ print '> ',
+ stdout.flush()
+
+ def input_cb(self, fd, event):
+ if event != glib.IO_IN:
+ return
+
+ self.line += fd.read();
+ for line in self.line.split('\n'):
+ line = line.strip()
+ if len(line) == 0:
+ break
+
+ self.handler(line.strip())
+
+ self.prompt()
+
+ return True
+
+def error_print(ex):
+ print 'Command Error: %s' % ex
+
+def checkarg(nb_args = 0, min_args = False):
+ def under(function):
+ def wrapper(*args, **kwargs):
+ resuls = True
+
+ if min_args:
+ result = len(args[1]) < nb_args
+ else:
+ result = len(args[1]) != nb_args
+
+ if result:
+ raise Exception('Command %s takes %s arguments' %
+ (function.__name__, nb_args))
+ return function(*args, **kwargs)
+ return wrapper
+ return under
+
+def print_dict(d):
+ for k in d:
+ try:
+ if type(d[k]) is dbus.Byte:
+ print 'Key %s --> 0x%x' % (k, d[k])
+ else:
+ print 'Key %s --> %s' % (k, d[k])
+ except:
+ print "Error: Key %s content cannot be printed" % k
+ pass
+
+def print_tuple(t):
+ for e in t:
+ if type(e) is dbus.Dictionary:
+ print_dict(e)
+ else:
+ print 'Element: %s' % e
+
+class Wpa_s:
+ def __init__(self, bus, iface_name, command):
+ self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF)
+ bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH,
+ member_keyword='signal')
+ bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH,
+ signal_name='InterfaceAdded')
+ bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH,
+ signal_name='InterfaceRemoved')
+ self.__reset()
+
+ self.bus = bus
+
+ self.debug = False
+
+ self.line_in = InputLine(self.__command)
+
+ if iface_name:
+ try:
+ self.create_if([iface_name])
+ except:
+ print "Error creating interface: %s" % iface_name
+
+ if len(command.strip(' ')):
+ self.__command(command)
+
+ def help(self, args):
+ list = self.command_list.keys()
+ list.sort()
+ for key in list:
+ help = ''
+ if (self.command_list[key].has_key(ArgFields.help)):
+ help = self.command_list[key][ArgFields.help]
+
+ print "%s\t%s" % (key.rjust(25), help.ljust(50))
+
+ def __command(self, cmd_line):
+ cmd = cmd_line.split(' ')
+
+ try:
+ func = getattr(self, cmd[0])
+ except Exception, e:
+ print 'Error: command unknown - %s' % e
+ return
+
+ try:
+ func(cmd[1:])
+ except Exception, e:
+ error_print(e)
+
+ def __wpa_property_changed(*args, **kwargs):
+ print 'WPA - Signal: %s' % kwargs.get('signal')
+
+ def __if_property_changed(*args, **kwargs):
+ signal = kwargs.get('signal')
+ print 'IF - Signal: %s' % signal
+
+ if signal == 'BSSAdded':
+ return
+
+ if args[0].debug:
+ print_tuple(args[1:])
+
+ def __p2p_property_changed(*args, **kwargs):
+ print 'IF P2P - Signal: %s' % kwargs.get('signal')
+ if args[0].debug:
+ print_tuple(args[1:])
+
+ """
+ It should be: __DeviceFound(self, object_path, properties)
+ wpa_supplicant's DBus API is buggy here:
+ - no properties are given
+ """
+ def __DeviceFound(self, object_path):
+ self.peers[object_path] = None
+
+ peer = self.bus.get_object(WPA_INTF, object_path)
+ peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)
+
+ self.peers[object_path] = peer_if.GetAll(WPA_PEER_INTF)
+
+ def __DeviceLost(self, object_path):
+ if object_path in self.peers:
+ del self.peers[object_path]
+
+ def __PeerJoined(self, object_path):
+ print 'Peer %s joined' % object_path
+
+ def __PeerDisconnected(self, object_path):
+ print 'Peer %s disconnected' % object_path
+
+ def __group_if_property_changed(*args, **kwargs):
+ print 'Group - ',
+ args[0].__if_property_changed(*args, **kwargs)
+
+ def __group_if_p2p_property_changed(*args, **kwargs):
+ print 'Group - ',
+ args[0].__p2p_property_changed(*args, **kwargs)
+
+ def __GroupFinished(self, ifname, role):
+ print 'Group running on %s is being removed' % ifname
+ self.group_obj = self.group_if = self.group_iface_path = None
+
+ def __InvitationResult(self, response):
+ print 'Invitation result status: %d ' % response['status']
+
+ if response.has_key('bssid'):
+ print 'bssid: %s' % response['bssid']
+
+ if self.debug:
+ print_dict(response)
+
+ def __GroupStarted(self, properties):
+ self.group_obj = properties['group_object']
+ self.bus.add_signal_receiver(self.__PeerJoined,
+ dbus_interface=WPA_GROUP_INTF,
+ path=self.group_obj,
+ signal_name='PeerJoined')
+ self.bus.add_signal_receiver(self.__PeerDisconnected,
+ dbus_interface=WPA_GROUP_INTF,
+ path=self.group_obj,
+ signal_name='PeerDisconnected')
+
+ self.group_iface_path = properties['interface_object']
+ self.group_if = dbus.Interface(self.bus.get_object(WPA_INTF,
+ self.group_iface_path),
+ WPA_P2P_INTF)
+ self.bus.add_signal_receiver(self.__group_if_property_changed,
+ dbus_interface=WPA_IF_INTF,
+ path=self.group_iface_path,
+ member_keyword='signal')
+ self.bus.add_signal_receiver(self.__group_if_p2p_property_changed,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.group_iface_path,
+ member_keyword='signal')
+ self.bus.add_signal_receiver(self.__GroupFinished,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.group_iface_path,
+ member_keyword='signal')
+ self.bus.add_signal_receiver(self.__InvitationResult,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.iface_path,
+ signal_name='InvitationResult')
+
+ if self.debug:
+ group = dbus.Interface(self.bus.get_object(WPA_INTF,
+ self.group_obj),
+ DBUS_PROPERTIES_INTF)
+ print_dict(group.GetAll(WPA_GROUP_INTF))
+
+ def __ServiceDiscoveryResponse(self, response):
+ peer = response['peer_object']
+ if peer in self.peers:
+ print 'Peer %s has this TLVs:' % (self.peers[peer]['DeviceName'])
+ print response['tlvs']
+
+ def __InterfaceAdded(self, path, properties):
+ print 'Interface %s Added (%s)' % (properties['Ifname'], path)
+ if self.debug:
+ print_dict(properties)
+ p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
+ path), DBUS_PROPERTIES_INTF)
+ print_dict(p2p.GetAll(WPA_P2P_INTF))
+
+ def __InterfaceRemoved(self, path):
+ print 'Interface Removed (%s)' % (path)
+
+ def __listen_if_signals(self):
+ self.bus.add_signal_receiver(self.__if_property_changed,
+ dbus_interface=WPA_IF_INTF,
+ path=self.iface_path,
+ member_keyword='signal')
+ self.bus.add_signal_receiver(self.__p2p_property_changed,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.iface_path,
+ member_keyword='signal')
+ self.bus.add_signal_receiver(self.__GroupStarted,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.iface_path,
+ signal_name='GroupStarted')
+ self.bus.add_signal_receiver(self.__DeviceFound,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.iface_path,
+ signal_name='DeviceFound')
+ self.bus.add_signal_receiver(self.__DeviceLost,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.iface_path,
+ signal_name='DeviceLost')
+ self.bus.add_signal_receiver(self.__ServiceDiscoveryResponse,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.iface_path,
+ signal_name='ServiceDiscoveryResponse')
+
+ def __reset(self):
+ self.iface_path = self.iface_name = self.iface = None
+ self.p2p = self.group_if = self.group_obj = None
+ self.peers = {}
+
+ def __set_if(self, iface_name):
+ self.iface = dbus.Interface(self.bus.get_object(WPA_INTF,
+ self.iface_path), WPA_IF_INTF)
+ self.p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
+ self.iface_path), WPA_P2P_INTF)
+
+ p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF)
+ p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig',
+ dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' },
+ signature='sv'))
+ print 'Interface %s: %s' % (iface_name, self.iface_path)
+ self.iface_name = iface_name
+ self.__listen_if_signals()
+
+ @checkarg()
+ def enable_debug(self, args):
+ self.debug = True
+
+ @checkarg()
+ def disable_debug(self, args):
+ self.debug = False
+
+ @checkarg(nb_args=1)
+ def create_if(self, args):
+ self.__reset()
+ self.iface_path = self.wpa.CreateInterface(({ 'Ifname' : args[0]} ))
+ self.__set_if(args[0])
+
+ @checkarg(nb_args=1)
+ def get_if(self, args):
+ self.__reset()
+ self.iface_path = self.wpa.GetInterface(args[0])
+ self.__set_if(args[0])
+
+ @checkarg()
+ def del_if(self, args = None):
+ if not self.iface_path:
+ return
+
+ self.wpa.RemoveInterface(self.iface_path)
+ print 'Interface %s removed' % self.iface_name
+ self.__reset()
+
+ @checkarg()
+ def scan(self, args = None):
+ if not self.iface:
+ return
+
+ self.iface.Scan(({ 'Type': 'passive' }))
+ print 'Scan started'
+
+ @checkarg()
+ def quit(self, args = None):
+ self.del_if(args)
+ exit(0)
+
+ @checkarg(nb_args=1)
+ def set_command_list(self, command_list):
+ self.command_list = command_list[0]
+
+ @checkarg()
+ def p2p_find(self, args = None):
+ if not self.p2p:
+ return
+
+ self.p2p.Find(({}))
+
+ @checkarg()
+ def p2p_stop_find(self, args = None):
+ if not self.p2p:
+ return
+
+ self.p2p.StopFind()
+
+ @checkarg()
+ def p2p_peers(self, args = None):
+ if not self.iface:
+ return
+
+ for p in self.peers:
+ print 'Peer Name=%s' % (self.peers[p]['DeviceName'])
+
+ def __find_peer(self, peer_name, ret_object_path = False):
+ if len(self.peers) == 0:
+ return None
+
+ peer = None
+ for p in self.peers:
+ if self.peers[p]['DeviceName'] == peer_name:
+ peer = self.peers[p]
+ break
+
+ if not peer:
+ print 'No peer found under the name: %s' % peer_name
+ p = None
+
+ if ret_object_path:
+ return p
+ else:
+ return peer
+
+ @checkarg(nb_args = 1)
+ def p2p_peer(self, args):
+ peer = self.__find_peer(args[0])
+ if peer:
+ print_dict(peer)
+
+ @checkarg(nb_args = 1)
+ def p2p_connect(self, args):
+ if not self.p2p:
+ return
+
+ peer = self.__find_peer(args[0])
+ if not peer:
+ return
+
+ peer_path = self.__find_peer(args[0], True)
+
+ if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER ==
+ P2P_GROUP_CAPAB_GROUP_OWNER):
+ print 'Joining an existing P2P group'
+ pin = self.p2p.Connect(({ 'peer' : peer_path,
+ 'wps_method' : 'pbc',
+ 'join' : True,
+ 'go_intent' : 0 }))
+ else:
+ print 'Associating with another P2P device'
+ pin = self.p2p.Connect(({ 'peer' : peer_path,
+ 'wps_method' : 'pbc',
+ 'join' : False,
+ 'go_intent' : 7 }))
+ if not pin:
+ print 'WPS PIN in use: %s' % pin
+
+ @checkarg(nb_args = 1)
+ def p2p_disconnect(self, args):
+ if not self.p2p:
+ return
+
+ peer = self.__find_peer(args[0])
+ if not peer:
+ return
+
+ if not self.group_if:
+ print 'Peer %s is not connected' % (peer['DeviceName'])
+ return
+
+ self.group_if.Disconnect()
+
+ @checkarg()
+ def p2p_group_add(self, args):
+ if not self.p2p:
+ return
+
+ self.p2p.GroupAdd(({ 'persistent' : dbus.Boolean(1) }))
+
+ @checkarg()
+ def p2p_group_remove(self, args):
+ if not self.group_if:
+ return
+
+ self.group_if.Disconnect()
+
+ @checkarg()
+ def p2p_group(self, args):
+ if not self.group_obj:
+ return
+
+ group = dbus.Interface(self.bus.get_object(WPA_INTF,
+ self.group_obj), DBUS_PROPERTIES_INTF)
+ print_dict(group.GetAll(WPA_GROUP_INTF))
+
+ @checkarg()
+ def p2p_flush(self, args):
+ if not self.p2p:
+ return
+
+ self.p2p.Flush()
+
+ @checkarg()
+ def p2p_serv_disc_req(self, args = None):
+ if not self.p2p:
+ return
+
+ """ We request all kind of services """
+ sd_req = dbus.Array(signature='y', variant_level=1)
+ for a in [2,0,0,1]:
+ sd_req.append(dbus.Byte(a))
+
+ ref = self.p2p.ServiceDiscoveryRequest(({ 'tlv' : sd_req }))
+ print 'Service discovery reference: %s' % ref
+
+ @checkarg(nb_args = 1)
+ def p2p_serv_disc_cancel_req(self, args):
+ if not self.p2p:
+ return
+
+ self.p2p.ServiceDiscoveryCancelRequest(int(args[0]))
+
+ @checkarg(nb_args = 3)
+ def p2p_service_add(self, args):
+ if not self.p2p:
+ return
+
+ service = { 'service_type' : args[0] }
+ if args[0] == 'upnp':
+ service['version'] = args[1]
+ service['service'] = args[2]
+ elif args[0] == 'bonjour':
+ service['query'] = args[1]
+ service['response'] = args[2]
+ else:
+ print 'Unknown service: %s' % args[0]
+ return
+
+ self.p2p.AddService((service))
+
+ @checkarg(nb_args = 2, min_args = True)
+ def p2p_service_del(self, args):
+ if not self.p2p:
+ return
+
+ service = { 'service_type' : args[0] }
+ if args[0] == 'upnp':
+ service['version'] = args[1]
+ service['service'] = args[2]
+ elif args[0] == 'bonjour':
+ service['query'] = args[1]
+ else:
+ print 'Unknown service: %s' % args[0]
+ return
+
+ self.p2p.DeleteService((service))
+
+ @checkarg()
+ def p2p_service_flush(self, args = None):
+ if not self.p2p:
+ return
+
+ self.p2p.FlushService()
+
+ @checkarg(nb_args = 1)
+ def p2p_invite(self, args):
+ if not self.p2p or not self.group_if:
+ return
+
+ peer_path = self.__find_peer(args[0], True)
+
+ if not peer_path:
+ return
+
+ self.group_if.Invite({ 'peer' : peer_path})
+
+def build_args(parser):
+ parser.add_argument('-d', default=False, action='store_true',
+ dest='debug', help='enable debug')
+ parser.add_argument('-i', metavar='<interface>', dest='ifname',
+ help='interface name')
+
+ command = {}
+ command['quit'] = {}
+ command['enable_debug'] = {}
+ command['disable_debug'] = {}
+ command['create_if'] = {ArgFields.help:'<iface_name> - create interface'}
+ command['get_if'] = {ArgFields.help:'<iface_name> - get interface'}
+ command['del_if'] = {ArgFields.help:'removes current interface'}
+ command['scan'] = {}
+ command['p2p_find'] = {}
+ command['p2p_stop_find'] = {}
+ command['p2p_flush'] = {}
+ command['p2p_group_add'] = {ArgFields.help:'adds an autonomous group'}
+ command['p2p_group_remove'] = {}
+ command['p2p_group'] = {}
+ command['p2p_peers'] = {}
+ command['p2p_peer'] = {ArgFields.help:'<p2p device name> - get info for a '
+ 'peer'}
+ command['p2p_connect'] = {ArgFields.help:'<p2p device name>'}
+ command['p2p_disconnect'] = {ArgFields.help:'<p2p device name>'}
+ command['p2p_serv_disc_req'] = {}
+ command['p2p_serv_disc_cancel_req'] = {ArgFields.help:'<identifier>'}
+ command['p2p_service_add'] = {ArgFields.help:'<service type> '
+ '<version/query> <service/response>'}
+ command['p2p_service_del'] = {ArgFields.help:'<service type> '
+ '<version/query> [<service>]'}
+ command['p2p_service_flush'] = {}
+ command['p2p_invite'] = {ArgFields.help:'<p2p device name>'}
+
+ command_list = command.keys()
+ command_list.sort()
+ subparsers = parser.add_subparsers(help='commands', dest='command')
+ subparsers.add_parser('')
+ for key in command_list:
+ help=None
+ metavar=None
+ if command[key].has_key(ArgFields.help):
+ help = command[key][ArgFields.help]
+ if command[key].has_key(ArgFields.metavar):
+ metavar = command[key][ArgFields.metavar]
+ command_parser = subparsers.add_parser(key, help=help)
+ command_parser.add_argument(key, nargs='*', metavar=metavar, help=help)
+
+ return command
+
+def main():
+ if version_info.major != 2:
+ print 'You need to run this under Python 2.x'
+ exit(1)
+
+ parser = argparse.ArgumentParser(description='Connman P2P Test')
+
+ command_list = build_args(parser)
+
+ argv[1:] += ['']
+
+ args = parser.parse_args(argv[1:])
+
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+ opts = []
+ if args.command:
+ opts = getattr(args, args.command)
+
+ params = ''
+ if opts and len(opts[0]):
+ params = ' ' + ''.join(opts)
+
+ bus = dbus.SystemBus()
+
+ mainloop = gobject.MainLoop()
+
+ wpa_s = Wpa_s(bus, args.ifname, args.command + params)
+
+ if (args.debug):
+ wpa_s.enable_debug([])
+
+ wpa_s.set_command_list([command_list])
+
+ mainloop.run()
+
+if __name__ == '__main__':
+ main()