diff options
author | himanshu <h.himanshu@samsung.com> | 2020-02-12 12:59:37 +0530 |
---|---|---|
committer | himanshu <h.himanshu@samsung.com> | 2020-02-12 12:59:37 +0530 |
commit | c3ea0616d0b91131dca66c3f0294ebfb9a7c3dc5 (patch) | |
tree | ed9187893abdc27e0aa9fefd25ee676dd3a7f410 /test | |
parent | 9ef21fcdc595e0471bbc4dc8cef9fa285f5fb747 (diff) | |
parent | 41fee4bb863ee6b25015826299aeb4eb30d68ee6 (diff) | |
download | bluez-c3ea0616d0b91131dca66c3f0294ebfb9a7c3dc5.tar.gz bluez-c3ea0616d0b91131dca66c3f0294ebfb9a7c3dc5.tar.bz2 bluez-c3ea0616d0b91131dca66c3f0294ebfb9a7c3dc5.zip |
Merge branch 'bluez_upgrade_5.52' of https://review.tizen.org/gerrit/p/platform/upstream/bluez into bluez_upgrade_5.52
Diffstat (limited to 'test')
-rwxr-xr-x | test/example-advertisement | 42 | ||||
-rw-r--r-- | test/example-endpoint | 186 | ||||
-rw-r--r-- | test/example-player | 203 |
3 files changed, 421 insertions, 10 deletions
diff --git a/test/example-advertisement b/test/example-advertisement index fd84eacf..88a27ab3 100755 --- a/test/example-advertisement +++ b/test/example-advertisement @@ -2,19 +2,18 @@ from __future__ import print_function +import argparse import dbus import dbus.exceptions import dbus.mainloop.glib import dbus.service - -import array +import time +import threading try: - from gi.repository import GObject # python3 + from gi.repository import GObject # python3 except ImportError: - import gobject as GObject # python2 - -from random import randint + import gobject as GObject # python2 mainloop = None @@ -136,6 +135,7 @@ class Advertisement(dbus.service.Object): def Release(self): print('%s: Released!' % self.path) + class TestAdvertisement(Advertisement): def __init__(self, bus, index): @@ -170,7 +170,13 @@ def find_adapter(bus): return None -def main(): +def shutdown(timeout): + print('Advertising for {} seconds...'.format(timeout)) + time.sleep(timeout) + mainloop.quit() + + +def main(timeout=0): global mainloop dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) @@ -183,7 +189,7 @@ def main(): return adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), - "org.freedesktop.DBus.Properties"); + "org.freedesktop.DBus.Properties") adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) @@ -198,7 +204,23 @@ def main(): reply_handler=register_ad_cb, error_handler=register_ad_error_cb) - mainloop.run() + if timeout > 0: + threading.Thread(target=shutdown, args=(timeout,)).start() + else: + print('Advertising forever...') + + mainloop.run() # blocks until mainloop.quit() is called + + ad_manager.UnregisterAdvertisement(test_advertisement) + print('Advertisement unregistered') + dbus.service.Object.remove_from_connection(test_advertisement) + if __name__ == '__main__': - main() + parser = argparse.ArgumentParser() + parser.add_argument('--timeout', default=0, type=int, help="advertise " + + "for this many seconds then stop, 0=run forever " + + "(default: 0)") + args = parser.parse_args() + + main(args.timeout) diff --git a/test/example-endpoint b/test/example-endpoint new file mode 100644 index 00000000..a5f0348a --- /dev/null +++ b/test/example-endpoint @@ -0,0 +1,186 @@ +#!/usr/bin/python + +from __future__ import absolute_import, print_function, unicode_literals + +import sys +import dbus +import dbus.exceptions +import dbus.service +import dbus.mainloop.glib + +import array +try: + from gi.repository import GObject +except ImportError: + import gobject as GObject +import bluezutils + +ENDPOINT_IFACE = 'org.bluez.MediaEndpoint1' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +A2DP_SOURCE_UUID = '0000110A-0000-1000-8000-00805F9B34FB' +A2DP_SINK_UUID = '0000110B-0000-1000-8000-00805F9B34FB' + +SBC_CODEC = dbus.Byte(0x00) +#Channel Modes: Mono DualChannel Stereo JointStereo +#Frequencies: 16Khz 32Khz 44.1Khz 48Khz +#Subbands: 4 8 +#Blocks: 4 8 12 16 +#Bitpool Range: 2-64 +SBC_CAPABILITIES = dbus.Array([dbus.Byte(0xff), dbus.Byte(0xff), dbus.Byte(2), dbus.Byte(64)]) +# JointStereo 44.1Khz Subbands: Blocks: 16 Bitpool Range: 2-32 +SBC_CONFIGURATION = dbus.Array([dbus.Byte(0x21), dbus.Byte(0x15), dbus.Byte(2), dbus.Byte(32)]) + +MP3_CODEC = dbus.Byte(0x01) +#Channel Modes: Mono DualChannel Stereo JointStereo +#Frequencies: 32Khz 44.1Khz 48Khz +#CRC: YES +#Layer: 3 +#Bit Rate: All except Free format +#VBR: Yes +#Payload Format: RFC-2250 +MP3_CAPABILITIES = dbus.Array([dbus.Byte(0x3f), dbus.Byte(0x07), dbus.Byte(0xff), dbus.Byte(0xfe)]) +# JointStereo 44.1Khz Layer: 3 Bit Rate: VBR Format: RFC-2250 +MP3_CONFIGURATION = dbus.Array([dbus.Byte(0x21), dbus.Byte(0x02), dbus.Byte(0x00), dbus.Byte(0x80)]) + +PCM_CODEC = dbus.Byte(0x00) +PCM_CONFIGURATION = dbus.Array([], signature="ay") + +CVSD_CODEC = dbus.Byte(0x01) + +class Rejected(dbus.DBusException): + _dbus_error_name = "org.bluez.Error.Rejected" + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' + +class Endpoint(dbus.service.Object): + def __init__(self, bus, path, properties, configuration): + self.path = path + self.bus = bus + self.properties = properties + self.configuration = configuration + self.exit_on_release = True + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return self.properties + + def get_path(self): + return dbus.ObjectPath(self.path) + + @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != ENDPOINT_IFACE: + raise InvalidArgsException() + + return self.get_properties() + + def set_exit_on_release(self, exit_on_release): + self.exit_on_release = exit_on_release + + def default_configuration(self, configuration): + self.configuration = configuration + + @dbus.service.method(ENDPOINT_IFACE, in_signature="", out_signature="") + def Release(self): + print("Release") + if self.exit_on_release: + mainloop.quit() + + @dbus.service.method(ENDPOINT_IFACE, in_signature="o", out_signature="") + def ClearConfiguration(self, transport): + print("ClearConfiguration (%s)" % (transport)) + + @dbus.service.method(ENDPOINT_IFACE, in_signature="oay", out_signature="") + def SetConfiguration(self, transport, config): + print("SetConfiguration (%s, %s)" % (transport, config)) + return + + @dbus.service.method(ENDPOINT_IFACE, in_signature="ay", out_signature="ay") + def SelectConfiguration(self, caps): + print("SelectConfiguration (%s)" % (caps)) + return self.configuration + +class Application(dbus.service.Object): + def __init__(self, bus, path, properties, configuration): + self.path = '/' + self.endpoints = [] + dbus.service.Object.__init__(self, bus, self.path) + self.add_endpoint(Endpoint(bus, path, properties, configuration)) + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_endpoint(self, endpoint): + self.endpoints.append(endpoint) + + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') + def GetManagedObjects(self): + response = {} + print('GetManagedObjects') + + for endpoint in self.endpoints: + response[endpoint.get_path()] = { ENDPOINT_IFACE: + endpoint.get_properties() } + + return response + +def register_app_cb(): + print('Media application registered') + + +def register_app_error_cb(error): + print('Failed to register application: ' + str(error)) + mainloop.quit() + +if __name__ == '__main__': + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + bus = dbus.SystemBus() + + if len(sys.argv) > 1: + path = bluezutils.find_adapter(sys.argv[1]).object_path + else: + path = bluezutils.find_adapter().object_path + + media = dbus.Interface(bus.get_object("org.bluez", path), + "org.bluez.Media1") + + + properties = dbus.Dictionary({ "UUID" : A2DP_SOURCE_UUID, + "Codec" : SBC_CODEC, + "DelayReporting" : True, + "Capabilities" : SBC_CAPABILITIES }) + + configuration = SBC_CONFIGURATION + + if len(sys.argv) > 2: + if sys.argv[2] == "sbcsink": + properties = dbus.Dictionary({ "UUID" : A2DP_SINK_UUID, + "Codec" : SBC_CODEC, + "DelayReporting" : True, + "Capabilities" : SBC_CAPABILITIES }) + if sys.argv[2] == "mp3source": + properties = dbus.Dictionary({ "UUID" : A2DP_SOURCE_UUID, + "Codec" : MP3_CODEC, + "Capabilities" : MP3_CAPABILITIES }) + configuration = MP3_CONFIGURATION + if sys.argv[2] == "mp3sink": + properties = dbus.Dictionary({ "UUID" : A2DP_SINK_UUID, + "Codec" : MP3_CODEC, + "Capabilities" : MP3_CAPABILITIES }) + configuration = MP3_CONFIGURATION + + print(properties) + + path = "/test/endpoint" + app = Application(bus, path, properties, configuration) + mainloop = GObject.MainLoop() + + media.RegisterApplication(app.get_path(), {}, + reply_handler=register_app_cb, + error_handler=register_app_error_cb) + mainloop.run() diff --git a/test/example-player b/test/example-player new file mode 100644 index 00000000..2beb08e4 --- /dev/null +++ b/test/example-player @@ -0,0 +1,203 @@ +#!/usr/bin/python + +from __future__ import print_function + +import os +import sys +import dbus +import dbus.service +import dbus.mainloop.glib +try: + from gi.repository import GObject +except ImportError: + import gobject as GObject +import bluezutils + +PLAYER_IFACE = 'org.mpris.MediaPlayer2.Player' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' + +class Player(dbus.service.Object): + def __init__(self, bus, path, obj): + self.path = path + dbus.service.Object.__init__(self, bus, self.path) + + if obj != None: + mp = dbus.Interface(bus.get_object("org.bluez", obj), + "org.bluez.MediaPlayer1") + prop = dbus.Interface(bus.get_object("org.bluez", obj), + "org.freedesktop.DBus.Properties") + + self.properties = prop.GetAll("org.bluez.MediaPlayer1") + + bus.add_signal_receiver(self.properties_changed, path = obj, + dbus_interface = "org.freedesktop.DBus.Properties", + signal_name = "PropertiesChanged") + else: + self.track = dbus.Dictionary({"xesam:title" : "Title", + "xesam:artist" : ["Artist"], + "xesam:album" : "Album", + "xesam:genre" : ["Genre"], + "xesam:trackNumber" : dbus.Int32(1), + "mpris:length" : dbus.Int64(10000) }, + signature="sv") + + self.properties = dbus.Dictionary({"PlaybackStatus" : "playing", + "Identity" : "SimplePlayer", + "LoopStatus" : "None", + "Rate" : dbus.Double(1.0), + "Shuffle" : dbus.Boolean(False), + "Metadata" : self.track, + "Volume" : dbus.Double(1.0), + "Position" : dbus.Int64(0), + "MinimumRate" : dbus.Double(1.0), + "MaximumRate" : dbus.Double(1.0), + "CanGoNext" : dbus.Boolean(False), + "CanGoPrevious" : dbus.Boolean(False), + "CanPlay" : dbus.Boolean(False), + "CanSeek" : dbus.Boolean(False), + "CanControl" : dbus.Boolean(False), + }, + signature="sv") + + print('Register media player with:\n\tProperties: %s' \ + % (self.properties)) + handler = InputHandler(self) + GObject.io_add_watch(sys.stdin, GObject.IO_IN, handler.handle) + + @dbus.service.method("org.freedesktop.DBus.Properties", + in_signature="ssv", out_signature="") + def Set(self, interface, key, value): + print("Set (%s, %s)" % (key, value), file=sys.stderr) + return + + def get_properties(self): + return self.properties + + def get_path(self): + return dbus.ObjectPath(self.path) + + @dbus.service.method("org.freedesktop.DBus.Properties", + in_signature='s', out_signature='a{sv}') + def GetAll(self, interface): + if interface != PLAYER_IFACE: + raise InvalidArgsException() + + return self.get_properties() + + @dbus.service.signal("org.freedesktop.DBus.Properties", + signature="sa{sv}as") + def PropertiesChanged(self, interface, properties, + invalidated = dbus.Array()): + """PropertiesChanged(interface, properties, invalidated) + + Send a PropertiesChanged signal. 'properties' is a dictionary + containing string parameters as specified in doc/media-api.txt. + """ + pass + + def help(self, func): + help(self.__class__.__dict__[func]) + + def properties_changed(self, interface, properties, invalidated): + print("properties_changed(%s, %s)" % (properties, invalidated)) + + self.PropertiesChanged(interface, properties, invalidated) + +class InputHandler: + commands = { 'PropertiesChanged': '(interface, properties)', + 'help': '(cmd)' } + def __init__(self, player): + self.player = player + print('\n\nAvailable commands:') + for cmd in self.commands: + print('\t', cmd, self.commands[cmd], sep='') + + print("\nUse python syntax to pass arguments to available methods.\n" \ + "E.g.: PropertiesChanged({'Metadata' : {'Title': 'My title', \ + 'Album': 'my album' }})") + self.prompt() + + def prompt(self): + print('\n>>> ', end='') + sys.stdout.flush() + + def handle(self, fd, condition): + s = os.read(fd.fileno(), 1024).strip() + try: + cmd = s[:s.find('(')] + if not cmd in self.commands: + print("Unknown command ", cmd) + except ValueError: + print("Malformed command") + return True + try: + exec "self.player.%s" % s + except Exception as e: + print(e) + pass + self.prompt() + return True + +class Application(dbus.service.Object): + def __init__(self, bus, path, obj): + self.path = '/' + self.players = [] + dbus.service.Object.__init__(self, bus, self.path) + self.add_player(Player(bus, path, obj)) + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_player(self, player): + self.players.append(player) + + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') + def GetManagedObjects(self): + response = {} + print('GetManagedObjects') + + for player in self.players: + response[player.get_path()] = { PLAYER_IFACE: + player.get_properties() } + + return response + +def register_app_cb(): + print('Media application registered') + + +def register_app_error_cb(error): + print('Failed to register application: ' + str(error)) + mainloop.quit() + +if __name__ == '__main__': + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + bus = dbus.SystemBus() + + if len(sys.argv) > 1: + path = bluezutils.find_adapter(sys.argv[1]).object_path + else: + path = bluezutils.find_adapter().object_path + + media = dbus.Interface(bus.get_object("org.bluez", path), + "org.bluez.Media1") + + path = "/test/player" + + if len(sys.argv) > 2: + app = Application(bus, path, sys.argv[2]) + else: + app = Application(bus, path, None) + + mainloop = GObject.MainLoop() + + media.RegisterApplication(app.get_path(), {}, + reply_handler=register_app_cb, + error_handler=register_app_error_cb) + + mainloop.run() |