¿Cómo implementar paralelo, retrasado de tal manera que el ciclo paralelo para se detiene cuando la salida va por debajo de un umbral?

8

Supongamos que tengo el siguiente código:

from scipy import *
import multiprocessing as mp
num_cores = mp.cpu_count()
from joblib import Parallel, delayed
import matplotlib.pyplot as plt

def func(x,y):
    return y/x
def main(y, xmin,xmax, dx):
    x = arange(xmin,xmax,dx)
    output = Parallel(n_jobs=num_cores)(delayed(func)(i, y) for i in x)
    return x, asarray(output)
def demo():
    x,z = main(2.,1.,30.,.1)
    plt.plot(x,z, label='All values')
    plt.plot(x[z>.1],z[z>.1], label='desired range') ## This is better to do in main()
    plt.show()

demo()

Quiero calcular la salida solo hasta que la salida> un número dado (se puede suponer que los elementos de salida disminuyen monotónicamente con el aumento de x) y luego se detienen (NO calcular para todos los valores de x y luego ordenarlos, eso es ineficiente para mi propósito). ¿Hay alguna manera de hacer eso usando Paralelo, retrasado o cualquier otro multiprocesamiento?

usuario247534
fuente
Puedes usar numpy también. He agregado algunos números. La selección [z> .1] en la función de demostración debe hacerse en la función principal para que el código sea más eficiente.
user247534
Sé que sería complicado, pero crearía una lista, la pasaría a la función y la función agregaría el resultado a esa lista. Luego, afuera verificaría si la lista contiene un número mayor que ese y luego terminaría los hilos de alguna manera. Ahora que pienso en esto, probablemente haya métodos más inteligentes para hacer esto, como las colas
Maxxik CZ

Respuestas:

1

No se output > a given numberespecificó, así que solo inventé uno. Después de la prueba tuve que revertir la condición para una operación adecuada output < a given number.

Usaría un grupo, iniciaría los procesos con una función de devolución de llamada para verificar la condición de detención, y luego terminaría el grupo cuando esté listo. pero eso provocaría una condición de carrera que permitiría omitir los resultados de los procesos en ejecución que no se les permitió finalizar. Creo que este método tiene una modificación mínima en su código y es muy fácil de leer. El orden de la lista NO está garantizado.

Pros: muy poca sobrecarga.
Contras: podrían haber faltado resultados.

Método 1)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
        output.append(ret)
        if ret < stop_condition:
            worker_pool.terminate()


def func(x, y, ):
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

Este método tiene más sobrecarga pero permitirá procesos que han comenzado a finalizar. Método 2)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
    if ret is not None:
        if ret < stop_condition:
            worker_stop.value = 1
        else:
            output.append(ret)


def func(x, y, ):
    if worker_stop.value != 0:
        return None
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
worker_stop = multiprocessing.Value('i', 0)
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

Método 3) Pros: No se dejarán resultados fuera
Contras: Esto se aleja de lo que normalmente haría.

tome el Método 1 y agregue

def stopPoolButLetRunningTaskFinish(pool):
    # Pool() shutdown new task from being started, by emptying the query all worker processes draw from
    while pool._task_handler.is_alive() and pool._inqueue._reader.poll():
        pool._inqueue._reader.recv()
    # Send sentinels to all worker processes
    for a in range(len(pool._pool)):
            pool._inqueue.put(None)

Entonces cambia stop_condition_callback

def stop_condition_callback(ret):
    if ret[1] < stop_condition:
        #worker_pool.terminate()
        stopPoolButLetRunningTaskFinish(worker_pool)
    else:
        output.append(ret)
ron
fuente
0

Usaría Dask para ejecutar en paralelo, y específicamente la interfaz de futuros para la retroalimentación en tiempo real de los resultados a medida que se completan. Cuando termine, puede cancelar los futuros restantes en vuelo, arrendar los innecesarios para terminar de forma asincrónica o cerrar el clúster.

from dask.distributed import Client, as_completed
client = Client()  # defaults to ncores workers, one thread each
y, xmin, xmax, dx = 2.,1.,30.,.1

def func(x, y):
    return x, y/x
x = arange(xmin,xmax,dx)
outx = []
output = []
futs = [client.submit(func, val, y) for val in x]
for future in as_completed(futs):
    outs = future.result()
    outx.append(outs[0])
    output.append(outs[1])
    if outs[1] < 0.1:
        break

Notas: - Supongo que quiso decir "menor que", porque de lo contrario el primer valor ya pasa ( y / xmin > 0.1) - no se garantiza que las salidas estén en el orden en que las ingresó si desea obtener resultados cuando estén listos, pero con tal cálculo rápido, tal vez siempre lo sean (es por eso que el func también devolvió el valor de entrada): si deja de calcular, la salida será más corta que el conjunto completo de entradas, por lo que no estoy muy seguro de lo que desea impresión.

duradero
fuente