Python: ¿Cómo puedo ejecutar funciones de Python en paralelo?

109

Investigué primero y no pude encontrar una respuesta a mi pregunta. Estoy tratando de ejecutar varias funciones en paralelo en Python.

Tengo algo como esto:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Quiero llamar a func1 y func2 y hacer que se ejecuten al mismo tiempo. Las funciones no interactúan entre sí ni en el mismo objeto. Ahora mismo tengo que esperar a que termine func1 antes de que comience func2. ¿Cómo hago algo como a continuación?

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Quiero poder crear ambos directorios casi al mismo tiempo porque cada minuto estoy contando cuántos archivos se están creando. Si el directorio no está allí, perderá el tiempo.

lmcadory
fuente
1
Es posible que desee rediseñar esto; si cuenta el número de archivos / carpetas por minuto, está creando una condición de carrera. ¿Qué tal si cada función actualiza un contador o usa un archivo de bloqueo para asegurarse de que el proceso periódico no actualice el recuento hasta que ambas funciones hayan terminado de ejecutarse?

Respuestas:

163

Puede usar threadingo multiprocessing.

Debido a las peculiaridades de CPython , threadinges poco probable que logre un verdadero paralelismo. Por esta razón, multiprocessinggeneralmente es una mejor apuesta.

Aquí tienes un ejemplo completo:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

La mecánica de iniciar / unir procesos secundarios se puede encapsular fácilmente en una función a lo largo de las líneas de su runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
NPE
fuente
4
Usé su código pero las funciones aún no se iniciaron al mismo tiempo.
lmcadory
4
@Lamar McAdory: Explique qué quiere decir exactamente con "al mismo tiempo", quizás dando un ejemplo concreto de lo que hizo, lo que esperaba que sucediera y lo que realmente sucedió.
NPE
4
@Lamar: Nunca puedes tener ninguna garantía de "exactamente el mismo tiempo" y pensar que puedes es simplemente incorrecto. Dependiendo de la cantidad de cpus que tenga, la carga de la máquina, la sincronización de muchas cosas que suceden en la computadora influirán en la hora en que se inician los subprocesos / procesos. Además, dado que los procesos se inician inmediatamente después de la creación, la sobrecarga de crear un proceso también debe calcularse en la diferencia de tiempo que ve.
Martin
1
¿Es posible obtener una lista de los resultados de cada función? digamos que cada función devuelve un valor diferente, ¿se pueden agregar los valores a alguna lista que se pueda usar más adelante? ¿quizás agregar el resultado a una lista global?
pelos
1
Si mis funciones toman parámetros y cuando paso parámetros mientras los llamo desde procesos separados, no se ejecutan simultáneamente. ¿Pueden ayudar, por favor?
user2910372
18

Esto se puede hacer elegantemente con Ray , un sistema que le permite paralelizar y distribuir fácilmente su código Python.

Para paralelizar su ejemplo, necesitaría definir sus funciones con el @ray.remotedecorador y luego invocarlas con .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Si pasa el mismo argumento a ambas funciones y el argumento es grande, una forma más eficiente de hacerlo es usando ray.put(). Esto evita que el gran argumento se serialice dos veces y cree dos copias de memoria del mismo:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Si func1()y func2()devuelve resultados, debe volver a escribir el código de la siguiente manera:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Hay una serie de ventajas de utilizar Ray sobre el módulo de multiprocesamiento . En particular, el mismo código se ejecutará en una sola máquina, así como en un grupo de máquinas. Para obtener más ventajas de Ray, consulte esta publicación relacionada .

Ion Stoica
fuente
18

Si sus funciones están principalmente haciendo trabajo de E / S (y menos trabajo de CPU) y tiene Python 3.2+, puede usar un ThreadPoolExecutor :

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Si sus funciones hacen principalmente trabajo de CPU (y menos trabajo de E / S) y tiene Python 2.6+, puede usar el módulo de multiprocesamiento :

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
David Foster
fuente
Esta es una buena respuesta. ¿Cómo identificar a partir del resultado de las tareas vinculadas de E / S usando concurrent.futures cuál completó? Básicamente, en lugar de funciones lamba si tenemos funciones normales, ¿cómo identificar el resultado mapeado a la función llamada?
Tragaknight
No importa, encontré una manera, en lugar de esto run_cpu_tasks_in_parallel ([lambda: print ('CPU task 1 running!'), Lambda: print ('CPU task 2 running!'),]) Use this - results = run_io_tasks_in_parallel ([lambda: {'is_something1': func1 ()}, lambda: {'is_something2': func2 ()},])
Tragaknight
5

Si es un usuario de Windows y usa Python 3, entonces esta publicación lo ayudará a hacer programación paralela en Python. Cuando ejecute una programación de grupo de biblioteca multiprocesamiento habitual, obtendrá un error con respecto a la función principal en su programa. Esto se debe al hecho de que Windows no tiene la funcionalidad fork (). La siguiente publicación ofrece una solución al problema mencionado.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Como estaba usando Python 3, cambié el programa un poco así:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

Después de esta función, el código de problema anterior también se cambia un poco así:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

Y obtuve la salida como:

[1, 8, 27, 64, 125, 216]

Creo que esta publicación puede ser útil para algunos usuarios de Windows.

Arun Sooraj
fuente
4

No hay forma de garantizar que dos funciones se ejecuten en sincronía entre sí, lo que parece ser lo que desea hacer.

Lo mejor que puede hacer es dividir la función en varios pasos, luego esperar a que ambos terminen en los puntos críticos de sincronización usando Process.join las menciones de respuesta de like @ aix.

Esto es mejor que time.sleep(10)porque no puede garantizar tiempos exactos. Con la espera explícita, está diciendo que las funciones deben realizarse ejecutando ese paso antes de pasar al siguiente, en lugar de asumir que se realizará dentro de los 10 ms, lo que no está garantizado en función de lo que esté sucediendo en la máquina.

Davy8
fuente
1

Parece que tiene una sola función a la que necesita llamar en dos parámetros diferentes. Esto se puede hacer elegantemente usando una combinación de concurrent.futuresy mapcon Python 3.2+

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Ahora, si su operación está vinculada a IO, entonces puede usar el ThreadPoolExecutorcomo tal:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Tenga en cuenta cómo mapse usa aquí paramap su función para la lista de argumentos.

Ahora, si su función está vinculada a la CPU, entonces puede usar ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Si no está seguro, simplemente puede probar ambos y ver cuál le da mejores resultados.

Finalmente, si está buscando imprimir sus resultados, simplemente puede hacer esto:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)
BICube
fuente