Prefiero Python sobre Scala. Pero, como Spark está escrito de forma nativa en Scala, esperaba que mi código se ejecute más rápido en Scala que en la versión de Python por razones obvias.
Con esa suposición, pensé en aprender y escribir la versión Scala de un código de preprocesamiento muy común para aproximadamente 1 GB de datos. Los datos se obtienen de la competencia SpringLeaf en Kaggle . Solo para dar una visión general de los datos (contiene 1936 dimensiones y 145232 filas). Los datos se componen de varios tipos, por ejemplo, int, float, string, boolean. Estoy usando 6 núcleos de 8 para el procesamiento de Spark; Es por eso que utilicé minPartitions=6
para que cada núcleo tenga algo que procesar.
Código Scala
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Código de Python
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
Scala Performance Etapa 0 (38 minutos), Etapa 1 (18 segundos)
Python Performance Etapa 0 (11 minutos), Etapa 1 (7 segundos)
Ambos producen diferentes gráficos de visualización DAG (debido a que ambas imágenes muestran diferentes funciones de etapa 0 para Scala ( map
) y Python ( reduceByKey
))
Pero, esencialmente, ambos códigos intentan transformar los datos en (dimension_id, cadena de la lista de valores) RDD y guardarlos en el disco. La salida se usará para calcular varias estadísticas para cada dimensión.
En cuanto al rendimiento, el código Scala para estos datos reales como este parece ejecutarse 4 veces más lento que la versión de Python. La buena noticia para mí es que me dio una buena motivación para seguir con Python. La mala noticia es que no entendí bien por qué.
fuente
Respuestas:
La respuesta original sobre el código se puede encontrar a continuación.
En primer lugar, debe distinguir entre diferentes tipos de API, cada uno con sus propias consideraciones de rendimiento.
API RDD
(estructuras puras de Python con orquestación basada en JVM)
Este es el componente que se verá más afectado por el rendimiento del código Python y los detalles de la implementación de PySpark. Si bien es poco probable que el rendimiento de Python sea un problema, hay al menos algunos factores que debe tener en cuenta:
Ejecutores basados en procesos (Python) versus ejecutores basados en subprocesos (JVM múltiples subprocesos múltiples) (Scala). Cada ejecutor de Python se ejecuta en su propio proceso. Como efecto secundario, proporciona un aislamiento más fuerte que su contraparte JVM y un cierto control sobre el ciclo de vida del ejecutor, pero un uso de memoria potencialmente significativamente mayor:
Rendimiento del código Python en sí. En general, Scala es más rápido que Python, pero variará de una tarea a otra. Además tiene varias opciones, incluyendo los ECI como Numba , extensiones C ( Cython ) o bibliotecas especializadas como Teano . Finalmente,
si no usa ML / MLlib (o simplemente la pila NumPy), considere usar PyPy como un intérprete alternativo. Ver SPARK-3094 .spark.python.worker.reuse
opción que puede usarse para elegir entre bifurcar el proceso de Python para cada tarea y reutilizar el proceso existente. La última opción parece ser útil para evitar la recolección de basura costosa (es más una impresión que un resultado de pruebas sistemáticas), mientras que la primera (predeterminada) es óptima en caso de emisiones e importaciones costosas.MLlib
(ejecución mixta de Python y JVM)
Las consideraciones básicas son más o menos las mismas que antes con algunos problemas adicionales. Si bien las estructuras básicas utilizadas con MLlib son objetos RDD Python simples, todos los algoritmos se ejecutan directamente usando Scala.
Significa un costo adicional de convertir objetos Python en objetos Scala y viceversa, un mayor uso de memoria y algunas limitaciones adicionales que cubriremos más adelante.
A partir de ahora (Spark 2.x), la API basada en RDD está en modo de mantenimiento y está programada para eliminarse en Spark 3.0 .
DataFrame API y Spark ML
(Ejecución JVM con código Python limitado al controlador)
Estas son probablemente la mejor opción para las tareas estándar de procesamiento de datos. Dado que el código de Python se limita principalmente a operaciones lógicas de alto nivel en el controlador, no debería haber diferencia de rendimiento entre Python y Scala.
Una única excepción es el uso de UDF de Python en fila que son significativamente menos eficientes que sus equivalentes Scala. Si bien hay alguna posibilidad de mejoras (ha habido un desarrollo sustancial en Spark 2.0.0), la mayor limitación es el viaje de ida y vuelta completo entre la representación interna (JVM) y el intérprete de Python. Si es posible, debe favorecer una composición de expresiones incorporadas (por ejemplo, el comportamiento UDF de Python se ha mejorado en Spark 2.0.0, pero aún es subóptimo en comparación con la ejecución nativa.
Esto
puede mejorar en el futuroha mejorado significativamente con la introducción de las UDF vectorizadas (SPARK-21190 y otras extensiones) , que utiliza Arrow Streaming para el intercambio de datos eficiente con deserialización de copia cero. Para la mayoría de las aplicaciones, sus gastos generales secundarios pueden ignorarse.También asegúrese de evitar pasar datos innecesarios entre
DataFrames
yRDDs
. Esto requiere serialización y deserialización costosas, sin mencionar la transferencia de datos hacia y desde el intérprete de Python.Vale la pena señalar que las llamadas Py4J tienen una latencia bastante alta. Esto incluye llamadas simples como:
Por lo general, no debería importar (la sobrecarga es constante y no depende de la cantidad de datos), pero en el caso de aplicaciones de software en tiempo real, puede considerar el almacenamiento en caché / reutilización de contenedores Java.
GraphX y Spark DataSets
Por ahora (Spark
GraphX1.62.1) ninguno proporciona API de PySpark, por lo que puede decir que PySpark es infinitamente peor que Scala.En la práctica, el desarrollo de GraphX se detuvo casi por completo y el proyecto se encuentra actualmente en el modo de mantenimiento con los tickets JIRA relacionados cerrados, ya que no se solucionarán . La biblioteca GraphFrames proporciona una biblioteca alternativa de procesamiento de gráficos con enlaces de Python.
Conjunto de datosSubjetivamente hablando, no hay mucho lugar para escribir estáticamente
Datasets
en Python e incluso si existiera la implementación actual de Scala es demasiado simplista y no proporciona los mismos beneficios de rendimiento queDataFrame
.Transmisión
Por lo que he visto hasta ahora, recomendaría usar Scala sobre Python. Puede cambiar en el futuro si PySpark obtiene soporte para flujos estructurados, pero en este momento Scala API parece ser mucho más robusto, completo y eficiente. Mi experiencia es bastante limitada.
La transmisión estructurada en Spark 2.x parece reducir la brecha entre los idiomas, pero por ahora todavía está en sus primeros días. Sin embargo, la API basada en RDD ya se menciona como "transmisión heredada" en la documentación de Databricks (fecha de acceso 2017-03-03)) por lo que es razonable esperar más esfuerzos de unificación.
Consideraciones de no rendimiento
Paridad de característicasNo todas las características de Spark están expuestas a través de la API PySpark. Asegúrese de verificar si las piezas que necesita ya están implementadas e intente comprender las posibles limitaciones.
Es particularmente importante cuando utiliza MLlib y contextos mixtos similares (consulte Llamar a la función Java / Scala desde una tarea ). Para ser justos, algunas partes de la API PySpark, como
Diseño APImllib.linalg
, proporcionan un conjunto de métodos más completo que Scala.La API PySpark refleja de cerca su contraparte Scala y, como tal, no es exactamente Pythonic. Significa que es bastante fácil de mapear entre idiomas, pero al mismo tiempo, el código de Python puede ser significativamente más difícil de entender.
Arquitectura complejaEl flujo de datos de PySpark es relativamente complejo en comparación con la ejecución pura de JVM. Es mucho más difícil razonar sobre programas PySpark o depuración. Además, al menos una comprensión básica de Scala y JVM en general es prácticamente imprescindible.
Spark 2.xy más alláEl cambio continuo hacia la
Dataset
API, con la API RDD congelada, brinda oportunidades y desafíos para los usuarios de Python. Si bien las partes de alto nivel de la API son mucho más fáciles de exponer en Python, las características más avanzadas son prácticamente imposibles de usar directamente .Además, las funciones nativas de Python continúan siendo ciudadanos de segunda clase en el mundo SQL. Con suerte, esto mejorará en el futuro con la serialización de Apache Arrow ( los esfuerzos actuales apuntan a los datos,
collection
pero serde UDF es un objetivo a largo plazo ).Para proyectos que dependen en gran medida de la base de código de Python, las alternativas puras de Python (como Dask o Ray ) podrían ser una alternativa interesante.
No tiene que ser uno contra el otro
La API Spark DataFrame (SQL, Dataset) proporciona una forma elegante de integrar el código Scala / Java en la aplicación PySpark. Puede usar
DataFrames
para exponer datos a un código JVM nativo y volver a leer los resultados. He explicado algunas opciones en otro lugar y puedes encontrar un ejemplo de trabajo de ida y vuelta en Python-Scala en Cómo usar una clase Scala dentro de Pyspark .Se puede aumentar aún más mediante la introducción de tipos definidos por el usuario (consulte ¿Cómo definir el esquema para el tipo personalizado en Spark SQL? ).
¿Qué tiene de malo el código provisto en la pregunta?
(Descargo de responsabilidad: punto de vista de Pythonista. Lo más probable es que me haya perdido algunos trucos de Scala)
En primer lugar, hay una parte en su código que no tiene ningún sentido. Si ya tiene
(key, value)
pares creados usandozipWithIndex
oenumerate
¿cuál es el punto de crear una cadena solo para dividirla inmediatamente después?flatMap
no funciona de forma recursiva, por lo que simplemente puede producir tuplas y omitir el seguimiento enmap
absoluto.Otra parte que encuentro problemática es
reduceByKey
. En términos generales,reduceByKey
es útil si la aplicación de la función de agregado puede reducir la cantidad de datos que se deben barajar. Dado que simplemente concatena cadenas, no hay nada que ganar aquí. Ignorando cosas de bajo nivel, como el número de referencias, la cantidad de datos que tiene que transferir es exactamente la misma que paragroupByKey
.Normalmente no me detendría en eso, pero por lo que puedo decir es un cuello de botella en su código Scala. Unir cadenas en JVM es una operación bastante costosa (ver, por ejemplo: ¿La concatenación de cadenas en scala es tan costosa como en Java? ). Significa que algo como esto,
_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
que es equivalente ainput4.reduceByKey(valsConcat)
en su código, no es una buena idea.Si se quiere evitar
groupByKey
que usted puede tratar de utilizaraggregateByKey
conStringBuilder
. Algo similar a esto debería hacer el truco:Pero dudo que valga la pena.
Teniendo en cuenta lo anterior, he reescrito su código de la siguiente manera:
Scala :
Python :
Resultados
En
local[6]
modo (Intel (R) Xeon (R) CPU E3-1245 V2 @ 3.40GHz) con 4GB de memoria por ejecutor que se necesita (n = 3):Estoy bastante seguro de que la mayor parte de ese tiempo se dedica a barajar, serializar, deserializar y otras tareas secundarias. Solo por diversión, aquí hay un ingenuo código de subproceso único en Python que realiza la misma tarea en esta máquina en menos de un minuto:
fuente
Extensión a las respuestas anteriores -
Scala es más rápido en muchos aspectos en comparación con Python, pero hay algunas razones válidas por las que Python se está volviendo más popular que Scala, veamos algunos de ellos:
Python para Apache Spark es bastante fácil de aprender y usar. Sin embargo, esta no es la única razón por la cual Pyspark es una mejor opción que Scala. Hay más.
La API de Python para Spark puede ser más lenta en el clúster, pero al final, los científicos de datos pueden hacer mucho más en comparación con Scala. La complejidad de Scala está ausente. La interfaz es simple y completa.
Hablar sobre la legibilidad del código, el mantenimiento y la familiaridad con la API de Python para Apache Spark es mucho mejor que Scala.
Python viene con varias bibliotecas relacionadas con el aprendizaje automático y el procesamiento del lenguaje natural. Esto ayuda en el análisis de datos y también tiene estadísticas muy maduras y probadas en el tiempo. Por ejemplo, numpy, pandas, scikit-learn, seaborn y matplotlib.
Nota: La mayoría de los científicos de datos utilizan un enfoque híbrido en el que utilizan lo mejor de ambas API.
Por último, la comunidad Scala a menudo resulta ser mucho menos útil para los programadores. Esto hace de Python un aprendizaje muy valioso. Si tiene suficiente experiencia con cualquier lenguaje de programación estáticamente tipado como Java, puede dejar de preocuparse por no usar Scala por completo.
fuente