¿Python Process Pool no es demoníaco?

96

¿Sería posible crear un grupo de Python que no sea demoníaco? Quiero que un grupo pueda llamar a una función que tenga otro grupo dentro.

Quiero esto porque los procesos deamon no pueden crear procesos. Específicamente, causará el error:

AssertionError: daemonic processes are not allowed to have children

Por ejemplo, considere el escenario donde function_atiene una piscina que se ejecuta y function_bque tiene una piscina que se ejecuta function_c. Esta cadena de funciones fallará porque function_bse está ejecutando en un proceso demonio y los procesos demonio no pueden crear procesos.

Max
fuente
AFAIK, no, no es posible que todos los trabajadores del grupo estén demonizados y no es posible inyectar la dependencia , por cierto, no entiendo la segunda parte de su pregunta I want a pool to be able to call a function that has another pool insidey cómo eso interfiere con el hecho de que los trabajadores están demonizados.
mouad
4
Porque si la función a tiene un grupo que ejecuta la función b, que tiene un grupo que ejecuta la función c, hay un problema en b de que se está ejecutando en un proceso demonio, y los procesos demonio no pueden crear procesos. AssertionError: daemonic processes are not allowed to have children
Max

Respuestas:

119

La multiprocessing.pool.Poolclase crea los procesos de trabajo en su __init__método, los convierte en demoníacos y los inicia, y no es posible restablecer su daemonatributo Falseantes de que se inicien (y luego ya no está permitido). Pero puede crear su propia subclase de multiprocesing.pool.Pool( multiprocessing.Pooles solo una función contenedora) y sustituir su propia multiprocessing.Processsubclase, que siempre es no demoníaca, para usarla en los procesos de trabajo.

Aquí hay un ejemplo completo de cómo hacer esto. Las partes importantes son las dos clases NoDaemonProcessy MyPoolen la parte superior y para llamar pool.close()y pool.join()en su MyPoolinstancia al final.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()
Chris Arndt
fuente
1
Acabo de probar mi código de nuevo con Python 2.7 / 3.2 (después de arreglar las líneas de "impresión") en Linux y Python 2.6 / 2.7 / 3.2 OS X. Linux y Python 2.7 / 3.2 en OS X funcionan bien pero el código de hecho se bloquea con Python 2.6 en OS X (Lion). Esto parece ser un error en el módulo de multiprocesamiento, que se solucionó, pero en realidad no he revisado el rastreador de errores.
Chris Arndt
1
¡Gracias! En Windows también debe llamarmultiprocessing.freeze_support()
frmdstryr
2
Buen trabajo. Si alguien tiene una pérdida de memoria con esto, intente usar "con cierre (MyPool (process = num_cpu)) como grupo:" para deshacerse del grupo correctamente
Chris Lucian
31
¿Cuáles son las desventajas de usar en MyPoollugar del predeterminado Pool? En otras palabras, a cambio de la flexibilidad de iniciar procesos secundarios, ¿qué costos debo pagar? (Si no hubiera costos, presumiblemente el estándar Poolhabría utilizado procesos no demoníacos).
máximo
4
@machen Sí, lamentablemente eso es cierto. En Python 3.6, la Poolclase se ha refactorizado ampliamente, por Processlo que ya no es un atributo simple, sino un método, que devuelve la instancia de proceso que obtiene de un contexto . Intenté sobrescribir este método para devolver una NoDaemonPoolinstancia, pero esto da como resultado la excepción AssertionError: daemonic processes are not allowed to have childrencuando se usa el Pool.
Chris Arndt
26

Tuve la necesidad de emplear un grupo no demoníaco en Python 3.7 y terminé adaptando el código publicado en la respuesta aceptada. A continuación, se muestra el fragmento que crea el grupo no demoníaco:

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)

Como la implementación actual de multiprocessingse ha refactorizado extensamente para basarse en contextos, debemos proporcionar una NoDaemonContextclase que tenga nuestro NoDaemonProcessatributo como. MyPoolluego usará ese contexto en lugar del predeterminado.

Dicho esto, debo advertir que hay al menos 2 advertencias para este enfoque:

  1. Aún depende de los detalles de implementación del multiprocessingpaquete y, por lo tanto, podría romperse en cualquier momento.
  2. Hay razones válidas por las que multiprocessinges tan difícil usar procesos no demoníacos, muchos de los cuales se explican aquí . El más convincente en mi opinión es:

    En cuanto a permitir que los subprocesos secundarios generen hijos propios mediante el subproceso, se corre el riesgo de crear un pequeño ejército de 'nietos' zombis si los subprocesos principales o secundarios terminan antes de que el subproceso se complete y regrese.

Massimiliano
fuente
Con respecto a la advertencia: Mi caso de uso es hacer tareas paralelas, pero los nietos devuelven información a sus padres que, a su vez, devuelven información a sus padres después de realizar un procesamiento local requerido. En consecuencia, cada nivel / rama tiene una espera explícita para todas sus hojas. ¿La advertencia todavía se aplica si tiene que esperar explícitamente a que finalicen los procesos generados?
A_A
Obteniendo el error AttributeError: module 'multiprocessing' has no attribute 'pool'en Python 3.8.0
Nyxynyx
@Nyxynyx No lo olvidesimport multiprocessing.pool
Chris Arndt
22

El módulo de multiprocesamiento tiene una interfaz agradable para usar grupos con procesos o subprocesos. Dependiendo de su caso de uso actual, podría considerar usar multiprocessing.pool.ThreadPoolpara su Pool externo, lo que dará como resultado subprocesos (que permiten generar procesos desde adentro) en lugar de procesos.

Puede que esté limitado por el GIL, pero en mi caso particular (probé ambos) , el tiempo de inicio de los procesos desde el exterior, Poolcomo se creó aquí, superó con creces la solución ThreadPool.


Es muy fácil de intercambio Processesde Threads. Lea más sobre cómo usar una ThreadPoolsolución aquí o aquí .

timmwagener
fuente
Gracias, esto me ayudó mucho, gran uso de subprocesos aquí (para generar procesos que realmente funcionan bien)
trance_dude
1
Para las personas que buscan una solución práctica que probablemente se aplique a su situación, esta es la indicada.
Abanana
6

En algunas versiones de Python estándar reemplazando piscina con la costumbre puede elevar de error: AssertionError: group argument must be None for now.

Aquí encontré una solución que puede ayudar:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc
Atterratio
fuente
4

concurrent.futures.ProcessPoolExecutorno tiene esta limitación. Puede tener un grupo de procesos anidado sin ningún problema:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

El código de demostración anterior se probó con Python 3.8.

Crédito: respuesta por jfs

Acumenus
fuente
1
Esta es ahora claramente la mejor solución, ya que requiere cambios mínimos.
DreamFlasher
1
¡funciona perfectamente! ... como nota al margen usando un niño, ¡ multiprocessing.Pooldentro de a ProcessPoolExecutor.Pooltambién es posible!
Rafael
3

El problema que encontré fue al intentar importar globales entre módulos, lo que provocó que la línea ProcessPool () se evaluara varias veces.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Luego importe de forma segura desde otra parte de su código

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         
James McGuigan
fuente
2

He visto gente que se enfrenta a este problema utilizando celeryla bifurcación de multiprocessingllamada billiard (extensiones de grupo de multiprocesamiento), que permite que los procesos demoníacos generen hijos. El recorrido es simplemente reemplazar el multiprocessingmódulo por:

import billiard as multiprocessing
Tomasz Bartkowiak
fuente