Punto muerto cuando se programan simultáneamente muchos trabajos de chispa

17

Uso de spark 2.4.4 que se ejecuta en modo de clúster YARN con el programador FIFO de spark.

Estoy enviando múltiples operaciones de trama de datos de chispa (es decir, escribiendo datos en S3) usando un ejecutor de grupo de subprocesos con un número variable de subprocesos. Esto funciona bien si tengo ~ 10 hilos, pero si uso cientos de hilos, parece que hay un punto muerto, sin trabajos programados de acuerdo con la interfaz de usuario de Spark.

¿Qué factores controlan cuántos trabajos se pueden programar simultáneamente? ¿Recursos del controlador (por ejemplo, memoria / núcleos)? ¿Alguna otra configuración de chispa?

EDITAR:

Aquí hay una breve sinopsis de mi código

ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);

Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);

List<Future<Void>> futures = listOfSeveralHundredThings
  .stream()
  .map(aThing -> ecs.submit(() -> {
    df
      .filter(col("some_column").equalTo(aThing))
      .write()
      .format("org.apache.hudi")
      .options(writeOptions)
      .save(outputPathFor(aThing));
    return null;
  }))
  .collect(Collectors.toList());

IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();

En algún momento, a medida que nThreadsaumenta, la chispa ya no parece estar programando ningún trabajo como lo demuestra:

  • ecs.poll(...) tiempo de espera eventualmente
  • La pestaña de trabajos de Spark UI que no muestra trabajos activos
  • La pestaña Ejecutores de Spark UI que no muestra tareas activas para ningún ejecutor
  • La pestaña Spark UI SQL que muestra nThreadsconsultas en ejecución sin ID de trabajo en ejecución

Mi entorno de ejecución es

  • AWS EMR 5.28.1
  • Spark 2.4.4
  • Nodo maestro = m5.4xlarge
  • Nodos centrales = 3x rd5.24xlarge
  • spark.driver.cores=24
  • spark.driver.memory=32g
  • spark.executor.memory=21g
  • spark.scheduler.mode=FIFO
Scott
fuente
¿Hay una sección específica que discute esto? He leído esos documentos varias veces en los últimos días y no he encontrado la respuesta que estoy buscando.
Scott
2
¿Puede mostrar el código que utiliza para enviar trabajos de Spark a través del ejecutor de grupo de subprocesos? Parece que el punto muerto está sucediendo antes de que se envíe el trabajo de Spark.
Salim
1
¿Puedes publicar tu código? Proporcione detalles sobre su entorno: CPU, RAM; ¿también cómo estás creando los hilos: simultáneamente o en pequeños grupos de 10?
Saheed
Lo sentimos, ¿cómo quieres decir que los trabajos no están programados? No aparecen en la interfaz de usuario de Spark, o aparecen en la lista de trabajos, pero ¿las tareas no se ejecutan? De cualquier manera, si sospecha un punto muerto, ejecute jstack -lpara obtener un volcado de subprocesos con información de bloqueo.
Daniel Darabos

Respuestas:

0

Si es posible, escriba la salida de los trabajos en AWS Elastic MapReduce hdfs (para aprovechar los cambios de nombre casi instantáneos y un mejor archivo IO de los hdf locales) y agregue un paso dstcp para mover los archivos a S3, para ahorrarse todos los problemas de manejar el entrañas de una tienda de objetos tratando de ser un sistema de archivos. También escribir en archivos hdf locales le permitirá habilitar la especulación para controlar las tareas fuera de control sin caer en las trampas de bloqueo asociadas con DirectOutputCommiter.

Si debe usar S3 como el directorio de salida, asegúrese de que estén configuradas las siguientes configuraciones de Spark

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false

Nota: DirectParquetOutputCommitter se elimina de Spark 2.0 debido a la posibilidad de pérdida de datos. Desafortunadamente, hasta que hayamos mejorado la consistencia de S3a, tenemos que trabajar con las soluciones. Las cosas están mejorando con Hadoop 2.8

Evite los nombres clave en orden lexicográfico. Uno podría usar prefijos aleatorios / aleatorios o invertir fecha y hora para moverse. El truco consiste en nombrar las claves jerárquicamente, colocando las cosas más comunes por las que filtra en el lado izquierdo de la clave. Y nunca tenga guiones bajos en los nombres de los depósitos debido a problemas de DNS.

Habilitar fs.s3a.fast.upload uploadpartes de un solo archivo en Amazon S3 en paralelo

Consulte estos artículos para obtener más detalles.

Configuración de spark.speculation en Spark 2.1.0 al escribir en s3

https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98

Devesh mehta
fuente
AWS tiene su propio committer docs.aws.amazon.com/emr/latest/ReleaseGuide/…
mazaneicha
0

En mi opinión, es probable que estés abordando este problema mal A menos que pueda garantizar que el número de tareas por trabajo sea muy bajo, es probable que no obtenga muchas mejoras de rendimiento al paralelizar cientos de trabajos a la vez. Su clúster solo puede admitir 300 tareas a la vez, suponiendo que esté utilizando el paralelismo predeterminado de 200, es decir, solo 1.5 trabajos. Sugeriría reescribir su código para limitar el máximo de consultas simultáneas en 10. Sospecho que tiene 300 consultas con una sola tarea de varios cientos en ejecución. La mayoría de los sistemas de procesamiento de datos OLTP tienen intencionalmente un nivel bastante bajo de consultas concurrentes en comparación con los sistemas RDS más tradicionales por este motivo.

además

  1. Apache Hudi tiene un paralelismo predeterminado de varios cientos de FYI.
  2. ¿Por qué no solo particionas según tu columna de filtro?
Andrew Long
fuente