#!/usr/bin/python import sys import gobject import string import dbus import dbus.service import dbus.mainloop.glib import glib import traceback def extract_list(list): val = "[" for i in list: val += " " + str(i) val += " ]" return val def extract_values(values): val = "{" for key in values.keys(): val += " " + key + "=" if key in ["PrefixLength"]: val += "%s" % (int(values[key])) else: if key in ["Servers", "Excludes"]: val += extract_list(values[key]) else: val += str(values[key]) val += " }" return val class Notification(dbus.service.Object): def __init__(self, bus, app, notify_path): dbus.service.Object.__init__(self) self.app = app @dbus.service.method("net.connman.Notification", in_signature='', out_signature='') def Release(self): print "Release %s" % (self._object_path) session_name = self._object_path.split('/')[-1] self.app.release(session_name) @dbus.service.method("net.connman.Notification", in_signature='a{sv}', out_signature='') def Update(self, settings): print "Update called at %s" % (self._object_path) try: for key in settings.keys(): if key in ["IPv4", "IPv6"]: val = extract_values(settings[key]) elif key in ["AllowedBearers"]: val = extract_list(settings[key]) else: val = settings[key] print " %s = %s" % (key, val) except: print "Exception:" traceback.print_exc() class SessionApplication(dbus.service.Object): def __init__(self, bus, object_path, mainloop): dbus.service.Object.__init__(self, bus, object_path) self.manager = None self.mainloop = mainloop self.sessions = {} try: bus = dbus.SystemBus() bus.watch_name_owner('net.connman', self.connman_name_owner_changed) except dbus.DBusException: traceback.print_exc() def connman_name_owner_changed(self, proxy): try: if proxy: print "connman appeared on D-Bus ", str(proxy) bus = dbus.SystemBus() self.manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") else: print "connman disappeared on D-Bus" self.manager = None for s in self.sessions.keys(): self.sessions[s]['notify'].remove_from_connection() self.sessions[s]['notify'] = None self.sessions = {} except dbus.DBusException: traceback.print_exc() def release(self, session_name): s = self.find_session(session_name) if not s: return if s['session']: s['session'].Destroy() s['session'] = None if s['notify']: s['notify'].remove_from_connection() s['notify'] = None del self.sessions[session_name] def type_convert(self, key, value): if key in [ "AllowedBearers" ]: return value elif key in [ "RoamingPolicy", "ConnectionType" ]: if len(value) > 0: return value[0] elif key in [ "Priority", "AvoidHandover", "StayConnected", "EmergencyCall" ]: flag = str(value[0]).strip().lower() val = flag not in ['false', 'f', 'n', '0'] return dbus.Boolean(val) elif key in [ "PeriodicConnect", "IdleTimeout" ]: val = value[0] return dbus.UInt32(val) return value def find_session(self, session_name): if not session_name in self.sessions.keys(): return None return self.sessions[session_name] @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def CreateSession(self, session_name): print "Create session" s = self.find_session(session_name) if s and s['session'] : print "Session %s already created-> drop reqest" % (session_name) return try: bus = dbus.SystemBus() if s == None: s = {} s['notify_path'] = self._object_path + "/" + session_name s['notify'] = Notification(bus, self, s['notify_path']) s['notify'].add_to_connection(bus, s['notify_path']) if not 'settings' in s.keys(): s['settings'] = {}; s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path']) print "notify path %s" % (s['notify_path']) print "session path %s" % (s['session_path']) s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']), "net.connman.Session") self.sessions[session_name] = s except dbus.DBusException, e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print e.get_dbus_message() return traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def DestroySession(self, session_name): print "Destroy session" s = self.find_session(session_name) if s == None or s['session'] == None: print "The session is not running -> drop request" return try: self.release(session_name) except dbus.DBusException: traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Connect(self, session_name): print "Connect session" s = self.find_session(session_name) if s == None or s['session'] == None: print "The session is not running -> drop request" return try: s['session'].Connect() except dbus.DBusException, e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print e.get_dbus_message() return traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Disconnect(self, session_name): print "Disconnect session" s = self.find_session(session_name) if s == None or s['session'] == None: print "The session is not running -> drop request" return try: s['session'].Disconnect() except dbus.DBusException, e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print e.get_dbus_message() return traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Change(self, session_name, key, value): print "Update session settings" s = self.find_session(session_name) if s == None or s['session'] == None: print "The session is not running -> drop request" return try: val = self.type_convert(key, value) s['session'].Change(key, val) except dbus.DBusException, e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print e.get_dbus_message() return traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Configure(self, session_name, key, value): print "Configure session settings" s = self.find_session(session_name) if s == None: s = {} s['notify_path'] = None s['notify'] = None if not 'settings' in s.keys(): s['settings'] = {}; s['session_path'] = None s['session'] = None self.sessions[session_name] = s if s and s['session']: print "The session is running, use change -> drop request" return val = self.type_convert(key, value) s['settings'][key] = val def main(): if len(sys.argv) < 2: print "Usage: %s " % (sys.argv[0]) print "" print " enable" print " disable" print " create " print " destroy " print " connect " print " disconnect " print " change " print " configure " print "" print " run " sys.exit(1) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) if sys.argv[1] == "enable": bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") manager.SetProperty("SessionMode", True) return elif sys.argv[1] == "disable": bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") manager.SetProperty("SessionMode", False) return if (len(sys.argv) < 3): print "Need test application path" sys.exit(1) app_path = sys.argv[2] bus = dbus.SessionBus() app_name = "com.example.SessionApplication.%s" % (string.strip(app_path, "/")) if sys.argv[1] == "run": name = dbus.service.BusName(app_name, bus) mainloop = gobject.MainLoop() app = SessionApplication(bus, app_path, mainloop) mainloop.run() return app = dbus.Interface(bus.get_object(app_name, app_path), "com.example.TestSession") if sys.argv[1] == "create": app.CreateSession(sys.argv[3]) elif sys.argv[1] == "destroy": app.DestroySession(sys.argv[3]) elif sys.argv[1] == "connect": app.Connect(sys.argv[3]) elif sys.argv[1] == "disconnect": app.Disconnect(sys.argv[3]) elif sys.argv[1] == "change": if len(sys.argv) < 5: print "Arguments missing" sys.exit(1) app.Change(sys.argv[3], sys.argv[4], sys.argv[5:]) elif sys.argv[1] == "configure": if len(sys.argv) < 5: print "Arguments missing" sys.exit(1) app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:]) else: print "Unknown command '%s'" % sys.argv[1] sys.exit(1) if __name__ == '__main__': main()