El multiprocesamiento hace que Python se bloquee y da un error que puede haber estado en progreso en otro hilo cuando se llamó a fork ()

89

Soy relativamente nuevo en Python y estoy tratando de implementar un módulo de multiprocesamiento para mi bucle for.

Tengo una serie de URL de imágenes almacenadas en img_urls que necesito descargar y aplicar algo de visión de Google.

if __name__ == '__main__':

    img_urls = [ALL_MY_Image_URLS]
    runAll(img_urls)
    print("--- %s seconds ---" % (time.time() - start_time)) 

Este es mi método runAll ()

def runAll(img_urls):
    num_cores = multiprocessing.cpu_count()

    print("Image URLS  {}",len(img_urls))
    if len(img_urls) > 2:
        numberOfImages = 0
    else:
        numberOfImages = 1

    start_timeProcess = time.time()

    pool = multiprocessing.Pool()
    pool.map(annotate,img_urls)
    end_timeProcess = time.time()
    print('\n Time to complete ', end_timeProcess-start_timeProcess)

    print(full_matching_pages)


def annotate(img_path):
    file =  requests.get(img_path).content
    print("file is",file)
    """Returns web annotations given the path to an image."""
    print('Process Working under ',os.getpid())
    image = types.Image(content=file)
    web_detection = vision_client.web_detection(image=image).web_detection
    report(web_detection)

Recibo esto como advertencia cuando lo ejecuto y Python se bloquea

objc[67570]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67570]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67567]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67567]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67568]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67568]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67569]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67569]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67571]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67571]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67572]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67572]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
SriTeja Chilakamarri
fuente
¿Estás en OSX? Entonces, tal vez este informe de errores le dé algunas pistas.
IonicSolutions
Oh, sí, estoy en OSX, gracias por indicarme el enlace.
SriTeja Chilakamarri
Aún así, no hubo suerte al intentar configurar el OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YEScomo se mencionó, todavía aparece el mismo error. @IonicSolutions
SriTeja Chilakamarri
Desafortunadamente, no tengo conocimientos específicos sobre este tema. Todo lo que puedo hacer es usar Google para encontrar problemas relacionados, por ejemplo, esta posible solución .
IonicSolutions
1
Esto se debe a que Apple ha cambiado el fork()comportamiento de macOS desde High Sierra . La OBJC_DISABLE_INITIALIZE_FORK_SAFETY=yesvariable desactiva el comportamiento de bloqueo inmediato que su nuevo marco ObjectiveC generalmente aplica ahora de forma predeterminada. Esto puede afectar a cualquier lenguaje que esté utilizando múltiples subprocesos / multiprocesamiento fork()en macOS >= 10.13, especialmente cuando se usan "extensiones nativas" / extensiones de código C.
TrinitronX

Respuestas:

218

Este error se produce debido a la seguridad adicional para restringir el subproceso múltiple en Mac OS High Sierra. Sé que esta respuesta es un poco tarde, pero resolví el problema usando el siguiente método:

Establezca una variable de entorno .bash_profile para permitir aplicaciones o scripts de subprocesos múltiples bajo las nuevas reglas de seguridad de Mac OS High Sierra.

Abra una terminal:

$ nano .bash_profile

Agregue la siguiente línea al final del archivo:

export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

Guarde, salga, cierre la terminal y vuelva a abrir la terminal. Verifique que la variable de entorno ahora esté configurada:

$ env

Verá un resultado similar a:

TERM_PROGRAM=Apple_Terminal
SHELL=/bin/bash
TERM=xterm-256color
TMPDIR=/var/folders/pn/vasdlj3ojO#OOas4dasdffJq/T/
Apple_PubSub_Socket_Render=/private/tmp/com.apple.launchd.E7qLFJDSo/Render
TERM_PROGRAM_VERSION=404
TERM_SESSION_ID=NONE
OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

Ahora debería poder ejecutar su script de Python con subprocesos múltiples.

Jonnyjandles
fuente
12
Esto realmente lo resolvió para mí. Quería iterar un gran marco de datos de pandas en varios subprocesos y encontré el mismo problema descrito por el operador. Esta respuesta me resolvió el problema. La única diferencia es que configuré esa variable env con el script que ejecuté:OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES python my-script.py
rodrigo-silveira
3
¡Muchas gracias! Para los interesados, esto funcionó para mí en macOS Mojave.
nmetts
Esto resolvió mi problema, pero mi script estaba usando multiprocesamiento
lollerskates
Funcionó en mi máquina con macOS Mojave, pero luego mis pruebas de Pytest ya no se ejecutan en paralelo. Antes, se estrellaba, pero al menos, era rápido ...
onekiloparsec
1
Esta variable de entorno resolvió mi problema de ejecutar ansible localmente en mi mac (catalina)
itsjwala
0

La solución que funciona para mí sin OBJC_DISABLE_INITIALIZE_FORK_SAFETYbandera en el entorno implica inicializar la multiprocessing.Poolclase justo después de main()que se inicia el programa.

Lo más probable es que esta no sea la solución más rápida posible y no estoy seguro de si funciona en todas las situaciones; sin embargo, precalentar los procesos del trabajador lo suficientemente temprano antes de que comiencen mis programas no genera ningún ... may have been in progress in another thread when fork() was callederror y sí obtengo un aumento significativo del rendimiento en comparación a lo que obtengo con código no paralelizado.

He creado una clase de conveniencia Parallelizerque comienzo muy temprano y luego uso durante todo el ciclo de vida de mi programa.

# entry point to my program
def main():
    parallelizer = Parallelizer()
    ...

Luego, cuando quiera tener paralelización:

# this function is parallelized. it is run by each child process.
def processing_function(input):
    ...
    return output

...
inputs = [...]
results = parallelizer.map(
    inputs,
    processing_function
)

Y la clase de paralelizador:

class Parallelizer:
    def __init__(self):
        self.input_queue = multiprocessing.Queue()
        self.output_queue = multiprocessing.Queue()
        self.pool = multiprocessing.Pool(multiprocessing.cpu_count(),
                                         Parallelizer._run,
                                         (self.input_queue, self.output_queue,))

    def map(self, contents, processing_func):
        size = 0
        for content in contents:
            self.input_queue.put((content, processing_func))
            size += 1
        results = []
        while size > 0:
            result = self.output_queue.get(block=True)
            results.append(result)
            size -= 1
        return results

    @staticmethod
    def _run(input_queue, output_queue):
        while True:
            content, processing_func = input_queue.get(block=True)
            result = processing_func(content)
            output_queue.put(result)

Una advertencia: el código paralelizado puede ser difícil de depurar, por lo que también he preparado una versión no paralelizada de mi clase que habilito cuando algo sale mal en los procesos secundarios:

class NullParallelizer:
    @staticmethod
    def map(contents, processing_func):
        results = []
        for content in contents:
            results.append(processing_func(content))
        return results
Stanislav Pankevich
fuente