gh-132775: Use _PyObject_GetXIData (With Fallback) (gh-134440)

This change includes some semi-related refactoring of queues and channels.
This commit is contained in:
Eric Snow 2025-05-22 06:50:06 -06:00 committed by GitHub
parent d706eb9e0f
commit d0eedfa10e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 485 additions and 437 deletions

View File

@ -36,9 +36,6 @@ Uncaught in the interpreter:
""".strip())
UNBOUND = 2 # error; this should not happen.
class WorkerContext(_thread.WorkerContext):
@classmethod
@ -47,23 +44,13 @@ class WorkerContext(_thread.WorkerContext):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
if args or kwargs:
raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
data = textwrap.dedent(fn)
kind = 'script'
# Make sure the script compiles.
# Ideally we wouldn't throw away the resulting code
# object. However, there isn't much to be done until
# code objects are shareable and/or we do a better job
# of supporting code objects in _interpreters.exec().
compile(data, '<string>', 'exec')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
data = pickle.dumps((fn, args, kwargs))
kind = 'function'
return (data, kind)
task = (fn, args, kwargs)
data = pickle.dumps(task)
return data
if initializer is not None:
try:
@ -86,24 +73,20 @@ class WorkerContext(_thread.WorkerContext):
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
err = pickle.dumps(exc)
_interpqueues.put(resultsid, (None, err), 1, UNBOUND)
_interpqueues.put(resultsid, (None, exc))
raise # re-raise
@classmethod
def _send_script_result(cls, resultsid):
_interpqueues.put(resultsid, (None, None), 0, UNBOUND)
_interpqueues.put(resultsid, (None, None))
@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
try:
_interpqueues.put(resultsid, (res, None), 0, UNBOUND)
except _interpreters.NotShareableError:
res = pickle.dumps(res)
_interpqueues.put(resultsid, (res, None), 1, UNBOUND)
with cls._capture_exc(resultsid):
_interpqueues.put(resultsid, (res, None))
@classmethod
def _call_pickled(cls, pickled, resultsid):
@ -134,8 +117,7 @@ class WorkerContext(_thread.WorkerContext):
_interpreters.incref(self.interpid)
maxsize = 0
fmt = 0
self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
self.resultsid = _interpqueues.create(maxsize)
self._exec(f'from {__name__} import WorkerContext')
@ -166,17 +148,8 @@ class WorkerContext(_thread.WorkerContext):
pass
def run(self, task):
data, kind = task
if kind == 'script':
raise NotImplementedError('script kind disabled')
script = f"""
with WorkerContext._capture_exc({self.resultsid}):
{textwrap.indent(data, ' ')}
WorkerContext._send_script_result({self.resultsid})"""
elif kind == 'function':
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
else:
raise NotImplementedError(kind)
data = task
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
try:
self._exec(script)
@ -199,15 +172,13 @@ WorkerContext._send_script_result({self.resultsid})"""
continue
else:
break
(res, excdata), pickled, unboundop = obj
(res, exc), unboundop = obj
assert unboundop is None, unboundop
if excdata is not None:
if exc is not None:
assert res is None, res
assert pickled
assert exc_wrapper is not None
exc = pickle.loads(excdata)
raise exc from exc_wrapper
return pickle.loads(res) if pickled else res
return res
class BrokenInterpreterPool(_thread.BrokenThreadPool):

View File

@ -55,15 +55,23 @@ def create(*, unbounditems=UNBOUND):
"""
unbound = _serialize_unbound(unbounditems)
unboundop, = unbound
cid = _channels.create(unboundop)
recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
cid = _channels.create(unboundop, -1)
recv, send = RecvChannel(cid), SendChannel(cid)
send._set_unbound(unboundop, unbounditems)
return recv, send
def list_all():
"""Return a list of (recv, send) for all open channels."""
return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
for cid, unbound in _channels.list_all()]
channels = []
for cid, unboundop, _ in _channels.list_all():
chan = _, send = RecvChannel(cid), SendChannel(cid)
if not hasattr(send, '_unboundop'):
send._set_unbound(unboundop)
else:
assert send._unbound[0] == op
channels.append(chan)
return channels
class _ChannelEnd:
@ -175,16 +183,33 @@ class SendChannel(_ChannelEnd):
_end = 'send'
def __new__(cls, cid, *, _unbound=None):
if _unbound is None:
try:
op = _channels.get_channel_defaults(cid)
_unbound = (op,)
except ChannelNotFoundError:
_unbound = _serialize_unbound(UNBOUND)
self = super().__new__(cls, cid)
self._unbound = _unbound
return self
# def __new__(cls, cid, *, _unbound=None):
# if _unbound is None:
# try:
# op = _channels.get_channel_defaults(cid)
# _unbound = (op,)
# except ChannelNotFoundError:
# _unbound = _serialize_unbound(UNBOUND)
# self = super().__new__(cls, cid)
# self._unbound = _unbound
# return self
def _set_unbound(self, op, items=None):
assert not hasattr(self, '_unbound')
if items is None:
items = _resolve_unbound(op)
unbound = (op, items)
self._unbound = unbound
return unbound
@property
def unbounditems(self):
try:
_, items = self._unbound
except AttributeError:
op, _ = _channels.get_queue_defaults(self._id)
_, items = self._set_unbound(op)
return items
@property
def is_closed(self):
@ -192,61 +217,61 @@ class SendChannel(_ChannelEnd):
return info.closed or info.closing
def send(self, obj, timeout=None, *,
unbound=None,
unbounditems=None,
):
"""Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received.
"""
if unbound is None:
unboundop, = self._unbound
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbound)
unboundop, = _serialize_unbound(unbounditems)
_channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)
def send_nowait(self, obj, *,
unbound=None,
unbounditems=None,
):
"""Send the object to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
if unbound is None:
unboundop, = self._unbound
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbound)
unboundop, = _serialize_unbound(unbounditems)
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
return _channels.send(self._id, obj, unboundop, blocking=False)
def send_buffer(self, obj, timeout=None, *,
unbound=None,
unbounditems=None,
):
"""Send the object's buffer to the channel's receiving end.
This blocks until the object is received.
"""
if unbound is None:
unboundop, = self._unbound
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbound)
unboundop, = _serialize_unbound(unbounditems)
_channels.send_buffer(self._id, obj, unboundop,
timeout=timeout, blocking=True)
def send_buffer_nowait(self, obj, *,
unbound=None,
unbounditems=None,
):
"""Send the object's buffer to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
if unbound is None:
unboundop, = self._unbound
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbound)
unboundop, = _serialize_unbound(unbounditems)
return _channels.send_buffer(self._id, obj, unboundop, blocking=False)
def close(self):

View File

@ -63,29 +63,34 @@ def _resolve_unbound(flag):
return resolved
def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
def create(maxsize=0, *, unbounditems=UNBOUND):
"""Return a new cross-interpreter queue.
The queue may be used to pass data safely between interpreters.
"syncobj" sets the default for Queue.put()
and Queue.put_nowait().
"unbounditems" likewise sets the default. See Queue.put() for
"unbounditems" sets the default for Queue.put(); see that method for
supported values. The default value is UNBOUND, which replaces
the unbound item.
"""
fmt = _SHARED_ONLY if syncobj else _PICKLED
unbound = _serialize_unbound(unbounditems)
unboundop, = unbound
qid = _queues.create(maxsize, fmt, unboundop)
return Queue(qid, _fmt=fmt, _unbound=unbound)
qid = _queues.create(maxsize, unboundop, -1)
self = Queue(qid)
self._set_unbound(unboundop, unbounditems)
return self
def list_all():
"""Return a list of all open queues."""
return [Queue(qid, _fmt=fmt, _unbound=(unboundop,))
for qid, fmt, unboundop in _queues.list_all()]
queues = []
for qid, unboundop, _ in _queues.list_all():
self = Queue(qid)
if not hasattr(self, '_unbound'):
self._set_unbound(unboundop)
else:
assert self._unbound[0] == unboundop
queues.append(self)
return queues
_known_queues = weakref.WeakValueDictionary()
@ -93,28 +98,17 @@ _known_queues = weakref.WeakValueDictionary()
class Queue:
"""A cross-interpreter queue."""
def __new__(cls, id, /, *, _fmt=None, _unbound=None):
def __new__(cls, id, /):
# There is only one instance for any given ID.
if isinstance(id, int):
id = int(id)
else:
raise TypeError(f'id must be an int, got {id!r}')
if _fmt is None:
if _unbound is None:
_fmt, op = _queues.get_queue_defaults(id)
_unbound = (op,)
else:
_fmt, _ = _queues.get_queue_defaults(id)
elif _unbound is None:
_, op = _queues.get_queue_defaults(id)
_unbound = (op,)
try:
self = _known_queues[id]
except KeyError:
self = super().__new__(cls)
self._id = id
self._fmt = _fmt
self._unbound = _unbound
_known_queues[id] = self
_queues.bind(id)
return self
@ -143,10 +137,27 @@ class Queue:
def __getstate__(self):
return None
def _set_unbound(self, op, items=None):
assert not hasattr(self, '_unbound')
if items is None:
items = _resolve_unbound(op)
unbound = (op, items)
self._unbound = unbound
return unbound
@property
def id(self):
return self._id
@property
def unbounditems(self):
try:
_, items = self._unbound
except AttributeError:
op, _ = _queues.get_queue_defaults(self._id)
_, items = self._set_unbound(op)
return items
@property
def maxsize(self):
try:
@ -165,77 +176,56 @@ class Queue:
return _queues.get_count(self._id)
def put(self, obj, timeout=None, *,
syncobj=None,
unbound=None,
unbounditems=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.
This blocks while the queue is full.
If "syncobj" is None (the default) then it uses the
queue's default, set with create_queue().
For most objects, the object received through Queue.get() will
be a new one, equivalent to the original and not sharing any
actual underlying data. The notable exceptions include
cross-interpreter types (like Queue) and memoryview, where the
underlying data is actually shared. Furthermore, some types
can be sent through a queue more efficiently than others. This
group includes various immutable types like int, str, bytes, and
tuple (if the items are likewise efficiently shareable). See interpreters.is_shareable().
If "syncobj" is false then all objects are supported,
at the expense of worse performance.
If "syncobj" is true then the object must be "shareable".
Examples of "shareable" objects include the builtin singletons,
str, and memoryview. One benefit is that such objects are
passed through the queue efficiently.
The key difference, though, is conceptual: the corresponding
object returned from Queue.get() will be strictly equivalent
to the given obj. In other words, the two objects will be
effectively indistinguishable from each other, even if the
object is mutable. The received object may actually be the
same object, or a copy (immutable values only), or a proxy.
Regardless, the received object should be treated as though
the original has been shared directly, whether or not it
actually is. That's a slightly different and stronger promise
than just (initial) equality, which is all "syncobj=False"
can promise.
"unbound" controls the behavior of Queue.get() for the given
"unbounditems" controls the behavior of Queue.get() for the given
object if the current interpreter (calling put()) is later
destroyed.
If "unbound" is None (the default) then it uses the
If "unbounditems" is None (the default) then it uses the
queue's default, set with create_queue(),
which is usually UNBOUND.
If "unbound" is UNBOUND_ERROR then get() will raise an
If "unbounditems" is UNBOUND_ERROR then get() will raise an
ItemInterpreterDestroyed exception if the original interpreter
has been destroyed. This does not otherwise affect the queue;
the next call to put() will work like normal, returning the next
item in the queue.
If "unbound" is UNBOUND_REMOVE then the item will be removed
If "unbounditems" is UNBOUND_REMOVE then the item will be removed
from the queue as soon as the original interpreter is destroyed.
Be aware that this will introduce an imbalance between put()
and get() calls.
If "unbound" is UNBOUND then it is returned by get() in place
If "unbounditems" is UNBOUND then it is returned by get() in place
of the unbound item.
"""
if syncobj is None:
fmt = self._fmt
if unbounditems is None:
unboundop = -1
else:
fmt = _SHARED_ONLY if syncobj else _PICKLED
if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
unboundop, = _serialize_unbound(unbounditems)
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
if fmt is _PICKLED:
obj = pickle.dumps(obj)
while True:
try:
_queues.put(self._id, obj, fmt, unboundop)
_queues.put(self._id, obj, unboundop)
except QueueFull as exc:
if timeout is not None and time.time() >= end:
raise # re-raise
@ -243,18 +233,12 @@ class Queue:
else:
break
def put_nowait(self, obj, *, syncobj=None, unbound=None):
if syncobj is None:
fmt = self._fmt
def put_nowait(self, obj, *, unbounditems=None):
if unbounditems is None:
unboundop = -1
else:
fmt = _SHARED_ONLY if syncobj else _PICKLED
if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
if fmt is _PICKLED:
obj = pickle.dumps(obj)
_queues.put(self._id, obj, fmt, unboundop)
unboundop, = _serialize_unbound(unbounditems)
_queues.put(self._id, obj, unboundop)
def get(self, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
@ -265,7 +249,7 @@ class Queue:
If the next item's original interpreter has been destroyed
then the "next object" is determined by the value of the
"unbound" argument to put().
"unbounditems" argument to put().
"""
if timeout is not None:
timeout = int(timeout)
@ -274,7 +258,7 @@ class Queue:
end = time.time() + timeout
while True:
try:
obj, fmt, unboundop = _queues.get(self._id)
obj, unboundop = _queues.get(self._id)
except QueueEmpty as exc:
if timeout is not None and time.time() >= end:
raise # re-raise
@ -284,10 +268,6 @@ class Queue:
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
if fmt == _PICKLED:
obj = pickle.loads(obj)
else:
assert fmt == _SHARED_ONLY
return obj
def get_nowait(self):
@ -297,16 +277,12 @@ class Queue:
is the same as get().
"""
try:
obj, fmt, unboundop = _queues.get(self._id)
obj, unboundop = _queues.get(self._id)
except QueueEmpty as exc:
raise # re-raise
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
if fmt == _PICKLED:
obj = pickle.loads(obj)
else:
assert fmt == _SHARED_ONLY
return obj

View File

@ -247,7 +247,7 @@ def _run_action(cid, action, end, state):
def clean_up_channels():
for cid, _ in _channels.list_all():
for cid, _, _ in _channels.list_all():
try:
_channels.destroy(cid)
except _channels.ChannelNotFoundError:
@ -373,11 +373,11 @@ class ChannelTests(TestBase):
self.assertIsInstance(cid, _channels.ChannelID)
def test_sequential_ids(self):
before = [cid for cid, _ in _channels.list_all()]
before = [cid for cid, _, _ in _channels.list_all()]
id1 = _channels.create(REPLACE)
id2 = _channels.create(REPLACE)
id3 = _channels.create(REPLACE)
after = [cid for cid, _ in _channels.list_all()]
after = [cid for cid, _, _ in _channels.list_all()]
self.assertEqual(id2, int(id1) + 1)
self.assertEqual(id3, int(id2) + 1)

View File

@ -377,11 +377,11 @@ class TestSendRecv(TestBase):
if not unbound:
extraargs = ''
elif unbound is channels.UNBOUND:
extraargs = ', unbound=channels.UNBOUND'
extraargs = ', unbounditems=channels.UNBOUND'
elif unbound is channels.UNBOUND_ERROR:
extraargs = ', unbound=channels.UNBOUND_ERROR'
extraargs = ', unbounditems=channels.UNBOUND_ERROR'
elif unbound is channels.UNBOUND_REMOVE:
extraargs = ', unbound=channels.UNBOUND_REMOVE'
extraargs = ', unbounditems=channels.UNBOUND_REMOVE'
else:
raise NotImplementedError(repr(unbound))
interp = interpreters.create()
@ -454,11 +454,11 @@ class TestSendRecv(TestBase):
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
sch.send_nowait(b'ham', unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(b'ham', unbounditems=channels.UNBOUND_REMOVE)
self.assertEqual(_channels.get_count(rch.id), 1)
interp = common(rch, sch, channels.UNBOUND_REMOVE, 1)
self.assertEqual(_channels.get_count(rch.id), 3)
sch.send_nowait(42, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(42, unbounditems=channels.UNBOUND_REMOVE)
self.assertEqual(_channels.get_count(rch.id), 4)
del interp
self.assertEqual(_channels.get_count(rch.id), 2)
@ -484,11 +484,11 @@ class TestSendRecv(TestBase):
_run_output(interp, dedent(f"""
from test.support.interpreters import channels
sch = channels.SendChannel({sch.id})
sch.send_nowait(1, unbound=channels.UNBOUND)
sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
sch.send_nowait(1, unbounditems=channels.UNBOUND)
sch.send_nowait(2, unbounditems=channels.UNBOUND_ERROR)
sch.send_nowait(3)
sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(5, unbound=channels.UNBOUND)
sch.send_nowait(4, unbounditems=channels.UNBOUND_REMOVE)
sch.send_nowait(5, unbounditems=channels.UNBOUND)
"""))
self.assertEqual(_channels.get_count(rch.id), 5)
@ -522,8 +522,8 @@ class TestSendRecv(TestBase):
rch = channels.RecvChannel({rch.id})
sch = channels.SendChannel({sch.id})
obj1 = rch.recv()
sch.send_nowait(2, unbound=channels.UNBOUND)
sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(2, unbounditems=channels.UNBOUND)
sch.send_nowait(obj1, unbounditems=channels.UNBOUND_REMOVE)
"""))
_run_output(interp2, dedent(f"""
from test.support.interpreters import channels
@ -535,21 +535,21 @@ class TestSendRecv(TestBase):
self.assertEqual(_channels.get_count(rch.id), 0)
sch.send_nowait(3)
_run_output(interp1, dedent("""
sch.send_nowait(4, unbound=channels.UNBOUND)
sch.send_nowait(4, unbounditems=channels.UNBOUND)
# interp closed here
sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(6, unbound=channels.UNBOUND)
sch.send_nowait(5, unbounditems=channels.UNBOUND_REMOVE)
sch.send_nowait(6, unbounditems=channels.UNBOUND)
"""))
_run_output(interp2, dedent("""
sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
sch.send_nowait(7, unbounditems=channels.UNBOUND_ERROR)
# interp closed here
sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(8, unbound=channels.UNBOUND)
sch.send_nowait(obj1, unbounditems=channels.UNBOUND_ERROR)
sch.send_nowait(obj2, unbounditems=channels.UNBOUND_REMOVE)
sch.send_nowait(8, unbounditems=channels.UNBOUND)
"""))
_run_output(interp1, dedent("""
sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(10, unbound=channels.UNBOUND)
sch.send_nowait(9, unbounditems=channels.UNBOUND_REMOVE)
sch.send_nowait(10, unbounditems=channels.UNBOUND)
"""))
self.assertEqual(_channels.get_count(rch.id), 10)

View File

@ -9,6 +9,7 @@ from test.support import import_helper, Py_DEBUG
_queues = import_helper.import_module('_interpqueues')
from test.support import interpreters
from test.support.interpreters import queues, _crossinterp
import test._crossinterp_definitions as defs
from .utils import _run_output, TestBase as _TestBase
@ -42,7 +43,7 @@ class LowLevelTests(TestBase):
importlib.reload(queues)
def test_create_destroy(self):
qid = _queues.create(2, 0, REPLACE)
qid = _queues.create(2, REPLACE, -1)
_queues.destroy(qid)
self.assertEqual(get_num_queues(), 0)
with self.assertRaises(queues.QueueNotFoundError):
@ -56,7 +57,7 @@ class LowLevelTests(TestBase):
'-c',
dedent(f"""
import {_queues.__name__} as _queues
_queues.create(2, 0, {REPLACE})
_queues.create(2, {REPLACE}, -1)
"""),
)
self.assertEqual(stdout, '')
@ -67,13 +68,13 @@ class LowLevelTests(TestBase):
def test_bind_release(self):
with self.subTest('typical'):
qid = _queues.create(2, 0, REPLACE)
qid = _queues.create(2, REPLACE, -1)
_queues.bind(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('bind too much'):
qid = _queues.create(2, 0, REPLACE)
qid = _queues.create(2, REPLACE, -1)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@ -81,7 +82,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('nested'):
qid = _queues.create(2, 0, REPLACE)
qid = _queues.create(2, REPLACE, -1)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@ -89,7 +90,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('release without binding'):
qid = _queues.create(2, 0, REPLACE)
qid = _queues.create(2, REPLACE, -1)
with self.assertRaises(queues.QueueError):
_queues.release(qid)
@ -132,13 +133,13 @@ class QueueTests(TestBase):
with self.subTest('same interpreter'):
queue2 = queues.create()
queue1.put(queue2, syncobj=True)
queue1.put(queue2)
queue3 = queue1.get()
self.assertIs(queue3, queue2)
with self.subTest('from current interpreter'):
queue4 = queues.create()
queue1.put(queue4, syncobj=True)
queue1.put(queue4)
out = _run_output(interp, dedent("""
queue4 = queue1.get()
print(queue4.id)
@ -149,7 +150,7 @@ class QueueTests(TestBase):
with self.subTest('from subinterpreter'):
out = _run_output(interp, dedent("""
queue5 = queues.create()
queue1.put(queue5, syncobj=True)
queue1.put(queue5)
print(queue5.id)
"""))
qid = int(out)
@ -198,7 +199,7 @@ class TestQueueOps(TestBase):
def test_empty(self):
queue = queues.create()
before = queue.empty()
queue.put(None, syncobj=True)
queue.put(None)
during = queue.empty()
queue.get()
after = queue.empty()
@ -213,7 +214,7 @@ class TestQueueOps(TestBase):
queue = queues.create(3)
for _ in range(3):
actual.append(queue.full())
queue.put(None, syncobj=True)
queue.put(None)
actual.append(queue.full())
for _ in range(3):
queue.get()
@ -227,16 +228,16 @@ class TestQueueOps(TestBase):
queue = queues.create()
for _ in range(3):
actual.append(queue.qsize())
queue.put(None, syncobj=True)
queue.put(None)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
queue.put(None, syncobj=True)
queue.put(None)
actual.append(queue.qsize())
for _ in range(3):
queue.get()
actual.append(queue.qsize())
queue.put(None, syncobj=True)
queue.put(None)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
@ -245,70 +246,32 @@ class TestQueueOps(TestBase):
def test_put_get_main(self):
expected = list(range(20))
for syncobj in (True, False):
kwds = dict(syncobj=syncobj)
with self.subTest(f'syncobj={syncobj}'):
queue = queues.create()
for i in range(20):
queue.put(i, **kwds)
actual = [queue.get() for _ in range(20)]
queue = queues.create()
for i in range(20):
queue.put(i)
actual = [queue.get() for _ in range(20)]
self.assertEqual(actual, expected)
self.assertEqual(actual, expected)
def test_put_timeout(self):
for syncobj in (True, False):
kwds = dict(syncobj=syncobj)
with self.subTest(f'syncobj={syncobj}'):
queue = queues.create(2)
queue.put(None, **kwds)
queue.put(None, **kwds)
with self.assertRaises(queues.QueueFull):
queue.put(None, timeout=0.1, **kwds)
queue.get()
queue.put(None, **kwds)
queue = queues.create(2)
queue.put(None)
queue.put(None)
with self.assertRaises(queues.QueueFull):
queue.put(None, timeout=0.1)
queue.get()
queue.put(None)
def test_put_nowait(self):
for syncobj in (True, False):
kwds = dict(syncobj=syncobj)
with self.subTest(f'syncobj={syncobj}'):
queue = queues.create(2)
queue.put_nowait(None, **kwds)
queue.put_nowait(None, **kwds)
with self.assertRaises(queues.QueueFull):
queue.put_nowait(None, **kwds)
queue.get()
queue.put_nowait(None, **kwds)
queue = queues.create(2)
queue.put_nowait(None)
queue.put_nowait(None)
with self.assertRaises(queues.QueueFull):
queue.put_nowait(None)
queue.get()
queue.put_nowait(None)
def test_put_syncobj(self):
for obj in [
None,
True,
10,
'spam',
b'spam',
(0, 'a'),
]:
with self.subTest(repr(obj)):
queue = queues.create()
queue.put(obj, syncobj=True)
obj2 = queue.get()
self.assertEqual(obj2, obj)
queue.put(obj, syncobj=True)
obj2 = queue.get_nowait()
self.assertEqual(obj2, obj)
for obj in [
[1, 2, 3],
{'a': 13, 'b': 17},
]:
with self.subTest(repr(obj)):
queue = queues.create()
with self.assertRaises(interpreters.NotShareableError):
queue.put(obj, syncobj=True)
def test_put_not_syncobj(self):
def test_put_full_fallback(self):
for obj in [
None,
True,
@ -323,11 +286,11 @@ class TestQueueOps(TestBase):
with self.subTest(repr(obj)):
queue = queues.create()
queue.put(obj, syncobj=False)
queue.put(obj)
obj2 = queue.get()
self.assertEqual(obj2, obj)
queue.put(obj, syncobj=False)
queue.put(obj)
obj2 = queue.get_nowait()
self.assertEqual(obj2, obj)
@ -341,24 +304,9 @@ class TestQueueOps(TestBase):
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
def test_put_get_default_syncobj(self):
def test_put_get_full_fallback(self):
expected = list(range(20))
queue = queues.create(syncobj=True)
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
get = getattr(queue, methname)
for i in range(20):
queue.put(i)
actual = [get() for _ in range(20)]
self.assertEqual(actual, expected)
obj = [1, 2, 3] # lists are not shareable
with self.assertRaises(interpreters.NotShareableError):
queue.put(obj)
def test_put_get_default_not_syncobj(self):
expected = list(range(20))
queue = queues.create(syncobj=False)
queue = queues.create()
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
get = getattr(queue, methname)
@ -384,7 +332,7 @@ class TestQueueOps(TestBase):
with self.subTest(f'{methname}()'):
interp.exec(dedent(f"""
orig = b'spam'
queue.put(orig, syncobj=True)
queue.put(orig)
obj = queue.{methname}()
assert obj == orig, 'expected: obj == orig'
assert obj is not orig, 'expected: obj is not orig'
@ -399,7 +347,7 @@ class TestQueueOps(TestBase):
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
obj1 = b'spam'
queue1.put(obj1, syncobj=True)
queue1.put(obj1)
out = _run_output(
interp,
@ -416,7 +364,7 @@ class TestQueueOps(TestBase):
obj2 = b'eggs'
print(id(obj2))
assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
queue2.put(obj2, syncobj=True)
queue2.put(obj2)
assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
"""))
self.assertEqual(len(queues.list_all()), 2)
@ -433,11 +381,11 @@ class TestQueueOps(TestBase):
if not unbound:
extraargs = ''
elif unbound is queues.UNBOUND:
extraargs = ', unbound=queues.UNBOUND'
extraargs = ', unbounditems=queues.UNBOUND'
elif unbound is queues.UNBOUND_ERROR:
extraargs = ', unbound=queues.UNBOUND_ERROR'
extraargs = ', unbounditems=queues.UNBOUND_ERROR'
elif unbound is queues.UNBOUND_REMOVE:
extraargs = ', unbound=queues.UNBOUND_REMOVE'
extraargs = ', unbounditems=queues.UNBOUND_REMOVE'
else:
raise NotImplementedError(repr(unbound))
interp = interpreters.create()
@ -447,8 +395,8 @@ class TestQueueOps(TestBase):
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
queue.put(obj1, syncobj=True{extraargs})
queue.put(obj2, syncobj=True{extraargs})
queue.put(obj1{extraargs})
queue.put(obj2{extraargs})
"""))
self.assertEqual(queue.qsize(), presize + 2)
@ -501,11 +449,11 @@ class TestQueueOps(TestBase):
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
queue.put(b'ham', unbound=queues.UNBOUND_REMOVE)
queue.put(b'ham', unbounditems=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 1)
interp = common(queue, queues.UNBOUND_REMOVE, 1)
self.assertEqual(queue.qsize(), 3)
queue.put(42, unbound=queues.UNBOUND_REMOVE)
queue.put(42, unbounditems=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 4)
del interp
self.assertEqual(queue.qsize(), 2)
@ -523,11 +471,11 @@ class TestQueueOps(TestBase):
_run_output(interp, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
queue.put(1, syncobj=True, unbound=queues.UNBOUND)
queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR)
queue.put(3, syncobj=True)
queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(5, syncobj=True, unbound=queues.UNBOUND)
queue.put(1, unbounditems=queues.UNBOUND)
queue.put(2, unbounditems=queues.UNBOUND_ERROR)
queue.put(3)
queue.put(4, unbounditems=queues.UNBOUND_REMOVE)
queue.put(5, unbounditems=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 5)
@ -555,13 +503,13 @@ class TestQueueOps(TestBase):
interp1 = interpreters.create()
interp2 = interpreters.create()
queue.put(1, syncobj=True)
queue.put(1)
_run_output(interp1, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = queue.get()
queue.put(2, syncobj=True, unbound=queues.UNBOUND)
queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(2, unbounditems=queues.UNBOUND)
queue.put(obj1, unbounditems=queues.UNBOUND_REMOVE)
"""))
_run_output(interp2, dedent(f"""
from test.support.interpreters import queues
@ -572,21 +520,21 @@ class TestQueueOps(TestBase):
self.assertEqual(queue.qsize(), 0)
queue.put(3)
_run_output(interp1, dedent("""
queue.put(4, syncobj=True, unbound=queues.UNBOUND)
queue.put(4, unbounditems=queues.UNBOUND)
# interp closed here
queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(6, syncobj=True, unbound=queues.UNBOUND)
queue.put(5, unbounditems=queues.UNBOUND_REMOVE)
queue.put(6, unbounditems=queues.UNBOUND)
"""))
_run_output(interp2, dedent("""
queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR)
queue.put(7, unbounditems=queues.UNBOUND_ERROR)
# interp closed here
queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR)
queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(8, syncobj=True, unbound=queues.UNBOUND)
queue.put(obj1, unbounditems=queues.UNBOUND_ERROR)
queue.put(obj2, unbounditems=queues.UNBOUND_REMOVE)
queue.put(8, unbounditems=queues.UNBOUND)
"""))
_run_output(interp1, dedent("""
queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(10, syncobj=True, unbound=queues.UNBOUND)
queue.put(9, unbounditems=queues.UNBOUND_REMOVE)
queue.put(10, unbounditems=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 10)
@ -642,12 +590,12 @@ class TestQueueOps(TestBase):
break
except queues.QueueEmpty:
continue
queue2.put(obj, syncobj=True)
queue2.put(obj)
t = threading.Thread(target=f)
t.start()
orig = b'spam'
queue1.put(orig, syncobj=True)
queue1.put(orig)
obj = queue2.get()
t.join()

View File

@ -20,9 +20,11 @@
#endif
#define REGISTERS_HEAP_TYPES
#define HAS_FALLBACK
#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
#undef HAS_UNBOUND_ITEMS
#undef HAS_FALLBACK
#undef REGISTERS_HEAP_TYPES
@ -523,7 +525,7 @@ typedef struct _channelitem {
int64_t interpid;
_PyXIData_t *data;
_waiting_t *waiting;
int unboundop;
unboundop_t unboundop;
struct _channelitem *next;
} _channelitem;
@ -536,7 +538,7 @@ _channelitem_ID(_channelitem *item)
static void
_channelitem_init(_channelitem *item,
int64_t interpid, _PyXIData_t *data,
_waiting_t *waiting, int unboundop)
_waiting_t *waiting, unboundop_t unboundop)
{
if (interpid < 0) {
interpid = _get_interpid(data);
@ -583,7 +585,7 @@ _channelitem_clear(_channelitem *item)
static _channelitem *
_channelitem_new(int64_t interpid, _PyXIData_t *data,
_waiting_t *waiting, int unboundop)
_waiting_t *waiting, unboundop_t unboundop)
{
_channelitem *item = GLOBAL_MALLOC(_channelitem);
if (item == NULL) {
@ -694,7 +696,7 @@ _channelqueue_free(_channelqueue *queue)
static int
_channelqueue_put(_channelqueue *queue,
int64_t interpid, _PyXIData_t *data,
_waiting_t *waiting, int unboundop)
_waiting_t *waiting, unboundop_t unboundop)
{
_channelitem *item = _channelitem_new(interpid, data, waiting, unboundop);
if (item == NULL) {
@ -798,7 +800,7 @@ _channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid,
}
queue->count -= 1;
int unboundop;
unboundop_t unboundop;
_channelitem_popped(item, p_data, p_waiting, &unboundop);
}
@ -1083,16 +1085,18 @@ typedef struct _channel {
PyThread_type_lock mutex;
_channelqueue *queue;
_channelends *ends;
struct {
int unboundop;
struct _channeldefaults {
unboundop_t unboundop;
xidata_fallback_t fallback;
} defaults;
int open;
struct _channel_closing *closing;
} _channel_state;
static _channel_state *
_channel_new(PyThread_type_lock mutex, int unboundop)
_channel_new(PyThread_type_lock mutex, struct _channeldefaults defaults)
{
assert(check_unbound(defaults.unboundop));
_channel_state *chan = GLOBAL_MALLOC(_channel_state);
if (chan == NULL) {
return NULL;
@ -1109,7 +1113,7 @@ _channel_new(PyThread_type_lock mutex, int unboundop)
GLOBAL_FREE(chan);
return NULL;
}
chan->defaults.unboundop = unboundop;
chan->defaults = defaults;
chan->open = 1;
chan->closing = NULL;
return chan;
@ -1130,7 +1134,7 @@ _channel_free(_channel_state *chan)
static int
_channel_add(_channel_state *chan, int64_t interpid,
_PyXIData_t *data, _waiting_t *waiting, int unboundop)
_PyXIData_t *data, _waiting_t *waiting, unboundop_t unboundop)
{
int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@ -1611,7 +1615,7 @@ done:
struct channel_id_and_info {
int64_t id;
int unboundop;
struct _channeldefaults defaults;
};
static struct channel_id_and_info *
@ -1628,7 +1632,7 @@ _channels_list_all(_channels *channels, int64_t *count)
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i] = (struct channel_id_and_info){
.id = ref->cid,
.unboundop = ref->chan->defaults.unboundop,
.defaults = ref->chan->defaults,
};
}
*count = channels->numopen;
@ -1714,13 +1718,13 @@ _channel_finish_closing(_channel_state *chan) {
// Create a new channel.
static int64_t
channel_create(_channels *channels, int unboundop)
channel_create(_channels *channels, struct _channeldefaults defaults)
{
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_CHANNEL_MUTEX_INIT;
}
_channel_state *chan = _channel_new(mutex, unboundop);
_channel_state *chan = _channel_new(mutex, defaults);
if (chan == NULL) {
PyThread_free_lock(mutex);
return -1;
@ -1752,7 +1756,7 @@ channel_destroy(_channels *channels, int64_t cid)
// Optionally request to be notified when it is received.
static int
channel_send(_channels *channels, int64_t cid, PyObject *obj,
_waiting_t *waiting, int unboundop)
_waiting_t *waiting, unboundop_t unboundop, xidata_fallback_t fallback)
{
PyThreadState *tstate = _PyThreadState_GET();
PyInterpreterState *interp = tstate->interp;
@ -1779,7 +1783,7 @@ channel_send(_channels *channels, int64_t cid, PyObject *obj,
PyThread_release_lock(mutex);
return -1;
}
if (_PyObject_GetXIDataNoFallback(tstate, obj, data) != 0) {
if (_PyObject_GetXIData(tstate, obj, fallback, data) != 0) {
PyThread_release_lock(mutex);
GLOBAL_FREE(data);
return -1;
@ -1823,7 +1827,8 @@ channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
// Like channel_send(), but strictly wait for the object to be received.
static int
channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
int unboundop, PY_TIMEOUT_T timeout)
unboundop_t unboundop, PY_TIMEOUT_T timeout,
xidata_fallback_t fallback)
{
// We use a stack variable here, so we must ensure that &waiting
// is not held by any channel item at the point this function exits.
@ -1834,7 +1839,7 @@ channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
}
/* Queue up the object. */
int res = channel_send(channels, cid, obj, &waiting, unboundop);
int res = channel_send(channels, cid, obj, &waiting, unboundop, fallback);
if (res < 0) {
assert(waiting.status == WAITING_NO_STATUS);
goto finally;
@ -2005,6 +2010,20 @@ channel_is_associated(_channels *channels, int64_t cid, int64_t interpid,
return (end != NULL && end->open);
}
static int
channel_get_defaults(_channels *channels, int64_t cid, struct _channeldefaults *defaults)
{
PyThread_type_lock mutex = NULL;
_channel_state *channel = NULL;
int err = _channels_lookup(channels, cid, &mutex, &channel);
if (err != 0) {
return err;
}
*defaults = channel->defaults;
PyThread_release_lock(mutex);
return 0;
}
static int
_channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count)
{
@ -2881,20 +2900,27 @@ clear_interpreter(void *data)
static PyObject *
channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"unboundop", NULL};
int unboundop;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "i:create", kwlist,
&unboundop))
static char *kwlist[] = {"unboundop", "fallback", NULL};
int unboundarg = -1;
int fallbackarg = -1;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|ii:create", kwlist,
&unboundarg, &fallbackarg))
{
return NULL;
}
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
struct _channeldefaults defaults = {0};
if (resolve_unboundop(unboundarg, UNBOUND_REPLACE,
&defaults.unboundop) < 0)
{
return NULL;
}
if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK,
&defaults.fallback) < 0)
{
return NULL;
}
int64_t cid = channel_create(&_globals.channels, unboundop);
int64_t cid = channel_create(&_globals.channels, defaults);
if (cid < 0) {
(void)handle_channel_error(-1, self, cid);
return NULL;
@ -2987,7 +3013,9 @@ channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
}
assert(cidobj != NULL);
PyObject *item = Py_BuildValue("Oi", cidobj, cur->unboundop);
PyObject *item = Py_BuildValue("Oii", cidobj,
cur->defaults.unboundop,
cur->defaults.fallback);
Py_DECREF(cidobj);
if (item == NULL) {
Py_SETREF(ids, NULL);
@ -3075,40 +3103,54 @@ receive end.");
static PyObject *
channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
NULL};
static char *kwlist[] = {"cid", "obj", "unboundop", "fallback",
"blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
int unboundop = UNBOUND_REPLACE;
int unboundarg = -1;
int fallbackarg = -1;
int blocking = 1;
PyObject *timeout_obj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|i$pO:channel_send", kwlist,
if (!PyArg_ParseTupleAndKeywords(args, kwds,
"O&O|ii$pO:channel_send", kwlist,
channel_id_converter, &cid_data, &obj,
&unboundop, &blocking, &timeout_obj))
&unboundarg, &fallbackarg,
&blocking, &timeout_obj))
{
return NULL;
}
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
return NULL;
}
int64_t cid = cid_data.cid;
PY_TIMEOUT_T timeout;
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
return NULL;
}
struct _channeldefaults defaults = {-1, -1};
if (unboundarg < 0 || fallbackarg < 0) {
int err = channel_get_defaults(&_globals.channels, cid, &defaults);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
}
unboundop_t unboundop;
if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
return NULL;
}
xidata_fallback_t fallback;
if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
return NULL;
}
/* Queue up the object. */
int err = 0;
if (blocking) {
err = channel_send_wait(&_globals.channels, cid, obj, unboundop, timeout);
err = channel_send_wait(
&_globals.channels, cid, obj, unboundop, timeout, fallback);
}
else {
err = channel_send(&_globals.channels, cid, obj, NULL, unboundop);
err = channel_send(
&_globals.channels, cid, obj, NULL, unboundop, fallback);
}
if (handle_channel_error(err, self, cid)) {
return NULL;
@ -3126,32 +3168,44 @@ By default this waits for the object to be received.");
static PyObject *
channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
NULL};
static char *kwlist[] = {"cid", "obj", "unboundop", "fallback",
"blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
int unboundop = UNBOUND_REPLACE;
int blocking = 1;
int unboundarg = -1;
int fallbackarg = -1;
int blocking = -1;
PyObject *timeout_obj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
"O&O|i$pO:channel_send_buffer", kwlist,
"O&O|ii$pO:channel_send_buffer", kwlist,
channel_id_converter, &cid_data, &obj,
&unboundop, &blocking, &timeout_obj)) {
&unboundarg, &fallbackarg,
&blocking, &timeout_obj))
{
return NULL;
}
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
return NULL;
}
int64_t cid = cid_data.cid;
PY_TIMEOUT_T timeout;
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
return NULL;
}
struct _channeldefaults defaults = {-1, -1};
if (unboundarg < 0 || fallbackarg < 0) {
int err = channel_get_defaults(&_globals.channels, cid, &defaults);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
}
unboundop_t unboundop;
if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
return NULL;
}
xidata_fallback_t fallback;
if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
return NULL;
}
PyObject *tempobj = PyMemoryView_FromObject(obj);
if (tempobj == NULL) {
@ -3162,10 +3216,11 @@ channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
int err = 0;
if (blocking) {
err = channel_send_wait(
&_globals.channels, cid, tempobj, unboundop, timeout);
&_globals.channels, cid, tempobj, unboundop, timeout, fallback);
}
else {
err = channel_send(&_globals.channels, cid, tempobj, NULL, unboundop);
err = channel_send(
&_globals.channels, cid, tempobj, NULL, unboundop, fallback);
}
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
@ -3197,7 +3252,7 @@ channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds)
cid = cid_data.cid;
PyObject *obj = NULL;
int unboundop = 0;
unboundop_t unboundop = 0;
int err = channel_recv(&_globals.channels, cid, &obj, &unboundop);
if (err == ERR_CHANNEL_EMPTY && dflt != NULL) {
// Use the default.
@ -3388,17 +3443,14 @@ channelsmod_get_channel_defaults(PyObject *self, PyObject *args, PyObject *kwds)
}
int64_t cid = cid_data.cid;
PyThread_type_lock mutex = NULL;
_channel_state *channel = NULL;
int err = _channels_lookup(&_globals.channels, cid, &mutex, &channel);
struct _channeldefaults defaults;
int err = channel_get_defaults(&_globals.channels, cid, &defaults);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
int unboundop = channel->defaults.unboundop;
PyThread_release_lock(mutex);
PyObject *defaults = Py_BuildValue("i", unboundop);
return defaults;
PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback);
return res;
}
PyDoc_STRVAR(channelsmod_get_channel_defaults_doc,

View File

@ -9,9 +9,11 @@
#include "pycore_crossinterp.h" // _PyXIData_t
#define REGISTERS_HEAP_TYPES
#define HAS_FALLBACK
#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
#undef HAS_UNBOUND_ITEMS
#undef HAS_FALLBACK
#undef REGISTERS_HEAP_TYPES
@ -401,14 +403,13 @@ typedef struct _queueitem {
meaning the interpreter has been destroyed. */
int64_t interpid;
_PyXIData_t *data;
int fmt;
int unboundop;
unboundop_t unboundop;
struct _queueitem *next;
} _queueitem;
static void
_queueitem_init(_queueitem *item,
int64_t interpid, _PyXIData_t *data, int fmt, int unboundop)
int64_t interpid, _PyXIData_t *data, unboundop_t unboundop)
{
if (interpid < 0) {
interpid = _get_interpid(data);
@ -422,7 +423,6 @@ _queueitem_init(_queueitem *item,
*item = (_queueitem){
.interpid = interpid,
.data = data,
.fmt = fmt,
.unboundop = unboundop,
};
}
@ -446,14 +446,14 @@ _queueitem_clear(_queueitem *item)
}
static _queueitem *
_queueitem_new(int64_t interpid, _PyXIData_t *data, int fmt, int unboundop)
_queueitem_new(int64_t interpid, _PyXIData_t *data, int unboundop)
{
_queueitem *item = GLOBAL_MALLOC(_queueitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
}
_queueitem_init(item, interpid, data, fmt, unboundop);
_queueitem_init(item, interpid, data, unboundop);
return item;
}
@ -476,10 +476,9 @@ _queueitem_free_all(_queueitem *item)
static void
_queueitem_popped(_queueitem *item,
_PyXIData_t **p_data, int *p_fmt, int *p_unboundop)
_PyXIData_t **p_data, unboundop_t *p_unboundop)
{
*p_data = item->data;
*p_fmt = item->fmt;
*p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _queueitem_clear().
item->data = NULL;
@ -527,16 +526,16 @@ typedef struct _queue {
_queueitem *first;
_queueitem *last;
} items;
struct {
int fmt;
struct _queuedefaults {
xidata_fallback_t fallback;
int unboundop;
} defaults;
} _queue;
static int
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
_queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults)
{
assert(check_unbound(unboundop));
assert(check_unbound(defaults.unboundop));
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_QUEUE_ALLOC;
@ -547,10 +546,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
.items = {
.maxsize = maxsize,
},
.defaults = {
.fmt = fmt,
.unboundop = unboundop,
},
.defaults = defaults,
};
return 0;
}
@ -631,8 +627,7 @@ _queue_unlock(_queue *queue)
}
static int
_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
int fmt, int unboundop)
_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@ -648,7 +643,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
return ERR_QUEUE_FULL;
}
_queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
_queueitem *item = _queueitem_new(interpid, data, unboundop);
if (item == NULL) {
_queue_unlock(queue);
return -1;
@ -668,8 +663,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
}
static int
_queue_next(_queue *queue,
_PyXIData_t **p_data, int *p_fmt, int *p_unboundop)
_queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@ -688,7 +682,7 @@ _queue_next(_queue *queue,
}
queue->items.count -= 1;
_queueitem_popped(item, p_data, p_fmt, p_unboundop);
_queueitem_popped(item, p_data, p_unboundop);
_queue_unlock(queue);
return 0;
@ -1035,8 +1029,7 @@ finally:
struct queue_id_and_info {
int64_t id;
int fmt;
int unboundop;
struct _queuedefaults defaults;
};
static struct queue_id_and_info *
@ -1053,8 +1046,7 @@ _queues_list_all(_queues *queues, int64_t *p_count)
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i].id = ref->qid;
assert(ref->queue != NULL);
ids[i].fmt = ref->queue->defaults.fmt;
ids[i].unboundop = ref->queue->defaults.unboundop;
ids[i].defaults = ref->queue->defaults;
}
*p_count = queues->count;
@ -1090,13 +1082,14 @@ _queue_free(_queue *queue)
// Create a new queue.
static int64_t
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
queue_create(_queues *queues, Py_ssize_t maxsize,
struct _queuedefaults defaults)
{
_queue *queue = GLOBAL_MALLOC(_queue);
if (queue == NULL) {
return ERR_QUEUE_ALLOC;
}
int err = _queue_init(queue, maxsize, fmt, unboundop);
int err = _queue_init(queue, maxsize, defaults);
if (err < 0) {
GLOBAL_FREE(queue);
return (int64_t)err;
@ -1125,7 +1118,8 @@ queue_destroy(_queues *queues, int64_t qid)
// Push an object onto the queue.
static int
queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
xidata_fallback_t fallback)
{
PyThreadState *tstate = PyThreadState_Get();
@ -1138,27 +1132,27 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
assert(queue != NULL);
// Convert the object to cross-interpreter data.
_PyXIData_t *data = _PyXIData_New();
if (data == NULL) {
_PyXIData_t *xidata = _PyXIData_New();
if (xidata == NULL) {
_queue_unmark_waiter(queue, queues->mutex);
return -1;
}
if (_PyObject_GetXIDataNoFallback(tstate, obj, data) != 0) {
if (_PyObject_GetXIData(tstate, obj, fallback, xidata) != 0) {
_queue_unmark_waiter(queue, queues->mutex);
GLOBAL_FREE(data);
GLOBAL_FREE(xidata);
return -1;
}
assert(_PyXIData_INTERPID(data) ==
assert(_PyXIData_INTERPID(xidata) ==
PyInterpreterState_GetID(tstate->interp));
// Add the data to the queue.
int64_t interpid = -1; // _queueitem_init() will set it.
int res = _queue_add(queue, interpid, data, fmt, unboundop);
int res = _queue_add(queue, interpid, xidata, unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (res != 0) {
// We may chain an exception here:
(void)_release_xid_data(data, 0);
GLOBAL_FREE(data);
(void)_release_xid_data(xidata, 0);
GLOBAL_FREE(xidata);
return res;
}
@ -1169,7 +1163,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
// XXX Support a "wait" mutex?
static int
queue_get(_queues *queues, int64_t qid,
PyObject **res, int *p_fmt, int *p_unboundop)
PyObject **res, int *p_unboundop)
{
int err;
*res = NULL;
@ -1185,7 +1179,7 @@ queue_get(_queues *queues, int64_t qid,
// Pop off the next item from the queue.
_PyXIData_t *data = NULL;
err = _queue_next(queue, &data, p_fmt, p_unboundop);
err = _queue_next(queue, &data, p_unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (err != 0) {
return err;
@ -1216,6 +1210,20 @@ queue_get(_queues *queues, int64_t qid,
return 0;
}
static int
queue_get_defaults(_queues *queues, int64_t qid,
struct _queuedefaults *p_defaults)
{
_queue *queue = NULL;
int err = _queues_lookup(queues, qid, &queue);
if (err != 0) {
return err;
}
*p_defaults = queue->defaults;
_queue_unmark_waiter(queue, queues->mutex);
return 0;
}
static int
queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize)
{
@ -1474,22 +1482,28 @@ qidarg_converter(PyObject *arg, void *ptr)
static PyObject *
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
static char *kwlist[] = {"maxsize", "unboundop", "fallback", NULL};
Py_ssize_t maxsize;
int fmt;
int unboundop;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
&maxsize, &fmt, &unboundop))
int unboundarg = -1;
int fallbackarg = -1;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "n|ii:create", kwlist,
&maxsize, &unboundarg, &fallbackarg))
{
return NULL;
}
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
struct _queuedefaults defaults = {0};
if (resolve_unboundop(unboundarg, UNBOUND_REPLACE,
&defaults.unboundop) < 0)
{
return NULL;
}
if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK,
&defaults.fallback) < 0)
{
return NULL;
}
int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
int64_t qid = queue_create(&_globals.queues, maxsize, defaults);
if (qid < 0) {
(void)handle_queue_error((int)qid, self, qid);
return NULL;
@ -1511,7 +1525,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
}
PyDoc_STRVAR(queuesmod_create_doc,
"create(maxsize, fmt, unboundop) -> qid\n\
"create(maxsize, unboundop, fallback) -> qid\n\
\n\
Create a new cross-interpreter queue and return its unique generated ID.\n\
It is a new reference as though bind() had been called on the queue.\n\
@ -1560,8 +1574,9 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
}
struct queue_id_and_info *cur = qids;
for (int64_t i=0; i < count; cur++, i++) {
PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
cur->unboundop);
PyObject *item = Py_BuildValue("Lii", cur->id,
cur->defaults.unboundop,
cur->defaults.fallback);
if (item == NULL) {
Py_SETREF(ids, NULL);
break;
@ -1575,34 +1590,44 @@ finally:
}
PyDoc_STRVAR(queuesmod_list_all_doc,
"list_all() -> [(qid, fmt)]\n\
"list_all() -> [(qid, unboundop, fallback)]\n\
\n\
Return the list of IDs for all queues.\n\
Each corresponding default format is also included.");
Each corresponding default unbound op and fallback is also included.");
static PyObject *
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
static char *kwlist[] = {"qid", "obj", "unboundop", "fallback", NULL};
qidarg_converter_data qidarg = {0};
PyObject *obj;
int fmt;
int unboundop;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
qidarg_converter, &qidarg, &obj, &fmt,
&unboundop))
int unboundarg = -1;
int fallbackarg = -1;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|ii$p:put", kwlist,
qidarg_converter, &qidarg, &obj,
&unboundarg, &fallbackarg))
{
return NULL;
}
int64_t qid = qidarg.id;
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
struct _queuedefaults defaults = {-1, -1};
if (unboundarg < 0 || fallbackarg < 0) {
int err = queue_get_defaults(&_globals.queues, qid, &defaults);
if (handle_queue_error(err, self, qid)) {
return NULL;
}
}
unboundop_t unboundop;
if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
return NULL;
}
xidata_fallback_t fallback;
if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
return NULL;
}
/* Queue up the object. */
int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, self, qid)) {
return NULL;
@ -1612,7 +1637,7 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
}
PyDoc_STRVAR(queuesmod_put_doc,
"put(qid, obj, fmt)\n\
"put(qid, obj)\n\
\n\
Add the object's data to the queue.");
@ -1628,27 +1653,26 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
int64_t qid = qidarg.id;
PyObject *obj = NULL;
int fmt = 0;
int unboundop = 0;
int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
int err = queue_get(&_globals.queues, qid, &obj, &unboundop);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, self, qid)) {
return NULL;
}
if (obj == NULL) {
return Py_BuildValue("Oii", Py_None, fmt, unboundop);
return Py_BuildValue("Oi", Py_None, unboundop);
}
PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
PyObject *res = Py_BuildValue("OO", obj, Py_None);
Py_DECREF(obj);
return res;
}
PyDoc_STRVAR(queuesmod_get_doc,
"get(qid) -> (obj, fmt)\n\
"get(qid) -> (obj, unboundop)\n\
\n\
Return a new object from the data at the front of the queue.\n\
The object's format is also returned.\n\
The unbound op is also returned.\n\
\n\
If there is nothing to receive then raise QueueEmpty.");
@ -1748,17 +1772,14 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
}
int64_t qid = qidarg.id;
_queue *queue = NULL;
int err = _queues_lookup(&_globals.queues, qid, &queue);
struct _queuedefaults defaults;
int err = queue_get_defaults(&_globals.queues, qid, &defaults);
if (handle_queue_error(err, self, qid)) {
return NULL;
}
int fmt = queue->defaults.fmt;
int unboundop = queue->defaults.unboundop;
_queue_unmark_waiter(queue, _globals.queues.mutex);
PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
return defaults;
PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback);
return res;
}
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,

View File

@ -39,10 +39,37 @@ _get_interpid(_PyXIData_t *data)
}
#ifdef HAS_FALLBACK
static int
resolve_fallback(int arg, xidata_fallback_t dflt,
xidata_fallback_t *p_fallback)
{
if (arg < 0) {
*p_fallback = dflt;
return 0;
}
xidata_fallback_t fallback;
if (arg == _PyXIDATA_XIDATA_ONLY) {
fallback =_PyXIDATA_XIDATA_ONLY;
}
else if (arg == _PyXIDATA_FULL_FALLBACK) {
fallback = _PyXIDATA_FULL_FALLBACK;
}
else {
PyErr_Format(PyExc_ValueError, "unsupported fallback %d", arg);
return -1;
}
*p_fallback = fallback;
return 0;
}
#endif
/* unbound items ************************************************************/
#ifdef HAS_UNBOUND_ITEMS
typedef int unboundop_t;
#define UNBOUND_REMOVE 1
#define UNBOUND_ERROR 2
#define UNBOUND_REPLACE 3
@ -53,6 +80,7 @@ _get_interpid(_PyXIData_t *data)
// object is released but the underlying data is copied (with the "raw"
// allocator) and used when the item is popped off the queue.
#ifndef NDEBUG
static int
check_unbound(int unboundop)
{
@ -65,5 +93,31 @@ check_unbound(int unboundop)
return 0;
}
}
#endif
static int
resolve_unboundop(int arg, unboundop_t dflt, unboundop_t *p_unboundop)
{
if (arg < 0) {
*p_unboundop = dflt;
return 0;
}
unboundop_t op;
if (arg == UNBOUND_REMOVE) {
op = UNBOUND_REMOVE;
}
else if (arg == UNBOUND_ERROR) {
op = UNBOUND_ERROR;
}
else if (arg == UNBOUND_REPLACE) {
op = UNBOUND_REPLACE;
}
else {
PyErr_Format(PyExc_ValueError, "unsupported unboundop %d", arg);
return -1;
}
*p_unboundop = op;
return 0;
}
#endif

View File

@ -1839,6 +1839,7 @@ _sharednsitem_set_value(_PyXI_namespace_item *item, PyObject *value)
return -1;
}
PyThreadState *tstate = PyThreadState_Get();
// XXX Use _PyObject_GetXIDataWithFallback()?
if (_PyObject_GetXIDataNoFallback(tstate, value, item->xidata) != 0) {
PyMem_RawFree(item->xidata);
item->xidata = NULL;