Apache Spark: impacto de volver a particionar, ordenar y almacenar en caché en una unión

10

Estoy explorando el comportamiento de Spark al unir una tabla consigo misma. Estoy usando Databricks.

Mi escenario ficticio es:

  1. Leer una tabla externa como marco de datos A (los archivos subyacentes están en formato delta)

  2. Defina el marco de datos B como el marco de datos A con solo ciertas columnas seleccionadas

  3. Unir los marcos de datos A y B en la columna 1 y la columna 2

(Sí, no tiene mucho sentido, solo estoy experimentando para entender la mecánica subyacente de Spark)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

Mi primer intento fue ejecutar el código tal como está (intento 1). Luego intenté repartir y almacenar en caché (intento 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

Finalmente, particioné, clasifiqué y almacené en caché

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

Los dags respectivos generados están como adjuntos.

Mis preguntas son:

  1. ¿Por qué en el intento 1 la tabla parece estar en caché aunque el almacenamiento en caché no se haya especificado explícitamente?

  2. Por qué InMemoreTableScan siempre es seguido por otro nodo de este tipo.

  3. ¿Por qué en el intento 3 el almacenamiento en caché parece tener lugar en dos etapas?

  4. Por qué en el intento 3 WholeStageCodegen sigue a uno (y solo uno) InMemoreTableScan.

intento 1

intento 2

ingrese la descripción de la imagen aquí

Dawid
fuente
Sospecho que el lector DataFrame almacena en caché los datos automáticamente cuando la fuente es una tabla externa. Tengo una situación similar en la que estoy leyendo datos de una tabla de base de datos, mientras que puedo descargar la pestaña "SQL" en 'IU de detalles de la aplicación' me muestra el número de filas que se están descargando pero aún no se ha guardado ningún archivo en la ubicación especificada . Supongo que conoce el recuento porque tiene datos en caché en algún lugar y eso es lo que aparece en el DAG. Si lee datos de un archivo de texto localmente, no verá el estado de la memoria caché.
Salim

Respuestas:

4

Lo que observa en estos 3 planes es una mezcla de tiempo de ejecución de DataBricks y Spark.

En primer lugar, mientras se ejecuta DataBricks runtime 3.3+, el almacenamiento en caché se habilita automáticamente para todos los archivos de parquet. Configuración correspondiente para eso: spark.databricks.io.cache.enabled true

Para su segunda consulta, InMemoryTableScan ocurre dos veces porque justo cuando se llamó a join, spark intentó calcular el conjunto de datos A y el conjunto de datos B en paralelo. Suponiendo que a los diferentes ejecutores se les asignaron las tareas anteriores, ambos deberán escanear la tabla desde el caché (DataBricks).

Para el tercero, InMemoryTableScan no se refiere al almacenamiento en caché en sí mismo. Simplemente significa que cualquier plan de catalizador formado implicaba escanear la tabla en caché varias veces.

PD: no puedo visualizar el punto 4 :)

Ashvjit Singh
fuente