#!/usr/bin/python from os import O_NONBLOCK from sys import stdin, stdout, exit, version_info, argv from fcntl import fcntl, F_GETFL, F_SETFL import glib import dbus import dbus.mainloop.glib import gobject import argparse WPA_NAME='fi.w1.wpa_supplicant1' WPA_INTF='fi.w1.wpa_supplicant1' WPA_PATH='/fi/w1/wpa_supplicant1' WPA_IF_INTF = WPA_INTF + '.Interface' WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice' WPA_GROUP_INTF = WPA_INTF + '.Group' WPA_PEER_INTF = WPA_INTF + '.Peer' DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties' P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0 class ArgFields: for field in ('help', 'metavar'): exec('{}="{}"'.format(field, field)) class InputLine: def __init__(self, handler): self.line = '' self.handler = handler flags = fcntl(stdin.fileno(), F_GETFL) flags |= O_NONBLOCK fcntl(stdin.fileno(), F_SETFL, flags) glib.io_add_watch(stdin, glib.IO_IN, self.input_cb) self.prompt() def prompt(self): self.line = '' print '> ', stdout.flush() def input_cb(self, fd, event): if event != glib.IO_IN: return self.line += fd.read(); for line in self.line.split('\n'): line = line.strip() if len(line) == 0: break self.handler(line.strip()) self.prompt() return True def error_print(ex): print 'Command Error: %s' % ex def checkarg(nb_args = 0, min_args = False): def under(function): def wrapper(*args, **kwargs): resuls = True if min_args: result = len(args[1]) < nb_args else: result = len(args[1]) != nb_args if result: raise Exception('Command %s takes %s arguments' % (function.__name__, nb_args)) return function(*args, **kwargs) return wrapper return under def print_dict(d): for k in d: try: if type(d[k]) is dbus.Byte: print 'Key %s --> 0x%x' % (k, d[k]) else: print 'Key %s --> %s' % (k, d[k]) except: print "Error: Key %s content cannot be printed" % k pass def print_tuple(t): for e in t: if type(e) is dbus.Dictionary: print_dict(e) else: print 'Element: %s' % e class Wpa_s: def __init__(self, bus, iface_name, command): self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF) bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH, member_keyword='signal') bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH, signal_name='InterfaceAdded') bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH, signal_name='InterfaceRemoved') self.__reset() self.bus = bus self.debug = False self.line_in = InputLine(self.__command) if iface_name: try: self.create_if([iface_name]) except: print "Error creating interface: %s" % iface_name if len(command.strip(' ')): self.__command(command) def help(self, args): list = self.command_list.keys() list.sort() for key in list: help = '' if (self.command_list[key].has_key(ArgFields.help)): help = self.command_list[key][ArgFields.help] print "%s\t%s" % (key.rjust(25), help.ljust(50)) def __command(self, cmd_line): cmd = cmd_line.split(' ') try: func = getattr(self, cmd[0]) except Exception, e: print 'Error: command unknown - %s' % e return try: func(cmd[1:]) except Exception, e: error_print(e) def __wpa_property_changed(*args, **kwargs): print 'WPA - Signal: %s' % kwargs.get('signal') def __if_property_changed(*args, **kwargs): signal = kwargs.get('signal') print 'IF - Signal: %s' % signal if signal == 'BSSAdded': return if args[0].debug: print_tuple(args[1:]) def __p2p_property_changed(*args, **kwargs): print 'IF P2P - Signal: %s' % kwargs.get('signal') if args[0].debug: print_tuple(args[1:]) def __peer_if_p2p_property_changed(*args, **kwargs): print 'Peer - ', args[0].__p2p_property_changed(*args, **kwargs) def __DeviceFound(self, object_path): self.peers[object_path] = None peer = self.bus.get_object(WPA_INTF, object_path) peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF) self.bus.add_signal_receiver(self.__peer_if_p2p_property_changed, dbus_interface=WPA_PEER_INTF, path=object_path, member_keyword='signal') self.peers[object_path] = peer_if.GetAll(WPA_PEER_INTF) if self.debug: print_dict(self.peers[object_path]) def __DeviceLost(self, object_path): if object_path in self.peers: del self.peers[object_path] def __PeerJoined(self, object_path): print 'Peer %s joined' % object_path def __PeerDisconnected(self, object_path): print 'Peer %s disconnected' % object_path def __group_if_property_changed(*args, **kwargs): print 'Group - ', args[0].__if_property_changed(*args, **kwargs) def __group_if_p2p_property_changed(*args, **kwargs): print 'Group - ', args[0].__p2p_property_changed(*args, **kwargs) def __GroupFinished(self, properties): print 'Group running on %s is being removed' % ifname self.group_obj = self.group_if = self.group_iface_path = None if self.debug: print_dict(properties) def __InvitationResult(self, response): print 'Invitation result status: %d ' % response['status'] if response.has_key('bssid'): print 'bssid: %s' % response['bssid'] if self.debug: print_dict(response) def __GroupStarted(self, properties): self.group_obj = properties['group_object'] self.bus.add_signal_receiver(self.__PeerJoined, dbus_interface=WPA_GROUP_INTF, path=self.group_obj, signal_name='PeerJoined') self.bus.add_signal_receiver(self.__PeerDisconnected, dbus_interface=WPA_GROUP_INTF, path=self.group_obj, signal_name='PeerDisconnected') self.group_iface_path = properties['interface_object'] self.group_if = dbus.Interface(self.bus.get_object(WPA_INTF, self.group_iface_path), WPA_P2P_INTF) self.bus.add_signal_receiver(self.__group_if_property_changed, dbus_interface=WPA_IF_INTF, path=self.group_iface_path, member_keyword='signal') self.bus.add_signal_receiver(self.__group_if_p2p_property_changed, dbus_interface=WPA_P2P_INTF, path=self.group_iface_path, member_keyword='signal') self.bus.add_signal_receiver(self.__GroupFinished, dbus_interface=WPA_P2P_INTF, path=self.group_iface_path, signal_name='GroupFinished') self.bus.add_signal_receiver(self.__InvitationResult, dbus_interface=WPA_P2P_INTF, path=self.iface_path, signal_name='InvitationResult') if self.debug: group = dbus.Interface(self.bus.get_object(WPA_INTF, self.group_obj), DBUS_PROPERTIES_INTF) print_dict(group.GetAll(WPA_GROUP_INTF)) def __ServiceDiscoveryResponse(self, response): peer = response['peer_object'] if peer in self.peers: print 'Peer %s has this TLVs:' % (self.peers[peer]['DeviceName']) print response['tlvs'] def __InterfaceAdded(self, path, properties): print 'Interface %s Added (%s)' % (properties['Ifname'], path) if self.debug: print_dict(properties) p2p = dbus.Interface(self.bus.get_object(WPA_INTF, path), DBUS_PROPERTIES_INTF) print_dict(p2p.GetAll(WPA_P2P_INTF)) def __InterfaceRemoved(self, path): print 'Interface Removed (%s)' % (path) def __listen_if_signals(self): self.bus.add_signal_receiver(self.__if_property_changed, dbus_interface=WPA_IF_INTF, path=self.iface_path, member_keyword='signal') self.bus.add_signal_receiver(self.__p2p_property_changed, dbus_interface=WPA_P2P_INTF, path=self.iface_path, member_keyword='signal') self.bus.add_signal_receiver(self.__GroupStarted, dbus_interface=WPA_P2P_INTF, path=self.iface_path, signal_name='GroupStarted') self.bus.add_signal_receiver(self.__DeviceFound, dbus_interface=WPA_P2P_INTF, path=self.iface_path, signal_name='DeviceFound') self.bus.add_signal_receiver(self.__DeviceLost, dbus_interface=WPA_P2P_INTF, path=self.iface_path, signal_name='DeviceLost') self.bus.add_signal_receiver(self.__ServiceDiscoveryResponse, dbus_interface=WPA_P2P_INTF, path=self.iface_path, signal_name='ServiceDiscoveryResponse') def __reset(self): self.iface_path = self.iface_name = self.iface = None self.p2p = self.group_if = self.group_obj = None self.peers = {} def __set_if(self, iface_name): self.iface = dbus.Interface(self.bus.get_object(WPA_INTF, self.iface_path), WPA_IF_INTF) self.p2p = dbus.Interface(self.bus.get_object(WPA_INTF, self.iface_path), WPA_P2P_INTF) p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF) p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig', dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' }, signature='sv')) print 'Interface %s: %s' % (iface_name, self.iface_path) self.iface_name = iface_name self.__listen_if_signals() @checkarg() def enable_debug(self, args): self.debug = True @checkarg() def disable_debug(self, args): self.debug = False @checkarg(nb_args=1) def create_if(self, args): self.__reset() self.iface_path = self.wpa.CreateInterface(({ 'Ifname' : args[0]} )) self.__set_if(args[0]) @checkarg(nb_args=1) def get_if(self, args): self.__reset() self.iface_path = self.wpa.GetInterface(args[0]) self.__set_if(args[0]) @checkarg() def del_if(self, args = None): if not self.iface_path: return self.wpa.RemoveInterface(self.iface_path) print 'Interface %s removed' % self.iface_name self.__reset() @checkarg() def scan(self, args = None): if not self.iface: return self.iface.Scan(({ 'Type': 'passive' })) print 'Scan started' @checkarg() def quit(self, args = None): self.del_if(args) exit(0) @checkarg(nb_args=1) def set_command_list(self, command_list): self.command_list = command_list[0] @checkarg() def p2p_find(self, args = None): if not self.p2p: return self.p2p.Find(({})) @checkarg() def p2p_stop_find(self, args = None): if not self.p2p: return self.p2p.StopFind() @checkarg() def p2p_peers(self, args = None): if not self.iface: return for p in self.peers: print 'Peer Name=%s' % (self.peers[p]['DeviceName']) def __find_peer(self, peer_name, ret_object_path = False): if len(self.peers) == 0: return None peer = None for p in self.peers: if self.peers[p]['DeviceName'] == peer_name: peer = self.peers[p] break if not peer: print 'No peer found under the name: %s' % peer_name p = None if ret_object_path: return p else: return peer @checkarg(nb_args = 1) def p2p_peer(self, args): peer_path = self.__find_peer(args[0], True) if peer_path: peer = self.bus.get_object(WPA_INTF, peer_path) peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF) self.peers[peer_path] = peer_if.GetAll(WPA_PEER_INTF) print_dict(self.peers[peer_path]) @checkarg(nb_args = 1) def p2p_connect(self, args): if not self.p2p: return peer = self.__find_peer(args[0]) if not peer: return peer_path = self.__find_peer(args[0], True) if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER == P2P_GROUP_CAPAB_GROUP_OWNER): print 'Joining an existing P2P group' pin = self.p2p.Connect(({ 'peer' : peer_path, 'wps_method' : 'pbc', 'join' : True, 'go_intent' : 0 })) else: print 'Associating with another P2P device' pin = self.p2p.Connect(({ 'peer' : peer_path, 'wps_method' : 'pbc', 'join' : False, 'go_intent' : 7 })) if not pin: print 'WPS PIN in use: %s' % pin @checkarg(nb_args = 1) def p2p_disconnect(self, args): if not self.p2p: return peer = self.__find_peer(args[0]) if not peer: return if not self.group_if: print 'Peer %s is not connected' % (peer['DeviceName']) return self.group_if.Disconnect() @checkarg() def p2p_group_add(self, args): if not self.p2p: return self.p2p.GroupAdd(({ 'persistent' : dbus.Boolean(1) })) @checkarg() def p2p_group_remove(self, args): if not self.group_if: return self.group_if.Disconnect() @checkarg() def p2p_group(self, args): if not self.group_obj: return group = dbus.Interface(self.bus.get_object(WPA_INTF, self.group_obj), DBUS_PROPERTIES_INTF) print_dict(group.GetAll(WPA_GROUP_INTF)) @checkarg() def p2p_flush(self, args): if not self.p2p: return self.p2p.Flush() @checkarg() def p2p_serv_disc_req(self, args = None): if not self.p2p: return """ We request all kind of services """ sd_req = dbus.Array(signature='y', variant_level=1) for a in [2,0,0,1]: sd_req.append(dbus.Byte(a)) ref = self.p2p.ServiceDiscoveryRequest(({ 'tlv' : sd_req })) print 'Service discovery reference: %s' % ref @checkarg(nb_args = 1) def p2p_serv_disc_cancel_req(self, args): if not self.p2p: return self.p2p.ServiceDiscoveryCancelRequest(int(args[0])) @checkarg(nb_args = 3) def p2p_service_add(self, args): if not self.p2p: return service = { 'service_type' : args[0] } if args[0] == 'upnp': service['version'] = args[1] service['service'] = args[2] elif args[0] == 'bonjour': service['query'] = args[1] service['response'] = args[2] else: print 'Unknown service: %s' % args[0] return self.p2p.AddService((service)) @checkarg(nb_args = 2, min_args = True) def p2p_service_del(self, args): if not self.p2p: return service = { 'service_type' : args[0] } if args[0] == 'upnp': service['version'] = args[1] service['service'] = args[2] elif args[0] == 'bonjour': service['query'] = args[1] else: print 'Unknown service: %s' % args[0] return self.p2p.DeleteService((service)) @checkarg() def p2p_service_flush(self, args = None): if not self.p2p: return self.p2p.FlushService() @checkarg(nb_args = 1) def p2p_invite(self, args): if not self.p2p or not self.group_if: return peer_path = self.__find_peer(args[0], True) if not peer_path: return self.group_if.Invite({ 'peer' : peer_path}) def build_args(parser): parser.add_argument('-d', default=False, action='store_true', dest='debug', help='enable debug') parser.add_argument('-i', metavar='', dest='ifname', help='interface name') command = {} command['quit'] = {} command['enable_debug'] = {} command['disable_debug'] = {} command['create_if'] = {ArgFields.help:' - create interface'} command['get_if'] = {ArgFields.help:' - get interface'} command['del_if'] = {ArgFields.help:'removes current interface'} command['scan'] = {} command['p2p_find'] = {} command['p2p_stop_find'] = {} command['p2p_flush'] = {} command['p2p_group_add'] = {ArgFields.help:'adds an autonomous group'} command['p2p_group_remove'] = {} command['p2p_group'] = {} command['p2p_peers'] = {} command['p2p_peer'] = {ArgFields.help:' - get info for a ' 'peer'} command['p2p_connect'] = {ArgFields.help:''} command['p2p_disconnect'] = {ArgFields.help:''} command['p2p_serv_disc_req'] = {} command['p2p_serv_disc_cancel_req'] = {ArgFields.help:''} command['p2p_service_add'] = {ArgFields.help:' ' ' '} command['p2p_service_del'] = {ArgFields.help:' ' ' []'} command['p2p_service_flush'] = {} command['p2p_invite'] = {ArgFields.help:''} command_list = command.keys() command_list.sort() subparsers = parser.add_subparsers(help='commands', dest='command') subparsers.add_parser('') for key in command_list: help=None metavar=None if command[key].has_key(ArgFields.help): help = command[key][ArgFields.help] if command[key].has_key(ArgFields.metavar): metavar = command[key][ArgFields.metavar] command_parser = subparsers.add_parser(key, help=help) command_parser.add_argument(key, nargs='*', metavar=metavar, help=help) return command def main(): if version_info.major != 2: print 'You need to run this under Python 2.x' exit(1) parser = argparse.ArgumentParser(description='Connman P2P Test') command_list = build_args(parser) argv[1:] += [''] args = parser.parse_args(argv[1:]) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) opts = [] if args.command: opts = getattr(args, args.command) params = '' if opts and len(opts[0]): params = ' ' + ''.join(opts) bus = dbus.SystemBus() mainloop = gobject.MainLoop() wpa_s = Wpa_s(bus, args.ifname, args.command + params) if (args.debug): wpa_s.enable_debug([]) wpa_s.set_command_list([command_list]) mainloop.run() if __name__ == '__main__': main()