multiprocesamiento: ¿compartir un gran objeto de solo lectura entre procesos?

107

¿Los procesos secundarios generados a través del multiprocesamiento comparten objetos creados anteriormente en el programa?

Tengo la siguiente configuración:

do_some_processing(filename):
    for line in file(filename):
        if line.split(',')[0] in big_lookup_object:
            # something here

if __name__ == '__main__':
    big_lookup_object = marshal.load('file.bin')
    pool = Pool(processes=4)
    print pool.map(do_some_processing, glob.glob('*.data'))

Estoy cargando un gran objeto en la memoria, luego creo un grupo de trabajadores que necesitan hacer uso de ese gran objeto. Se accede al objeto grande de solo lectura, no necesito pasar modificaciones entre procesos.

Mi pregunta es: ¿el objeto grande está cargado en la memoria compartida, como lo estaría si generara un proceso en unix / c, o cada proceso carga su propia copia del objeto grande?

Actualización: para aclarar más: big_lookup_object es un objeto de búsqueda compartido. No necesito dividir eso y procesarlo por separado. Necesito guardar una sola copia. El trabajo que necesito dividir es leer muchos otros archivos grandes y buscar los elementos en esos archivos grandes con el objeto de búsqueda.

Actualización adicional: la base de datos es una buena solución, memcached podría ser una mejor solución y el archivo en disco (shelve o dbm) podría ser incluso mejor. En esta pregunta estaba particularmente interesado en una solución de memoria. Para la solución final, usaré hadoop, pero quería ver si también puedo tener una versión local en memoria.

Parand
fuente
su código tal como está escrito llamará marshal.loada los padres y a cada hijo (cada proceso importa el módulo).
jfs
Tienes razón, corregido.
Parand
Para "local en memoria" y si desea evitar copiar lo siguiente, podría ser útil: docs.python.org/library/…
jfs
compartir no. Los procesos generados (es decir, fork o exec, por ejemplo) es un duplicado exacto del proceso de llamada ... pero en una memoria diferente. Para que un proceso se comunique con otro, necesita comunicación entre procesos o lectura / escritura de IPC en alguna ubicación de memoria compartida .
ron

Respuestas:

50

"¿Los procesos secundarios generados a través del multiprocesamiento comparten objetos creados anteriormente en el programa?"

No (python antes de 3.8) y Sí en 3.8 ( https://docs.python.org/3/library/multiprocessing.shared_memory.html#module-multiprocessing.shared_memory )

Los procesos tienen espacio de memoria independiente.

Solución 1

Para aprovechar al máximo una estructura grande con muchos trabajadores, haga esto.

  1. Escriba cada trabajador como un "filtro": lee los resultados intermedios de stdin, funciona, escribe resultados intermedios en stdout.

  2. Conecte a todos los trabajadores como una tubería:

    process1 <source | process2 | process3 | ... | processn >result

Cada proceso lee, trabaja y escribe.

Esto es notablemente eficiente ya que todos los procesos se ejecutan al mismo tiempo. Las escrituras y lecturas pasan directamente a través de búferes compartidos entre los procesos.


Solucion 2

En algunos casos, tiene una estructura más compleja, a menudo una estructura "en abanico". En este caso, tiene un padre con varios hijos.

  1. El padre abre los datos de origen. El padre bifurca a varios hijos.

  2. El padre lee la fuente, distribuye partes de la fuente a cada hijo que se ejecuta simultáneamente.

  3. Cuando el padre llegue al final, cierre la tubería. El niño termina el archivo y termina normalmente.

Las partes secundarias son agradables de escribir porque cada niño simplemente lee sys.stdin.

El padre tiene un poco de juego de pies elegante para engendrar a todos los hijos y retener las tuberías correctamente, pero no es tan malo.

Fan-in es la estructura opuesta. Varios procesos que se ejecutan de forma independiente deben intercalar sus entradas en un proceso común. El recopilador no es tan fácil de escribir, ya que tiene que leer de muchas fuentes.

La lectura de muchas canalizaciones con nombre a menudo se realiza utilizando el selectmódulo para ver qué canalizaciones tienen entradas pendientes.


Solución 3

La búsqueda compartida es la definición de una base de datos.

Solución 3A: cargue una base de datos. Deje que los trabajadores procesen los datos en la base de datos.

Solución 3B: cree un servidor muy simple usando werkzeug (o similar) para proporcionar aplicaciones WSGI que respondan a HTTP GET para que los trabajadores puedan consultar el servidor.


Solución 4

Objeto de sistema de archivos compartido. Unix OS ofrece objetos de memoria compartida. Estos son solo archivos que se asignan a la memoria para que se realice el intercambio de E / S en lugar de más lecturas almacenadas en búfer por convención.

Puede hacer esto desde un contexto de Python de varias maneras

  1. Escriba un programa de inicio que (1) rompa su objeto gigantesco original en objetos más pequeños y (2) inicie trabajadores, cada uno con un objeto más pequeño. Los objetos más pequeños podrían ser objetos de Python encurtidos para ahorrar un poco de tiempo de lectura de archivos.

  2. Escriba un programa de inicio que (1) lea su objeto gigantesco original y escriba un archivo codificado por bytes con estructura de página utilizando seekoperaciones para garantizar que las secciones individuales sean fáciles de encontrar con búsquedas simples. Esto es lo que hace un motor de base de datos: dividir los datos en páginas, hacer que cada página sea fácil de localizar mediante un archivo seek.

    Genere trabajadores con acceso a este archivo de gran tamaño con estructura de página. Cada trabajador puede buscar las partes relevantes y hacer su trabajo allí.

S.Lott
fuente
Mis procesos no son realmente adaptadores; son todos iguales, solo procesan diferentes piezas de datos.
Parand del
A menudo se pueden estructurar como filtros. Leen su pieza de datos, hacen su trabajo y escriben su resultado para su posterior procesamiento.
S.Lott
Me gusta su solución, pero ¿qué pasa con el bloqueo de E / S? ¿Qué pasa si el padre bloquea la lectura / escritura de / hacia uno de sus hijos? Select le notifica que puede escribir, pero no dice cuánto. Lo mismo para la lectura.
Cristian Ciupitu
Estos son procesos separados: padres e hijos no interfieren entre sí. Cada byte producido en un extremo de una tubería está disponible inmediatamente en el otro extremo para ser consumido: una tubería es un búfer compartido. No estoy seguro de lo que significa su pregunta en este contexto.
S.Lott
Puedo verificar lo que dijo S.Lott. Necesitaba las mismas operaciones realizadas en un solo archivo. Entonces, el primer trabajador ejecutó su función en cada línea con el número% 2 == 0 y lo guardó en un archivo, y envió las otras líneas al siguiente proceso canalizado (que era el mismo script). El tiempo de ejecución se redujo a la mitad. Es un poco hacky, pero la sobrecarga es mucho más ligera que map / poop en el módulo de multiprocesamiento.
Vince
36

¿Los procesos secundarios generados a través del multiprocesamiento comparten objetos creados anteriormente en el programa?

Depende. Para las variables globales de solo lectura, a menudo se puede considerar así (aparte de la memoria consumida); de lo contrario, no debería.

La documentación del multiprocesamiento dice:

Better to inherit than pickle/unpickle

En Windows, muchos tipos de multiprocesamiento deben poder seleccionarse para que los procesos secundarios puedan usarlos. Sin embargo, generalmente se debe evitar enviar objetos compartidos a otros procesos que utilicen canalizaciones o colas. En su lugar, debe organizar el programa de modo que un proceso que necesita acceso a un recurso compartido creado en otro lugar pueda heredarlo de un proceso anterior.

Explicitly pass resources to child processes

En Unix, un proceso hijo puede hacer uso de un recurso compartido creado en un proceso padre utilizando un recurso global. Sin embargo, es mejor pasar el objeto como argumento al constructor del proceso hijo.

Además de hacer que el código sea (potencialmente) compatible con Windows, esto también asegura que mientras el proceso hijo siga vivo, el objeto no será recolectado como basura en el proceso padre. Esto puede ser importante si se libera algún recurso cuando el objeto se recolecta como basura en el proceso principal.

Global variables

Tenga en cuenta que si el código que se ejecuta en un proceso secundario intenta acceder a una variable global, entonces el valor que ve (si lo hay) puede no ser el mismo que el valor en el proceso principal en el momento en que se llamó a Process.start () .

Ejemplo

En Windows (CPU única):

#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool

x = 23000 # replace `23` due to small integers share representation
z = []    # integers are immutable, let's try mutable object

def printx(y):
    global x
    if y == 3:
       x = -x
    z.append(y)
    print os.getpid(), x, id(x), z, id(z) 
    print y
    if len(sys.argv) == 2 and sys.argv[1] == "sleep":
       time.sleep(.1) # should make more apparant the effect

if __name__ == '__main__':
    pool = Pool(processes=4)
    pool.map(printx, (1,2,3,4))

Con sleep:

$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4

Sin sleep:

$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4
jfs
fuente
6
¿Eh? ¿Cómo se comparte z en los procesos?
cbare
4
@cbare: ¡Buena pregunta! De hecho, z no se comparte, como muestra la salida con sleep. La salida sin dormir muestra que un solo proceso maneja (PID = 1148) todo el trabajo; lo que vemos en el último ejemplo es el valor de z para este único proceso.
Eric O Lebigot
Esta respuesta muestra que zno se comparte. Por tanto, esto responde a la pregunta con: "no, al menos en Windows, una variable padre no se comparte entre los hijos".
Eric O Lebigot
@EOL: técnicamente tiene razón, pero en la práctica, si los datos son de solo lectura (a diferencia del zcaso), se pueden considerar compartidos.
jfs
Solo para aclarar, la declaración Tenga en cuenta que si el código que se ejecuta en un proceso hijo intenta acceder a una variable global ... en los documentos 2.7 se refiere a Python que se ejecuta en Windows.
user1071847
28

S. Lot tiene razón. Los atajos de multiprocesamiento de Python le brindan efectivamente una porción de memoria separada y duplicada.

En la mayoría de los sistemas * nix, usar una llamada de nivel inferior a le os.fork()dará, de hecho, memoria de copia sobre escritura, que podría ser lo que está pensando. AFAIK, en teoría, en el programa más simplista posible, podría leer esos datos sin tenerlos duplicados.

Sin embargo, las cosas no son tan simples en el intérprete de Python. Los datos del objeto y los metadatos se almacenan en el mismo segmento de memoria, por lo que incluso si el objeto nunca cambia, algo como un contador de referencia para ese objeto que se incrementa provocará una escritura en la memoria y, por lo tanto, una copia. Casi cualquier programa de Python que esté haciendo más que "imprimir 'hola'" provocará incrementos en el recuento de referencias, por lo que es probable que nunca se dé cuenta del beneficio de copiar sobre escritura.

Incluso si alguien lograra piratear una solución de memoria compartida en Python, intentar coordinar la recolección de basura en los procesos probablemente sería bastante doloroso.

Jarret Hardie
fuente
3
En ese caso, solo se copiará la región de memoria del recuento de referencias, no necesariamente los datos grandes de solo lectura, ¿no es así?
kawing-chiu
7

Si está ejecutando en Unix, es posible que compartan el mismo objeto, debido a cómo funciona la bifurcación (es decir, los procesos secundarios tienen una memoria separada pero es de copia en escritura, por lo que puede compartirse siempre que nadie la modifique). Intenté lo siguiente:

import multiprocessing

x = 23

def printx(y):
    print x, id(x)
    print y

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(printx, (1,2,3,4))

y obtuvo la siguiente salida:

$ ./mtest.py
23 22995656
1
23 22995656
2
23 22995656
3
23 22995656
4

Por supuesto, esto no prueba que no se haya realizado una copia, pero debería poder verificar eso en su situación mirando la salida de pspara ver cuánta memoria real está usando cada subproceso.

Jacob Gabrielson
fuente
2
¿Y el recolector de basura? ¿Qué pasa cuando se ejecuta? ¿No cambia el diseño de la memoria?
Cristian Ciupitu
Esa es una preocupación válida. Si afectaría a Parand dependería de cómo esté usando todo esto y cuán confiable tenga que ser este código. Si no funcionara para él, recomendaría usar el módulo mmap para tener más control (suponiendo que quiera seguir con este enfoque básico).
Jacob Gabrielson
He publicado una actualización para su ejemplo: stackoverflow.com/questions/659865/…
jfs
@JacobGabrielson: Se hace la copia. La pregunta original es si se realiza la copia.
abhinavkulkarni
3

Los diferentes procesos tienen diferentes espacios de direcciones. Como ejecutar diferentes instancias del intérprete. Para eso es IPC (comunicación entre procesos).

Puede utilizar colas o canalizaciones para este propósito. También puede usar rpc sobre tcp si desea distribuir los procesos a través de una red más adelante.

http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes

Vasil
fuente
2
No creo que IPC sea apropiado para esto; se trata de datos de solo lectura a los que todo el mundo necesita acceder. No tiene sentido pasarlo entre procesos; en el peor de los casos, cada uno puede leer su propia copia. Estoy intentando ahorrar memoria al no tener una copia separada en cada proceso.
Parand del
Puede tener un proceso maestro que delegue datos para trabajar en los otros procesos esclavos. Los esclavos pueden solicitar datos o pueden enviar datos. De esta forma, no todos los procesos tendrán una copia del objeto completo.
Vasil
1
@Vasil: ¿Qué pasa si cada proceso necesita todo el conjunto de datos y solo ejecuta una operación diferente en él?
Será el
1

No está directamente relacionado con el multiprocesamiento per se, pero a partir de su ejemplo, parecería que podría usar el módulo de estantería o algo así. ¿El "big_lookup_object" realmente tiene que estar completamente en la memoria?


fuente
Buen punto, no he comparado directamente el rendimiento del disco con el de la memoria. Había asumido que habría una gran diferencia, pero en realidad no lo he probado.
Parand del
1

No, pero puede cargar sus datos como un proceso secundario y permitirle compartir sus datos con otros niños. vea abajo.

import time
import multiprocessing

def load_data( queue_load, n_processes )

    ... load data here into some_variable

    """
    Store multiple copies of the data into
    the data queue. There needs to be enough
    copies available for each process to access. 
    """

    for i in range(n_processes):
        queue_load.put(some_variable)


def work_with_data( queue_data, queue_load ):

    # Wait for load_data() to complete
    while queue_load.empty():
        time.sleep(1)

    some_variable = queue_load.get()

    """
    ! Tuples can also be used here
    if you have multiple data files
    you wish to keep seperate.  
    a,b = queue_load.get()
    """

    ... do some stuff, resulting in new_data

    # store it in the queue
    queue_data.put(new_data)


def start_multiprocess():

    n_processes = 5

    processes = []
    stored_data = []

    # Create two Queues
    queue_load = multiprocessing.Queue()
    queue_data = multiprocessing.Queue()

    for i in range(n_processes):

        if i == 0:
            # Your big data file will be loaded here...
            p = multiprocessing.Process(target = load_data,
            args=(queue_load, n_processes))

            processes.append(p)
            p.start()   

        # ... and then it will be used here with each process
        p = multiprocessing.Process(target = work_with_data,
        args=(queue_data, queue_load))

        processes.append(p)
        p.start()

    for i in range(n_processes)
        new_data = queue_data.get()
        stored_data.append(new_data)    

    for p in processes:
        p.join()
    print(processes)    
Mott la tupla
fuente
-4

Para la plataforma Linux / Unix / MacOS, forkmap es una solución rápida y sucia.

Maxim Imakaev
fuente