Spark: ¿Por qué Python supera significativamente a Scala en mi caso de uso?

16

Para comparar el rendimiento de Spark al usar Python y Scala, creé el mismo trabajo en ambos idiomas y comparé el tiempo de ejecución. Esperaba que ambos trabajos tomaran aproximadamente la misma cantidad de tiempo, pero el trabajo de Python solo tomó 27min, mientras que el trabajo de Scala tomó 37min(¡casi un 40% más!). También implementé el mismo trabajo en Java y también me tomó 37minutes. ¿Cómo es posible que Python sea mucho más rápido?

Ejemplo mínimo verificable:

Trabajo de Python:

# Configuration
conf = pyspark.SparkConf()
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
conf.set("spark.executor.instances", "4")
conf.set("spark.executor.cores", "8")
sc = pyspark.SparkContext(conf=conf)

# 960 Files from a public dataset in 2 batches
input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

# Count occurances of a certain string
logData = sc.textFile(input_files)
logData2 = sc.textFile(input_files2)
a = logData.filter(lambda value: value.startswith('WARC-Type: response')).count()
b = logData2.filter(lambda value: value.startswith('WARC-Type: response')).count()

print(a, b)

Trabajo Scala:

// Configuration
config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

// 960 Files from a public dataset in 2 batches 
val input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
val input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

// Count occurances of a certain string
val logData1 = sc.textFile(input_files)
val logData2 = sc.textFile(input_files2)
val num1 = logData1.filter(line => line.startsWith("WARC-Type: response")).count()
val num2 = logData2.filter(line => line.startsWith("WARC-Type: response")).count()

println(s"Lines with a: $num1, Lines with b: $num2")

Simplemente mirando el código, parecen ser idénticos. Miré a los DAG y no me dieron ninguna idea (o al menos me falta el conocimiento para llegar a una explicación basada en ellos).

Realmente agradecería cualquier puntero.

maestromusica
fuente
Los comentarios no son para discusión extendida; Esta conversación se ha movido al chat .
Samuel Liew
1
Hubiera comenzado el análisis, antes de preguntar nada, cronometrando los bloques y declaraciones correspondientes para ver si había un lugar en particular donde la versión de Python es más rápida. Entonces es posible que haya podido agudizar la pregunta a 'por qué esta declaración de Python es más rápida'.
Terry Jan Reedy

Respuestas:

11

Su suposición básica, que Scala o Java deberían ser más rápidos para esta tarea específica, es simplemente incorrecta. Puede verificarlo fácilmente con aplicaciones locales mínimas. Scala uno:

import scala.io.Source
import java.time.{Duration, Instant}

object App {
  def main(args: Array[String]) {
    val Array(filename, string) = args

    val start = Instant.now()

    Source
      .fromFile(filename)
      .getLines
      .filter(line => line.startsWith(string))
      .length

    val stop = Instant.now()
    val duration = Duration.between(start, stop).toMillis
    println(s"${start},${stop},${duration}")
  }
}

Python uno

import datetime
import sys

if __name__ == "__main__":
    _, filename, string = sys.argv
    start = datetime.datetime.now()
    with open(filename) as fr:
        # Not idiomatic or the most efficient but that's what
        # PySpark will use
        sum(1 for _ in filter(lambda line: line.startswith(string), fr))

    end = datetime.datetime.now()
    duration = round((end - start).total_seconds() * 1000)
    print(f"{start},{end},{duration}")

Resultados (300 repeticiones cada uno, Python 3.7.6, Scala 2.11.12), Posts.xmldesde el volcado de datos hermeneutics.stackexchange.com con una combinación de patrones coincidentes y no coincidentes:

diagramas de caja de durartion en millis para los programas anteriores

  • Python 273.50 (258.84, 288.16)
  • Scala 634.13 (533.81, 734.45)

Como puede ver, Python no solo es sistemáticamente más rápido, sino que también es más consistente (menor difusión).

El mensaje para llevar es: no creas FUD sin fundamento : los idiomas pueden ser más rápidos o más lentos en tareas específicas o con entornos específicos (por ejemplo, aquí Scala puede verse afectado por el inicio de JVM y / o GC y / o JIT), pero si reclamas como "XYZ es X4 más rápido" o "XYZ es lento en comparación con ZYX (..) Aproximadamente, 10 veces más lento" generalmente significa que alguien escribió un código realmente malo para probar cosas.

Editar :

Para abordar algunas inquietudes planteadas en los comentarios:

  • En el código OP, los datos se pasan principalmente en una dirección (JVM -> Python) y no se requiere una serialización real (esta ruta específica solo pasa la cadena de bytes tal como está y decodifica en UTF-8 en el otro lado). Eso es lo más barato posible cuando se trata de "serialización".
  • Lo que se devuelve es solo un número entero por partición, por lo que en esa dirección el impacto es insignificante.
  • La comunicación se realiza a través de sockets locales (toda la comunicación en el trabajador más allá de la conexión inicial y la autenticación se realiza utilizando el descriptor de archivo devuelto local_connect_and_auth, y no es más que un archivo asociado a un socket ). Una vez más, tan barato como se pone cuando se trata de comunicación entre procesos.
  • Teniendo en cuenta la diferencia en el rendimiento bruto que se muestra arriba (mucho más alto que lo que ve en su programa), hay un amplio margen para los gastos generales enumerados anteriormente.
  • Este caso es completamente diferente de los casos en los que se deben pasar objetos simples o complejos desde y hacia el intérprete de Python en una forma accesible para ambas partes como volcados compatibles con encurtidos (los ejemplos más notables incluyen UDF de estilo antiguo, algunas partes de viejos -style MLLib).

Edición 2 :

Como jasper-m estaba preocupado por el costo de inicio aquí, uno puede demostrar fácilmente que Python todavía tiene una ventaja significativa sobre Scala, incluso si el tamaño de entrada aumenta significativamente.

Aquí están los resultados para 2003360 líneas / 5.6G (la misma entrada, solo duplicada varias veces, 30 repeticiones), que excede todo lo que puede esperar en una sola tarea de Spark.

ingrese la descripción de la imagen aquí

  • Python 22809.57 (21466.26, 24152.87)
  • Scala 27315.28 (24367.24, 30263.31)

Tenga en cuenta los intervalos de confianza no superpuestos.

Edición 3 :

Para abordar otro comentario de Jasper-M:

La mayor parte de todo el procesamiento todavía ocurre dentro de una JVM en el caso de Spark.

Eso es simplemente incorrecto en este caso particular:

  • El trabajo en cuestión es el trabajo de mapa con reducción global única utilizando RDD de PySpark.
  • PySpark RDD (a diferencia de, digamos DataFrame) implementa gran cantidad de funcionalidades de forma nativa en Python, con entrada de excepción, salida y comunicación entre nodos.
  • Dado que es un trabajo de una sola etapa, y el resultado final es lo suficientemente pequeño como para ser ignorado, la responsabilidad principal de JVM (si se tratara de un punto crítico, esto se implementa principalmente en Java, no en Scala) es invocar el formato de entrada de Hadoop y enviar los datos a través del socket archivo a Python.
  • La parte de lectura es idéntica para JVM y Python API, por lo que puede considerarse una sobrecarga constante. Tampoco califica como la mayor parte del procesamiento , incluso para un trabajo tan simple como este.
usuario10938362
fuente
3
Excelente enfoque del problema. Gracias por compartir esto
Alexandros Biratsis
1
@egordoe Alexandros dijo "no se invoca UDF aquí", no que "Python no se invoca", eso hace toda la diferencia. La sobrecarga de serialización es importante cuando los datos se intercambian entre sistemas (es decir, cuando desea pasar datos a un UDF y viceversa).
user10938362
1
@egordoe Claramente confundes dos cosas: la sobrecarga de la serialización, que es un problema cuando los objetos no triviales se pasan de un lado a otro. Y sobrecarga de comunicación. Aquí hay poca o ninguna sobrecarga de serialización, porque simplemente pasa y decodifica las cadenas de bytes, y eso sucede principalmente en la dirección, ya que de regreso obtienes un entero por partición. La comunicación es motivo de cierta preocupación, pero el paso de datos a través de sockets locales es eficiente, ya que realmente se convierte en comunicación entre procesos. Si eso no está claro, recomiendo leer la fuente, no es difícil y será esclarecedor.
usuario10938362
1
Además, los métodos de serialización simplemente no son iguales. Como muestra el caso de Spark, los buenos métodos de serialización pueden reducir los costos al nivel en el que ya no es un problema (ver Pandas UDF con flecha) y cuando sucede, otros factores pueden dominar (ver, por ejemplo, comparaciones de rendimiento entre las funciones de la ventana Scala y sus equivalentes con Pandas UDF: Python gana por un margen mucho más alto allí que en esta pregunta).
user10938362
1
¿Y tu punto es @ Jasper-M? Las tareas individuales de Spark suelen ser lo suficientemente pequeñas como para tener una carga de trabajo comparable a esta. No me tome por el camino equivocado, pero si tiene algún contraejemplo real que invalide esta o toda la pregunta, publíquela. Ya he señalado que las acciones secundarias contribuyen en cierta medida a este valor, pero no dominan el costo. Todos somos ingenieros (de algún tipo) aquí, hablemos de números y códigos, no creencias, ¿de acuerdo?
user10938362
4

El trabajo de Scala lleva más tiempo porque tiene una configuración incorrecta y, por lo tanto, los trabajos de Python y Scala han recibido recursos desiguales.

Hay dos errores en el código:

val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sc.hadoopConfiguration.set("spark.executor.instances", "4") // LINE #4
sc.hadoopConfiguration.set("spark.executor.cores", "8") // LINE #5
  1. LÍNEA 1. Una vez que se ha ejecutado la línea, la configuración de recursos del trabajo Spark ya está establecida y arreglada. A partir de este momento, no hay forma de ajustar nada. Ni el número de ejecutores ni el número de núcleos por ejecutor.
  2. LÍNEA 4-5. sc.hadoopConfigurationes un lugar incorrecto para configurar cualquier configuración de Spark. Debe establecerse en la configinstancia a la que pasa new SparkContext(config).

[AGREGADO] Teniendo en cuenta lo anterior, propondría cambiar el código del trabajo de Scala a

config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

y vuelva a probarlo de nuevo. Apuesto a que la versión de Scala será X veces más rápida ahora.

egordoe
fuente
¿Verifiqué que ambos trabajos ejecutan 32 tareas en paralelo, así que no creo que sea el culpable?
maestromusica
gracias por la edición, intentaremos probarlo ahora mismo
maestromusica
hola @maestromusica debe ser algo en la configuración de recursos porque, intrínsecamente, Python puede no superar a Scala en este caso de uso en particular. Otra razón puede ser algunos factores aleatorios no correlacionados, es decir, la carga del clúster en el momento particular y similar. Por cierto, ¿qué modo usas? independiente, local, hilo?
egordoe
Sí, he verificado que esta respuesta es incorrecta. El tiempo de ejecución es el mismo. También imprimí la configuración en ambos casos y es idéntica.
maestromusica
1
Creo que tienes razón. Hice esta pregunta para investigar todas las demás posibilidades, como un error en el código o tal vez que no entendí algo. Gracias por tu contribución.
maestromusica