diff options
Diffstat (limited to 'Lib/test/test_socket.py')
-rw-r--r-- | Lib/test/test_socket.py | 247 |
1 files changed, 195 insertions, 52 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 13d0eb4..5d5005b 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - import unittest from test import test_support @@ -330,28 +328,29 @@ class GeneralModuleTests(unittest.TestCase): ip = socket.gethostbyname(hostname) except socket.error: # Probably name lookup wasn't set up right; skip this test - return + self.skipTest('name lookup failure') self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") try: hname, aliases, ipaddrs = socket.gethostbyaddr(ip) except socket.error: # Probably a similar problem as above; skip this test - return + self.skipTest('address lookup failure') all_host_names = [hostname, hname] + aliases fqhn = socket.getfqdn(ip) if not fqhn in all_host_names: self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) + @unittest.skipUnless(hasattr(sys, 'getrefcount'), + 'test needs sys.getrefcount()') def testRefCountGetNameInfo(self): # Testing reference count for getnameinfo - if hasattr(sys, "getrefcount"): - try: - # On some versions, this loses a reference - orig = sys.getrefcount(__name__) - socket.getnameinfo(__name__,0) - except TypeError: - self.assertEqual(sys.getrefcount(__name__), orig, - "socket.getnameinfo loses a reference") + try: + # On some versions, this loses a reference + orig = sys.getrefcount(__name__) + socket.getnameinfo(__name__,0) + except TypeError: + self.assertEqual(sys.getrefcount(__name__), orig, + "socket.getnameinfo loses a reference") def testInterpreterCrash(self): # Making sure getnameinfo doesn't crash the interpreter @@ -414,7 +413,7 @@ class GeneralModuleTests(unittest.TestCase): # Try same call with optional protocol omitted port2 = socket.getservbyname(service) eq(port, port2) - # Try udp, but don't barf it it doesn't exist + # Try udp, but don't barf if it doesn't exist try: udpport = socket.getservbyname(service, 'udp') except socket.error: @@ -458,17 +457,17 @@ class GeneralModuleTests(unittest.TestCase): # Check that setting it to an invalid type raises TypeError self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") + @unittest.skipUnless(hasattr(socket, 'inet_aton'), + 'test needs socket.inet_aton()') def testIPv4_inet_aton_fourbytes(self): - if not hasattr(socket, 'inet_aton'): - return # No inet_aton, nothing to check # Test that issue1008086 and issue767150 are fixed. # It must return 4 bytes. self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0')) self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255')) + @unittest.skipUnless(hasattr(socket, 'inet_pton'), + 'test needs socket.inet_pton()') def testIPv4toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform from socket import inet_aton as f, inet_pton, AF_INET g = lambda a: inet_pton(AF_INET, a) @@ -483,15 +482,15 @@ class GeneralModuleTests(unittest.TestCase): self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170')) self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255')) + @unittest.skipUnless(hasattr(socket, 'inet_pton'), + 'test needs socket.inet_pton()') def testIPv6toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform try: from socket import inet_pton, AF_INET6, has_ipv6 if not has_ipv6: - return + self.skipTest('IPv6 not available') except ImportError: - return + self.skipTest('could not import needed symbols from socket') f = lambda a: inet_pton(AF_INET6, a) self.assertEqual('\x00' * 16, f('::')) @@ -502,9 +501,9 @@ class GeneralModuleTests(unittest.TestCase): f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') ) + @unittest.skipUnless(hasattr(socket, 'inet_ntop'), + 'test needs socket.inet_ntop()') def testStringToIPv4(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform from socket import inet_ntoa as f, inet_ntop, AF_INET g = lambda a: inet_ntop(AF_INET, a) @@ -517,15 +516,15 @@ class GeneralModuleTests(unittest.TestCase): self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55')) self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff')) + @unittest.skipUnless(hasattr(socket, 'inet_ntop'), + 'test needs socket.inet_ntop()') def testStringToIPv6(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform try: from socket import inet_ntop, AF_INET6, has_ipv6 if not has_ipv6: - return + self.skipTest('IPv6 not available') except ImportError: - return + self.skipTest('could not import needed symbols from socket') f = lambda a: inet_ntop(AF_INET6, a) self.assertEqual('::', f('\x00' * 16)) @@ -565,7 +564,7 @@ class GeneralModuleTests(unittest.TestCase): my_ip_addr = socket.gethostbyname(socket.gethostname()) except socket.error: # Probably name lookup wasn't set up right; skip this test - return + self.skipTest('name lookup failure') self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) self.assertEqual(name[1], port) @@ -644,9 +643,10 @@ class GeneralModuleTests(unittest.TestCase): if SUPPORTS_IPV6: socket.getaddrinfo('::1', 80) # port can be a string service name such as "http", a numeric - # port number or None + # port number (int or long), or None socket.getaddrinfo(HOST, "http") socket.getaddrinfo(HOST, 80) + socket.getaddrinfo(HOST, 80L) socket.getaddrinfo(HOST, None) # test family and socktype filters infos = socket.getaddrinfo(HOST, None, socket.AF_INET) @@ -663,6 +663,15 @@ class GeneralModuleTests(unittest.TestCase): socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE) + # Issue 17269: test workaround for OS X platform bug segfault + if hasattr(socket, 'AI_NUMERICSERV'): + try: + # The arguments here are undefined and the call may succeed + # or fail. All we care here is that it doesn't segfault. + socket.getaddrinfo("localhost", None, 0, 0, 0, + socket.AI_NUMERICSERV) + except socket.gaierror: + pass def check_sendall_interrupted(self, with_timeout): # socketpair() is not stricly required, but it makes things easier. @@ -683,11 +692,12 @@ class GeneralModuleTests(unittest.TestCase): c.settimeout(1.5) with self.assertRaises(ZeroDivisionError): signal.alarm(1) - c.sendall(b"x" * (1024**2)) + c.sendall(b"x" * test_support.SOCK_MAX_SIZE) if with_timeout: signal.signal(signal.SIGALRM, ok_handler) signal.alarm(1) - self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2)) + self.assertRaises(socket.timeout, c.sendall, + b"x" * test_support.SOCK_MAX_SIZE) finally: signal.signal(signal.SIGALRM, old_alarm) c.close() @@ -699,11 +709,20 @@ class GeneralModuleTests(unittest.TestCase): def test_sendall_interrupted_with_timeout(self): self.check_sendall_interrupted(True) - def testListenBacklog0(self): + def test_listen_backlog(self): + for backlog in 0, -1: + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + srv.bind((HOST, 0)) + srv.listen(backlog) + srv.close() + + @test_support.cpython_only + def test_listen_backlog_overflow(self): + # Issue 15989 + import _testcapi srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.bind((HOST, 0)) - # backlog = 0 - srv.listen(0) + self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) srv.close() @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.') @@ -773,10 +792,10 @@ class BasicTCPTest(SocketConnectedTest): big_chunk = 'f' * 2048 self.serv_conn.sendall(big_chunk) + @unittest.skipUnless(hasattr(socket, 'fromfd'), + 'socket.fromfd not availble') def testFromFd(self): # Testing fromfd() - if not hasattr(socket, "fromfd"): - return # On Windows, this doesn't exist fd = self.cli_conn.fileno() sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) self.addCleanup(sock.close) @@ -809,6 +828,19 @@ class BasicTCPTest(SocketConnectedTest): self.serv_conn.send(MSG) self.serv_conn.shutdown(2) + testShutdown_overflow = test_support.cpython_only(testShutdown) + + @test_support.cpython_only + def _testShutdown_overflow(self): + import _testcapi + self.serv_conn.send(MSG) + # Issue 15989 + self.assertRaises(OverflowError, self.serv_conn.shutdown, + _testcapi.INT_MAX + 1) + self.assertRaises(OverflowError, self.serv_conn.shutdown, + 2 + (_testcapi.UINT_MAX + 1)) + self.serv_conn.shutdown(2) + @unittest.skipUnless(thread, 'Threading required for this test.') class BasicUDPTest(ThreadedUDPSocketTest): @@ -854,6 +886,8 @@ class TCPCloserTest(ThreadedTCPSocketTest): self.cli.connect((HOST, self.port)) time.sleep(1.0) +@unittest.skipUnless(hasattr(socket, 'socketpair'), + 'test needs socket.socketpair()') @unittest.skipUnless(thread, 'Threading required for this test.') class BasicSocketPairTest(SocketPairTest): @@ -882,7 +916,10 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): def testSetBlocking(self): # Testing whether set blocking works - self.serv.setblocking(0) + self.serv.setblocking(True) + self.assertIsNone(self.serv.gettimeout()) + self.serv.setblocking(False) + self.assertEqual(self.serv.gettimeout(), 0.0) start = time.time() try: self.serv.accept() @@ -894,6 +931,19 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): def _testSetBlocking(self): pass + @test_support.cpython_only + def testSetBlocking_overflow(self): + # Issue 15989 + import _testcapi + if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: + self.skipTest('needs UINT_MAX < ULONG_MAX') + self.serv.setblocking(False) + self.assertEqual(self.serv.gettimeout(), 0.0) + self.serv.setblocking(_testcapi.UINT_MAX + 1) + self.assertIsNone(self.serv.gettimeout()) + + _testSetBlocking_overflow = test_support.cpython_only(_testSetBlocking) + def testAccept(self): # Testing non-blocking accept self.serv.setblocking(0) @@ -961,8 +1011,8 @@ class FileObjectClassTestCase(SocketConnectedTest): def tearDown(self): self.serv_file.close() self.assertTrue(self.serv_file.closed) - self.serv_file = None SocketConnectedTest.tearDown(self) + self.serv_file = None def clientSetUp(self): SocketConnectedTest.clientSetUp(self) @@ -1150,6 +1200,64 @@ class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): bufsize = 1 # Default-buffered for reading; line-buffered for writing + class SocketMemo(object): + """A wrapper to keep track of sent data, needed to examine write behaviour""" + def __init__(self, sock): + self._sock = sock + self.sent = [] + + def send(self, data, flags=0): + n = self._sock.send(data, flags) + self.sent.append(data[:n]) + return n + + def sendall(self, data, flags=0): + self._sock.sendall(data, flags) + self.sent.append(data) + + def __getattr__(self, attr): + return getattr(self._sock, attr) + + def getsent(self): + return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent] + + def setUp(self): + FileObjectClassTestCase.setUp(self) + self.serv_file._sock = self.SocketMemo(self.serv_file._sock) + + def testLinebufferedWrite(self): + # Write two lines, in small chunks + msg = MSG.strip() + print >> self.serv_file, msg, + print >> self.serv_file, msg + + # second line: + print >> self.serv_file, msg, + print >> self.serv_file, msg, + print >> self.serv_file, msg + + # third line + print >> self.serv_file, '' + + self.serv_file.flush() + + msg1 = "%s %s\n"%(msg, msg) + msg2 = "%s %s %s\n"%(msg, msg, msg) + msg3 = "\n" + self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3]) + + def _testLinebufferedWrite(self): + msg = MSG.strip() + msg1 = "%s %s\n"%(msg, msg) + msg2 = "%s %s %s\n"%(msg, msg, msg) + msg3 = "\n" + l1 = self.cli_file.readline() + self.assertEqual(l1, msg1) + l2 = self.cli_file.readline() + self.assertEqual(l2, msg2) + l3 = self.cli_file.readline() + self.assertEqual(l3, msg3) + class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): @@ -1197,7 +1305,26 @@ class NetworkConnectionNoServer(unittest.TestCase): port = test_support.find_unused_port() with self.assertRaises(socket.error) as cm: socket.create_connection((HOST, port)) - self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) + + # Issue #16257: create_connection() calls getaddrinfo() against + # 'localhost'. This may result in an IPV6 addr being returned + # as well as an IPV4 one: + # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) + # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), + # (26, 2, 0, '', ('::1', 41230, 0, 0))] + # + # create_connection() enumerates through all the addresses returned + # and if it doesn't successfully bind to any of them, it propagates + # the last exception it encountered. + # + # On Solaris, ENETUNREACH is returned in this circumstance instead + # of ECONNREFUSED. So, if that errno exists, add it to our list of + # expected errnos. + expected_errnos = [ errno.ECONNREFUSED, ] + if hasattr(errno, 'ENETUNREACH'): + expected_errnos.append(errno.ENETUNREACH) + + self.assertIn(cm.exception.errno, expected_errnos) def test_create_connection_timeout(self): # Issue #9792: create_connection() should not recast timeout errors @@ -1355,12 +1482,12 @@ class TCPTimeoutTest(SocketTCPTest): if not ok: self.fail("accept() returned success when we did not expect it") + @unittest.skipUnless(hasattr(signal, 'alarm'), + 'test needs signal.alarm()') def testInterruptedTimeout(self): # XXX I don't know how to do this test on MSWindows or any other # plaform that doesn't support signal.alarm() or os.kill(), though # the bug should have existed on all platforms. - if not hasattr(signal, "alarm"): - return # can only test on *nix self.serv.settimeout(5.0) # must be longer than alarm class Alarm(Exception): pass @@ -1420,6 +1547,7 @@ class TestExceptions(unittest.TestCase): self.assertTrue(issubclass(socket.gaierror, socket.error)) self.assertTrue(issubclass(socket.timeout, socket.error)) +@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') class TestLinuxAbstractNamespace(unittest.TestCase): UNIX_PATH_MAX = 108 @@ -1515,6 +1643,23 @@ class BufferIOTest(SocketConnectedTest): _testRecvFromIntoMemoryview = _testRecvFromIntoArray + def testRecvFromIntoSmallBuffer(self): + # See issue #20246. + buf = bytearray(8) + self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) + + def _testRecvFromIntoSmallBuffer(self): + with test_support.check_py3k_warnings(): + buf = buffer(MSG) + self.serv_conn.send(buf) + + def testRecvFromIntoEmptyBuffer(self): + buf = bytearray() + self.cli_conn.recvfrom_into(buf) + self.cli_conn.recvfrom_into(buf, 0) + + _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray + TIPC_STYPE = 2000 TIPC_LOWER = 200 @@ -1534,11 +1679,11 @@ def isTipcAvailable(): for line in f: if line.startswith("tipc "): return True - if test_support.verbose: - print "TIPC module is not loaded, please 'sudo modprobe tipc'" return False -class TIPCTest (unittest.TestCase): +@unittest.skipUnless(isTipcAvailable(), + "TIPC module is not loaded, please 'sudo modprobe tipc'") +class TIPCTest(unittest.TestCase): def testRDM(self): srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) @@ -1558,7 +1703,9 @@ class TIPCTest (unittest.TestCase): self.assertEqual(msg, MSG) -class TIPCThreadableTest (unittest.TestCase, ThreadableTest): +@unittest.skipUnless(isTipcAvailable(), + "TIPC module is not loaded, please 'sudo modprobe tipc'") +class TIPCThreadableTest(unittest.TestCase, ThreadableTest): def __init__(self, methodName = 'runTest'): unittest.TestCase.__init__(self, methodName = methodName) ThreadableTest.__init__(self) @@ -1611,13 +1758,9 @@ def test_main(): NetworkConnectionAttributesTest, NetworkConnectionBehaviourTest, ]) - if hasattr(socket, "socketpair"): - tests.append(BasicSocketPairTest) - if sys.platform == 'linux2': - tests.append(TestLinuxAbstractNamespace) - if isTipcAvailable(): - tests.append(TIPCTest) - tests.append(TIPCThreadableTest) + tests.append(BasicSocketPairTest) + tests.append(TestLinuxAbstractNamespace) + tests.extend([TIPCTest, TIPCThreadableTest]) thread_info = test_support.threading_setup() test_support.run_unittest(*tests) |