Utilice la matriz numpy en la memoria compartida para multiprocesamiento

111

Me gustaría usar una matriz numpy en memoria compartida para usar con el módulo de multiprocesamiento. La dificultad es usarlo como una matriz numpy, y no solo como una matriz ctypes.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

Esto produce resultados como:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

Se puede acceder a la matriz de una manera ctypes, por ejemplo, arr[i]tiene sentido. Sin embargo, no es una matriz numerosa y no puedo realizar operaciones como -1*arr, o arr.sum(). Supongo que una solución sería convertir la matriz ctypes en una matriz numpy. Sin embargo (además de no poder hacer que esto funcione), no creo que se comparta más.

Parece que habría una solución estándar para lo que tiene que ser un problema común.

Ian Langmore
fuente
1
¿No es lo mismo que este? stackoverflow.com/questions/5033799/…
pygabriel
1
No es exactamente la misma pregunta. La pregunta vinculada es preguntar sobre en subprocesslugar de multiprocessing.
Andrew

Respuestas:

82

Para agregar a las respuestas de @ unutbu (ya no está disponible) y @Henry Gomersall. Puede utilizar shared_arr.get_lock()para sincronizar el acceso cuando sea necesario:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Ejemplo

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

Si no necesita acceso sincronizado o crea sus propias cerraduras, entonces no mp.Array()es necesario. Podrías usar mp.sharedctypes.RawArrayen este caso.

jfs
fuente
2
¡Hermosa respuesta! Si quiero tener más de una matriz compartida, cada una de ellas bloqueable por separado, pero con la cantidad de matrices determinada en tiempo de ejecución, ¿es una extensión sencilla de lo que ha hecho aquí?
Andrew
3
@Andrew: las matrices compartidas deben crearse antes de que se generen los procesos secundarios.
jfs
Buen comentario sobre el orden de las operaciones. Sin embargo, eso es lo que tenía en mente: crear un número de matrices compartidas especificado por el usuario y luego generar algunos procesos secundarios. ¿Es eso sencillo?
Andrew
1
@Chicony: no puede cambiar el tamaño de la matriz. Piense en ello como un bloque de memoria compartido que debe asignarse antes de que se inicien los procesos secundarios. No necesita usar toda la memoria, por ejemplo, podría pasar counta numpy.frombuffer(). Puede intentar hacerlo en un nivel inferior usando mmapo algo como posix_ipcdirectamente para implementar un análogo de RawArray redimensionable (podría implicar copiar mientras redimensiona) (o buscar una biblioteca existente). O si su tarea lo permite: copie los datos en partes (si no los necesita todos a la vez). "Cómo cambiar el tamaño de una memoria compartida" es una buena pregunta aparte.
jfs
1
@umopapisdn: Pool()define el número de procesos (el número de núcleos de CPU disponibles se utiliza por defecto). Mes el número de veces f()que se llama a la función.
jfs
21

El Arrayobjeto tiene un get_obj()método asociado con él, que devuelve la matriz ctypes que presenta una interfaz de búfer. Creo que lo siguiente debería funcionar ...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

Cuando se ejecuta, imprime el primer elemento de aahora 10.0, mostrando ay bson solo dos vistas en la misma memoria.

Para asegurarse de que todavía es seguro para multiprocesador, creo que tendrá que usar los métodos acquirey releaseque existen en el Arrayobjeto a, y su bloqueo integrado para asegurarse de que se acceda a todo de manera segura (aunque no soy un experto en el módulo multiprocesador).

Henry Gomersall
fuente
no funcionará sin sincronización como @unutbu demostró en su respuesta (ahora eliminada).
jfs
1
Presumiblemente, si solo desea acceder al procesamiento posterior de la matriz, puede hacerlo de manera limpia sin preocuparse por problemas de concurrencia y bloqueo.
Henry Gomersall
en este caso no es necesario mp.Array.
jfs
1
El código de procesamiento puede requerir matrices bloqueadas, pero la interpretación posterior al procesamiento de los datos no necesariamente. Supongo que esto proviene de comprender cuál es exactamente el problema. Claramente, acceder a los datos compartidos al mismo tiempo requerirá cierta protección, ¡lo cual pensé que sería obvio!
Henry Gomersall
16

Si bien las respuestas ya dadas son buenas, hay una solución mucho más fácil a este problema siempre que se cumplan dos condiciones:

  1. Está en un sistema operativo compatible con POSIX (por ejemplo, Linux, Mac OSX); y
  2. Los procesos secundarios necesitan acceso de solo lectura a la matriz compartida.

En este caso, no es necesario jugar con la creación explícita de variables compartidas, ya que los procesos secundarios se crearán mediante una bifurcación. Un niño bifurcado comparte automáticamente el espacio de memoria de los padres. En el contexto del multiprocesamiento de Python, esto significa que comparte todas las variables de nivel de módulo ; tenga en cuenta que esto no es válido para los argumentos que pasa explícitamente a sus procesos secundarios o a las funciones que llama en a multiprocessing.Poolo así.

Un simple ejemplo:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))
EelkeSpaak
fuente
3
+1 Información realmente valiosa. ¿Puede explicar por qué solo se comparten las variables de nivel de módulo? ¿Por qué las variables locales no forman parte del espacio de memoria de los padres? Por ejemplo, ¿por qué no puede funcionar esto si tengo una función F con var local V y una función G dentro de F que hace referencia a V?
Coffee_Table
5
Advertencia: esta respuesta es un poco engañosa. El proceso hijo recibe una copia del estado del proceso padre, incluidas las variables globales, en el momento de la bifurcación. Los estados no están sincronizados de ninguna manera y divergirán a partir de ese momento. Esta técnica puede ser útil en algunos escenarios (por ejemplo: bifurcar procesos secundarios ad-hoc que manejan una instantánea del proceso principal y luego terminan), pero es inútil en otros (por ejemplo: procesos secundarios de larga ejecución que tienen que compartir y sincronizar datos con el proceso principal).
David Stein
4
@EelkeSpaak: Su afirmación - "un niño bifurcado comparte automáticamente el espacio de memoria del padre" - es incorrecta. Si tengo un proceso secundario que quiere monitorear el estado del proceso principal, de una manera estrictamente de solo lectura, la bifurcación no me llevará allí: el niño solo ve una instantánea del estado principal en el momento de la bifurcación. De hecho, eso es precisamente lo que estaba tratando de hacer (siguiendo su respuesta) cuando descubrí esta limitación. De ahí la posdata de su respuesta. En pocas palabras: el estado padre no se "comparte", sino que simplemente se copia al hijo. Eso no es "compartir" en el sentido habitual.
David Stein
2
¿Me equivoco al pensar que se trata de una situación de copia sobre escritura, al menos en los sistemas posix? Es decir, después de la bifurcación, creo que la memoria se comparte hasta que se escriben nuevos datos, momento en el que se crea una copia. Así que sí, es cierto que los datos no se "comparten" exactamente, pero pueden proporcionar un aumento de rendimiento potencialmente enorme. Si su proceso es de solo lectura, ¡no habrá gastos de copia! ¿He entendido el punto correctamente?
remitente
2
@senderle Sí, ¡eso es exactamente lo que quise decir! De ahí mi punto (2) en la respuesta sobre el acceso de solo lectura.
EelkeSpaak
11

Escribí un pequeño módulo de Python que usa la memoria compartida POSIX para compartir matrices numpy entre intérpretes de Python. Quizás le resulte útil.

https://pypi.python.org/pypi/SharedArray

Así es como funciona:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
estera
fuente
8

Puede utilizar el sharedmemmódulo: https://bitbucket.org/cleemesser/numpy-sharedmem

Entonces, aquí está su código original, esta vez usando memoria compartida que se comporta como una matriz NumPy (tenga en cuenta la última declaración adicional que llama a una sum()función NumPy ):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()
Velimir Mlaker
fuente
1
Nota: esto ya no se está desarrollando y no parece funcionar en linux github.com/sturlamolden/sharedmem-numpy/issues/4
AD
numpy-sharedmem puede no estar en desarrollo, pero aún funciona en Linux, consulte github.com/vmlaker/benchmark-sharedmem .
Velimir Mlaker