summaryrefslogtreecommitdiff
path: root/Lib/test/test_socket.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_socket.py')
-rw-r--r--Lib/test/test_socket.py247
1 files changed, 195 insertions, 52 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 13d0eb4..5d5005b 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -1,5 +1,3 @@
-#!/usr/bin/env python
-
import unittest
from test import test_support
@@ -330,28 +328,29 @@ class GeneralModuleTests(unittest.TestCase):
ip = socket.gethostbyname(hostname)
except socket.error:
# Probably name lookup wasn't set up right; skip this test
- return
+ self.skipTest('name lookup failure')
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
except socket.error:
# Probably a similar problem as above; skip this test
- return
+ self.skipTest('address lookup failure')
all_host_names = [hostname, hname] + aliases
fqhn = socket.getfqdn(ip)
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
+ @unittest.skipUnless(hasattr(sys, 'getrefcount'),
+ 'test needs sys.getrefcount()')
def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo
- if hasattr(sys, "getrefcount"):
- try:
- # On some versions, this loses a reference
- orig = sys.getrefcount(__name__)
- socket.getnameinfo(__name__,0)
- except TypeError:
- self.assertEqual(sys.getrefcount(__name__), orig,
- "socket.getnameinfo loses a reference")
+ try:
+ # On some versions, this loses a reference
+ orig = sys.getrefcount(__name__)
+ socket.getnameinfo(__name__,0)
+ except TypeError:
+ self.assertEqual(sys.getrefcount(__name__), orig,
+ "socket.getnameinfo loses a reference")
def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter
@@ -414,7 +413,7 @@ class GeneralModuleTests(unittest.TestCase):
# Try same call with optional protocol omitted
port2 = socket.getservbyname(service)
eq(port, port2)
- # Try udp, but don't barf it it doesn't exist
+ # Try udp, but don't barf if it doesn't exist
try:
udpport = socket.getservbyname(service, 'udp')
except socket.error:
@@ -458,17 +457,17 @@ class GeneralModuleTests(unittest.TestCase):
# Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
+ @unittest.skipUnless(hasattr(socket, 'inet_aton'),
+ 'test needs socket.inet_aton()')
def testIPv4_inet_aton_fourbytes(self):
- if not hasattr(socket, 'inet_aton'):
- return # No inet_aton, nothing to check
# Test that issue1008086 and issue767150 are fixed.
# It must return 4 bytes.
self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
+ @unittest.skipUnless(hasattr(socket, 'inet_pton'),
+ 'test needs socket.inet_pton()')
def testIPv4toString(self):
- if not hasattr(socket, 'inet_pton'):
- return # No inet_pton() on this platform
from socket import inet_aton as f, inet_pton, AF_INET
g = lambda a: inet_pton(AF_INET, a)
@@ -483,15 +482,15 @@ class GeneralModuleTests(unittest.TestCase):
self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
+ @unittest.skipUnless(hasattr(socket, 'inet_pton'),
+ 'test needs socket.inet_pton()')
def testIPv6toString(self):
- if not hasattr(socket, 'inet_pton'):
- return # No inet_pton() on this platform
try:
from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6:
- return
+ self.skipTest('IPv6 not available')
except ImportError:
- return
+ self.skipTest('could not import needed symbols from socket')
f = lambda a: inet_pton(AF_INET6, a)
self.assertEqual('\x00' * 16, f('::'))
@@ -502,9 +501,9 @@ class GeneralModuleTests(unittest.TestCase):
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
)
+ @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
+ 'test needs socket.inet_ntop()')
def testStringToIPv4(self):
- if not hasattr(socket, 'inet_ntop'):
- return # No inet_ntop() on this platform
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
@@ -517,15 +516,15 @@ class GeneralModuleTests(unittest.TestCase):
self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
+ @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
+ 'test needs socket.inet_ntop()')
def testStringToIPv6(self):
- if not hasattr(socket, 'inet_ntop'):
- return # No inet_ntop() on this platform
try:
from socket import inet_ntop, AF_INET6, has_ipv6
if not has_ipv6:
- return
+ self.skipTest('IPv6 not available')
except ImportError:
- return
+ self.skipTest('could not import needed symbols from socket')
f = lambda a: inet_ntop(AF_INET6, a)
self.assertEqual('::', f('\x00' * 16))
@@ -565,7 +564,7 @@ class GeneralModuleTests(unittest.TestCase):
my_ip_addr = socket.gethostbyname(socket.gethostname())
except socket.error:
# Probably name lookup wasn't set up right; skip this test
- return
+ self.skipTest('name lookup failure')
self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
self.assertEqual(name[1], port)
@@ -644,9 +643,10 @@ class GeneralModuleTests(unittest.TestCase):
if SUPPORTS_IPV6:
socket.getaddrinfo('::1', 80)
# port can be a string service name such as "http", a numeric
- # port number or None
+ # port number (int or long), or None
socket.getaddrinfo(HOST, "http")
socket.getaddrinfo(HOST, 80)
+ socket.getaddrinfo(HOST, 80L)
socket.getaddrinfo(HOST, None)
# test family and socktype filters
infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
@@ -663,6 +663,15 @@ class GeneralModuleTests(unittest.TestCase):
socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
socket.AI_PASSIVE)
+ # Issue 17269: test workaround for OS X platform bug segfault
+ if hasattr(socket, 'AI_NUMERICSERV'):
+ try:
+ # The arguments here are undefined and the call may succeed
+ # or fail. All we care here is that it doesn't segfault.
+ socket.getaddrinfo("localhost", None, 0, 0, 0,
+ socket.AI_NUMERICSERV)
+ except socket.gaierror:
+ pass
def check_sendall_interrupted(self, with_timeout):
# socketpair() is not stricly required, but it makes things easier.
@@ -683,11 +692,12 @@ class GeneralModuleTests(unittest.TestCase):
c.settimeout(1.5)
with self.assertRaises(ZeroDivisionError):
signal.alarm(1)
- c.sendall(b"x" * (1024**2))
+ c.sendall(b"x" * test_support.SOCK_MAX_SIZE)
if with_timeout:
signal.signal(signal.SIGALRM, ok_handler)
signal.alarm(1)
- self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2))
+ self.assertRaises(socket.timeout, c.sendall,
+ b"x" * test_support.SOCK_MAX_SIZE)
finally:
signal.signal(signal.SIGALRM, old_alarm)
c.close()
@@ -699,11 +709,20 @@ class GeneralModuleTests(unittest.TestCase):
def test_sendall_interrupted_with_timeout(self):
self.check_sendall_interrupted(True)
- def testListenBacklog0(self):
+ def test_listen_backlog(self):
+ for backlog in 0, -1:
+ srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ srv.bind((HOST, 0))
+ srv.listen(backlog)
+ srv.close()
+
+ @test_support.cpython_only
+ def test_listen_backlog_overflow(self):
+ # Issue 15989
+ import _testcapi
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.bind((HOST, 0))
- # backlog = 0
- srv.listen(0)
+ self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
srv.close()
@unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.')
@@ -773,10 +792,10 @@ class BasicTCPTest(SocketConnectedTest):
big_chunk = 'f' * 2048
self.serv_conn.sendall(big_chunk)
+ @unittest.skipUnless(hasattr(socket, 'fromfd'),
+ 'socket.fromfd not availble')
def testFromFd(self):
# Testing fromfd()
- if not hasattr(socket, "fromfd"):
- return # On Windows, this doesn't exist
fd = self.cli_conn.fileno()
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close)
@@ -809,6 +828,19 @@ class BasicTCPTest(SocketConnectedTest):
self.serv_conn.send(MSG)
self.serv_conn.shutdown(2)
+ testShutdown_overflow = test_support.cpython_only(testShutdown)
+
+ @test_support.cpython_only
+ def _testShutdown_overflow(self):
+ import _testcapi
+ self.serv_conn.send(MSG)
+ # Issue 15989
+ self.assertRaises(OverflowError, self.serv_conn.shutdown,
+ _testcapi.INT_MAX + 1)
+ self.assertRaises(OverflowError, self.serv_conn.shutdown,
+ 2 + (_testcapi.UINT_MAX + 1))
+ self.serv_conn.shutdown(2)
+
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicUDPTest(ThreadedUDPSocketTest):
@@ -854,6 +886,8 @@ class TCPCloserTest(ThreadedTCPSocketTest):
self.cli.connect((HOST, self.port))
time.sleep(1.0)
+@unittest.skipUnless(hasattr(socket, 'socketpair'),
+ 'test needs socket.socketpair()')
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicSocketPairTest(SocketPairTest):
@@ -882,7 +916,10 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def testSetBlocking(self):
# Testing whether set blocking works
- self.serv.setblocking(0)
+ self.serv.setblocking(True)
+ self.assertIsNone(self.serv.gettimeout())
+ self.serv.setblocking(False)
+ self.assertEqual(self.serv.gettimeout(), 0.0)
start = time.time()
try:
self.serv.accept()
@@ -894,6 +931,19 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testSetBlocking(self):
pass
+ @test_support.cpython_only
+ def testSetBlocking_overflow(self):
+ # Issue 15989
+ import _testcapi
+ if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
+ self.skipTest('needs UINT_MAX < ULONG_MAX')
+ self.serv.setblocking(False)
+ self.assertEqual(self.serv.gettimeout(), 0.0)
+ self.serv.setblocking(_testcapi.UINT_MAX + 1)
+ self.assertIsNone(self.serv.gettimeout())
+
+ _testSetBlocking_overflow = test_support.cpython_only(_testSetBlocking)
+
def testAccept(self):
# Testing non-blocking accept
self.serv.setblocking(0)
@@ -961,8 +1011,8 @@ class FileObjectClassTestCase(SocketConnectedTest):
def tearDown(self):
self.serv_file.close()
self.assertTrue(self.serv_file.closed)
- self.serv_file = None
SocketConnectedTest.tearDown(self)
+ self.serv_file = None
def clientSetUp(self):
SocketConnectedTest.clientSetUp(self)
@@ -1150,6 +1200,64 @@ class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
bufsize = 1 # Default-buffered for reading; line-buffered for writing
+ class SocketMemo(object):
+ """A wrapper to keep track of sent data, needed to examine write behaviour"""
+ def __init__(self, sock):
+ self._sock = sock
+ self.sent = []
+
+ def send(self, data, flags=0):
+ n = self._sock.send(data, flags)
+ self.sent.append(data[:n])
+ return n
+
+ def sendall(self, data, flags=0):
+ self._sock.sendall(data, flags)
+ self.sent.append(data)
+
+ def __getattr__(self, attr):
+ return getattr(self._sock, attr)
+
+ def getsent(self):
+ return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent]
+
+ def setUp(self):
+ FileObjectClassTestCase.setUp(self)
+ self.serv_file._sock = self.SocketMemo(self.serv_file._sock)
+
+ def testLinebufferedWrite(self):
+ # Write two lines, in small chunks
+ msg = MSG.strip()
+ print >> self.serv_file, msg,
+ print >> self.serv_file, msg
+
+ # second line:
+ print >> self.serv_file, msg,
+ print >> self.serv_file, msg,
+ print >> self.serv_file, msg
+
+ # third line
+ print >> self.serv_file, ''
+
+ self.serv_file.flush()
+
+ msg1 = "%s %s\n"%(msg, msg)
+ msg2 = "%s %s %s\n"%(msg, msg, msg)
+ msg3 = "\n"
+ self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3])
+
+ def _testLinebufferedWrite(self):
+ msg = MSG.strip()
+ msg1 = "%s %s\n"%(msg, msg)
+ msg2 = "%s %s %s\n"%(msg, msg, msg)
+ msg3 = "\n"
+ l1 = self.cli_file.readline()
+ self.assertEqual(l1, msg1)
+ l2 = self.cli_file.readline()
+ self.assertEqual(l2, msg2)
+ l3 = self.cli_file.readline()
+ self.assertEqual(l3, msg3)
+
class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
@@ -1197,7 +1305,26 @@ class NetworkConnectionNoServer(unittest.TestCase):
port = test_support.find_unused_port()
with self.assertRaises(socket.error) as cm:
socket.create_connection((HOST, port))
- self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
+
+ # Issue #16257: create_connection() calls getaddrinfo() against
+ # 'localhost'. This may result in an IPV6 addr being returned
+ # as well as an IPV4 one:
+ # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
+ # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)),
+ # (26, 2, 0, '', ('::1', 41230, 0, 0))]
+ #
+ # create_connection() enumerates through all the addresses returned
+ # and if it doesn't successfully bind to any of them, it propagates
+ # the last exception it encountered.
+ #
+ # On Solaris, ENETUNREACH is returned in this circumstance instead
+ # of ECONNREFUSED. So, if that errno exists, add it to our list of
+ # expected errnos.
+ expected_errnos = [ errno.ECONNREFUSED, ]
+ if hasattr(errno, 'ENETUNREACH'):
+ expected_errnos.append(errno.ENETUNREACH)
+
+ self.assertIn(cm.exception.errno, expected_errnos)
def test_create_connection_timeout(self):
# Issue #9792: create_connection() should not recast timeout errors
@@ -1355,12 +1482,12 @@ class TCPTimeoutTest(SocketTCPTest):
if not ok:
self.fail("accept() returned success when we did not expect it")
+ @unittest.skipUnless(hasattr(signal, 'alarm'),
+ 'test needs signal.alarm()')
def testInterruptedTimeout(self):
# XXX I don't know how to do this test on MSWindows or any other
# plaform that doesn't support signal.alarm() or os.kill(), though
# the bug should have existed on all platforms.
- if not hasattr(signal, "alarm"):
- return # can only test on *nix
self.serv.settimeout(5.0) # must be longer than alarm
class Alarm(Exception):
pass
@@ -1420,6 +1547,7 @@ class TestExceptions(unittest.TestCase):
self.assertTrue(issubclass(socket.gaierror, socket.error))
self.assertTrue(issubclass(socket.timeout, socket.error))
+@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
class TestLinuxAbstractNamespace(unittest.TestCase):
UNIX_PATH_MAX = 108
@@ -1515,6 +1643,23 @@ class BufferIOTest(SocketConnectedTest):
_testRecvFromIntoMemoryview = _testRecvFromIntoArray
+ def testRecvFromIntoSmallBuffer(self):
+ # See issue #20246.
+ buf = bytearray(8)
+ self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
+
+ def _testRecvFromIntoSmallBuffer(self):
+ with test_support.check_py3k_warnings():
+ buf = buffer(MSG)
+ self.serv_conn.send(buf)
+
+ def testRecvFromIntoEmptyBuffer(self):
+ buf = bytearray()
+ self.cli_conn.recvfrom_into(buf)
+ self.cli_conn.recvfrom_into(buf, 0)
+
+ _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
+
TIPC_STYPE = 2000
TIPC_LOWER = 200
@@ -1534,11 +1679,11 @@ def isTipcAvailable():
for line in f:
if line.startswith("tipc "):
return True
- if test_support.verbose:
- print "TIPC module is not loaded, please 'sudo modprobe tipc'"
return False
-class TIPCTest (unittest.TestCase):
+@unittest.skipUnless(isTipcAvailable(),
+ "TIPC module is not loaded, please 'sudo modprobe tipc'")
+class TIPCTest(unittest.TestCase):
def testRDM(self):
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
@@ -1558,7 +1703,9 @@ class TIPCTest (unittest.TestCase):
self.assertEqual(msg, MSG)
-class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
+@unittest.skipUnless(isTipcAvailable(),
+ "TIPC module is not loaded, please 'sudo modprobe tipc'")
+class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName = 'runTest'):
unittest.TestCase.__init__(self, methodName = methodName)
ThreadableTest.__init__(self)
@@ -1611,13 +1758,9 @@ def test_main():
NetworkConnectionAttributesTest,
NetworkConnectionBehaviourTest,
])
- if hasattr(socket, "socketpair"):
- tests.append(BasicSocketPairTest)
- if sys.platform == 'linux2':
- tests.append(TestLinuxAbstractNamespace)
- if isTipcAvailable():
- tests.append(TIPCTest)
- tests.append(TIPCThreadableTest)
+ tests.append(BasicSocketPairTest)
+ tests.append(TestLinuxAbstractNamespace)
+ tests.extend([TIPCTest, TIPCThreadableTest])
thread_info = test_support.threading_setup()
test_support.run_unittest(*tests)