Test suite for poll() interface (SF patch #100852)
This commit is contained in:
parent
cf96dc800e
commit
3227cc8c09
43
Lib/test/output/test_poll
Normal file
43
Lib/test/output/test_poll
Normal file
@ -0,0 +1,43 @@
|
||||
test_poll
|
||||
Running poll test 1
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
This is a test.
|
||||
Poll test 1 complete
|
||||
Running poll test 2
|
||||
timeout = 0
|
||||
timeout = 1000
|
||||
'testing...\012'
|
||||
timeout = 2000
|
||||
'testing...\012'
|
||||
timeout = 4000
|
||||
'testing...\012'
|
||||
timeout = 8000
|
||||
'testing...\012'
|
||||
timeout = 16000
|
||||
'testing...\012'
|
||||
timeout = -1
|
||||
'testing...\012'
|
||||
timeout = -1
|
||||
'testing...\012'
|
||||
timeout = -1
|
||||
'testing...\012'
|
||||
timeout = -1
|
||||
'testing...\012'
|
||||
timeout = -1
|
||||
'testing...\012'
|
||||
timeout = -1
|
||||
timeout = -1
|
||||
timeout = -1
|
||||
timeout = -1
|
||||
timeout = -1
|
||||
Poll test 2 complete
|
171
Lib/test/test_poll.py
Normal file
171
Lib/test/test_poll.py
Normal file
@ -0,0 +1,171 @@
|
||||
|
||||
# Test case for the os.poll() function
|
||||
|
||||
import sys, os, select, random
|
||||
from test.test_support import verbose, TestSkipped, TESTFN
|
||||
|
||||
try:
|
||||
select.poll
|
||||
except AttributeError:
|
||||
raise TestSkipped, "select.poll not defined -- skipping test_poll"
|
||||
|
||||
|
||||
def find_ready_matching(ready, flag):
|
||||
match = []
|
||||
for fd, mode in ready:
|
||||
if mode & flag:
|
||||
match.append(fd)
|
||||
return match
|
||||
|
||||
def test_poll1():
|
||||
"""Basic functional test of poll object
|
||||
|
||||
Create a bunch of pipe and test that poll works with them.
|
||||
"""
|
||||
print 'Running poll test 1'
|
||||
p = select.poll()
|
||||
|
||||
NUM_PIPES = 12
|
||||
MSG = " This is a test."
|
||||
MSG_LEN = len(MSG)
|
||||
readers = []
|
||||
writers = []
|
||||
r2w = {}
|
||||
w2r = {}
|
||||
|
||||
for i in range(NUM_PIPES):
|
||||
rd, wr = os.pipe()
|
||||
p.register(rd, select.POLLIN)
|
||||
p.register(wr, select.POLLOUT)
|
||||
readers.append(rd)
|
||||
writers.append(wr)
|
||||
r2w[rd] = wr
|
||||
w2r[wr] = rd
|
||||
|
||||
while writers:
|
||||
ready = p.poll()
|
||||
ready_writers = find_ready_matching(ready, select.POLLOUT)
|
||||
if not ready_writers:
|
||||
raise RuntimeError, "no pipes ready for writing"
|
||||
wr = random.choice(ready_writers)
|
||||
os.write(wr, MSG)
|
||||
|
||||
ready = p.poll()
|
||||
ready_readers = find_ready_matching(ready, select.POLLIN)
|
||||
if not ready_readers:
|
||||
raise RuntimeError, "no pipes ready for reading"
|
||||
rd = random.choice(ready_readers)
|
||||
buf = os.read(rd, MSG_LEN)
|
||||
assert len(buf) == MSG_LEN
|
||||
print buf
|
||||
os.close(r2w[rd])
|
||||
writers.remove(r2w[rd])
|
||||
|
||||
poll_unit_tests()
|
||||
print 'Poll test 1 complete'
|
||||
|
||||
def poll_unit_tests():
|
||||
# returns NVAL for invalid file descriptor
|
||||
FD = 42
|
||||
try:
|
||||
os.close(FD)
|
||||
except OSError:
|
||||
pass
|
||||
p = select.poll()
|
||||
p.register(FD)
|
||||
r = p.poll()
|
||||
assert r[0] == (FD, select.POLLNVAL)
|
||||
|
||||
f = open(TESTFN, 'w')
|
||||
fd = f.fileno()
|
||||
p = select.poll()
|
||||
p.register(f)
|
||||
r = p.poll()
|
||||
assert r[0][0] == fd
|
||||
f.close()
|
||||
r = p.poll()
|
||||
assert r[0] == (fd, select.POLLNVAL)
|
||||
os.unlink(TESTFN)
|
||||
|
||||
# type error for invalid arguments
|
||||
p = select.poll()
|
||||
try:
|
||||
p.register(p)
|
||||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
print "Bogus register call did not raise TypeError"
|
||||
try:
|
||||
p.unregister(p)
|
||||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
print "Bogus unregister call did not raise TypeError"
|
||||
|
||||
# can't unregister non-existent object
|
||||
p = select.poll()
|
||||
try:
|
||||
p.unregister(3)
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
print "Bogus unregister call did not raise KeyError"
|
||||
|
||||
# Test error cases
|
||||
pollster = select.poll()
|
||||
class Nope:
|
||||
pass
|
||||
|
||||
class Almost:
|
||||
def fileno(self):
|
||||
return 'fileno'
|
||||
|
||||
try:
|
||||
pollster.register( Nope(), 0 )
|
||||
except TypeError: pass
|
||||
else: print 'expected TypeError exception, not raised'
|
||||
|
||||
try:
|
||||
pollster.register( Almost(), 0 )
|
||||
except TypeError: pass
|
||||
else: print 'expected TypeError exception, not raised'
|
||||
|
||||
|
||||
# Another test case for poll(). This is copied from the test case for
|
||||
# select(), modified to use poll() instead.
|
||||
|
||||
def test_poll2():
|
||||
print 'Running poll test 2'
|
||||
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
|
||||
p = os.popen(cmd, 'r')
|
||||
pollster = select.poll()
|
||||
pollster.register( p, select.POLLIN )
|
||||
for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
|
||||
if verbose:
|
||||
print 'timeout =', tout
|
||||
fdlist = pollster.poll(tout)
|
||||
if (fdlist == []):
|
||||
continue
|
||||
if fdlist[0] == (p.fileno(),select.POLLHUP):
|
||||
line = p.readline()
|
||||
if line != "":
|
||||
print 'error: pipe seems to be closed, but still returns data'
|
||||
continue
|
||||
|
||||
elif fdlist[0] == (p.fileno(),select.POLLIN):
|
||||
line = p.readline()
|
||||
if verbose:
|
||||
print `line`
|
||||
if not line:
|
||||
if verbose:
|
||||
print 'EOF'
|
||||
break
|
||||
continue
|
||||
else:
|
||||
print 'Unexpected return value from select.poll:', fdlist
|
||||
p.close()
|
||||
print 'Poll test 2 complete'
|
||||
|
||||
test_poll1()
|
||||
test_poll2()
|
||||
|
Loading…
x
Reference in New Issue
Block a user