Procesando un solo archivo de múltiples procesos

82

Tengo un solo archivo de texto grande en el que quiero procesar cada línea (hacer algunas operaciones) y almacenarlas en una base de datos. Dado que un solo programa simple está tardando demasiado, quiero que se realice a través de múltiples procesos o subprocesos. Cada hilo / proceso debe leer los datos DIFERENTES (líneas diferentes) de ese archivo único y hacer algunas operaciones en su pieza de datos (líneas) y ponerlas en la base de datos para que al final, tenga todos los datos procesados ​​y mi La base de datos se descarga con los datos que necesito.

Pero no soy capaz de entender cómo abordar esto.

pranavk
fuente
3
Buena pregunta. También tuve esta duda. Aunque elegí la opción de dividir el archivo en archivos más pequeños :)
Sushant Gupta

Respuestas:

109

Lo que busca es un patrón de productor / consumidor

Ejemplo de subprocesamiento básico

Aquí hay un ejemplo básico usando el módulo de subprocesamiento (en lugar de multiprocesamiento)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

No compartirías el objeto de archivo con los hilos. Produciría trabajo para ellos proporcionando a la cola líneas de datos. Luego, cada hilo tomaría una línea, la procesaría y luego la devolvería a la cola.

Hay algunas funciones más avanzadas integradas en el módulo de multiprocesamiento para compartir datos, como listas y tipos especiales de cola . Existen ventajas y desventajas al utilizar multiprocesamiento frente a subprocesos y depende de si su trabajo está vinculado a la CPU o IO.

Multiprocesamiento básico Ejemplo de agrupación

Aquí hay un ejemplo realmente básico de un grupo de multiprocesamiento

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

Un Pool es un objeto de conveniencia que gestiona sus propios procesos. Dado que un archivo abierto puede iterar sobre sus líneas, puede pasarlo a pool.map(), que lo recorrerá y entregará líneas a la función de trabajo. Map bloquea y devuelve el resultado completo cuando está hecho. Tenga en cuenta que este es un ejemplo demasiado simplificado y que pool.map()va a leer todo el archivo en la memoria de una vez antes de realizar el trabajo. Si espera tener archivos grandes, tenga esto en cuenta. Hay formas más avanzadas de diseñar una configuración de productor / consumidor.

"Pool" manual con límite y reordenación de líneas

Este es un ejemplo manual de Pool.map , pero en lugar de consumir un iterable completo de una sola vez, puede establecer un tamaño de cola para que solo lo alimente pieza por pieza tan rápido como pueda procesar. También agregué los números de línea para que pueda rastrearlos y consultarlos si lo desea, más adelante.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)
jdi
fuente
1
Esto es bueno, pero ¿y si el procesamiento está vinculado a la E / S? En ese caso, el paralelismo puede ralentizar las cosas en lugar de acelerarlas. Las búsquedas dentro de una sola pista de disco son mucho más rápidas que las búsquedas entre pistas, y hacer E / S en paralelo tiende a introducir búsquedas entre pistas en lo que de otro modo sería una carga de E / S secuencial. Para obtener algún beneficio de las E / S paralelas, a veces resulta bastante útil utilizar un espejo RAID.
user1277476
2
@ jwillis0720 - Claro. (None,) * num_workerscrea una tupla de valores Ninguno igual al tamaño del número de trabajadores. Estos serán los valores centinela que le dicen a cada hilo que salga porque no hay más trabajo. La itertools.chainfunción le permite juntar varias secuencias en una secuencia virtual sin tener que copiar nada. Entonces, lo que obtenemos es que primero recorre las líneas del archivo y luego los valores de Ninguno.
jdi
2
Eso está mejor explicado que mi profesor, muy bonito +1.
licuado
1
@ ℕʘʘḆḽḘ, he editado un poco mi texto para que sea más claro. Ahora explica que el ejemplo del medio va a absorber todos los datos de su archivo en la memoria a la vez, lo que podría ser un problema si su archivo es más grande que la cantidad de RAM que tiene actualmente disponible. Luego muestro en el tercer ejemplo cómo ir línea por línea, para no consumir todo el archivo a la vez.
jdi
1
@ ℕʘʘḆḽḘ lea los documentos de pool.Map (). Dice que dividirá el iterable en trozos y los enviará a los trabajadores. Entonces terminará consumiendo todas las líneas en la memoria. Sí, iterar una línea a la vez es eficiente en la memoria, pero si termina manteniendo todas esas líneas en la memoria, entonces vuelve a leer todo el archivo.
jdi
9

Aquí hay un ejemplo realmente estúpido que preparé:

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

La parte complicada aquí es asegurarse de dividir el archivo en caracteres de nueva línea para que no se pierda ninguna línea (o solo lea líneas parciales). Luego, cada proceso lee que es parte del archivo y devuelve un objeto que el hilo principal puede poner en la base de datos. Por supuesto, es posible que incluso deba hacer esta parte en partes para no tener que guardar toda la información en la memoria a la vez. (esto se logra con bastante facilidad, simplemente divida la lista de "argumentos" en X fragmentos y llame pool.map(wrapper,chunk) , consulte aquí )

mgilson
fuente
-3

dividiremos el archivo grande en varios archivos más pequeños y procesamos cada uno de ellos en subprocesos separados.

Tanu
fuente
esto no es lo que OP quiere !! pero solo por una idea ... no está mal.
DRPK