¿Cómo puedo recuperar el valor de retorno de una función pasada al multiprocesamiento. Proceso?

190

En el código de ejemplo a continuación, me gustaría recuperar el valor de retorno de la función worker. ¿Cómo puedo hacer esto? ¿Dónde se almacena este valor?

Código de ejemplo:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Salida:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Parece que no puedo encontrar el atributo relevante en los objetos almacenados jobs.

blz
fuente

Respuestas:

189

Use la variable compartida para comunicarse. Por ejemplo así:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()
vartec
fuente
46
Recomendaría usar un multiprocessing.Queue, en lugar de un Manageraquí. El uso de Managerrequiere generar un proceso completamente nuevo, que es excesivo cuando Queuelo haría.
dano
1
@dano: Me pregunto, si usamos el objeto Queue (), no podemos asegurar el orden cuando cada proceso devuelve el valor. Quiero decir, si necesitamos el orden en el resultado, para hacer el próximo trabajo. ¿Cómo podríamos estar seguros de dónde exactamente qué salida es de qué proceso?
Catbuilts
44
@Catbuilts Puede devolver una tupla de cada proceso, donde un valor es el valor de retorno real que le interesa y el otro es un identificador único del proceso. Pero también me pregunto por qué necesita saber qué proceso está devolviendo qué valor. Si eso es lo que realmente necesita saber sobre el proceso, ¿o necesita correlacionar entre su lista de entradas y la lista de salidas? En ese caso, recomendaría usar multiprocessing.Pool.mappara procesar su lista de elementos de trabajo.
dano
55
advertencias para funciones con un solo argumento : debería usar args=(my_function_argument, ). Tenga en cuenta la ,coma aquí! De lo contrario, Python se quejará de "argumentos posicionales faltantes". Me tomó 10 minutos entenderlo. Compruebe también el uso manual (en la sección "clase de proceso").
yuqli
2
@vartec un inconveniente de usar un multiprocesamiento. El diccionario Manager () es que encurtidos (serializa) el objeto que devuelve, por lo que tiene un cuello de botella dado por la biblioteca de encurtidos de un tamaño máximo de 2GiB para que el objeto regrese. ¿Hay alguna otra forma de hacer esto evitando la serialización del objeto devuelto?
hirschme
68

Creo que el enfoque sugerido por @sega_sai es el mejor. Pero realmente necesita un ejemplo de código, así que aquí va:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Lo que imprimirá los valores de retorno:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Si está familiarizado con map(el Python 2 incorporado) esto no debería ser demasiado desafiante. De lo contrario, eche un vistazo al enlace de sega_Sai .

Tenga en cuenta la poca cantidad de código que se necesita. (También tenga en cuenta cómo se reutilizan los procesos).

marca
fuente
1
¿Alguna idea de por qué mi getpid()devolución tiene el mismo valor? Estoy ejecutando Python3
zelusp
No estoy seguro de cómo Pool distribuye las tareas entre los trabajadores. ¿Quizás todos puedan terminar en el mismo trabajador si son realmente rápidos? ¿Sucede constantemente? También si agrega un retraso?
Mark
También pensé que era una cuestión relacionada con la velocidad, pero cuando alimento pool.mapun rango de 1,000,000 usando más de 10 procesos, veo como máximo dos pids diferentes.
zelusp
1
Entonces no estoy seguro. Creo que sería interesante abrir una pregunta separada para esto.
Marque el
Si desea enviar una función diferente a cada proceso, use pool.apply_async: docs.python.org/3/library/…
Kyle
24

Este ejemplo muestra cómo usar una lista de multiprocesamiento. Instancias de tubería para devolver cadenas de un número arbitrario de procesos:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Salida:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Esta solución utiliza menos recursos que un multiprocessing.Queue qué usos

  • un tubo
  • al menos una cerradura
  • un amortiguador
  • un hilo

o un multiprocesamiento.SimpleQueue que utiliza

  • un tubo
  • al menos una cerradura

Es muy instructivo mirar la fuente de cada uno de estos tipos.

David Cullen
fuente
¿Cuál sería la mejor manera de hacerlo sin convertir las tuberías en una variable global?
Nickpick el
Puse todos los datos y códigos globales en una función principal y funciona igual. Eso responde tu pregunta?
David Cullen
¿la tubería siempre debe leerse antes de que se le pueda agregar (enviar) un nuevo valor?
Nickpick
+1, buena respuesta. Pero acerca de que la solución es más eficiente, la compensación es que está haciendo uno Pipepor proceso frente a uno Queuepara todos los procesos. No sé si eso termina siendo más eficiente en todos los casos.
sudo
2
Esta respuesta provoca un punto muerto si el objeto devuelto es grande. En lugar de hacer proc.join () primero, primero intentaría recv () el valor de retorno y luego uniría.
L. Pes
22

Por alguna razón, no pude encontrar un ejemplo general de cómo hacer esto en Queueningún lugar (incluso los ejemplos de documentos de Python no generan múltiples procesos), así que esto es lo que obtuve trabajando después de 10 intentos:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queuees una cola de bloqueo segura para subprocesos que puede usar para almacenar los valores de retorno de los procesos secundarios. Por lo tanto, debe pasar la cola a cada proceso. Algo menos obvio aquí es que usted tiene que get()partir de la cola antes de que joinlos Processes o bien la cola se llena y bloquea todo.

Actualización para aquellos que están orientados a objetos (probado en Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
sudo
fuente
18

Para cualquier otra persona que esté buscando cómo obtener un valor de un Processuso Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()
Matthew Moisen
fuente
1
cuando pongo algo en una cola en mi proceso de trabajo, nunca se llega a mi unión. ¿Alguna idea de cómo podría venir esto?
Laurens Koppenol el
@LaurensKoppenol, ¿quiere decir que su código principal se cuelga en p.join () de forma permanente y nunca continúa? ¿Tu proceso tiene un bucle infinito?
Matthew Moisen el
44
Sí, cuelga allí infinitamente. Todos mis trabajadores terminan (finaliza el ciclo dentro de la función de trabajador, se imprime la declaración posterior para todos los trabajadores) La unión no hace nada. Si Queuejoin()
elimino
@LaurensKoppenol ¿Quizás no estás llamando queue.put(ret)antes de llamar p.start()? En ese caso, el subproceso de trabajo se bloqueará para queue.get()siempre. Puede replicar esto copiando mi fragmento de arriba mientras comenta queue.put(ret).
Matthew Moisen
Edité esta respuesta, la queue.get()tiene que suceder antes de la p.join(). Funciona ahora para mí.
jfunk
10

Puede usar el exitincorporado para establecer el código de salida de un proceso. Se puede obtener del exitcodeatributo del proceso:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Salida:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
David Cullen
fuente
44
Tenga en cuenta que este enfoque podría ser confuso. Los procesos generalmente deben salir con el código de salida 0 si se completan sin error. Si tiene algo que monitorea los códigos de salida del proceso de su sistema, entonces puede verlos como errores.
ferrouswheel
1
Perfecto si solo desea generar una excepción en el proceso principal en caso de error.
crizCraig
5

El paquete de guijarros tiene un buen apalancamiento de abstracción multiprocessing.Pipeque lo hace bastante sencillo:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Ejemplo de: https://pythonhosted.org/Pebble/#concurrent-decorators

erikreed
fuente
3

Pensé que simplificaría los ejemplos más simples copiados desde arriba, trabajando para mí en Py3.6. Más simple es multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

Puede establecer el número de procesos en la piscina con, por ejemplo, Pool(processes=5). Sin embargo, el valor predeterminado es el recuento de CPU, así que déjelo en blanco para las tareas vinculadas a la CPU. (Las tareas vinculadas a E / S a menudo se adaptan a los subprocesos de todos modos, ya que los subprocesos en su mayoría están esperando, por lo que pueden compartir un núcleo de CPU). PoolTambién se aplica la optimización de fragmentación .

(Tenga en cuenta que el método de trabajo no se puede anidar dentro de un método. Inicialmente definí mi método de trabajo dentro del método al que hace la llamada pool.map, para mantenerlo todo autocontenido, pero luego los procesos no pudieron importarlo, y arrojé "AttributeError : No se puede encurtir el objeto local external_method..inner_method ". Más aquí . Puede estar dentro de una clase.)

(Aprecio la impresión original de la pregunta especificada en 'represent!'lugar de hacerlo time.sleep(), pero sin ella pensé que algunos códigos se ejecutaban simultáneamente cuando no era así).


Py3 ProcessPoolExecutortambién tiene dos líneas ( .mapdevuelve un generador, por lo que necesita list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

Con simples Processes:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

Úselo SimpleQueuesi todo lo que necesita es puty get. El primer bucle inicia todos los procesos, antes de que el segundo realice las queue.getllamadas de bloqueo . No creo que haya ninguna razón para llamar p.join()también.

Chris
fuente
2

Una solución simple:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Salida:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Rubens_Zimbres
fuente
2

Si está utilizando Python 3, puede usarlo concurrent.futures.ProcessPoolExecutorcomo una abstracción conveniente:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Salida:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Aleph Aleph
fuente
0

Modifiqué un poco la respuesta de vartec ya que necesitaba obtener los códigos de error de la función. (Gracias vertec !!! es un truco increíble)

Esto también se puede hacer con un manager.listpero creo que es mejor tenerlo en un dict y almacenar una lista dentro de él. De esa forma, mantenemos la función y los resultados, ya que no podemos estar seguros del orden en que se completará la lista.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j
pelos
fuente