Rendimiento de chispa para Scala vs Python

178

Prefiero Python sobre Scala. Pero, como Spark está escrito de forma nativa en Scala, esperaba que mi código se ejecute más rápido en Scala que en la versión de Python por razones obvias.

Con esa suposición, pensé en aprender y escribir la versión Scala de un código de preprocesamiento muy común para aproximadamente 1 GB de datos. Los datos se obtienen de la competencia SpringLeaf en Kaggle . Solo para dar una visión general de los datos (contiene 1936 dimensiones y 145232 filas). Los datos se componen de varios tipos, por ejemplo, int, float, string, boolean. Estoy usando 6 núcleos de 8 para el procesamiento de Spark; Es por eso que utilicé minPartitions=6para que cada núcleo tenga algo que procesar.

Código Scala

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Código de Python

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala Performance Etapa 0 (38 minutos), Etapa 1 (18 segundos) ingrese la descripción de la imagen aquí

Python Performance Etapa 0 (11 minutos), Etapa 1 (7 segundos) ingrese la descripción de la imagen aquí

Ambos producen diferentes gráficos de visualización DAG (debido a que ambas imágenes muestran diferentes funciones de etapa 0 para Scala ( map) y Python ( reduceByKey))

Pero, esencialmente, ambos códigos intentan transformar los datos en (dimension_id, cadena de la lista de valores) RDD y guardarlos en el disco. La salida se usará para calcular varias estadísticas para cada dimensión.

En cuanto al rendimiento, el código Scala para estos datos reales como este parece ejecutarse 4 veces más lento que la versión de Python. La buena noticia para mí es que me dio una buena motivación para seguir con Python. La mala noticia es que no entendí bien por qué.

Mrityunjay
fuente
8
Quizás esto depende del código y de la aplicación, ya que obtengo el otro resultado, que apache spark python es más lento que scala, al sumar mil millones de términos de la fórmula de Leibniz para π
Paul
3
¡Interesante pregunta! Por cierto, eche un vistazo aquí: emptypipes.org/2015/01/17/python-vs-scala-vs-spark Mientras más núcleos tenga, menos podrá ver las diferencias entre los idiomas.
Markon
¿Has considerado aceptar la respuesta existente?
10465355 dice Reinstate Monica

Respuestas:

357

La respuesta original sobre el código se puede encontrar a continuación.


En primer lugar, debe distinguir entre diferentes tipos de API, cada uno con sus propias consideraciones de rendimiento.

API RDD

(estructuras puras de Python con orquestación basada en JVM)

Este es el componente que se verá más afectado por el rendimiento del código Python y los detalles de la implementación de PySpark. Si bien es poco probable que el rendimiento de Python sea un problema, hay al menos algunos factores que debe tener en cuenta:

  • Sobrecarga de la comunicación JVM. Prácticamente todos los datos que vienen hacia y desde el ejecutor de Python tienen que pasar a través de un socket y un trabajador JVM. Si bien esta es una comunicación local relativamente eficiente, todavía no es gratuita.
  • Ejecutores basados ​​en procesos (Python) versus ejecutores basados ​​en subprocesos (JVM múltiples subprocesos múltiples) (Scala). Cada ejecutor de Python se ejecuta en su propio proceso. Como efecto secundario, proporciona un aislamiento más fuerte que su contraparte JVM y un cierto control sobre el ciclo de vida del ejecutor, pero un uso de memoria potencialmente significativamente mayor:

    • huella de memoria del intérprete
    • huella de las bibliotecas cargadas
    • Difusión menos eficiente (cada proceso requiere su propia copia de una emisión)
  • Rendimiento del código Python en sí. En general, Scala es más rápido que Python, pero variará de una tarea a otra. Además tiene varias opciones, incluyendo los ECI como Numba , extensiones C ( Cython ) o bibliotecas especializadas como Teano . Finalmente, si no usa ML / MLlib (o simplemente la pila NumPy) , considere usar PyPy como un intérprete alternativo. Ver SPARK-3094 .

  • La configuración de PySpark proporciona la spark.python.worker.reuseopción que puede usarse para elegir entre bifurcar el proceso de Python para cada tarea y reutilizar el proceso existente. La última opción parece ser útil para evitar la recolección de basura costosa (es más una impresión que un resultado de pruebas sistemáticas), mientras que la primera (predeterminada) es óptima en caso de emisiones e importaciones costosas.
  • El recuento de referencias, utilizado como el método de recolección de basura de primera línea en CPython, funciona bastante bien con las cargas de trabajo típicas de Spark (procesamiento similar a la secuencia, sin ciclos de referencia) y reduce el riesgo de pausas prolongadas de GC.

MLlib

(ejecución mixta de Python y JVM)

Las consideraciones básicas son más o menos las mismas que antes con algunos problemas adicionales. Si bien las estructuras básicas utilizadas con MLlib son objetos RDD Python simples, todos los algoritmos se ejecutan directamente usando Scala.

Significa un costo adicional de convertir objetos Python en objetos Scala y viceversa, un mayor uso de memoria y algunas limitaciones adicionales que cubriremos más adelante.

A partir de ahora (Spark 2.x), la API basada en RDD está en modo de mantenimiento y está programada para eliminarse en Spark 3.0 .

DataFrame API y Spark ML

(Ejecución JVM con código Python limitado al controlador)

Estas son probablemente la mejor opción para las tareas estándar de procesamiento de datos. Dado que el código de Python se limita principalmente a operaciones lógicas de alto nivel en el controlador, no debería haber diferencia de rendimiento entre Python y Scala.

Una única excepción es el uso de UDF de Python en fila que son significativamente menos eficientes que sus equivalentes Scala. Si bien hay alguna posibilidad de mejoras (ha habido un desarrollo sustancial en Spark 2.0.0), la mayor limitación es el viaje de ida y vuelta completo entre la representación interna (JVM) y el intérprete de Python. Si es posible, debe favorecer una composición de expresiones incorporadas (por ejemplo, el comportamiento UDF de Python se ha mejorado en Spark 2.0.0, pero aún es subóptimo en comparación con la ejecución nativa.

Esto puede mejorar en el futuro ha mejorado significativamente con la introducción de las UDF vectorizadas (SPARK-21190 y otras extensiones) , que utiliza Arrow Streaming para el intercambio de datos eficiente con deserialización de copia cero. Para la mayoría de las aplicaciones, sus gastos generales secundarios pueden ignorarse.

También asegúrese de evitar pasar datos innecesarios entre DataFramesy RDDs. Esto requiere serialización y deserialización costosas, sin mencionar la transferencia de datos hacia y desde el intérprete de Python.

Vale la pena señalar que las llamadas Py4J tienen una latencia bastante alta. Esto incluye llamadas simples como:

from pyspark.sql.functions import col

col("foo")

Por lo general, no debería importar (la sobrecarga es constante y no depende de la cantidad de datos), pero en el caso de aplicaciones de software en tiempo real, puede considerar el almacenamiento en caché / reutilización de contenedores Java.

GraphX ​​y Spark DataSets

Por ahora (Spark 1.6 2.1) ninguno proporciona API de PySpark, por lo que puede decir que PySpark es infinitamente peor que Scala.

GraphX

En la práctica, el desarrollo de GraphX ​​se detuvo casi por completo y el proyecto se encuentra actualmente en el modo de mantenimiento con los tickets JIRA relacionados cerrados, ya que no se solucionarán . La biblioteca GraphFrames proporciona una biblioteca alternativa de procesamiento de gráficos con enlaces de Python.

Conjunto de datos

Subjetivamente hablando, no hay mucho lugar para escribir estáticamente Datasetsen Python e incluso si existiera la implementación actual de Scala es demasiado simplista y no proporciona los mismos beneficios de rendimiento que DataFrame.

Transmisión

Por lo que he visto hasta ahora, recomendaría usar Scala sobre Python. Puede cambiar en el futuro si PySpark obtiene soporte para flujos estructurados, pero en este momento Scala API parece ser mucho más robusto, completo y eficiente. Mi experiencia es bastante limitada.

La transmisión estructurada en Spark 2.x parece reducir la brecha entre los idiomas, pero por ahora todavía está en sus primeros días. Sin embargo, la API basada en RDD ya se menciona como "transmisión heredada" en la documentación de Databricks (fecha de acceso 2017-03-03)) por lo que es razonable esperar más esfuerzos de unificación.

Consideraciones de no rendimiento

Paridad de características

No todas las características de Spark están expuestas a través de la API PySpark. Asegúrese de verificar si las piezas que necesita ya están implementadas e intente comprender las posibles limitaciones.

Es particularmente importante cuando utiliza MLlib y contextos mixtos similares (consulte Llamar a la función Java / Scala desde una tarea ). Para ser justos, algunas partes de la API PySpark, como mllib.linalg, proporcionan un conjunto de métodos más completo que Scala.

Diseño API

La API PySpark refleja de cerca su contraparte Scala y, como tal, no es exactamente Pythonic. Significa que es bastante fácil de mapear entre idiomas, pero al mismo tiempo, el código de Python puede ser significativamente más difícil de entender.

Arquitectura compleja

El flujo de datos de PySpark es relativamente complejo en comparación con la ejecución pura de JVM. Es mucho más difícil razonar sobre programas PySpark o depuración. Además, al menos una comprensión básica de Scala y JVM en general es prácticamente imprescindible.

Spark 2.xy más allá

El cambio continuo hacia la DatasetAPI, con la API RDD congelada, brinda oportunidades y desafíos para los usuarios de Python. Si bien las partes de alto nivel de la API son mucho más fáciles de exponer en Python, las características más avanzadas son prácticamente imposibles de usar directamente .

Además, las funciones nativas de Python continúan siendo ciudadanos de segunda clase en el mundo SQL. Con suerte, esto mejorará en el futuro con la serialización de Apache Arrow ( los esfuerzos actuales apuntan a los datos,collection pero serde UDF es un objetivo a largo plazo ).

Para proyectos que dependen en gran medida de la base de código de Python, las alternativas puras de Python (como Dask o Ray ) podrían ser una alternativa interesante.

No tiene que ser uno contra el otro

La API Spark DataFrame (SQL, Dataset) proporciona una forma elegante de integrar el código Scala / Java en la aplicación PySpark. Puede usar DataFramespara exponer datos a un código JVM nativo y volver a leer los resultados. He explicado algunas opciones en otro lugar y puedes encontrar un ejemplo de trabajo de ida y vuelta en Python-Scala en Cómo usar una clase Scala dentro de Pyspark .

Se puede aumentar aún más mediante la introducción de tipos definidos por el usuario (consulte ¿Cómo definir el esquema para el tipo personalizado en Spark SQL? ).


¿Qué tiene de malo el código provisto en la pregunta?

(Descargo de responsabilidad: punto de vista de Pythonista. Lo más probable es que me haya perdido algunos trucos de Scala)

En primer lugar, hay una parte en su código que no tiene ningún sentido. Si ya tiene (key, value)pares creados usando zipWithIndexo enumerate¿cuál es el punto de crear una cadena solo para dividirla inmediatamente después? flatMapno funciona de forma recursiva, por lo que simplemente puede producir tuplas y omitir el seguimiento en mapabsoluto.

Otra parte que encuentro problemática es reduceByKey. En términos generales, reduceByKeyes útil si la aplicación de la función de agregado puede reducir la cantidad de datos que se deben barajar. Dado que simplemente concatena cadenas, no hay nada que ganar aquí. Ignorando cosas de bajo nivel, como el número de referencias, la cantidad de datos que tiene que transferir es exactamente la misma que para groupByKey.

Normalmente no me detendría en eso, pero por lo que puedo decir es un cuello de botella en su código Scala. Unir cadenas en JVM es una operación bastante costosa (ver, por ejemplo: ¿La concatenación de cadenas en scala es tan costosa como en Java? ). Significa que algo como esto, _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) que es equivalente a input4.reduceByKey(valsConcat)en su código, no es una buena idea.

Si se quiere evitar groupByKeyque usted puede tratar de utilizar aggregateByKeycon StringBuilder. Algo similar a esto debería hacer el truco:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

Pero dudo que valga la pena.

Teniendo en cuenta lo anterior, he reescrito su código de la siguiente manera:

Scala :

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python :

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

Resultados

En local[6]modo (Intel (R) Xeon (R) CPU E3-1245 V2 @ 3.40GHz) con 4GB de memoria por ejecutor que se necesita (n = 3):

  • Scala - media: 250.00s, stdev: 12.49
  • Python - media: 246.66s, stdev: 1.15

Estoy bastante seguro de que la mayor parte de ese tiempo se dedica a barajar, serializar, deserializar y otras tareas secundarias. Solo por diversión, aquí hay un ingenuo código de subproceso único en Python que realiza la misma tarea en esta máquina en menos de un minuto:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])
cero323
fuente
22
Una de las respuestas más claras, completas y útiles que he encontrado por un tiempo. ¡Gracias!
etov
¡Qué tipo tan soberbio eres!
DennisLi
-4

Extensión a las respuestas anteriores -

Scala es más rápido en muchos aspectos en comparación con Python, pero hay algunas razones válidas por las que Python se está volviendo más popular que Scala, veamos algunos de ellos:

Python para Apache Spark es bastante fácil de aprender y usar. Sin embargo, esta no es la única razón por la cual Pyspark es una mejor opción que Scala. Hay más.

La API de Python para Spark puede ser más lenta en el clúster, pero al final, los científicos de datos pueden hacer mucho más en comparación con Scala. La complejidad de Scala está ausente. La interfaz es simple y completa.

Hablar sobre la legibilidad del código, el mantenimiento y la familiaridad con la API de Python para Apache Spark es mucho mejor que Scala.

Python viene con varias bibliotecas relacionadas con el aprendizaje automático y el procesamiento del lenguaje natural. Esto ayuda en el análisis de datos y también tiene estadísticas muy maduras y probadas en el tiempo. Por ejemplo, numpy, pandas, scikit-learn, seaborn y matplotlib.

Nota: La mayoría de los científicos de datos utilizan un enfoque híbrido en el que utilizan lo mejor de ambas API.

Por último, la comunidad Scala a menudo resulta ser mucho menos útil para los programadores. Esto hace de Python un aprendizaje muy valioso. Si tiene suficiente experiencia con cualquier lenguaje de programación estáticamente tipado como Java, puede dejar de preocuparse por no usar Scala por completo.


fuente