Difundir stdin a procesos paralelos

13

Tengo una tarea que procesa una lista de archivos en stdin. El tiempo de inicio del programa es considerable, y la cantidad de tiempo que toma cada archivo varía ampliamente. Quiero generar una cantidad sustancial de estos procesos, luego enviar el trabajo a los que no estén ocupados. Hay varias herramientas de línea de comandos diferentes que casi hacen lo que quiero, lo reduje a dos opciones casi funcionales:

find . -type f | split -n r/24 -u --filter="myjob"
find . -type f | parallel --pipe -u -l 1 myjob

El problema es que splithace un round robin puro, por lo que uno de los procesos se queda atrás y queda atrás, retrasando la finalización de toda la operación; mientras que paralleldesea generar un proceso por N líneas o bytes de entrada y termino gastando demasiado tiempo en la sobrecarga de inicio.

¿Hay algo como esto que reutilizará los procesos y las líneas de alimentación a los procesos que hayan desbloqueado stdins?

BCoates
fuente
¿De dónde es ese splitcomando? El nombre entra en conflicto con la utilidad de procesamiento de texto estándar .
Gilles 'SO- deja de ser malvado'
@Gilles, es el GNU: "split (GNU coreutils) 8.13" . Usarlo como una alternativa extraña a xargs probablemente no sea el uso previsto, pero es lo más parecido a lo que quiero haber encontrado.
BCoates
2
He estado pensando en eso, y un problema fundamental es saber que una instancia de myjobestá lista para recibir más información. No hay forma de saber que un programa está listo para procesar más entradas, todo lo que puede saber es que algún búfer en algún lugar (un búfer de tubería, un búfer estándar) está listo para recibir más entrada. ¿Puede organizar que su programa envíe algún tipo de solicitud (por ejemplo, mostrar un mensaje) cuando esté listo?
Gilles 'SO- deja de ser malvado'
Suponiendo que el programa no esté usando el almacenamiento intermedio en stdin, un sistema de archivos FUSE que reacciona a las readllamadas funcionaría. Ese es un esfuerzo de programación bastante grande.
Gilles 'SO- deja de ser malvado'
¿Por qué estás usando -l 1en los parallelargumentos? IIRC, que le dice a paralelo que procese una línea de entrada por trabajo (es decir, un nombre de archivo por bifurcación de myjob, por lo que hay mucha sobrecarga de inicio).
cas

Respuestas:

1

Eso no parece posible en un caso tan general. Implica que tiene un búfer para cada proceso y puede ver los búferes desde el exterior para decidir dónde colocar la siguiente entrada (programación) ... Por supuesto, puede escribir algo (o usar un sistema por lotes como slurm)

Pero dependiendo de cuál sea el proceso, es posible que pueda preprocesar la entrada. Por ejemplo, si desea descargar archivos, actualizar entradas de una base de datos o similar, pero el 50% de ellas terminarán siendo omitidas (y por lo tanto tiene una gran diferencia de procesamiento dependiendo de la entrada), simplemente configure un preprocesador que verifica qué entradas van a tomar mucho tiempo (existe un archivo, se cambiaron los datos, etc.), por lo que se garantiza que todo lo que provenga del otro lado llevará una cantidad de tiempo bastante igual. Incluso si la heurística no es perfecta, puede terminar con una mejora considerable. Puede volcar los demás en un archivo y procesarlos de la misma manera.

Pero eso depende de su caso de uso.

estani
fuente
1

No, no hay una solución genérica. Su despachador necesita saber cuándo cada programa está listo para leer otra línea, y no hay un estándar que yo sepa que lo permita. Todo lo que puede hacer es poner una línea en STDOUT y esperar a que algo lo consuma; No hay realmente una buena manera para que el productor en una tubería sepa si el próximo consumidor está listo o no.

dannysauer
fuente
0

No lo creo. En mi revista favorita había un artículo sobre programación de bash que hacía lo que querías. Estoy dispuesto a creer que si hubiera herramientas para hacerlo, las habrían mencionado. Entonces quieres algo en la línea de:

set -m # enable job control
max_processes=8
concurrent_processes=0

child_has_ended() { concurrent_processes=$((concurrent_processes - 1)) }

trap child_has_ended SIGCHLD # that's magic calling our bash function when a child processes ends

for i in $(find . -type f)
do
  # don't do anything while there are max_processes running
  while [ ${concurrent_processes} -ge ${max_processes}]; do sleep 0.5; done 
  # increase the counter
  concurrent_processes=$((concurrent_processes + 1))
  # start a child process to actually deal with one file
  /path/to/script/to/handle/one/file $i &
done

Obviamente, puede cambiar la invocación al script de trabajo real a su gusto. La revista que mencioné inicialmente hace cosas como instalar tuberías y, en realidad, iniciar hilos de trabajo. Esté atento mkfifoa eso, pero esa ruta es mucho más complicada ya que los procesos de los trabajadores necesitan indicarle al proceso maestro que están listos para recibir más datos. Por lo tanto, necesita un quince para cada proceso de trabajo para enviar datos y un quince para que el proceso maestro reciba cosas de los trabajadores.

DESCARGO DE RESPONSABILIDAD Escribí ese guión desde la parte superior de mi cabeza. Puede tener algunos problemas de sintaxis.

Bananguin
fuente
1
Esto no parece cumplir con los requisitos: está comenzando una instancia diferente del programa para cada elemento.
Gilles 'SO- deja de ser malvado'
Por lo general, es preferible usar en find . -type f | while read ilugar de for i in $(find . -type f).
0

Para GNU Parallel puede establecer el tamaño del bloque usando --block. Sin embargo, requiere que tenga suficiente memoria para mantener 1 bloque en la memoria para cada uno de los procesos en ejecución.

Entiendo que esto no es precisamente lo que está buscando, pero puede ser una solución aceptable por ahora.

Si sus tareas en promedio toman el mismo tiempo, entonces podría usar mbuffer:

find . -type f | split -n r/24 -u --filter="mbuffer -m 2G | myjob"
Ole Tange
fuente
0

Prueba esto:

mkfifo para cada proceso

Luego tail -f | myjobespera cada quince.

Por ejemplo, la configuración de los trabajadores (procesos myjob)

mkdir /tmp/jobs
for X in 1 2 3 4
do
   mkfifo pipe$X
   tail -f pipe$X | myjob &
   jobs -l| awk '/pipe'$X'/ {print $2, "'pipe$X'"}' >> pipe-job-mapping
done

Dependiendo de su aplicación (myjob), es posible que pueda usar jobs -s para encontrar trabajos detenidos. De lo contrario, enumere los procesos ordenados por CPU y seleccione el que consuma la menor cantidad de recursos. De tener el informe de trabajo en sí mismo, por ejemplo, estableciendo una bandera en el sistema de archivos cuando quiere más trabajo.

Suponiendo que el trabajo se detiene al esperar la entrada, use

jobs -sl para averiguar el pedido de un trabajo detenido y asignarle trabajo, por ejemplo

grep "^$STOPPED_PID" pipe-to-job-mapping | while read PID PIPE
do
   cat workset > $PIPE
done

Probé esto con

garfield:~$ cd /tmp
garfield:/tmp$ mkfifo f1
garfield:/tmp$ mkfifo f2
garfield:/tmp$ tail -f f1 | sed 's/^/1 /' &
[1] 21056
garfield:/tmp$ tail -f f2 | sed 's/^/2 /' &
[2] 21058
garfield:/tmp$ echo hello > f1
1 hello
garfield:/tmp$ echo what > f2
2 what
garfield:/tmp$ echo yes > f1
1 yes

Debo admitir que esto fue inventado, así que mmm.

Johan
fuente
0

Lo que realmente se necesita para resolver esto es un mecanismo de cola de algún tipo.

¿Es posible que los trabajos lean su entrada de una Cola, como una cola de mensajes SYSV, y luego que los programas se ejecuten en paralelo simplemente empujando los valores a la cola?

Otra posibilidad es usar un directorio para la cola, así:

  1. la salida de búsqueda crea un enlace simbólico a cada archivo para procesar en un directorio, pending
  2. cada proceso de trabajo realiza uno mvdel primer archivo que ve en el directorio a un directorio hermano de pending, nombrado inprogress.
  3. si el trabajo mueve con éxito el archivo, realiza el procesamiento; de lo contrario, vuelve a buscar y mover otro nombre de archivo depending
ceniza
fuente
0

exponiendo la respuesta de @ ash, puede usar una cola de mensajes SYSV para distribuir el trabajo. Si no desea escribir su propio programa en C, hay una utilidad llamada ipcmdque puede ayudar. Esto es lo que he puesto juntos para aprobar la salida find $DIRECTORY -type fde $PARALLELvarios procesos:

set -o errexit
set -o nounset

export IPCMD_MSQID=$(ipcmd msgget)

DIRECTORY=$1
PARALLEL=$2

# clean up message queue on exit
trap 'ipcrm -q $IPCMD_MSQID' EXIT

for i in $(seq $PARALLEL); do
   {
      while true
      do
          message=$(ipcmd msgrcv) || exit
          [ -f $message ] || break
          sleep $((RANDOM/3000))
      done
   } &
done

find "$DIRECTORY" -type f | xargs ipcmd msgsnd

for i in $(seq $PARALLEL); do
   ipcmd msgsnd "/dev/null/bar"
done
wait

Aquí hay una prueba de funcionamiento:

$ for i in $(seq 20 10 100) ; do time parallel.sh /usr/lib/ $i ; done
parallel.sh /usr/lib/ $i  0.30s user 0.67s system 0% cpu 1:57.23 total
parallel.sh /usr/lib/ $i  0.28s user 0.69s system 1% cpu 1:09.58 total
parallel.sh /usr/lib/ $i  0.19s user 0.80s system 1% cpu 1:05.29 total
parallel.sh /usr/lib/ $i  0.29s user 0.73s system 2% cpu 44.417 total
parallel.sh /usr/lib/ $i  0.25s user 0.80s system 2% cpu 37.353 total
parallel.sh /usr/lib/ $i  0.21s user 0.85s system 3% cpu 32.354 total
parallel.sh /usr/lib/ $i  0.30s user 0.82s system 3% cpu 28.542 total
parallel.sh /usr/lib/ $i  0.27s user 0.88s system 3% cpu 30.219 total
parallel.sh /usr/lib/ $i  0.34s user 0.84s system 4% cpu 26.535 total
kouk
fuente
0

A menos que pueda estimar cuánto tiempo se procesará un archivo de entrada particular y los procesos de trabajo no tienen una forma de informar al planificador (como lo hacen en escenarios de computación paralela normales, a menudo a través de MPI ), generalmente no tiene suerte - pagar la penalidad de algunos trabajadores que procesan información por más tiempo que otros (debido a la desigualdad de información), o pagar la penalidad de generar un único proceso nuevo para cada archivo de entrada.

Peterph
fuente
0

GNU Parallel ha cambiado en los últimos 7 años. Entonces hoy puede hacerlo:

Este ejemplo muestra que se dan más bloques al proceso 11 y 10 que al proceso 4 y 5 porque 4 y 5 leen más lentamente:

seq 1000000 |
  parallel -j8 --tag --roundrobin --pipe --block 1k 'pv -qL {}0000 | wc' ::: 11 4 5 6 9 8 7 10
Ole Tange
fuente