Fixes issue #6766: Updated multiprocessing Proxy Objects to support nesting

This commit is contained in:
Davin Potts 2016-09-07 18:48:01 -05:00
parent 1aa642f6bd
commit 86a7668426
3 changed files with 192 additions and 56 deletions

View File

@ -1682,7 +1682,9 @@ their parent process exits. The manager classes are defined in the
of processes. Objects of this type are returned by of processes. Objects of this type are returned by
:func:`multiprocessing.Manager`. :func:`multiprocessing.Manager`.
It also supports creation of shared lists and dictionaries. Its methods create and return :ref:`multiprocessing-proxy_objects` for a
number of commonly used data types to be synchronized across processes.
This notably includes shared lists and dictionaries.
.. method:: Barrier(parties[, action[, timeout]]) .. method:: Barrier(parties[, action[, timeout]])
@ -1745,31 +1747,17 @@ their parent process exits. The manager classes are defined in the
dict(mapping) dict(mapping)
dict(sequence) dict(sequence)
Create a shared ``dict`` object and return a proxy for it. Create a shared :class:`dict` object and return a proxy for it.
.. method:: list() .. method:: list()
list(sequence) list(sequence)
Create a shared ``list`` object and return a proxy for it. Create a shared :class:`list` object and return a proxy for it.
.. note::
Modifications to mutable values or items in dict and list proxies will not
be propagated through the manager, because the proxy has no way of knowing
when its values or items are modified. To modify such an item, you can
re-assign the modified object to the container proxy::
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d
.. versionchanged:: 3.6
Shared objects are capable of being nested. For example, a shared
container object such as a shared list can contain other shared objects
which will all be managed and synchronized by the :class:`SyncManager`.
.. class:: Namespace .. class:: Namespace
@ -1881,6 +1869,8 @@ client to access it remotely::
>>> s = m.get_server() >>> s = m.get_server()
>>> s.serve_forever() >>> s.serve_forever()
.. _multiprocessing-proxy_objects:
Proxy Objects Proxy Objects
~~~~~~~~~~~~~ ~~~~~~~~~~~~~
@ -1890,8 +1880,7 @@ proxy. Multiple proxy objects may have the same referent.
A proxy object has methods which invoke corresponding methods of its referent A proxy object has methods which invoke corresponding methods of its referent
(although not every method of the referent will necessarily be available through (although not every method of the referent will necessarily be available through
the proxy). A proxy can usually be used in most of the same ways that its the proxy). In this way, a proxy can be used just like its referent can:
referent can:
.. doctest:: .. doctest::
@ -1912,9 +1901,9 @@ the referent, whereas applying :func:`repr` will return the representation of
the proxy. the proxy.
An important feature of proxy objects is that they are picklable so they can be An important feature of proxy objects is that they are picklable so they can be
passed between processes. Note, however, that if a proxy is sent to the passed between processes. As such, a referent can contain
corresponding manager's process then unpickling it will produce the referent :ref:`multiprocessing-proxy_objects`. This permits nesting of these managed
itself. This means, for example, that one shared object can contain a second: lists, dicts, and other :ref:`multiprocessing-proxy_objects`:
.. doctest:: .. doctest::
@ -1922,10 +1911,46 @@ itself. This means, for example, that one shared object can contain a second:
>>> b = manager.list() >>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b >>> a.append(b) # referent of a now contains referent of b
>>> print(a, b) >>> print(a, b)
[[]] [] [<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello') >>> b.append('hello')
>>> print(a, b) >>> print(a[0], b)
[['hello']] ['hello'] ['hello'] ['hello']
Similarly, dict and list proxies may be nested inside one another::
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
If standard (non-proxy) :class:`list` or :class:`dict` objects are contained
in a referent, modifications to those mutable values will not be propagated
through the manager because the proxy has no way of knowing when the values
contained within are modified. However, storing a value in a container proxy
(which triggers a ``__setitem__`` on the proxy object) does propagate through
the manager and so to effectively modify such an item, one could re-assign the
modified value to the container proxy::
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
This approach is perhaps less convenient than employing nested
:ref:`multiprocessing-proxy_objects` for most use cases but also
demonstrates a level of control over the synchronization.
.. note:: .. note::

View File

@ -142,7 +142,8 @@ class Server(object):
self.id_to_obj = {'0': (None, ())} self.id_to_obj = {'0': (None, ())}
self.id_to_refcount = {} self.id_to_refcount = {}
self.mutex = threading.RLock() self.id_to_local_proxy_obj = {}
self.mutex = threading.Lock()
def serve_forever(self): def serve_forever(self):
''' '''
@ -227,7 +228,14 @@ class Server(object):
methodname = obj = None methodname = obj = None
request = recv() request = recv()
ident, methodname, args, kwds = request ident, methodname, args, kwds = request
obj, exposed, gettypeid = id_to_obj[ident] try:
obj, exposed, gettypeid = id_to_obj[ident]
except KeyError as ke:
try:
obj, exposed, gettypeid = \
self.id_to_local_proxy_obj[ident]
except KeyError as second_ke:
raise ke
if methodname not in exposed: if methodname not in exposed:
raise AttributeError( raise AttributeError(
@ -308,7 +316,7 @@ class Server(object):
''' '''
with self.mutex: with self.mutex:
result = [] result = []
keys = list(self.id_to_obj.keys()) keys = list(self.id_to_refcount.keys())
keys.sort() keys.sort()
for ident in keys: for ident in keys:
if ident != '0': if ident != '0':
@ -321,7 +329,8 @@ class Server(object):
''' '''
Number of shared objects Number of shared objects
''' '''
return len(self.id_to_obj) - 1 # don't count ident='0' # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
return len(self.id_to_refcount)
def shutdown(self, c): def shutdown(self, c):
''' '''
@ -363,13 +372,9 @@ class Server(object):
self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
if ident not in self.id_to_refcount: if ident not in self.id_to_refcount:
self.id_to_refcount[ident] = 0 self.id_to_refcount[ident] = 0
# increment the reference count immediately, to avoid
# this object being garbage collected before a Proxy self.incref(c, ident)
# object for it can be created. The caller of create() return ident, tuple(exposed)
# is responsible for doing a decref once the Proxy object
# has been created.
self.incref(c, ident)
return ident, tuple(exposed)
def get_methods(self, c, token): def get_methods(self, c, token):
''' '''
@ -387,15 +392,45 @@ class Server(object):
def incref(self, c, ident): def incref(self, c, ident):
with self.mutex: with self.mutex:
self.id_to_refcount[ident] += 1 try:
self.id_to_refcount[ident] += 1
except KeyError as ke:
# If no external references exist but an internal (to the
# manager) still does and a new external reference is created
# from it, restore the manager's tracking of it from the
# previously stashed internal ref.
if ident in self.id_to_local_proxy_obj:
self.id_to_refcount[ident] = 1
self.id_to_obj[ident] = \
self.id_to_local_proxy_obj[ident]
obj, exposed, gettypeid = self.id_to_obj[ident]
util.debug('Server re-enabled tracking & INCREF %r', ident)
else:
raise ke
def decref(self, c, ident): def decref(self, c, ident):
if ident not in self.id_to_refcount and \
ident in self.id_to_local_proxy_obj:
util.debug('Server DECREF skipping %r', ident)
return
with self.mutex: with self.mutex:
assert self.id_to_refcount[ident] >= 1 assert self.id_to_refcount[ident] >= 1
self.id_to_refcount[ident] -= 1 self.id_to_refcount[ident] -= 1
if self.id_to_refcount[ident] == 0: if self.id_to_refcount[ident] == 0:
del self.id_to_obj[ident], self.id_to_refcount[ident] del self.id_to_refcount[ident]
util.debug('disposing of obj with id %r', ident)
if ident not in self.id_to_refcount:
# Two-step process in case the object turns out to contain other
# proxy objects (e.g. a managed list of managed lists).
# Otherwise, deleting self.id_to_obj[ident] would trigger the
# deleting of the stored value (another managed object) which would
# in turn attempt to acquire the mutex that is already held here.
self.id_to_obj[ident] = (None, (), None) # thread-safe
util.debug('disposing of obj with id %r', ident)
with self.mutex:
del self.id_to_obj[ident]
# #
# Class to represent state of a manager # Class to represent state of a manager
@ -658,7 +693,7 @@ class BaseProxy(object):
_mutex = util.ForkAwareThreadLock() _mutex = util.ForkAwareThreadLock()
def __init__(self, token, serializer, manager=None, def __init__(self, token, serializer, manager=None,
authkey=None, exposed=None, incref=True): authkey=None, exposed=None, incref=True, manager_owned=False):
with BaseProxy._mutex: with BaseProxy._mutex:
tls_idset = BaseProxy._address_to_local.get(token.address, None) tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None: if tls_idset is None:
@ -680,6 +715,12 @@ class BaseProxy(object):
self._serializer = serializer self._serializer = serializer
self._Client = listener_client[serializer][1] self._Client = listener_client[serializer][1]
# Should be set to True only when a proxy object is being created
# on the manager server; primary use case: nested proxy objects.
# RebuildProxy detects when a proxy is being created on the manager
# and sets this value appropriately.
self._owned_by_manager = manager_owned
if authkey is not None: if authkey is not None:
self._authkey = process.AuthenticationString(authkey) self._authkey = process.AuthenticationString(authkey)
elif self._manager is not None: elif self._manager is not None:
@ -738,6 +779,10 @@ class BaseProxy(object):
return self._callmethod('#GETVALUE') return self._callmethod('#GETVALUE')
def _incref(self): def _incref(self):
if self._owned_by_manager:
util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
return
conn = self._Client(self._token.address, authkey=self._authkey) conn = self._Client(self._token.address, authkey=self._authkey)
dispatch(conn, None, 'incref', (self._id,)) dispatch(conn, None, 'incref', (self._id,))
util.debug('INCREF %r', self._token.id) util.debug('INCREF %r', self._token.id)
@ -822,19 +867,19 @@ class BaseProxy(object):
def RebuildProxy(func, token, serializer, kwds): def RebuildProxy(func, token, serializer, kwds):
''' '''
Function used for unpickling proxy objects. Function used for unpickling proxy objects.
If possible the shared object is returned, or otherwise a proxy for it.
''' '''
server = getattr(process.current_process(), '_manager_server', None) server = getattr(process.current_process(), '_manager_server', None)
if server and server.address == token.address: if server and server.address == token.address:
return server.id_to_obj[token.id][0] util.debug('Rebuild a proxy owned by manager, token=%r', token)
else: kwds['manager_owned'] = True
incref = ( if token.id not in server.id_to_local_proxy_obj:
kwds.pop('incref', True) and server.id_to_local_proxy_obj[token.id] = \
not getattr(process.current_process(), '_inheriting', False) server.id_to_obj[token.id]
) incref = (
return func(token, serializer, incref=incref, **kwds) kwds.pop('incref', True) and
not getattr(process.current_process(), '_inheriting', False)
)
return func(token, serializer, incref=incref, **kwds)
# #
# Functions to create proxies and proxy types # Functions to create proxies and proxy types

View File

@ -1628,13 +1628,33 @@ class _TestContainers(BaseTestCase):
d = [a, b] d = [a, b]
e = self.list(d) e = self.list(d)
self.assertEqual( self.assertEqual(
e[:], [element[:] for element in e],
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
) )
f = self.list([a]) f = self.list([a])
a.append('hello') a.append('hello')
self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
def test_list_proxy_in_list(self):
a = self.list([self.list(range(3)) for _i in range(3)])
self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
a[0][-1] = 55
self.assertEqual(a[0][:], [0, 1, 55])
for i in range(1, 3):
self.assertEqual(a[i][:], [0, 1, 2])
self.assertEqual(a[1].pop(), 2)
self.assertEqual(len(a[1]), 2)
for i in range(0, 3, 2):
self.assertEqual(len(a[i]), 3)
del a
b = self.list()
b.append(b)
del b
def test_dict(self): def test_dict(self):
d = self.dict() d = self.dict()
@ -1646,6 +1666,52 @@ class _TestContainers(BaseTestCase):
self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
def test_dict_proxy_nested(self):
pets = self.dict(ferrets=2, hamsters=4)
supplies = self.dict(water=10, feed=3)
d = self.dict(pets=pets, supplies=supplies)
self.assertEqual(supplies['water'], 10)
self.assertEqual(d['supplies']['water'], 10)
d['supplies']['blankets'] = 5
self.assertEqual(supplies['blankets'], 5)
self.assertEqual(d['supplies']['blankets'], 5)
d['supplies']['water'] = 7
self.assertEqual(supplies['water'], 7)
self.assertEqual(d['supplies']['water'], 7)
del pets
del supplies
self.assertEqual(d['pets']['ferrets'], 2)
d['supplies']['blankets'] = 11
self.assertEqual(d['supplies']['blankets'], 11)
pets = d['pets']
supplies = d['supplies']
supplies['water'] = 7
self.assertEqual(supplies['water'], 7)
self.assertEqual(d['supplies']['water'], 7)
d.clear()
self.assertEqual(len(d), 0)
self.assertEqual(supplies['water'], 7)
self.assertEqual(pets['hamsters'], 4)
l = self.list([pets, supplies])
l[0]['marmots'] = 1
self.assertEqual(pets['marmots'], 1)
self.assertEqual(l[0]['marmots'], 1)
del pets
del supplies
self.assertEqual(l[0]['marmots'], 1)
outer = self.list([[88, 99], l])
self.assertIsInstance(outer[0], list) # Not a ListProxy
self.assertEqual(outer[-1][-1]['feed'], 3)
def test_namespace(self): def test_namespace(self):
n = self.Namespace() n = self.Namespace()
n.name = 'Bob' n.name = 'Bob'