summaryrefslogtreecommitdiff
path: root/tests/test_gio.py
diff options
context:
space:
mode:
authorHasan Wan <hasan.wan@intel.com>2012-05-25 16:47:13 +0800
committerHasan Wan <hasan.wan@intel.com>2012-05-28 16:06:22 +0800
commit1ddee039e66b9c1feb2129a1904e60760e0e8fc3 (patch)
tree84a3b12d0f839c6a2ef702eb02f3288c91cf5902 /tests/test_gio.py
parentf7d643cbb2184346b6f8d26091eb7eb38c6bbfe1 (diff)
downloadpygobject2-1ddee039e66b9c1feb2129a1904e60760e0e8fc3.tar.gz
pygobject2-1ddee039e66b9c1feb2129a1904e60760e0e8fc3.tar.bz2
pygobject2-1ddee039e66b9c1feb2129a1904e60760e0e8fc3.zip
Updated with Tizen:Base source codes
Diffstat (limited to 'tests/test_gio.py')
-rw-r--r--tests/test_gio.py1137
1 files changed, 1137 insertions, 0 deletions
diff --git a/tests/test_gio.py b/tests/test_gio.py
new file mode 100644
index 0000000..feafc70
--- /dev/null
+++ b/tests/test_gio.py
@@ -0,0 +1,1137 @@
+# -*- Mode: Python -*-
+
+import os
+import unittest
+
+from common import gio, glib
+
+
+class TestFile(unittest.TestCase):
+ def setUp(self):
+ self._f = open("file.txt", "w+")
+ self.file = gio.File("file.txt")
+
+ def tearDown(self):
+ self._f.close()
+ if os.path.exists('file.txt'):
+ os.unlink("file.txt")
+
+ def testReadAsync(self):
+ self._f.write("testing")
+ self._f.seek(0)
+
+ def callback(file, result):
+ try:
+ stream = file.read_finish(result)
+ self.failUnless(isinstance(stream, gio.InputStream))
+ self.assertEquals(stream.read(), "testing")
+ finally:
+ loop.quit()
+
+ self.file.read_async(callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testAppendToAsync(self):
+ self._f.write("append_to ")
+ self._f.close()
+
+ def callback(file, result):
+ try:
+ stream = file.append_to_finish(result)
+ self.failUnless(isinstance(stream, gio.OutputStream))
+ w = stream.write("testing")
+ cont, leng, etag = self.file.load_contents()
+ self.assertEqual(cont, "append_to testing")
+ finally:
+ loop.quit()
+
+ self.file.append_to_async(callback, gio.FILE_CREATE_NONE,
+ glib.PRIORITY_HIGH)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testAppendToAsyncNoargs(self):
+ self._f.write("append_to ")
+ self._f.close()
+
+ def callback(file, result):
+ try:
+ stream = file.append_to_finish(result)
+ self.failUnless(isinstance(stream, gio.OutputStream))
+ w = stream.write("testing")
+ cont, leng, etag = self.file.load_contents()
+ self.assertEqual(cont, "append_to testing")
+ finally:
+ loop.quit()
+
+ self.file.append_to_async(callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testCreateAsync(self):
+ def callback(file, result):
+ try:
+ stream = file.create_finish(result)
+ self.failUnless(isinstance(stream, gio.OutputStream))
+ w = stream.write("testing")
+ cont, leng, etag = file.load_contents()
+ self.assertEqual(cont, "testing")
+ finally:
+ if os.path.exists('temp.txt'):
+ os.unlink("temp.txt")
+ loop.quit()
+
+ gfile = gio.File("temp.txt")
+ gfile.create_async(callback, gio.FILE_CREATE_NONE,
+ glib.PRIORITY_HIGH)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testCreateReadWriteAsync(self):
+ def callback(file, result):
+ try:
+ iostream = file.create_readwrite_finish(result)
+ self.failUnless(isinstance(iostream, gio.FileIOStream))
+
+ ostream = iostream.get_output_stream()
+ self.failUnless(isinstance(ostream, gio.OutputStream))
+
+ w = ostream.write("testing")
+ cont, leng, etag = file.load_contents()
+ self.assertEqual(cont, "testing")
+ finally:
+ if os.path.exists('temp.txt'):
+ os.unlink("temp.txt")
+ loop.quit()
+
+ gfile = gio.File("temp.txt")
+ gfile.create_readwrite_async(callback, gio.FILE_CREATE_NONE,
+ glib.PRIORITY_HIGH)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testCreateAsyncNoargs(self):
+ def callback(file, result):
+ try:
+ stream = file.create_finish(result)
+ self.failUnless(isinstance(stream, gio.OutputStream))
+ w = stream.write("testing")
+ cont, leng, etag = file.load_contents()
+ self.assertEqual(cont, "testing")
+ finally:
+ if os.path.exists('temp.txt'):
+ os.unlink("temp.txt")
+ loop.quit()
+
+ gfile = gio.File("temp.txt")
+ gfile.create_async(callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testReplaceAsync(self):
+ self._f.write("testing")
+ self._f.close()
+
+ def callback(file, result):
+ try:
+ stream = file.replace_finish(result)
+ self.failUnless(isinstance(stream, gio.OutputStream))
+ stream.write("some new string")
+ stream.close()
+ cont, leng, etag = file.load_contents()
+ self.assertEqual(cont, "some new string")
+ finally:
+ loop.quit()
+
+
+ self.file.replace_async(callback, None, True, gio.FILE_CREATE_NONE,
+ glib.PRIORITY_HIGH)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testReplaceAsyncNoargs(self):
+ self._f.write("testing")
+ self._f.close()
+
+ def callback(file, result):
+ try:
+ stream = file.replace_finish(result)
+ self.failUnless(isinstance(stream, gio.OutputStream))
+ stream.write("some new string")
+ stream.close()
+ cont, leng, etag = file.load_contents()
+ self.assertEqual(cont, "some new string")
+ finally:
+ loop.quit()
+
+
+ self.file.replace_async(callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testReadAsyncError(self):
+ self.assertRaises(TypeError, self.file.read_async)
+ self.assertRaises(TypeError, self.file.read_async, "foo", "bar")
+ self.assertRaises(TypeError, self.file.read_async, "foo",
+ priority="bar")
+ self.assertRaises(TypeError, self.file.read_async, "foo",
+ priority="bar")
+ self.assertRaises(TypeError, self.file.read_async, "foo",
+ priority=1, cancellable="bar")
+ self.assertRaises(TypeError, self.file.read_async, "foo", 1, "bar")
+
+ def testConstructor(self):
+ for gfile in [gio.File("/"),
+ gio.File("file:///"),
+ gio.File(uri="file:///"),
+ gio.File(path="/"),
+ gio.File(u"/"),
+ gio.File(path=u"/")]:
+ self.failUnless(isinstance(gfile, gio.File))
+ self.assertEquals(gfile.get_path(), "/")
+ self.assertEquals(gfile.get_uri(), "file:///")
+
+ def testConstructorError(self):
+ self.assertRaises(TypeError, gio.File)
+ self.assertRaises(TypeError, gio.File, 1)
+ self.assertRaises(TypeError, gio.File, "foo", "bar")
+ self.assertRaises(TypeError, gio.File, foo="bar")
+ self.assertRaises(TypeError, gio.File, uri=1)
+ self.assertRaises(TypeError, gio.File, path=1)
+
+ def testLoadContents(self):
+ self._f.write("testing load_contents")
+ self._f.seek(0)
+ c = gio.Cancellable()
+ cont, leng, etag = self.file.load_contents(cancellable=c)
+ self.assertEqual(cont, "testing load_contents")
+ self.assertEqual(leng, 21)
+ self.assertNotEqual(etag, '')
+
+ def testLoadContentsAsync(self):
+ self._f.write("testing load_contents_async")
+ self._f.seek(0)
+
+ def callback(contents, result):
+ try:
+ cont, leng, etag = contents.load_contents_finish(result)
+ self.assertEqual(cont, "testing load_contents_async")
+ self.assertEqual(leng, 27)
+ self.assertNotEqual(etag, '')
+ finally:
+ loop.quit()
+
+ canc = gio.Cancellable()
+ self.file.load_contents_async(callback, cancellable=canc)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testQueryInfoAsync(self):
+ def callback(file, result):
+ try:
+ info = file.query_info_finish(result)
+ self.failUnless(isinstance(info, gio.FileInfo))
+ self.failUnless(info.get_name(), "file.txt")
+ finally:
+ loop.quit()
+
+ self.file.query_info_async("standard", callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testMountMountable(self):
+ gfile = gio.File('localtest:')
+ def unmount_done(mount, result):
+ try:
+ retval = mount.unmount_finish(result)
+ self.failUnless(retval)
+ finally:
+ loop.quit()
+
+ def mount_enclosing_volume_done(gfile, result):
+ try:
+ try:
+ retval = gfile.mount_enclosing_volume_finish(result)
+ except gio.Error, e:
+ # If we run the tests too fast
+ if e.code == gio.ERROR_ALREADY_MOUNTED:
+ print ('WARNING: testfile is already mounted, '
+ 'skipping test')
+ loop.quit()
+ return
+ raise
+ self.failUnless(retval)
+ finally:
+ try:
+ mount = gfile.find_enclosing_mount()
+ except gio.Error:
+ loop.quit()
+ return
+ mount.unmount(unmount_done)
+
+ mount_operation = gio.MountOperation()
+ gfile.mount_enclosing_volume(mount_operation,
+ mount_enclosing_volume_done)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testCopy(self):
+ if os.path.exists('copy.txt'):
+ os.unlink("copy.txt")
+
+ source = gio.File('file.txt')
+ destination = gio.File('copy.txt')
+ try:
+ retval = source.copy(destination)
+ self.failUnless(retval)
+
+ self.failUnless(os.path.exists('copy.txt'))
+ self.assertEqual(open('file.txt').read(),
+ open('copy.txt').read())
+ finally:
+ os.unlink("copy.txt")
+
+ self.called = False
+ def callback(current, total):
+ self.called = True
+ source = gio.File('file.txt')
+ destination = gio.File('copy.txt')
+ try:
+ retval = source.copy(destination, callback)
+ self.failUnless(retval)
+
+ self.failUnless(os.path.exists('copy.txt'))
+ self.assertEqual(open('file.txt').read(),
+ open('copy.txt').read())
+ self.failUnless(self.called)
+ finally:
+ os.unlink("copy.txt")
+
+ def test_copy_async(self):
+ if os.path.exists('copy.txt'):
+ os.unlink("copy.txt")
+
+ source = gio.File('file.txt')
+ destination = gio.File('copy.txt')
+
+ def copied(source_, result):
+ try:
+ self.assert_(source_ is source)
+ self.failUnless(source_.copy_finish(result))
+ finally:
+ loop.quit()
+
+ def progress(current, total):
+ self.assert_(isinstance(current, long))
+ self.assert_(isinstance(total, long))
+ self.assert_(0 <= current <= total)
+
+ try:
+ loop = glib.MainLoop()
+ source.copy_async(destination, copied, progress_callback = progress)
+ loop.run()
+
+ self.failUnless(os.path.exists('copy.txt'))
+ self.assertEqual(open('file.txt').read(),
+ open('copy.txt').read())
+ finally:
+ os.unlink("copy.txt")
+
+ # See bug 546591.
+ def test_copy_progress(self):
+ source = gio.File('file.txt')
+ destination = gio.File('copy.txt')
+
+ def progress(current, total):
+ self.assert_(isinstance(current, long))
+ self.assert_(isinstance(total, long))
+ self.assert_(0 <= current <= total)
+
+ try:
+ retval = source.copy(destination,
+ flags=gio.FILE_COPY_OVERWRITE,
+ progress_callback=progress)
+ self.failUnless(retval)
+
+ self.failUnless(os.path.exists('copy.txt'))
+ self.assertEqual(open('file.txt').read(),
+ open('copy.txt').read())
+ finally:
+ os.unlink("copy.txt")
+
+ def testMove(self):
+ if os.path.exists('move.txt'):
+ os.unlink("move.txt")
+
+ source = gio.File('file.txt')
+ destination = gio.File('move.txt')
+ retval = source.move(destination)
+ self.failUnless(retval)
+
+ self.failIf(os.path.exists('file.txt'))
+ self.failUnless(os.path.exists('move.txt'))
+
+ self.called = False
+ def callback(current, total):
+ self.called = True
+ source = gio.File('move.txt')
+ destination = gio.File('move-2.txt')
+ try:
+ retval = source.move(destination, callback)
+ self.failUnless(retval)
+
+ self.failIf(os.path.exists('move.txt'))
+ self.failUnless(os.path.exists('move-2.txt'))
+ self.failUnless(self.called)
+ finally:
+ os.unlink("move-2.txt")
+
+ def testInfoList(self):
+ infolist = self.file.query_settable_attributes()
+ for info in infolist:
+ if info.name == "time::modified":
+ self.assertEqual(info.type, gio.FILE_ATTRIBUTE_TYPE_UINT64)
+ self.assertEqual(info.name, "time::modified")
+ self.assertEqual(info.flags,
+ gio.FILE_ATTRIBUTE_INFO_COPY_WHEN_MOVED |
+ gio.FILE_ATTRIBUTE_INFO_COPY_WITH_FILE)
+
+ def testQueryWritableNamespaces(self):
+ infolist = self.file.query_writable_namespaces()
+ for info in infolist:
+ if info.name == "xattr":
+ self.assertEqual(info.type, gio.FILE_ATTRIBUTE_TYPE_STRING)
+
+ def testSetAttribute(self):
+ self._f.write("testing attributes")
+ self._f.seek(0)
+ infolist = self.file.query_settable_attributes()
+
+ self.assertNotEqual(len(infolist), 0)
+
+ for info in infolist:
+ if info.name == "time::modified-usec":
+ ret = self.file.set_attribute("time::modified-usec",
+ gio.FILE_ATTRIBUTE_TYPE_UINT32,
+ 10, gio.FILE_QUERY_INFO_NONE)
+ self.assertEqual(ret, True)
+
+ def testSetAttributesAsync(self):
+ def callback(gfile, result):
+ try:
+ info = gfile.set_attributes_finish(result)
+ usec = info.get_attribute_uint32("time::modified-usec")
+ self.assertEqual(usec, 10)
+ finally:
+ loop.quit()
+
+ info = gio.FileInfo()
+ info.set_attribute_uint32("time::modified-usec", 10)
+
+ canc = gio.Cancellable()
+ self.file.set_attributes_async(info, callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testReplaceContents(self):
+ self.file.replace_contents("testing replace_contents")
+ cont, leng, etag = self.file.load_contents()
+ self.assertEqual(cont, "testing replace_contents")
+
+ caught = False
+ try:
+ self.file.replace_contents("this won't work", etag="wrong")
+ except gio.Error, e:
+ self.assertEqual(e.code, gio.ERROR_WRONG_ETAG)
+ caught = True
+ self.failUnless(caught)
+
+ self.file.replace_contents("testing replace_contents again", etag=etag)
+ cont, leng, etag = self.file.load_contents()
+ self.assertEqual(cont, "testing replace_contents again")
+
+ self.file.replace_contents("testing replace_contents yet again", etag=None)
+ cont, leng, etag = self.file.load_contents()
+ self.assertEqual(cont, "testing replace_contents yet again")
+
+ def testReplaceContentsAsync(self):
+
+ def callback(contents, result):
+ try:
+ newetag = contents.replace_contents_finish(result)
+ cont, leng, etag = self.file.load_contents()
+ self.assertEqual(cont, "testing replace_contents_async")
+ self.assertEqual(leng, 30)
+ self.assertEqual(etag, newetag)
+ self.assertNotEqual(newetag, '')
+ finally:
+ loop.quit()
+
+ canc = gio.Cancellable()
+ self.file.replace_contents_async("testing replace_contents_async", callback, cancellable=canc)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def test_eq(self):
+ self.assertEqual(gio.File('foo'),
+ gio.File('foo'))
+ self.assertNotEqual(gio.File('foo'),
+ gio.File('bar'))
+
+ def test_hash(self):
+ self.assertEquals(hash(gio.File('foo')),
+ hash(gio.File('foo')))
+
+ def testSetDisplayNameAsync(self):
+ def callback(gfile, result):
+ try:
+ new_gfile = gfile.set_display_name_finish(result)
+ new_name = new_gfile.get_basename()
+ self.assertEqual(new_name, "new.txt")
+ deleted = new_gfile.delete()
+ self.assertEqual(deleted, True)
+ finally:
+ loop.quit()
+
+ canc = gio.Cancellable()
+ self.file.set_display_name_async("new.txt", callback, cancellable=canc)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+class TestGFileEnumerator(unittest.TestCase):
+ def setUp(self):
+ self.file = gio.File(".")
+
+ def testEnumerateChildren(self):
+ enumerator = self.file.enumerate_children(
+ "standard::*", gio.FILE_QUERY_INFO_NOFOLLOW_SYMLINKS)
+ for file_info in enumerator:
+ if file_info.get_name() == 'test_gio.py':
+ break
+ else:
+ raise AssertionError
+
+ def testEnumerateChildrenAsync(self):
+ def callback(gfile, result):
+ try:
+ for file_info in gfile.enumerate_children_finish(result):
+ if file_info.get_name() == 'test_gio.py':
+ break
+ else:
+ raise AssertionError
+ finally:
+ loop.quit()
+
+ self.file.enumerate_children_async(
+ "standard::*", callback)
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testNextFilesAsync(self):
+ def callback(enumerator, result):
+ try:
+ for file_info in enumerator.next_files_finish(result):
+ if file_info.get_name() == 'test_gio.py':
+ break
+ else:
+ raise AssertionError
+ finally:
+ loop.quit()
+
+ enumerator = self.file.enumerate_children(
+ "standard::*", gio.FILE_QUERY_INFO_NOFOLLOW_SYMLINKS)
+ enumerator.next_files_async(1000, callback)
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testCloseFilesAsync(self):
+ def callback(enumerator, result):
+ try:
+ enumerator.close_finish(result)
+ finally:
+ loop.quit()
+
+ enumerator = self.file.enumerate_children(
+ "standard::*", gio.FILE_QUERY_INFO_NOFOLLOW_SYMLINKS)
+
+ enumerator.close_async(callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+
+class TestInputStream(unittest.TestCase):
+ def setUp(self):
+ self._f = open("inputstream.txt", "w+")
+ self._f.write("testing")
+ self._f.seek(0)
+ self.stream = gio.unix.InputStream(self._f.fileno(), False)
+
+ def tearDown(self):
+ self._f.close()
+ os.unlink("inputstream.txt")
+
+ def testRead(self):
+ self.assertEquals(self.stream.read(), "testing")
+
+ self.stream = gio.MemoryInputStream()
+ self.assertEquals(self.stream.read(), '')
+
+ self.stream = gio.MemoryInputStream()
+ some_data = open("test_gio.py", "rb").read()
+ self.stream.add_data(some_data)
+ self.assertEquals(self.stream.read(), some_data)
+
+ stream = gio.MemoryInputStream()
+ stream.add_data(some_data)
+ self.assertEquals(self._read_in_loop(stream,
+ lambda: stream.read(50),
+ 50),
+ some_data)
+
+ def testSkip(self):
+ self.stream.skip(2)
+ res = self.stream.read()
+ self.assertEqual(res, "sting")
+
+ def testSkipAsync(self):
+ def callback(stream, result):
+ try:
+ size = stream.skip_finish(result)
+ self.assertEqual(size, 2)
+ res = stream.read()
+ self.assertEqual(res, "sting")
+ finally:
+ loop.quit()
+
+ self.stream.skip_async(2, callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def test_read_part(self):
+ self.assertEquals(self._read_in_loop(self.stream,
+ lambda: self.stream.read_part()),
+ 'testing')
+
+ stream = gio.MemoryInputStream()
+ some_data = open('test_gio.py', 'rb').read()
+ stream.add_data(some_data)
+ self.assertEquals(self._read_in_loop(stream,
+ lambda: stream.read_part(50),
+ 50),
+ some_data)
+
+ def _read_in_loop(self, stream, reader, size_limit=0):
+ read_data = ''
+ while True:
+ read_part = reader()
+ if read_part:
+ read_data += read_part
+ if size_limit > 0:
+ self.assert_(len(read_part) <= size_limit,
+ '%d <= %d' % (len(read_part), size_limit))
+ else:
+ return read_data
+
+ def testReadAsync(self):
+ def callback(stream, result):
+ self.assertEquals(result.get_op_res_gssize(), 7)
+ try:
+ data = stream.read_finish(result)
+ self.assertEquals(data, "testing")
+ stream.close()
+ finally:
+ loop.quit()
+
+ self.stream.read_async(7, callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testReadAsyncError(self):
+ self.count = 0
+ def callback(stream, result):
+ try:
+ self.count += 1
+ if self.count == 1:
+ return
+ self.assertRaises(gio.Error, stream.read_finish, result)
+ finally:
+ loop.quit()
+
+ self.stream.read_async(10240, callback)
+ self.stream.read_async(10240, callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ self.assertEquals(self.count, 2)
+
+ self.assertRaises(TypeError, self.stream.read_async)
+ self.assertRaises(TypeError, self.stream.read_async, "foo")
+ self.assertRaises(TypeError, self.stream.read_async, 1024, "bar")
+ self.assertRaises(TypeError, self.stream.read_async, 1024,
+ priority="bar")
+ self.assertRaises(TypeError, self.stream.read_async, 1024,
+ priority="bar")
+ self.assertRaises(TypeError, self.stream.read_async, 1024,
+ priority=1, cancellable="bar")
+ self.assertRaises(TypeError, self.stream.read_async, 1024, 1, "bar")
+
+
+ # FIXME: this makes 'make check' freeze
+ def _testCloseAsync(self):
+ def callback(stream, result):
+ try:
+ self.failUnless(stream.close_finish(result))
+ finally:
+ loop.quit()
+
+ self.stream.close_async(callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+
+class TestDataInputStream(unittest.TestCase):
+ def setUp(self):
+ self.base_stream = gio.MemoryInputStream()
+ self.data_stream = gio.DataInputStream(self.base_stream)
+
+ def test_read_line(self):
+ self.base_stream.add_data('foo\nbar\n\nbaz')
+ self.assertEquals('foo', self.data_stream.read_line())
+ self.assertEquals('bar', self.data_stream.read_line())
+ self.assertEquals('', self.data_stream.read_line())
+ self.assertEquals('baz', self.data_stream.read_line())
+
+ def test_read_line_async(self):
+ def do_read_line_async():
+ loop = glib.MainLoop()
+ line = []
+
+ def callback(stream, result):
+ try:
+ line.append(stream.read_line_finish(result))
+ finally:
+ loop.quit()
+
+ self.data_stream.read_line_async(callback)
+ loop.run()
+ return line[0]
+
+ self.base_stream.add_data('foo\nbar\n\nbaz')
+ self.assertEquals('foo', do_read_line_async())
+ self.assertEquals('bar', do_read_line_async())
+ self.assertEquals('', do_read_line_async())
+ self.assertEquals('baz', do_read_line_async())
+
+ def test_read_until(self):
+ self.base_stream.add_data('sentence.end of line\nthe rest')
+ self.assertEquals('sentence', self.data_stream.read_until('.!?'))
+ self.assertEquals('end of line', self.data_stream.read_until('\n\r'))
+ self.assertEquals('the rest', self.data_stream.read_until('#$%^&'))
+
+ def test_read_until_async(self):
+ def do_read_until_async(stop_chars):
+ loop = glib.MainLoop()
+ data = []
+
+ def callback(stream, result):
+ try:
+ data.append(stream.read_until_finish(result))
+ finally:
+ loop.quit()
+
+ self.data_stream.read_until_async(stop_chars, callback)
+ loop.run()
+ return data[0]
+
+ # Note the weird difference between synchronous and
+ # asynchronous version. See bug #584284.
+ self.base_stream.add_data('sentence.end of line\nthe rest')
+ self.assertEquals('sentence', do_read_until_async('.!?'))
+ self.assertEquals('.end of line', do_read_until_async('\n\r'))
+ self.assertEquals('\nthe rest', do_read_until_async('#$%^&'))
+
+
+class TestMemoryInputStream(unittest.TestCase):
+ def setUp(self):
+ self.stream = gio.MemoryInputStream()
+
+ def test_add_data(self):
+ self.stream.add_data('foobar')
+ self.assertEquals('foobar', self.stream.read())
+
+ self.stream.add_data('ham ')
+ self.stream.add_data(None)
+ self.stream.add_data('spam')
+ self.assertEquals('ham spam', self.stream.read())
+
+ def test_new_from_data(self):
+ stream = gio.memory_input_stream_new_from_data('spam')
+ self.assertEquals('spam', stream.read())
+
+
+class TestOutputStream(unittest.TestCase):
+ def setUp(self):
+ self._f = open("outputstream.txt", "w")
+ self.stream = gio.unix.OutputStream(self._f.fileno(), False)
+
+ def tearDown(self):
+ self._f.close()
+ os.unlink("outputstream.txt")
+
+ def testWrite(self):
+ self.stream.write("testing")
+ self.stream.close()
+ self.failUnless(os.path.exists("outputstream.txt"))
+ self.assertEquals(open("outputstream.txt").read(), "testing")
+
+ def test_write_part(self):
+ stream = gio.MemoryOutputStream()
+ some_data = open('test_gio.py', 'rb').read()
+ buffer = some_data
+
+ # In fact this makes only one looping (memory stream is fast,
+ # write_part behaves just like write), but let's still be
+ # complete.
+ while buffer:
+ written = stream.write_part(buffer)
+ if written == len(buffer):
+ break
+ else:
+ buffer = buffer[written:]
+
+ self.assertEquals(stream.get_contents(), some_data)
+
+ def testWriteAsync(self):
+ def callback(stream, result):
+ self.assertEquals(result.get_op_res_gssize(), 7)
+ try:
+ self.assertEquals(stream.write_finish(result), 7)
+ self.failUnless(os.path.exists("outputstream.txt"))
+ self.assertEquals(open("outputstream.txt").read(), "testing")
+ finally:
+ loop.quit()
+
+ self.stream.write_async("testing", callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testWriteAsyncError(self):
+ def callback(stream, result):
+ self.assertEquals(result.get_op_res_gssize(), 0)
+ try:
+ self.assertRaises(gio.Error, stream.write_finish, result)
+ finally:
+ loop.quit()
+
+ self.stream.close()
+ self.stream.write_async("testing", callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ self.assertRaises(TypeError, self.stream.write_async)
+ self.assertRaises(TypeError, self.stream.write_async, 1138)
+ self.assertRaises(TypeError, self.stream.write_async, "foo", "bar")
+ self.assertRaises(TypeError, self.stream.write_async, "foo",
+ priority="bar")
+ self.assertRaises(TypeError, self.stream.write_async, "foo",
+ priority="bar")
+ self.assertRaises(TypeError, self.stream.write_async, "foo",
+ priority=1, cancellable="bar")
+ self.assertRaises(TypeError, self.stream.write_async, "foo", 1, "bar")
+
+ # FIXME: this makes 'make check' freeze
+ def _testCloseAsync(self):
+ def callback(stream, result):
+ try:
+ self.failUnless(stream.close_finish(result))
+ finally:
+ loop.quit()
+
+ self.stream.close_async(callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testFlushAsync(self):
+ def callback(stream, result):
+ try:
+ self.failUnless(stream.flush_finish(result))
+ finally:
+ loop.quit()
+
+ self.stream.flush_async(callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+ def testSpliceAsync(self):
+ _f = open("stream.txt", "w+")
+ _f.write("testing")
+ _f.seek(0)
+ instream = gio.unix.InputStream(_f.fileno(), False)
+
+ def callback(stream, result):
+ try:
+ size = stream.splice_finish(result)
+ self.assertEqual(size, 7)
+
+ finally:
+ os.unlink("stream.txt")
+ loop.quit()
+
+ self.stream.splice_async(instream, callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+class TestMemoryOutputStream(unittest.TestCase):
+ def setUp(self):
+ self.stream = gio.MemoryOutputStream()
+
+ def test_get_contents(self):
+ self.stream.write('foobar')
+ self.assertEquals('foobar', self.stream.get_contents())
+
+ self.stream.write('baz')
+ self.assertEquals('foobarbaz', self.stream.get_contents())
+
+
+class TestVolumeMonitor(unittest.TestCase):
+ def setUp(self):
+ self.monitor = gio.volume_monitor_get()
+
+ def testGetConnectedDrives(self):
+ drives = self.monitor.get_connected_drives()
+ self.failUnless(isinstance(drives, list))
+
+ def testGetVolumes(self):
+ volumes = self.monitor.get_volumes()
+ self.failUnless(isinstance(volumes, list))
+
+ def testGetMounts(self):
+ mounts = self.monitor.get_mounts()
+ self.failUnless(isinstance(mounts, list))
+ if not mounts:
+ return
+
+ self.failUnless(isinstance(mounts[0], gio.Mount))
+ # Bug 538601
+ icon = mounts[0].get_icon()
+ if not icon:
+ return
+ self.failUnless(isinstance(icon, gio.Icon))
+
+
+class TestContentTypeGuess(unittest.TestCase):
+ def testFromName(self):
+ mime_type = gio.content_type_guess('diagram.svg')
+ self.assertEquals('image/svg+xml', mime_type)
+
+ def testFromContents(self):
+ mime_type = gio.content_type_guess(data='<html></html>')
+ self.assertEquals('text/html', mime_type)
+
+ def testFromContentsUncertain(self):
+ mime_type, result_uncertain = gio.content_type_guess(
+ data='<html></html>', want_uncertain=True)
+ self.assertEquals('text/html', mime_type)
+ self.assertEquals(bool, type(result_uncertain))
+
+
+class TestFileInfo(unittest.TestCase):
+ def setUp(self):
+ self.fileinfo = gio.File("test_gio.py").query_info("*")
+
+ def testListAttributes(self):
+ attributes = self.fileinfo.list_attributes("standard")
+ self.failUnless(attributes)
+ self.failUnless('standard::name' in attributes)
+
+ def testGetModificationTime(self):
+ mtime = self.fileinfo.get_modification_time()
+ self.assertEqual(type(mtime), float)
+
+ def testSetModificationTime(self):
+ self.fileinfo.set_modification_time(1000)
+ mtime = self.fileinfo.get_modification_time()
+ self.assertEqual(mtime, 1000)
+
+
+class TestAppInfo(unittest.TestCase):
+ def setUp(self):
+ self.appinfo = gio.AppInfo("does-not-exist")
+
+ def testSimple(self):
+ self.assertEquals(self.appinfo.get_description(),
+ "Custom definition for does-not-exist")
+
+ def test_eq(self):
+ info1 = gio.app_info_get_all()[0]
+ info2 = info1.dup()
+ self.assert_(info1 is not info2)
+ self.assertEquals(info1, info2)
+
+ self.assertNotEqual(gio.app_info_get_all()[0], gio.app_info_get_all()[1])
+
+class TestVfs(unittest.TestCase):
+ def setUp(self):
+ self.vfs = gio.vfs_get_default()
+
+ def testGetSupportedURISchemes(self):
+ result = self.vfs.get_supported_uri_schemes()
+ self.failUnless(type(result), [])
+
+class TestVolume(unittest.TestCase):
+ def setUp(self):
+ self.monitor = gio.volume_monitor_get()
+
+ def testVolumeEnumerate(self):
+ volumes = self.monitor.get_volumes()
+ self.failUnless(isinstance(volumes, list))
+ for v in volumes:
+ if v is not None:
+ ids = v.enumerate_identifiers()
+ self.failUnless(isinstance(ids, list))
+ for id in ids:
+ if id is not None:
+ self.failUnless(isinstance(id, str))
+
+class TestFileInputStream(unittest.TestCase):
+ def setUp(self):
+ self._f = open("file.txt", "w+")
+ self._f.write("testing")
+ self._f.seek(0)
+ self.file = gio.File("file.txt")
+
+ def tearDown(self):
+ self._f.close()
+ if os.path.exists('file.txt'):
+ os.unlink("file.txt")
+
+ def testQueryInfoAsync(self):
+ def callback(stream, result):
+ try:
+ info = stream.query_info_finish(result)
+ self.failUnless(isinstance(info, gio.FileInfo))
+ self.failUnless(info.get_attribute_uint64("standard::size"), 7)
+ finally:
+ loop.quit()
+
+ inputstream = self.file.read()
+ inputstream.query_info_async("standard", callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+class TestFileOutputStream(unittest.TestCase):
+ def setUp(self):
+ self._f = open("file.txt", "w+")
+ self._f.write("testing")
+ self._f.seek(0)
+ self.file = gio.File("file.txt")
+
+ def tearDown(self):
+ self._f.close()
+ if os.path.exists('file.txt'):
+ os.unlink("file.txt")
+
+ def testQueryInfoAsync(self):
+ def callback(stream, result):
+ try:
+ info = stream.query_info_finish(result)
+ self.failUnless(isinstance(info, gio.FileInfo))
+ self.failUnless(info.get_attribute_uint64("standard::size"), 7)
+ finally:
+ loop.quit()
+
+ outputstream = self.file.append_to()
+ outputstream.query_info_async("standard", callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+class TestBufferedInputStream(unittest.TestCase):
+ def setUp(self):
+ self._f = open("buffer.txt", "w+")
+ self._f.write("testing")
+ self._f.seek(0)
+ stream = gio.unix.InputStream(self._f.fileno(), False)
+ self.buffered = gio.BufferedInputStream(stream)
+
+ def tearDown(self):
+ self._f.close()
+ os.unlink("buffer.txt")
+
+ def test_fill_async(self):
+ def callback(stream, result):
+ try:
+ size = stream.fill_finish(result)
+ self.failUnlessEqual(size, 4)
+ finally:
+ loop.quit()
+
+ self.buffered.fill_async(4, callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+class TestIOStream(unittest.TestCase):
+ def setUp(self):
+ self.file = gio.File("file.txt")
+ self.iofile = self.file.create_readwrite(gio.FILE_CREATE_NONE)
+
+ def tearDown(self):
+ if os.path.exists('file.txt'):
+ os.unlink("file.txt")
+
+ def testIOStreamCloseAsync(self):
+ def callback(stream, result):
+ try:
+ self.failUnless(stream.close_finish(result))
+ finally:
+ loop.quit()
+
+ self.iofile.close_async(callback)
+
+ loop = glib.MainLoop()
+ loop.run()
+
+
+ def testQueryInfoAsync(self):
+ def callback(stream, result):
+ try:
+ info = stream.query_info_finish(result)
+ self.failUnless(isinstance(info, gio.FileInfo))
+ self.failUnless(info.get_attribute_uint64("standard::size"), 7)
+ finally:
+ loop.quit()
+
+ ostream = self.iofile.get_output_stream()
+ ostream.write("testing")
+
+ self.iofile.query_info_async("standard", callback)
+
+ loop = glib.MainLoop()
+ loop.run()