¿Cómo debo iniciar sesión mientras uso multiprocesamiento en Python?

239

En este momento tengo un módulo central en un marco que genera múltiples procesos utilizando el multiprocessingmódulo Python 2.6 . Debido a que utiliza multiprocessing, no hay registro de multiprocesamiento-conscientes de nivel de módulo, LOG = multiprocessing.get_logger(). Según los documentos , este registrador tiene bloqueos compartidos de proceso para que no pueda confundir las cosas sys.stderr(o cualquier archivo de control) al tener múltiples procesos que escriben en él simultáneamente.

El problema que tengo ahora es que los otros módulos en el marco no son multiprocesadores. A mi modo de ver, necesito hacer que todas las dependencias de este módulo central utilicen el registro con reconocimiento de multiprocesamiento. Eso es molesto dentro del marco, y mucho menos para todos los clientes del marco. ¿Hay alternativas en las que no estoy pensando?

cdleary
fuente
10
Los documentos a los que se vincula, indican exactamente lo contrario de lo que dice, el registrador no tiene bloqueos compartidos de proceso y las cosas se mezclan, un problema que yo también tuve.
Sebastian Blask
3
vea ejemplos en los documentos de stdlib: Iniciar sesión en un solo archivo desde múltiples procesos . Las recetas no requieren que otros módulos sean multiprocesadores.
jfs
Entonces, ¿para qué sirve multiprocessing.get_logger()? Parece que estas otras formas de hacer el registro son la funcionalidad de registro multiprocessingde poco valor.
Tim Ludwinski
44
get_logger()es el registrador utilizado por el multiprocessingpropio módulo. Es útil si desea depurar un multiprocessingproblema.
jfs

Respuestas:

69

La única forma de lidiar con esto de manera no intrusiva es:

  1. Genere cada proceso de trabajo de modo que su registro vaya a un descriptor de archivo diferente (en el disco o en la tubería). Idealmente, todas las entradas del registro deben tener una marca de tiempo.
  2. Su proceso de controlador puede hacer uno de los siguientes:
    • Si usa archivos de disco: combine los archivos de registro al final de la ejecución, ordenados por marca de tiempo
    • Si usa tuberías (recomendado): combine las entradas de registro sobre la marcha de todas las tuberías en un archivo de registro central. (Por ejemplo, periódicamente a selectpartir de los descriptores de archivo de las tuberías, realice una ordenación por fusión en las entradas de registro disponibles y vacíe al registro centralizado. Repita).
vladr
fuente
Bien, eso fue 35 años antes de pensar en eso (pensé que usaría atexit:-). El problema es que no te dará una lectura en tiempo real. Esto puede ser parte del precio del multiprocesamiento en lugar de multiproceso.
cdleary
@cdleary, utilizando el enfoque canalizado, sería lo más cercano posible en tiempo real (especialmente si stderr no está protegido en los procesos generados)
vladr
1
Por cierto, gran suposición aquí: no Windows. ¿Estás en Windows?
vladr
22
¿Por qué no utilizar simplemente un multiprocesamiento, una cola y un hilo de registro en el proceso principal? Parece más simple
Brandon Rhodes
1
@BrandonRhodes: como dije, de manera no intrusiva . El uso multiprocessing.Queueno será más sencillo si hay mucho código para volver a cablear para usar multiprocessing.Queue, y / o si el rendimiento es un problema
vladr
122

Acabo de escribir un controlador de registro propio que simplemente alimenta todo al proceso principal a través de una tubería. Solo lo he estado probando durante diez minutos, pero parece funcionar bastante bien.

( Nota: Esto está codificado RotatingFileHandler, que es mi propio caso de uso).


Actualización: @javier ahora mantiene este enfoque como un paquete disponible en Pypi: consulte multiprocesamiento-registro en Pypi, github en https://github.com/jruere/multiprocessing-logging


Actualización: Implementación!

Esto ahora usa una cola para el manejo correcto de la concurrencia, y también se recupera de los errores correctamente. He estado usando esto en producción durante varios meses, y la versión actual a continuación funciona sin problemas.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
zzzeek
fuente
44
El controlador anterior realiza la escritura de todos los archivos del proceso primario y usa solo un hilo para recibir los mensajes pasados ​​de los procesos secundarios. Si invoca el controlador en sí desde un proceso secundario generado, entonces lo está utilizando incorrectamente y obtendrá los mismos problemas que RotatingFileHandler. He usado el código anterior durante años sin problemas.
zzzeek
99
Lamentablemente, este enfoque no funciona en Windows. De docs.python.org/library/multiprocessing.html 16.6.2.12 "Tenga en cuenta que en Windows los procesos secundarios solo heredarán el nivel del registrador del proceso primario; cualquier otra personalización del registrador no se heredará". Los subprocesos no heredarán el controlador, y no puede pasarlo explícitamente porque no es seleccionable.
Noah Yetter
2
Vale la pena señalar que multiprocessing.Queueusa un hilo para adentro put(). Por lo tanto, no invoque put(es decir, registre un mensaje con el MultiProcessingLogcontrolador) antes de crear todos los subprocesos. De lo contrario, el hilo estará muerto en el proceso hijo. Una solución es llamar Queue._after_fork()al comienzo de cada proceso secundario, o utilizar multiprocessing.queues.SimpleQueueen su lugar, lo que no implica subproceso, pero está bloqueando.
Danqi Wang
55
¿Podría agregar un ejemplo simple que muestre la inicialización, así como el uso de un proceso secundario hipotético? No estoy muy seguro de cómo se supone que el proceso secundario tiene acceso a la cola sin crear una instancia de otra instancia de su clase.
JesseBuesking
11
@zzzeek, ​​esta solución es buena, pero no pude encontrar un paquete con ella o algo similar, así que creé una llamada multiprocessing-logging.
Javier
30

QueueHandleres nativo en Python 3.2+, y hace exactamente esto. Se replica fácilmente en versiones anteriores.

Los documentos de Python tienen dos ejemplos completos: iniciar sesión en un solo archivo desde múltiples procesos

Para aquellos que usan Python <3.2, simplemente copie QueueHandleren su propio código desde: https://gist.github.com/vsajip/591589 o, alternativamente, importe logutils .

Cada proceso (incluido el proceso principal) coloca su registro en el Queue, y luego un listenerhilo o proceso (se proporciona un ejemplo para cada uno) los recoge y los escribe a todos en un archivo, sin riesgo de corrupción o desorden.

fantabolous
fuente
21

A continuación hay otra solución con un enfoque en la simplicidad para cualquier otra persona (como yo) que llegue aquí desde Google. ¡El registro debería ser fácil! Solo para 3.2 o superior.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
usuario2133814
fuente
2
Las clases QueueHandlery también QueueListenerse pueden usar en Python 2.7, disponible en el logutilspaquete.
Lev Levitsky
55
El registrador del proceso principal también debe usar un QueueHandler. En su código actual, el proceso principal está pasando por alto la cola, por lo que puede haber condiciones de carrera entre el proceso principal y los trabajadores. Todos deben iniciar sesión en la cola (a través de un QueueHandler) y solo el QueueListener debe poder iniciar sesión en StreamHandler.
Ismael EL ATIFI
Además, no tiene que inicializar el registrador en cada hijo. Simplemente inicie el registrador en el proceso primario y obtenga el registrador en cada proceso secundario.
okwap
20

Otra alternativa podría ser los diversos controladores de registro no basados ​​en archivos en el loggingpaquete :

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(y otros)

De esta manera, podría tener fácilmente un demonio de registro en algún lugar en el que podría escribir de forma segura y manejar los resultados correctamente. (Por ejemplo, un servidor de socket simple que simplemente desempaqueta el mensaje y lo emite a su propio controlador de archivos rotativo).

También SyslogHandlerse encargaría de esto por ti. Por supuesto, puede usar su propia instancia syslog, no la del sistema.

Ali Afshar
fuente
13

Una variante de los demás que mantiene separados el hilo de registro y la cola.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
ironhacker
fuente
Me gusta la idea de recuperar el nombre del registrador del registro de la cola. Esto permite usar convencional fileConfig()en MainProcess y un registrador apenas configurado en PoolWorkers (solo con setLevel(logging.NOTSET)). Como mencioné en otro comentario, estoy usando Pool, así que tuve que obtener mi Cola (proxy) del Administrador en lugar de multiprocesamiento para que pueda encurtirse. Esto me permite pasar la cola a un trabajador dentro de un diccionario (la mayoría de los cuales se deriva del uso del objeto argsparse vars()). Creo que al final este es el mejor enfoque para MS Windows que carece de fork () y rompe la solución @zzzeak.
mlt
@mlt Creo que también podría poner una Cola de multiprocesamiento en el init en lugar de usar un Administrador (vea la respuesta a stackoverflow.com/questions/25557686/… - se trata de Locks pero creo que también funciona para las Colas)
fantabolous
@fantabolous Eso no funcionará en MS Windows o cualquier otra plataforma que carece fork. De esa forma, cada proceso tendrá su propia cola inútil independiente. El segundo enfoque en el Q / A vinculado no funcionará en tales plataformas. Es una forma de código no portátil.
mlt
@mlt Interesante. Estoy usando Windows y parece que funciona bien para mí; no mucho después de la última vez que comenté, configuré un grupo de procesos que comparten un multiprocessing.Queueproceso principal y lo he estado usando constantemente desde entonces. Sin embargo, no pretendo entender por qué funciona.
fantástico
10

Todas las soluciones actuales están demasiado acopladas a la configuración de registro mediante un controlador. Mi solución tiene la siguiente arquitectura y características:

  • Puede usar cualquier configuración de registro que desee
  • El registro se realiza en un hilo de demonio
  • Apagado seguro del demonio mediante el uso de un administrador de contexto
  • La comunicación al hilo de registro se realiza por multiprocessing.Queue
  • En subprocesos, logging.Logger(y las instancias ya definidas) son parcheadas para enviar todos los registros a la cola
  • Nuevo : formatee el rastreo y el mensaje antes de enviarlos a la cola para evitar errores de decapado

El código con el ejemplo de uso y la salida se puede encontrar en el siguiente Gist: https://gist.github.com/schlamar/7003737

Schlamar
fuente
A menos que me falta algo, esto no es en realidad un hilo de utilidad, ya que nunca antes ha establecido daemon_thread.daemona True. Tenía que hacer eso para que mi programa Python salga correctamente cuando ocurra una excepción dentro del administrador de contexto.
blah238
También necesitaba excepciones de captura, registro y tragar lanzados por el objetivo funcde logged_call, si no la excepción conseguiría ilegible con otra salida registrada. Aquí está mi versión modificada de esto: gist.github.com/blah238/8ab79c4fe9cdb254f5c37abfc5dc85bf
blah238
8

Dado que podemos representar el registro multiproceso como muchos editores y un suscriptor (escucha), usar ZeroMQ para implementar la mensajería PUB-SUB es una opción.

Además, el módulo PyZMQ , los enlaces de Python para ZMQ, implementa PUBHandler , que es un objeto para publicar mensajes de registro en un zmq.PUB socket.

Hay una solución en la web para el registro centralizado desde una aplicación distribuida usando PyZMQ y PUBHandler, que puede adoptarse fácilmente para trabajar localmente con múltiples procesos de publicación.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
Samuel
fuente
6

También me gusta la respuesta de zzzeek, ​​pero Andre tiene razón en que se requiere una cola para evitar la confusión. Tuve un poco de suerte con la pipa, pero sí vi que era algo esperable. Implementarlo resultó ser más difícil de lo que pensaba, particularmente debido a que se ejecuta en Windows, donde hay algunas restricciones adicionales sobre variables globales y otras cosas (ver: ¿Cómo se implementa el multiprocesamiento de Python en Windows? )

Pero, finalmente lo conseguí funcionando. Este ejemplo probablemente no sea perfecto, por lo que los comentarios y sugerencias son bienvenidos. Tampoco admite la configuración del formateador o cualquier otra cosa que no sea el registrador raíz. Básicamente, debe reiniciar el registrador en cada uno de los procesos del grupo con la cola y configurar los otros atributos en el registrador.

Una vez más, cualquier sugerencia sobre cómo mejorar el código es bienvenida. Ciertamente todavía no conozco todos los trucos de Python :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
Mike Miller
fuente
1
Me pregunto si if 'MainProcess' == multiprocessing.current_process().name:se puede usar en lugar de pasar child.
mlt
En caso de que alguien más esté tratando de usar un grupo de procesos en lugar de objetos de proceso separados en Windows, vale la pena mencionar que Manager se usará para pasar la cola a los subprocesos, ya que no se puede seleccionar directamente.
mlt
Esta implementación funcionó bien para mí. Lo modifiqué para que funcione con un número arbitrario de controladores. De esta forma, puede configurar su controlador raíz de una manera que no sea multiprocesamiento, luego, donde sea seguro hacer la cola, pasar los controladores raíz a esto, eliminarlos y hacer que este sea el único controlador.
Jaxor24
3

simplemente publique en alguna parte su instancia del registrador. de esa manera, los otros módulos y clientes pueden usar su API para obtener el registrador sin tener que hacerlo import multiprocessing.

Javier
fuente
1
El problema con esto es que los registradores de multiprocesamiento aparecen sin nombre, por lo que no podrá descifrar la secuencia de mensajes fácilmente. Tal vez sería posible nombrarlos después de la creación, lo que haría que fuera más razonable verlos.
cdleary
bueno, publique un registrador para cada módulo, o mejor, exporte diferentes cierres que usen el registrador con el nombre del módulo. el punto es dejar que otros módulos usen su API
Javier
Definitivamente razonable (¡y +1 de mi parte!), Pero extrañaría poder import logging; logging.basicConfig(level=logging.DEBUG); logging.debug('spam!')desde cualquier lugar y hacer que funcione correctamente.
cdleary
3
Es un fenómeno interesante que veo cuando uso Python, que nos acostumbramos tanto a poder hacer lo que queremos en 1 o 2 líneas simples que el enfoque simple y lógico en otros idiomas (por ejemplo, publicar el registrador de multiprocesamiento o envolver en un accesorio) todavía se siente como una carga. :)
Kylotan
3

Me gustó la respuesta de zzzeek. Simplemente sustituiría la Tubería por una Cola ya que si varios hilos / procesos usan el mismo extremo de la tubería para generar mensajes de registro, quedarán confusos.

André Cruz
fuente
Estaba teniendo algunos problemas con el controlador, aunque no era que los mensajes fueran confusos, es que todo dejaría de funcionar. Cambié Pipe para que sea Queue ya que eso es más apropiado. Sin embargo, los errores que estaba recibiendo no se resolvieron con eso, en última instancia, agregué un método try / except al método Receive (), muy raramente, un intento de registrar excepciones fallará y terminará siendo atrapado allí. Una vez que agregué el try / except, se ejecuta durante semanas sin problemas, y un archivo standarderr capturará aproximadamente dos excepciones errantes por semana.
zzzeek
2

¿Qué tal delegar todo el registro a otro proceso que lea todas las entradas de registro de una Cola?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

Simplemente comparta LOG_QUEUE a través de cualquiera de los mecanismos multiproceso o incluso la herencia, ¡y todo funciona bien!

Sawan
fuente
1

Tengo una solución similar a la de Ironhacker, excepto que utilizo logging.exception en algunos de mis códigos y descubrí que necesitaba formatear la excepción antes de pasarla de nuevo a la cola, ya que los rastreadores no son seleccionables:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
Richard Jones
fuente
Encontré un ejemplo completo en este sentido aquí .
Aryeh Leib Taurog
1

A continuación se muestra una clase que se puede utilizar en un entorno Windows, requiere ActivePython. También puede heredar para otros controladores de registro (StreamHandler, etc.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

Y aquí hay un ejemplo que demuestra el uso:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass
usuario6336812
fuente
Probablemente, usar en multiprocessing.Lock()lugar de Windows Mutex haría que la solución sea portátil.
xmedeko
1

Aquí está mi truco / solución simple ... no es el más completo, pero es fácilmente modificable y más fácil de leer y entender, creo que cualquier otra respuesta que encontré antes de escribir esto:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
nmz787
fuente
1

Hay este gran paquete

Paquete: https://pypi.python.org/pypi/multiprocessing-logging/

código: https://github.com/jruere/multiprocessing-logging

Instalar en pc:

pip install multiprocessing-logging

Luego añade:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()
juan Isaza
fuente
3
Esta biblioteca está literalmente basada en otro comentario en la publicación SO actual: stackoverflow.com/a/894284/1698058 .
Chris Hunt
Orígenes: stackoverflow.com/a/894284/1663382 Agradezco el uso de ejemplo del módulo, además de la documentación en la página de inicio.
Liquidgenius
0

Una de las alternativas es escribir el registro de multiprocesamiento en un archivo conocido y registrar un atexitcontrolador para unirse a esos procesos, leerlo nuevamente en stderr; sin embargo, no obtendrá un flujo en tiempo real a los mensajes de salida en stderr de esa manera.

cdleary
fuente
El enfoque que está proponiendo a continuación es idéntico al de su comentario aquí stackoverflow.com/questions/641420/…
iruvar
0

Si tiene puntos muertos en una combinación de bloqueos, hilos y bifurcaciones en el loggingmódulo, eso se informa en el informe de error 6721 (consulte también la pregunta SO relacionada ).

Hay una pequeña solución de reparación publicada aquí .

Sin embargo, eso solo solucionará cualquier posible punto muerto logging. Eso no arreglará que las cosas estén distorsionadas. Vea las otras respuestas presentadas aquí.

Albert
fuente
0

La idea más simple como se menciona:

  • Tome el nombre del archivo y la identificación del proceso del proceso actual.
  • Configurar a [WatchedFileHandler][1]. Las razones de este controlador se discuten en detalle aquí , pero en resumen hay ciertas peores condiciones de carrera con los otros controladores de registro. Este tiene la ventana más corta para la condición de carrera.
    • Elija una ruta para guardar los registros, como / var / log / ...
usuario1460675
fuente
0

Para quien necesite esto, escribí un decorador para el paquete multiprocessing_logging que agrega el nombre del proceso actual a los registros, por lo que queda claro quién registra qué.

También ejecuta install_mp_handler (), por lo que resulta inútil ejecutarlo antes de crear un grupo.

Esto me permite ver qué trabajador crea qué mensajes de registro.

Aquí está el plano con un ejemplo:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')
Orsiris de Jong
fuente
-5

A mis hijos que enfrentan el mismo problema en décadas y encontraron esta pregunta en este sitio, les dejo esta respuesta.

Simplicidad vs sobrecomplicación. Solo usa otras herramientas. Python es impresionante, pero no fue diseñado para hacer algunas cosas.

El siguiente fragmento de logrotate daemon funciona para mí y no complica demasiado las cosas. Programe que se ejecute cada hora y

/var/log/mylogfile.log {
    size 1
    copytruncate
    create
    rotate 10
    missingok
    postrotate
        timeext=`date -d '1 hour ago' "+%Y-%m-%d_%H"`
        mv /var/log/mylogfile.log.1 /var/log/mylogfile-$timeext.log
    endscript
}

Así es como lo instalo (los enlaces simbólicos no funcionan para logrotate):

sudo cp /directpath/config/logrotate/myconfigname /etc/logrotate.d/myconfigname
sudo cp /etc/cron.daily/logrotate /etc/cron.hourly/logrotate
Baldr
fuente