Objetos de memoria compartida en multiprocesamiento

123

Supongamos que tengo una gran matriz numpy en memoria, tengo una función funcque toma esta matriz gigante como entrada (junto con algunos otros parámetros). funccon diferentes parámetros se pueden ejecutar en paralelo. Por ejemplo:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Si uso la biblioteca de multiprocesamiento, esa matriz gigante se copiará varias veces en diferentes procesos.

¿Hay alguna manera de permitir que diferentes procesos compartan la misma matriz? Este objeto de matriz es de solo lectura y nunca se modificará.

Lo que es más complicado, si arr no es una matriz, sino un objeto python arbitrario, ¿hay alguna manera de compartirlo?

[EDITADO]

Leí la respuesta pero todavía estoy un poco confundido. Dado que fork () es copia en escritura, no deberíamos invocar ningún costo adicional al generar nuevos procesos en la biblioteca de multiprocesamiento de python. Pero el siguiente código sugiere que hay una gran sobrecarga:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

salida (y, por cierto, el costo aumenta a medida que aumenta el tamaño de la matriz, por lo que sospecho que todavía hay gastos generales relacionados con la copia de memoria):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

¿Por qué hay una sobrecarga tan grande si no copiamos la matriz? ¿Y qué parte me salva la memoria compartida?

Vendetta
fuente
Has mirado los documentos , ¿verdad?
Lev Levitsky
@FrancisAvila, ¿hay alguna manera de compartir no solo una matriz, sino también objetos arbitrarios de Python?
Vendetta
1
@LevLevitsky Tengo que preguntar, ¿hay alguna manera de compartir no solo una matriz, sino también objetos arbitrarios de Python?
Vendetta
2
Esta respuesta explica muy bien por qué los objetos arbitrarios de Python no se pueden compartir.
Janne Karila

Respuestas:

121

Si utiliza un sistema operativo que utiliza fork()semántica de copia en escritura (como cualquier unix común), siempre y cuando nunca altere su estructura de datos, estará disponible para todos los procesos secundarios sin ocupar memoria adicional. No tendrá que hacer nada especial (excepto asegurarse de no alterar el objeto).

Lo más eficiente que puede hacer para su problema sería empaquetar su matriz en una estructura de matriz eficiente (usando numpyo array), colocarla en la memoria compartida, envolverla multiprocessing.Arrayy pasarla a sus funciones. Esta respuesta muestra cómo hacer eso .

Si desea un objeto compartido grabable , deberá envolverlo con algún tipo de sincronización o bloqueo. multiprocessingproporciona dos métodos para hacer esto : uno usando memoria compartida (adecuada para valores simples, matrices o tipos de letra) o un Managerproxy, donde un proceso retiene la memoria y un administrador arbitra el acceso a ella desde otros procesos (incluso a través de una red).

El Managerenfoque se puede usar con objetos arbitrarios de Python, pero será más lento que el equivalente usando memoria compartida porque los objetos deben ser serializados / deserializados y enviados entre procesos.

Hay una gran cantidad de bibliotecas y enfoques de procesamiento paralelo disponibles en Python . multiprocessinges una biblioteca excelente y bien redondeada, pero si tiene necesidades especiales, quizás uno de los otros enfoques sea mejor.

Francis Avila
fuente
25
Solo para tener en cuenta, en Python fork () en realidad significa copiar en el acceso (porque solo acceder al objeto cambiará su recuento de referencia).
Fabio Zadrozny
3
@FabioZadrozny ¿Copiaría realmente todo el objeto o solo la página de memoria que contiene su recuento?
zigg
55
AFAIK, solo la página de memoria que contiene el recuento (entonces, 4kb en cada acceso de objeto)
Fabio Zadrozny
1
@max Use un cierre. La función dada a apply_asyncdebe hacer referencia al objeto compartido en el alcance directamente en lugar de a través de sus argumentos.
Francis Avila
3
@FrancisAvila, ¿cómo usas un cierre? ¿No debería ser elegible la función que le das a apply_async? ¿O esto es solo una restricción map_async?
GermanK
17

Me encontré con el mismo problema y escribí una pequeña clase de utilidad de memoria compartida para solucionarlo.

Estoy usando multiprocessing.RawArray(sin bloqueo), y también el acceso a las matrices no está sincronizado en absoluto (sin bloqueo), tenga cuidado de no disparar a sus propios pies.

Con la solución obtengo aceleraciones por un factor de aproximadamente 3 en un i7 de cuatro núcleos.

Aquí está el código: siéntase libre de usarlo y mejorarlo, y por favor informe cualquier error.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
martin.preinfalk
fuente
Me acabo de dar cuenta de que tienes que configurar tus matrices de memoria compartida antes de crear el grupo de multiprocesamiento, todavía no sé por qué, pero definitivamente no funcionará al revés.
martin.preinfalk
La razón es que ese grupo de multiprocesamiento llama a fork () cuando se crea una instancia del grupo, por lo que cualquier cosa posterior no tendrá acceso al puntero a ningún elemento compartido creado posteriormente.
Xiv
Cuando probé este código en py35 obtuve una excepción en multiprocessing.sharedctypes.py, así que supongo que este código es solo para py2.
Dr. Hillier Dániel
11

Este es el caso de uso previsto para Ray , que es una biblioteca para Python paralelo y distribuido. Bajo el capó, serializa objetos usando el diseño de datos de Apache Arrow (que es un formato de copia cero) y los almacena en un almacén de objetos de memoria compartida para que puedan acceder a ellos mediante múltiples procesos sin crear copias.

El código se vería así.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

Si no llama ray.put, la matriz seguirá almacenada en la memoria compartida, pero eso se hará una vez por invocación func, que no es lo que desea.

Tenga en cuenta que esto funcionará no solo para matrices, sino también para objetos que contengan matrices , por ejemplo, diccionarios que mapean entradas a matrices como se muestra a continuación.

Puede comparar el rendimiento de la serialización en Ray versus pickle ejecutando lo siguiente en IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

La serialización con Ray es solo un poco más rápida que el pickle, pero la deserialización es 1000 veces más rápida debido al uso de memoria compartida (este número, por supuesto, dependerá del objeto).

Ver la documentación de Ray . Puede leer más sobre la serialización rápida con Ray y Arrow . Tenga en cuenta que soy uno de los desarrolladores de Ray.

Robert Nishihara
fuente
1
Ray suena bien! Pero, he intentado usar esta biblioteca antes, pero desafortunadamente, me di cuenta de que Ray no es compatible con Windows. Espero que puedan apoyar Windows lo antes posible. ¡Gracias desarrolladores!
Hzzkygcs
5

Como mencionó Robert Nishihara, Apache Arrow lo hace fácil, específicamente con el almacén de objetos en memoria Plasma, en el que se basa Ray.

Hice plasma cerebral específicamente por este motivo: carga y recarga rápidas de objetos grandes en una aplicación Flask. Es un espacio de nombres de objetos de memoria compartida para objetos serializables de Apache Arrow, incluyendo pickle'd bytestrings generados por pickle.dumps(...).

La diferencia clave con Apache Ray y Plasma es que realiza un seguimiento de las ID de los objetos por usted. Cualquier proceso, subproceso o programa que se ejecute localmente puede compartir los valores de las variables llamando al nombre desde cualquier Brainobjeto.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]
russellthehippo
fuente